2.2. Lectura de registros en states.json de un Punto con Python

El procesamiento de los datos históricos de «un sensor» en una ubicación se realiza con Python desde el archivo generado en la sección anterior. Esta forma se obtiene un archivo nuevo y de menor tamaño para procesar con los datos de solo el dispositivo que se propone analizar.

states20211022.json

El formato de la tabla es el mismo que el observado en «DB Browser».

Los campos a usar son principalmente:

  • «entity_id» para la selección de un sensor,
  • «attributes» donde previamente se configuró para guardar  el mensaje mqtt de cada transmisión
  • «created» que contine la fecha y hora de creación del registro. referenciada a GMT=0.

Parametros guardados en attributes

El campo «attributes» de cada registro contiene varios parámetros posibles a usar en el formato del mensaje. Un ejemplo del contenido se indica en el recuadro en rojo mostrado en lala figura anterior, y se presenta a continuación.

{"applicationID":"4",
 "applicationName":"Leer_RssiSNR",
 "deviceName":"cc11Pruebas",
 "devEUI":"pT7GFa7ePxE=",
 "rxInfo":[{"gatewayID":"uCfr//4dylc=",
            "time":null,"timeSinceGPSEpoch":null,
            "rssi":-63,
            "loRaSNR":10.8,
            "channel":3,
            "rfChain":0,
            "board":0,
            "antenna":0,
            "location":{"latitude":-2.1418242484279535,
                        "longitude":-79.96711134910585,
                        "altitude":20,
                        "source":"UNKNOWN",
                        "accuracy":0},
            "fineTimestampType":"NONE",
            "context":"CmMVZA==",
            "uplinkID":"I7NWhENCQyaFtYoZIDw3KQ==",
            "crcStatus":"CRC_OK"}],
 "txInfo":{"frequency":902900000,
           "modulation":"LORA",
           "loRaModulationInfo":{"bandwidth":125,
                                 "spreadingFactor":10,
                                 "codeRate":"4/5",
                                 "polarizationInversion":false}
           },
 "adr":true,
 "dr":0,
 "fCnt":778,
 "fPort":4,
 "data":"KwcK9A8=",
 "objectJSON":"{\"Down_datarate\":10,\"Down_rssi\":-43,\"Down_snr\":7,\"bateria_V\":4.08}",
 "tags":{},
 "confirmedUplink":true,
 "devAddr":"AVqCdA==",
 "publishedAt":"2021-10-21T16:28:36.265518462Z",
 "unit_of_measurement":"dBm",
 "friendly_name":"rssi_up_cc11"
 }

El formato del ejemplo mostrado se realizó con el editor de Python para facilidad de lectura,

Procesamiento de datos para un sensor

El procesamiento de cada registro se realiza separando los campos a usar, en una tabla los campos representan las  columnas a usar.

La selección del dispositivos se realiza comparando con «entity_id«.

El intervalo de fechas permite usar ventanas de tiempo de observación. La fecha y hora registrada se encuentra referenciadas a GMT=0, dado que en el pais donde se realizan las muestras usa GMT=-5, se realiza un desplazamiento en las horas para facilitar la identificación de las franjas horarias.

Los identificadores de gateways se simplifican con el diccionario gateway, que deben ser previamente identificados buscando en el archivo.json como «gatewayID». También se revisa el número de trama (fCnt) para registrar las recepción de tramas repetidas en cada gateway, registrando los valores en una lista, para posteriormente, seleccionar el valor mayor, menor, promedio).

Los parámetros de recepción en el gateway se encuentran en «rx_datos» en el siguiente orden: rssi, loRaSNR, channel, rfChain, frequency

El resultado de la selección de los datos se guarda en un nuevo archivo en formato.csv para facilitar la lectura de los datos con otros programas.

Otra forma de guardar los datos en en formato.json, que puede ser leida con la librería Pandas.

Instrucciones en Python para tabla CSV

# Lectura de archivo.json hacia una tabla0
# selecciona registros de "un sensor"
# graba unreporte.csv con pandas
# http://blog.espol.edu.ec/girni/

import numpy as np
import json
import pandas as pd
import datetime as dt
import os

# INGRESO
# archivo de entrada
unarchivoDB = "states20211022.json"
carpeta_DB  = "BaseDatosJSON"

# fecha intervalo
fechainicio = "2021-10-21 11:00:00.0"
fechafin    = "2021-10-22 11:50:00.0"
zonaGMT     = -5

unsensor    = "sensor.rssi_up_cc12"
modelo_disp = "modulo" # capsula, modulo desarrollo
ubicado     = "LOS22"
carpeta_rsm = "resultado_Test"

# gatewayID simplifica identificador 
gatewayDB   = {'uCfr//4dylc=':'Gw03'}

# PROCEDIMIENTO -----
campos = ['domain','entity_id','state','attributes','created']

