El archivo contiene un grupo de funciones usadas para procesar los datos de este prototipo, se separaron del bloque principal de instrucciones con el objetivo de simplificar el desarrollo del las actividades principales.
Recuerde disponer de este archivo en la misma carpeta que el algoritmo que invoca a la librería.
archivo de libreria: girni_lora_libreria
datos de muestras
tabulaPunto(unarchivo,directorio):
Lee el archivo de un punto, tabula cada lectura por dispositivo en diccionario,
por modo: rx, rx y remitente de cada paquete: baliza.
describePunto(punto):
estadistica descriptiva de un punto, calcula: count, mean, std, min, 25%, 50%, 75%, min, max
resumen_medida(punto,medida,descriptor):
Realiza la tabla resumen de puntos por la medida y descriptor a partir de la tabla de los puntos estadisticos descritos
Linealización para ecuación
pares_usar(tabla, baliza, analiza,unabaliza, unsector =», medida = ‘rssi’,modo = ‘rx’)
Selecciona desde la tabla puntos a usar respecto a una baliza, el resultado se entrega en una lista que contiene los pares ordenados y sus etiquetas [pares, par_etiqueta]
linealiza_lstsq(xi,yi,digitos = 3)
emplea el método de minimos cuadrados para entregar la ecuacion mediante un diccionario que contiene los parámetros para aplicarla.
La variable dígitos indica cuántos dígitos se usarán para la ecuación en formato latex.
El procedimiento se describe en: Rssi(distancia) Linealización – función Python
unaecuacion = {'alpha' : alpha,
'beta' : beta,
'eq_latex': fdtxt0,
'intervalox' : [np.min(xi),np.max(xi)],
'error_medio': dyi0mean,
'error_std' : dyi0std,
'eqg_latex' : grtxt0,
'intervaloy' : [np.min(yi),np.max(yi)],
'errorx_medio': dxi0mean,
'errorx_std' : dxi0std,
}
Proceso de triangulación
dist_rssi(valor,ecuacion_rssi). Evalua la ecuacion de distancia(rssi) revisando los intervalos disponibles para el valor de rssi dado.
cruce2circulos(x1,y1,r1,x2,y2,r2). Revisa intervalo de area de cruce entre dos círculos de centro y radio: x1, y1, r1 // x2, y2, r2.
raices2circulos(x1,y1,r1,x2,y2,r2,tolera=1e-10). Busca las intersección entre 2 circulos de centro y radio: x1,y1,r1 || x2,y2,r2 . Revisa con cruce2circulos().
intersectacirculos(radio,centro,tolera=1e-10). Busca las intersecciones entre parejas de varios círculos y las entrega como [raicesx,raicesy], usa las funciones cruce2circulos() y con raices2circulos() que se encuentran en el enlace: Solución General de intersección de círculos
trilatera(radio,centro,tolera=1e-10). Busca el baricentro entre las intersecciones de varios círculos, punto central en el área de
intersección entre varios circulos. Requiere el resultado de la función:
raiztodas = intersectacirculos(centro,radio, tolera = 1e-10)
Algoritmo Python
# Girni LoRa librerias 2020-10-07
# LoRa-Multipunto, lecturas de Rssi y SNR
# Girni 2020-10-07 propuesta: edelros@espol.edu.ec
import numpy as np
import pandas as pd
import scipy.optimize as sp
def tabulaPunto(unarchivo,carpeta, prefijo = 'multipunto'):
''' Lee el archivo de un punto dentro del carpeta,
elimina el prefijo el nombre del archivo,
tabula cada lectura por dispositivo en diccionario,
por modo: rx, rx
y remitente de cada paquete: baliza
Prepara para procesar estadistica descriptiva
'''
# Datos estructura
punto = {'rx':{},
'tx':{},
'nombre': ' '}
# Lectura de unarchivo
unarchivoubica = carpeta+'/'+unarchivo
archivoPunto = open(unarchivoubica,'r')
# nombre del punto desde nombre archivo
pnombre = unarchivo
pnombre = pnombre.strip('.txt')
n = len(prefijo)
pnombre = pnombre[n:]
punto['nombre'] = pnombre
linea = archivoPunto.readline()
while (linea!=''):
linea_rx = linea.startswith('rx')
linea_tx = linea.startswith('tx')
if linea_rx or linea_tx:
# formato de trama:
# tx_rx, c1_ff, d1_d2_d3, numtrama,
# rssitx, snrtx, rssi_rx,snr_rx
texto = linea.strip('\n')
texto = texto.split(',')
rx_tx = texto[0]
dir_recibe = texto[1]
dir_remite = texto[2]
ID_paquete = int(texto[3])
rssi_tx = float(texto[4])
snr_tx = float(texto[5])
rssi_rx = float(texto[6])
snr_rx = float(texto[7])
# llena datos
if dir_remite in punto[rx_tx].keys():
punto[rx_tx][dir_remite]['rssi_rx'].append(rssi_rx)
punto[rx_tx][dir_remite]['snr_rx'].append(snr_rx)
punto[rx_tx][dir_remite]['secuencia_rx'].append(ID_paquete)
punto[rx_tx][dir_remite]['rssi_tx'].append(rssi_tx)
punto[rx_tx][dir_remite]['snr_tx'].append(snr_tx)
else:
punto[rx_tx][dir_remite] = {'rssi_rx': [rssi_rx],
'snr_rx' : [snr_rx],
'secuencia_rx':[ID_paquete],
'rssi_tx': [rssi_tx],
'snr_tx' : [snr_tx]}
# siguiente línea
linea = archivoPunto.readline()
archivoPunto.close()
return(punto)
def describePunto(punto):
''' estadistica descriptiva de un punto
calcula y registra count, mean, std,
min,25%,50%,75%,max
'''
estadistica = {}
for modo in punto.keys():
estadistica[modo] = {}
# analiza rssi y snr para rx y tx
for disp in punto[modo].keys():
estadistica[modo][disp] = ''
valores = pd.DataFrame(punto[modo][disp])
descrito = valores.describe()
descrito = descrito.drop('secuencia_rx',axis=1)
estadistica[modo][disp] = descrito
return(estadistica)
def resumen_medida(punto,medida,descriptor):
'''
Realiza la tabla resumen de puntos por
medida ('rssi' o 'snr')
y descriptor ('count, mean, std,
min,25%,50%,75%,max)
a partir de la tabla de los puntos estadisticos descritos
'''
rsm_disp = pd.DataFrame()
for disp in punto:
undisp_rx = punto[disp][medida+'_rx']
etiquetarx = medida+'_rx_'+disp
rsm_disp[etiquetarx] = undisp_rx.copy()
for disp in punto:
undisp_tx = punto[disp][medida+'_tx']
etiquetatx = medida+'_tx_'+disp
rsm_disp[etiquetatx] = undisp_tx.copy()
unafila = rsm_disp.loc[descriptor]
return(unafila)
def pares_usar(tabla,baliza, analiza,
unabaliza, unsector ='',
medida = 'rssi', modo = 'rx'):
''' Selecciona en tabla los puntos a usar
respecto a una baliza y sector
resultado en [pares, par_etiqueta]
'''
# balizas referencia para analizar
baliza_key = list(baliza.keys())
baliza_val = list(baliza.values())
donde = baliza_val.index(unabaliza)
cualbaliza = baliza_key[donde]
# banderas de uso y atipicos
bal_sec = cualbaliza+unsector
tabla['usar_'+bal_sec] = 0
tabla['atip_'+bal_sec] = 0
# Parametros
if unsector == '':
analizarque = analiza[unabaliza]
else:
analizarque = analiza[unabaliza][unsector]
atipico_std = analizarque['atipico_std']
bal_grp = analizarque['grp']
bal_tip = analizarque['tip']
bal_LOS = analizarque['LOS']
# usar segmento de grupo/tipo, bandera True/False
for cadapunto in tabla.index:
cond1 = tabla['grupo'][cadapunto] in bal_grp
cond2 = tabla['tipo'][cadapunto] in bal_tip
cond3 = tabla['LOS_'+cualbaliza][cadapunto] in bal_LOS
cond4 = True
if unsector != '':
cond4 = tabla['sector_'+cualbaliza][cadapunto] == int(unsector.strip('s'))
usar = cond1 and cond2 and cond3 and cond4
valor = 0
if usar:
valor = 1
tabla.loc[cadapunto,'usar_'+bal_sec] = valor
# datos hacia baliza
pares = []
par_etiqueta = []
for cadapunto in tabla.index:
columna = medida+'_'+modo+'_'+cualbaliza
xk = tabla['dist_'+cualbaliza][cadapunto]
yk = tabla[columna][cadapunto]
# no vacio y para usar
cond1 = not(np.isnan(yk))
cond2 = tabla['usar_'+bal_sec][cadapunto]
if cond1 and cond2:
unpar = np.array([xk,yk])
# llena pares y etiquetas
if len(pares)>0:
pares = np.concatenate((pares,[unpar]),axis=0)
par_etiqueta = np.concatenate((par_etiqueta,[cadapunto]),
axis=0)
else:
pares = np.array([unpar])
par_etiqueta = np.array([cadapunto])
# ordena pares para gráfica
if len(pares)>0:
ordenar = np.argsort(pares[:, 0])
pares = pares[ordenar]
par_etiqueta = par_etiqueta[ordenar]
return ([pares, par_etiqueta])
def linealiza_lstsq(xi,yi,digitos = 3):
''' usa minimos cuadrados para entregar la ecuacion
digitos: usados en expresion latex
'''
unaecuacion = {}
# Eje x en log10()
xilog = np.log10(xi)
n = len(xi)
# mínimos cuadrados (least square),
# distancia vs medida
A = np.vstack([xilog, np.ones(n)]).T
[m0, b0] = np.linalg.lstsq(A, yi, rcond=None)[0]
alpha = -m0/10
beta = b0
# ecuaciones expresion rssi(d)
fdist0 = lambda d: -10*alpha*(np.log10(d))+beta
fdtxt0 = r'$ rssi = -10(' + str(np.round(alpha,digitos))
fdtxt0 = fdtxt0 + ')log_{10}(d)' # +('
texto = '+'
if beta <0:
texto = '-'
fdtxt0 = fdtxt0 + texto + str(np.round(np.abs(beta),digitos))+' $'
# Errores respecto a rssi(d)
yi0 = fdist0(xi)
dyi0 = yi - yi0
dyi0mean = np.mean(np.abs(dyi0))
dyi0std = np.std(dyi0, dtype=np.float64)
# ecuaciones expresion d(rssi)
grssi0 = lambda rssi: 10**((beta-rssi)/(10*alpha))
grtxt0 = r"$ d = 10^{(" + str(np.round(beta,digitos)) + ' - '
grtxt0 = grtxt0 + 'rssi)/' + '(10('+str(np.round(alpha,digitos))+'))} $'
# Errores respecto a rssi(d)
xi0 = grssi0(yi)
dxi0 = xi - xi0
dxi0mean = np.mean(np.abs(dxi0))
dxi0std = np.std(dxi0, dtype=np.float64)
unaecuacion = {'alpha' : alpha,
'beta' : beta,
'eq_latex': fdtxt0,
'intervalox' : [np.min(xi),np.max(xi)],
'error_medio': dyi0mean,
'error_std' : dyi0std,
'eqg_latex' : grtxt0,
'intervaloy' : [np.min(yi),np.max(yi)],
'errorx_medio': dxi0mean,
'errorx_std' : dxi0std,
}
return(unaecuacion)
def dist_rssi(valor,ecuacion_rssi, desplazar = 0):
''' evalua ecuacion de distancia(rssi)
revisando los intervalos disponibles
'''
# resultados
distancia = np.nan
e_mean = np.nan
e_1std = np.nan
e_2std = np.nan
# Revisa intervalos en ecuacion
interv_fuera = 0
tamano = len(ecuacion_rssi)
eq_cual = list(ecuacion_rssi.keys())
# revisa si hay ecuaciones
if tamano >= 1:
# en intervalo de todos los puntos?
i_eq = 'r0'
a = ecuacion_rssi[i_eq]['intervaloy'][0]
b = ecuacion_rssi[i_eq]['intervaloy'][1]
desplaza = 0; valor0 = valor
if 'desplaza' in list(ecuacion_rssi[i_eq].keys()):
desplaza = ecuacion_rssi[i_eq]['desplaza']
if desplazar == 1:
valor0 = valor - desplaza
if valor0<a:
interv_fuera = -1 # izquierda
if valor0>=b:
interv_fuera = 1 # derecha
if interv_fuera!=0:
if interv_fuera == 1: # derecha
i_eq = eq_cual[1]
if interv_fuera == -1: # izquierda
i_eq = eq_cual[-1]
# intervalo rssi [a,b)
a = ecuacion_rssi[i_eq]['intervaloy'][0]
b = ecuacion_rssi[i_eq]['intervaloy'][1]
alpha = ecuacion_rssi[i_eq]['alpha']
beta = ecuacion_rssi[i_eq]['beta']
# correccion de formula por desplazamiento
desplaza = 0; valor1 = valor
if 'desplaza' in list(ecuacion_rssi[i_eq].keys()):
desplaza = ecuacion_rssi[i_eq]['desplaza']
if desplazar == 1:
valor1 = valor - desplaza
distancia = 10**((valor1-beta)/(-10*alpha))
# errores estimados de distancia
error_medio = ecuacion_rssi[i_eq]['error_medio']
error_std = ecuacion_rssi[i_eq]['error_std']
dist_mean = 10**((valor-error_medio-beta)/(-10*alpha))
dist_1std = 10**((valor-error_std-beta)/(-10*alpha))
dist_2std = 10**((valor-2*error_std-beta)/(-10*alpha))
e_mean = np.abs(distancia-dist_mean)
e_1std = np.abs(distancia-dist_1std)
e_2std = np.abs(distancia-dist_2std)
# evalua valor si hay ecuacion
if tamano>1 and interv_fuera == 0:
donde = eq_cual.index('r0')
eq_cual.pop(donde)
# Revisa que exista ecuacion en cada intervalo
for i_eq in eq_cual:
if ecuacion_rssi[i_eq] is None:
donde = eq_cual.index(i_eq)
eq_cual.pop(donde)
# revisa intervalo y evalua
for i_eq in eq_cual:
# correccion de formula por desplazamiento
desplaza = 0; valor1 = valor
if 'desplaza' in list(ecuacion_rssi[i_eq].keys()):
desplaza = ecuacion_rssi[i_eq]['desplaza']
if desplazar == 1:
valor1 = valor - desplaza
# intervalo rssi [a,b)
a = ecuacion_rssi[i_eq]['intervaloy'][0]
b = ecuacion_rssi[i_eq]['intervaloy'][1]
cond1 = (valor1>=a) and (valor1<b)
if cond1:
alpha = ecuacion_rssi[i_eq]['alpha']
beta = ecuacion_rssi[i_eq]['beta']
distancia = 10**((valor1-beta)/(-10*alpha))
# errores estimados de distancia
error_medio = ecuacion_rssi[i_eq]['error_medio']
error_std = ecuacion_rssi[i_eq]['error_std']
dist_mean = 10**((valor-error_medio-beta)/(-10*alpha))
dist_1std = 10**((valor-error_std-beta)/(-10*alpha))
dist_2std = 10**((valor-2*error_std-beta)/(-10*alpha))
e_mean = np.abs(distancia-dist_mean)
e_1std = np.abs(distancia-dist_1std)
e_2std = np.abs(distancia-dist_2std)
return([distancia,e_mean,e_1std,e_2std,interv_fuera])
# Las siguientes funciones tienen como objetivo
# realizar la trilateración entre varios circulos
# def trilatera(radio,centro,tolera=1e-10)
# requieren las funiones:
# def cruce2circulos(x1,y1,r1,x2,y2,r2)
# def raices2circulos(x1,y1,r1,x2,y2,r2,tolera=1e-10)
# def intersectacirculos(radio,centro,tolera=1e-10)
def cruce2circulos(x1,y1,r1,x2,y2,r2):
''' Revisa intervalo de area de cruce
entre dos círculos de centro y radio
x1,y1,r1 // x2,y2,r2
'''
uncruce = []
dx = x2 - x1
dy = y2 - y1
d_centros = np.sqrt(dx**2 + dy**2)
d_cruce = r2 + r1
# los circulos se cruzan o tocan
if d_cruce >= d_centros:
# intervalos de cruce
xa = np.max([x1-r1,x2-r2])
xb = np.min([x1+r1,x2+r2])
ya = np.max([y1-r1,y2-r2])
yb = np.min([y1+r1,y2+r2])
# cada circulo arriba, abajo
abajo1 = 0 ; arriba1 = 0
abajo2 = 0 ; arriba2 = 0
if ya<=y1:
abajo1 = 1
if yb>=y1:
arriba1 = 1
if ya<=y2:
abajo2 = 1
if yb>=y2:
arriba2 = 1
sector = [ abajo1*abajo2, abajo1*arriba2,
arriba1*abajo2, arriba1*arriba2]
uncruce = [xa,xb,ya,yb,sector]
return(uncruce)
def raices2circulos(x1,y1,r1,x2,y2,r2,tolera=1e-10):
''' busca las intersección entre 2 circulos
de centro y radio: x1,y1,r1 || x2,y2,r2
revisa con cruce2circulos()
'''
casicero = tolera*np.min([r1,r2])
uncruce = cruce2circulos(x1,y1,r1,x2,y2,r2)
raizx = []; raizy = []
secruzan = 0
# si hay cruce de circulos
if len(uncruce)>0:
sectores = [[-1,-1],[-1,1],
[ 1,-1],[ 1,1]]
[xa,xb,ya,yb,sector] = uncruce
xc = (xa+xb)/2
dx = xb-xa
dy = yb-ya
k = len(sector)
if dx<casicero: # se tocan en un punto
k = 1
for j in range(0,k,1):
if sector[j]==1:
s1 = sectores[j][0]
s2 = sectores[j][1]
def gx(x,x1,r1,casicero):
z = r1**2-(x-x1)**2
if np.abs(z)<casicero:
z = 0
return(z)
fx1 = lambda x: s1*np.sqrt(gx(x,x1,r1,casicero)) + y1
fx2 = lambda x: s2*np.sqrt(gx(x,x2,r2,casicero)) + y2
fx = lambda x: fx1(x)-fx2(x)
fa = fx(xa)
fb = fx(xb)
raiz1 = np.nan
raiz2 = np.nan
# intervalo/2 izquierda
xc = xc + dx*tolera
fc = fx(xc)
cambio = np.sign(fa)*np.sign(fc)
if cambio<0:
raiz1 = sp.bisect(fx,xa,xc,xtol=tolera)
# intervalo/2 derecha
xc = xc - 2*dx*tolera
fc = fx(xc)
cambio = np.sign(fc)*np.sign(fb)
if cambio<0:
raiz2 = sp.bisect(fx,xc,xb,xtol=tolera)
# si hay contacto en un borde
if dx<casicero and dy>0:
raiz1 = xa
if dy<casicero and dx>0:
raiz1 = x1
# Añade si existe raiz
if not(np.isnan(raiz1)):
raizx.append(raiz1)
raizy.append(fx1(raiz1))
if not(np.isnan(raiz2)):
raizx.append(raiz2)
raizy.append(fx1(raiz2))
secruzan = 1
# No hay cruce de circulos
if len(uncruce) == 0:
dx = x2 - x1
dy = y2 - y1
m = dy/dx
theta = np.arctan2(dy,dx)
dx1 = r1* np.cos(theta)
dx2 = r2* np.cos(theta)
xi1 = x1 + dx1
xi2 = x2 - dx2
b = y1 - m*x1
raizx = [(xi1+xi2)/2]
raizy = [m*raizx[0]+b]
raices = [raizx,raizy,secruzan]
return(raices)
def intersectacirculos(radio,centro,tolera=1e-10):
''' busca las intersecciones entre parejas de varios
círculos y las entrega como [raicesx,raicesy]
usa las funciones cruce2circulos()
y con raices2circulos() que se encuentran en el enlace:
s3Eva_IT2018_T1 Intersección de dos círculos
'''
vertices = list(centro.keys())
n = len (vertices)
# agrupa raices en todasx y todasy
todasx = [] ; todasy = []
cruces = np.zeros(shape=(n,n),dtype=int)
for i in range(0,n-1,1):
for j in range(i+1,n,1):
x1 = centro[vertices[i]][0]
y1 = centro[vertices[i]][1]
r1 = radio[vertices[i]]
x2 = centro[vertices[j]][0]
y2 = centro[vertices[j]][1]
r2 = radio[vertices[j]]
# busca raices entre 2 circulos
raices = raices2circulos(x1,y1,r1,x2,y2,r2,tolera)
raizx = raices[0]
raizy = raices[1]
cruces[i,j] = raices[2]
cruces[j,i] = raices[2]
m = len(raizx)
if m>0:
for k in range(0,m,1):
todasx.append(raizx[k])
todasy.append(raizy[k])
raiztodas = [todasx,todasy,cruces]
return(raiztodas)
def trilatera(radio,centro,tolera=1e-10):
''' busca el baricentro entre las intersecciones
de varios círculos punto central en el área de
intersección entre varios circulos.
requiere el resultado de la función:
raiztodas = intersectacirculos(centro,radio, tolera = 1e-10)
'''
vertices = list(centro.keys())
n = len (vertices)
# revisa raiz dentro de cada circulo
raiztodas = intersectacirculos(radio,centro,tolera)
todasx = raiztodas[0]
todasy = raiztodas[1]
cruces = raiztodas[2]
m = len(todasx)
raicesx = []
raicesy = []
fuera = []
for k in range(0,m,1):
xk = todasx[k]
yk = todasy[k]
dentro = 0
for i in range(0,n,1):
x1 = centro[vertices[i]][0]
y1 = centro[vertices[i]][1]
r1 = radio[vertices[i]]
dx = x1-xk
dy = y1-yk
d_centro = np.sqrt(dx**2+dy**2)
if d_centro<=(r1*(1+tolera)):
dentro = dentro + 1
if dentro == n:
raicesx.append(xk)
raicesy.append(yk)
# busca baricentro
baricentro = np.nan
barerror = np.nan
q = len(raicesx)
if q>0:
xbar = np.mean(raicesx)
ybar = np.mean(raicesy)
baricentro = [xbar,ybar]
barerror = 0
for i in range(0,q,1):
d = np.sqrt((xbar-raicesx[i])**2+(ybar-raicesy[i])**2)
if d>barerror:
barerror = d
poligono = [raicesx,raicesy]
else:
poligono = [todasx,todasy]
resultado = {'baricentro': baricentro,
'barerror' : barerror,
'poligono' : poligono,
'nocruzaen' : ''}
# revisa espacio entre circulos
sumacruces = np.sum(cruces,axis = 0)
if 0 in list(sumacruces):
raicesx = todasx.copy()
raicesy = todasy.copy()
for i in range(0,n,1):
if sumacruces[i]==0:
x1 = centro[vertices[i]][0]
y1 = centro[vertices[i]][1]
lejanamax = -1
lejana_en = -1
for k in range(0,len(raicesx),1):
dx = x1 - raicesx[k]
dy = y1 - raicesy[k]
d_centro = np.sqrt(dx**2+dy**2)
if d_centro>lejanamax:
lejanamax = d_centro
lejana_en = k
if lejana_en>=0:
raicesx.pop(lejana_en)
raicesy.pop(lejana_en)
# busca baricentro
baricentro = np.nan
barerror = np.nan
q = len(raicesx)
if q>0:
xbar = np.mean(raicesx)
ybar = np.mean(raicesy)
baricentro = [xbar,ybar]
barerror = 0
for i in range(0,q,1):
d = np.sqrt((xbar-raicesx[i])**2+(ybar-raicesy[i])**2)
if d>barerror:
barerror = d
poligono = [raicesx,raicesy]
else:
poligono = [todasx,todasy]
resultado = {'baricentro': baricentro,
'barerror' : barerror,
'poligono' : poligono,
'nocruzaen' : vertices[i] }
return(resultado)