# Lectura de archivo json, cambia a formato DataFrame
tabla0 = pd.DataFrame()
archivo_ruta  = carpeta_DB + '/' + unarchivoDB
archivoexiste = os.path.exists(archivo_ruta)
if archivoexiste:
    tabla0 = pd.read_json(archivo_ruta)
    tabla0 = pd.DataFrame(tabla0,columns=campos)
else:
    print(' ERROR: NO se encuentra el archivo...')
    print('        revise el nombre de archivo. ')

# Revisa cada registro 
leidos = 0
tabla  = {}
if archivoexiste:
    # Intervalo de fecha como datetime
    hora_desplaza  = dt.timedelta(hours = abs(zonaGMT))
    fechaformatoDB = '%Y-%m-%dT%H:%M:%S.%f'
    fechaformato   = '%Y-%m-%d %H:%M:%S.%f'
    fechatxt     = fechainicio[0:np.min([26,len(fechainicio)])]
    fechainicio  = dt.datetime.strptime(fechatxt,fechaformato)
    fechatxt     = fechafin[0:np.min([26,len(fechafin)])]
    fechafin     = dt.datetime.strptime(fechatxt,fechaformato)

    # datos de trama
    for cadaregistro in tabla0.index:
        unatrama   = tabla0['attributes'][cadaregistro]
        trama_mqtt = json.loads(unatrama)
        
        # selecciona registro por sensor, en fecha y con datos
        cualsensor = (tabla0['entity_id'][cadaregistro] == unsensor)
        
        unafecha = tabla0['created'][cadaregistro]
        unafecha = unafecha[0:np.min([26,len(unafecha)])]
        unafecha = dt.datetime.strptime(unafecha,fechaformato)
        unafecha = unafecha - hora_desplaza
        
        enfecha  = (unafecha>=fechainicio) and (unafecha<=fechafin)
        condatos = 'applicationID' in trama_mqtt.keys()
        
        selecciona = cualsensor and condatos and enfecha
        if (selecciona == True):        
            # datos en la mensaje MQTT
            publishedAt = trama_mqtt["publishedAt"]
            publishedAt = publishedAt[0:np.min([26,len(publishedAt)])]
            publishedAt = dt.datetime.strptime(publishedAt,fechaformatoDB)
            publishedAt = publishedAt - hora_desplaza
            
            friendly_name = trama_mqtt["friendly_name"]
            deviceName    = trama_mqtt["deviceName"]
            dispositivo   = friendly_name.split('_')[2]
            
            dr      = trama_mqtt["dr"]
            fCnt    = trama_mqtt["fCnt"]
            fPort   = trama_mqtt["fPort"]
            freq_tx = trama_mqtt["txInfo"]["frequency"]
            bandwidth = trama_mqtt["txInfo"]["loRaModulationInfo"]["bandwidth"]
            spreadingFactor = trama_mqtt["txInfo"]["loRaModulationInfo"]["spreadingFactor"]

            objectJSON = trama_mqtt["objectJSON"]
            datosensor = json.loads(objectJSON)
                 
            datostrama = {"publishedAt": publishedAt,
                          "dispositivo": dispositivo,
                          "fCnt": fCnt}
            
            # revisa gateway en la red LoRaWan
            tamano = len(trama_mqtt["rxInfo"])
            i = 0
            while i<tamano:
                gatewayID = trama_mqtt["rxInfo"][i]["gatewayID"]
                rssi      = trama_mqtt["rxInfo"][i]["rssi"]
                loRaSNR   = trama_mqtt["rxInfo"][i]["loRaSNR"]
                channel   = trama_mqtt["rxInfo"][i]["channel"]
                rfChain   = trama_mqtt["rxInfo"][i]["rfChain"]
                gtwNum    = gatewayDB[gatewayID]
                
                # registra en tabla, incluyendo tramas repetidas
                datostrama['rssi_up']    = rssi
                datostrama['snr_up']     = loRaSNR
                datostrama['channel_up'] = channel
                datostrama['rfChain_up'] = rfChain
                datostrama['gtw_rx']     = gtwNum
                i = i + 1

            # dato del sensor
            equivale = {'Down_datarate': 'dr_down',
                        'Down_rssi'    : 'rssi_down',
                        'Down_snr'     : 'snr_down',
                        'bateria_V'    : 'bateria_V'}
            for undato in datosensor:
                if undato in equivale.keys():
                    datostrama[equivale[undato]] = datosensor[undato]
                else:
                    datostrama[undato] = datosensor[undato]

            # datos, otros valores
            datostrama["frequency"] = freq_tx
            datostrama["bandwidth"] = bandwidth
            datostrama["spreadingFactor"] = spreadingFactor
            datostrama["fPort"]   = fPort
            datostrama["dr"]      = dr
            datostrama["created"] = unafecha

            leidos = leidos + 1
            
            # revisa registro repetido
            repetido = False
            if leidos>1:
                trama_num  = (fCnt == tabla[leidos-1]['fCnt'])
                fecha_pub  = (publishedAt == tabla[leidos-1]['publishedAt'])
                gtwNum_rep = (gtwNum == tabla[leidos-1]['gtw_rx'])
                repetido   = (trama_num and fecha_pub and gtwNum_rep)
            if not(repetido):
                tabla[leidos] = datostrama
            else:
                leidos = leidos - 1

    # convierte diccionario a DataFrame
    if len(tabla)>0:
        tabla = pd.DataFrame(tabla)
        tabla = tabla.T
        # añade ubicación y modelo (capsula o modulo desarrollo)
        tabla.insert(1,'ubicado',ubicado)
        tabla.insert(2,'modelo_disp',modelo_disp)
        
# directorio de resultados
if len(carpeta_rsm) > 0:
    if not(os.path.exists(carpeta_rsm)):
        os.makedirs(carpeta_rsm)
    carpeta_rsm = carpeta_rsm + '/'

# SALIDA
print('registros leidos:     ', len(tabla0))
print('registros procesados: ', leidos)
print(tabla)

# guarda el reporte en csv
if len(tabla)>0:
    unreporte = carpeta_rsm+'data_'+ubicado+'.csv'
    tabla.to_csv(unreporte)

el resultado de puede visualiza como:

registros leidos:      7090
registros procesados:  98
                  publishedAt ubicado modelo_disp dispositivo  ... spreadingFactor fPort dr                    created
1  2021-10-21 11:34:07.079188   LOS22      modulo        cc12  ...              10     4  0 2021-10-21 11:34:07.105036
2  2021-10-21 11:49:07.862255   LOS22      modulo        cc12  ...              10     4  0 2021-10-21 11:49:07.876219
3  2021-10-21 12:04:07.931128   LOS22      modulo        cc12  ...              10     4  0 2021-10-21 12:04:07.943781
4  2021-10-21 12:19:08.489097   LOS22      modulo        cc12  ...              10     4  0 2021-10-21 12:19:08.504848
5  2021-10-21 12:34:08.479274   LOS22      modulo        cc12  ...              10     4  0 2021-10-21 12:34:08.494229
..                        ...     ...         ...         ...  ...             ...   ... ..                        ...
94 2021-10-22 10:49:46.742877   LOS22      modulo        cc12  ...              10     4  0 2021-10-22 10:49:46.756601
95 2021-10-22 11:04:47.636628   LOS22      modulo        cc12  ...              10     4  0 2021-10-22 11:04:47.652299
96 2021-10-22 11:19:48.310535   LOS22      modulo        cc12  ...              10     4  0 2021-10-22 11:19:48.331143
97 2021-10-22 11:34:49.209626   LOS22      modulo        cc12  ...              10     4  0 2021-10-22 11:34:49.224717
98 2021-10-22 11:49:49.638194   LOS22      modulo        cc12  ...              10     4  0 2021-10-22 11:49:49.651398

[98 rows x 20 columns]
>>>

y el archivo resultante se presenta como:

data_LOS22.csv


Instrucciones en Python para tabla JSON

# Lectura de archivo.json hacia una tabla1
# selecciona registros de "un sensor"
# graba unreporte.json con pandas

import numpy as np
import json
import pandas as pd
import datetime as dt

# INGRESO

# archivo de entrada
unarchivo   = 'states20211022.json'

fechainicio = '2021-10-21 19:00:00.0'
fechafin    = '2021-10-22 17:10:00.0'

unsensor    = 'sensor.rssi_up_cc22'
# archivo de salida
unreporte   = 'tramas20211022_cc12.json'

# PROCEDIMIENTO -----
# columnas o campos a usar
campos    = ['domain','entity_id','state',
             'attributes','created']
# gatewayID simplifica identificador 
gateway   = {'uCfr//5zvhk=':'Gw01',
             'uCfr//4dylc=':'Gw03'}

# Lectura de archivo json
tabla0 = pd.read_json(unarchivo)
# Cambia a formato DataFrame
tabla0 = pd.DataFrame(tabla0,columns=campos)

# Revisa cada registro 
leidos = 0
tabla  = {}

# Intervalo de fecha como datetime
formato     = '%Y-%m-%d %H:%M:%S.%f'
fechainicio = fechainicio[0:np.min([26,len(fechainicio)])]
fechainicio = dt.datetime.strptime(fechainicio,formato)
fechafin    = fechafin[0:np.min([26,len(fechafin)])]
fechafin    = dt.datetime.strptime(fechafin,formato)

# datos de trama
for cadaregistro in tabla0.index:
    unatrama   = tabla0['attributes'][cadaregistro]
    trama_mqtt = json.loads(unatrama)
    
    # selecciona registro por sensor, en fecha y con datos
    cualsensor = (tabla0['entity_id'][cadaregistro] == unsensor)
    
    unafecha = tabla0['created'][cadaregistro]
    unafecha = unafecha[0:np.min([26,len(unafecha)])]
    unafecha = dt.datetime.strptime(unafecha,formato)
    
    enfecha  = (unafecha>=fechainicio) and (unafecha<=fechafin)
    condatos   = 'applicationID' in trama_mqtt.keys()
    
    selecciona = cualsensor and condatos and enfecha
    if (selecciona == True):
        leidos =  leidos + 1
        
        # datos en la mensaje MQTT
        publishedAt = trama_mqtt["publishedAt"]
        friendly_name = trama_mqtt["friendly_name"]
        deviceName    = trama_mqtt["deviceName"]
        dr    = trama_mqtt["dr"]
        fCnt  = trama_mqtt["fCnt"]
        fPort = trama_mqtt["fPort"]

        objectJSON  = trama_mqtt["objectJSON"]
        datosensor  = json.loads(objectJSON)
        
        frequency   = trama_mqtt["txInfo"]["frequency"]
        bandwidth   = trama_mqtt["txInfo"]["loRaModulationInfo"]["bandwidth"]
        spreadingFactor = trama_mqtt["txInfo"]["loRaModulationInfo"]["spreadingFactor"]
             
        datostrama = {"publishedAt":publishedAt,
                      "fPort":fPort, "dr": dr,"fCnt": fCnt,
                      "frequency":  frequency,
                      "bandwidth":  bandwidth,
                      "spreadingFactor": spreadingFactor,
                      "datosensor":datosensor}
        
        # revisa por cada gateway en la red LoRaWan
        tamano = len(trama_mqtt["rxInfo"])
        i = 0
        while i<tamano:
            gatewayID = trama_mqtt["rxInfo"][i]["gatewayID"]
            rssi      = trama_mqtt["rxInfo"][i]["rssi"]
            loRaSNR   = trama_mqtt["rxInfo"][i]["loRaSNR"]
            channel   = trama_mqtt["rxInfo"][i]["channel"]
            rfChain   = trama_mqtt["rxInfo"][i]["rfChain"]
            gtwNum    = gateway[gatewayID]
            i = i+1
            
            # registra en tabla, incluyendo tramas repetidas
            rx_datos = [rssi, loRaSNR,
                        channel, rfChain,frequency/1e6]  
            if not(gtwNum in datostrama.keys()):
                datostrama[gtwNum] = [rx_datos]
            else:
                if not(rx_datos in datostrama[gtwNum]):
                    datostrama[gwNum].append(rx_datos)
        dispositivo = friendly_name.split('_')[2]

        # revisa registro repetido
        disp_existe = dispositivo in tabla
        if not(disp_existe):
            tabla[dispositivo] = {leidos:datostrama}
        else:
            trama_num = (fCnt == tabla[dispositivo][leidos-1]['fCnt'])
            fecha_pub = (publishedAt == tabla[dispositivo][leidos-1]['publishedAt'])
            repetido  = (trama_num and fecha_pub)
            if not(repetido):
                tabla[dispositivo][leidos] = datostrama
            else:
                leidos = leidos - 1

# convierte diccionario en DataFrame
tabla = pd.DataFrame(tabla)

# SALIDA
print('registros leidos:     ',len(tabla0))
print('registros procesados: ', leidos)

# guarda el reporte en json
tabla.to_json(unreporte)

El procesamiento de los datos se puede realizar con librerias como Pandas.

En el procesamiento de registros, se descarta el registro de «join».

Una muestra del reporte es:

registros leidos:      7090
registros procesados:  98
>>> tabla.keys()
Index(['cc12'], dtype='object')
>>> tabla['cc12'].keys()
Int64Index([ 1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11, 12, 13, 14, 15, 16, 17,
            18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34,
            35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51,
            52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68,
            69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85,
            86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98],
           dtype='int64')
>>> tabla['cc12'][2].keys()
dict_keys(['publishedAt', 'fPort', 'dr', 'fCnt',
 'frequency', 'bandwidth', 'spreadingFactor', 'datosensor',
 'Gw03'])
>>> 

>>> tabla['cc12'][2]['datosensor'].keys()
dict_keys(['Down_datarate', 'Down_rssi', 'Down_snr',
 'bateria_V'])

>>> tabla['cc12'][2]['Gw03']
[[-84, 8.5, 1, 0, 902.5]]

Esta forma se obtiene un nuevo y mas pequeño archivo para procesar con los datos de solo el dispositivo que se propone analizar.

tramas20211022_cc12.json

Referencia: Archivos.json – Pandas en blog de Fundamentos de Programación