Ubiquitous Environment Control System(called UECS) is japanese GreenHouse control system. Front UI is Grafana.Database is Influxdb. each sensor data is UDP on UECS protcol. Grafux is capture softwear of UECS data .
# -*- coding: utf-8 -*-
import sys,time,json,subprocess,os
from socket import *
import xml.etree.ElementTree as ET
from datetime import datetime
import configparser,codecs
def serch_table():
print('Please wait a few minuite............')
print('')
print('Please wait a few minuite............')
print('')
# cmd = "ps -aux |grep uecs_rec|awk \'{print \"sudo kill\",$2}\' | sh"
cmd = "ps -aux |grep uecs_main|awk \'{print \"sudo kill\",$2}\' | sh"
subprocess.call( cmd, shell=True )
HOST = ''
PORT = 16520
s =socket(AF_INET,SOCK_DGRAM)
s.bind((HOST, PORT))
data=[]
i = 0
for i in range(0, 100):
i+=1
msg, address = s.recvfrom(8192)
if msg == ".":
print("Sender is closed")
break
XmlData = msg.decode('utf-8')
root = ET.fromstring(XmlData)
for child in root:
#print(child.tag)
if child.tag == "DATA" :
c = child.get("type")
type = str(c[0:c.find('.')])
table = type + "_" + child.get("room") + "_" + child.get("region") + "_" + child.get("order") + "_" + child.get("priority")
data.append(table.lower())
data_uniq = []
for x in data:
i += 1
print(" Now Loading..... " + str(i) +" "+ x)
if x not in data_uniq:
data_uniq.append(x)
return sorted(data_uniq)
def make_cfg_first(filepath):
with open(filepath, 'w') as cfg_new:
cfg_new.write('\r\n[user]')
# cfg_new.write('\r\n; https://support.google.com/accounts/answer/185833')
cfg_new.write('\r\ngmail=')
cfg_new.write('\r\npass=')
cfg_new.write('\r\nname=')
cfg_new.write('\r\nrecipient= \r\n')
cfg_new.write('\r\n[locate]')
cfg_new.write('\r\nlongitude = 139.767')
cfg_new.write('\r\nlatitude = 35.681 \r\n')
table=[]
table = serch_table()
cfg_new.write('\r\n[table_name]')
for s in table:
cfg_new.write('\r\n'+ s + ' = ')
cfg_new.write('\r\n')
cfg_new.write('\r\n[influx_db]')
cfg_new.write('\r\nlocal_host=localhost')
cfg_new.write('\r\nuser_name=root')
cfg_new.write('user_pass=root')
cfg_new.write('\r\ninterval=10')
cfg_new.write('\r\ndb_name=uecs \r\n')
cfg_new.write('\r\n')
cfg_new.write('\r\n[influx_db_cloud]')
cfg_new.write('\r\ncloud_host=')
cfg_new.write('\r\nuser_name=root')
cfg_new.write('user_pass=root')
cfg_new.write('\r\ninterval=10')
cfg_new.write('\r\ndb_name=uecs \r\n')
cfg_new.write('\r\n[alertmail]')
cfg_new.write('\r\n')
cfg_new.write('\r\n[openweathermap]')
# cfg_new.write('\r\n; https://home.openweathermap.org/users/sign_in')
cfg_new.write('\r\nowm_key = \r\n')
cfg_new.write('\r\n[twitter]')
# cfg_new.write('\r\n; https://apps.twitter.com/')
cfg_new.write('\r\nCONSUMER_KEY =')
cfg_new.write('\r\nCONSUMER_SECRET =')
cfg_new.write('\r\nACCESS_KEY =')
cfg_new.write('\r\nCCESS_SECRET = \r\n')
cfg_new.write('\r\n[yahoo_weather]')
cfg_new.write('\r\nAPP_ID =')
cfg_new.write('\r\ninterval=')
cfg_new.write('\r\ntype = ')
cfg_new.write('\r\nroom = ')
cfg_new.write('\r\nregion = ')
cfg_new.write('\r\norder = ')
cfg_new.write('\r\npriority = ')
cfg_new.write('\r\ndistance1 = ')
cfg_new.write('\r\ndistance2 = ')
cfg_new.write('\r\nsend_ccm_time = 20')
cfg_new.write('\r\nsend_ccm_range = \r\n')
# read uecs_c.cfg
filepath = os.path.dirname(os.path.abspath(__file__))
filepath = os.path.dirname(filepath) + '/grafux.cfg'
if os.path.exists(filepath) is False:
make_cfg_first(filepath)
sys.exit()
config = configparser.ConfigParser()
config.read(filepath)
cfg = {}
for sect in config.sections() :
cfg[sect] = {}
for opt in config.options(sect) :
cfg[sect][opt] = config[sect][opt]
#remove config
if os.path.exists(filepath):
os.remove(filepath)
# make config
table=[]
table = serch_table()
table_opt =''
for s in table:
if s in dict(cfg['table_name']) :
table_opt = table_opt + '\r\n' + s + '= ' + cfg['table_name'][s]
else :
table_opt = table_opt + '\r\n' + s + '= '
with open(filepath, 'w') as cfg_new:
for key in cfg.keys():
cfg_new.write('\r\n[%s]' % (key))
if key == 'table_name' :
cfg_new.write(table_opt)
for opt in cfg[key].keys():
if key != 'table_name' :
cfg_new.write('\r\n' + opt + '=' + cfg[key][opt])
cfg_new.write('\r\n')
# print grafux.cfg
fin = codecs.open(filepath,'r','utf-8')
fout = ""
for line in fin:
fout = fout + str(line)
print(fout)
#---------------------------------------------------
# yahoo weather YOLP
# reference : https://qiita.com/takahirono7/items/01b1ef6bd364fffdb3c4
# https://developer.yahoo.co.jp/webapi/map/openlocalplatform/v1/weather.html
# APIsample : http://weather.olp.yahooapis.jp/v1/place?appid=dj00aiZpPVVyTHJ3VkdmNkVvMiZzPWNvbnN1bWVyc2VjcmV0Jng9ZTI-&coordinates=139.767,35.681&output=json&interval=5
#---------------------------------------------------
#!/usr/bin/python
# encoding:utf-8
import urllib.request,pprint,json
import Exec_Influxdb
from influxdb import InfluxDBClient
import sys,time,os,math
from datetime import datetime
import Initial_set,configparser
import geohash
def insert_ob_influxdb(config,host_name,table,data,lat,lon,hash,dirct,type,epoch_time):
db_user,db_pass,db_database = Exec_Influxdb.set_dbuser(config,host_name)
client = InfluxDBClient(host_name, 8086, db_user, db_pass, db_database)
json_body = [
{
"measurement": table,
"tags": {
"direction":dirct
},
"time": epoch_time,
"precision": "s",
"fields": {
"ob" : data["ob"],
"geohash" :hash
}
}
]
print("influxdb(%s): %s" %(host_name,json_body))
client.write_points(json_body)
def insert_ccm_influxdb(config,host_name,table,ccm_data):
db_user,db_pass,db_database = Exec_Influxdb.set_dbuser(config,host_name)
client = InfluxDBClient(host_name, 8086, db_user, db_pass, db_database)
json_body = [
{
"measurement": table,
"precision": "s",
"fields": {
"value" : float(ccm_data[0]),
"direction" : str(ccm_data[1]),
"fc_time" : str(ccm_data[2])
}
}
]
print("influxdb(%s): %s" %(host_name,json_body))
client.write_points(json_body)
def insert_fc_influxdb(config,host_name,table,data,lat,lon,hash,dirct,type):
db_user,db_pass,db_database = Exec_Influxdb.set_dbuser(config,host_name)
client = InfluxDBClient(host_name, 8086, db_user, db_pass, db_database)
# now = datetime.now()
# epoch = datetime_to_epoch(now) * 1000000000'
json_body = [
{
"measurement": table,
"tags": {
"direction":dirct
},
# speed up "time": epoch,
"precision": "s",
"fields": {
# "ob" : data["ob"],
"fc5" : data.get(type + "5",None),
"fc10": data.get(type + "10",None),
"fc15": data.get(type + "15",None),
"fc20": data.get(type + "20",None),
"fc25": data.get(type + "25",None),
"fc30": data.get(type + "30",None),
"fc35": data.get(type + "35",None),
"fc40": data.get(type + "40",None),
"fc45": data.get(type + "45",None),
"fc50": data.get(type + "50",None),
"fc55": data.get(type + "55",None),
"fc60": data.get(type + "60",None),
"geohash" :hash ,
"lat" :lat,
"lon" :lon
}
}
]
print("influxdb(%s): %s" %(host_name,json_body))
client.write_points(json_body)
def get_coordinates(config):
lon = config['locate']['longitude']
lat = config['locate']['latitude']
if config['yahoo_weather']['distance1'] =='':
config['yahoo_weather']['distance1'] = '2.5' #Defalut set 2.5km
if config['yahoo_weather']['distance2'] =='':
config['yahoo_weather']['distance2'] = str(float(config['yahoo_weather']['distance1']) *2) # Default set distance1 * 2
D={} # return value dict
Center={} #nest dict
Center['km'] = '0'
Center['lon'] = lon
Center['lat'] = lat
Center['hash'] = geohash.encode(float(lat),float(lon))
D['Center'] = Center
for distance in [config['yahoo_weather']['distance1'],config['yahoo_weather']['distance2']]:
for s in ['N','E','S','W']:
N={} # nest dict
E={} # nest dict
S={} # nest dict
W={} # nest dict
coord = round(float(distance)*0.01,4)
if s== 'N': # North
N['km'] =distance
N['lon'] = str(round(float(lon) + coord ,4))
N['lat'] = lat
N['hash'] = geohash.encode(float(N['lat']),float(N['lon']))
D[s + distance] = N
elif s == 'E': # East
E['km'] = distance
E['lon'] = lon
E['lat'] = str(round(float(lat) + coord ,4))
E['hash'] = geohash.encode(float(E['lat']),float(E['lon']))
D[s + distance] = E
elif s == 'S': # South
S['km'] = distance
S['lon'] = str(round(float(lon) - coord ,4))
S['lat'] = lat
S['hash'] = geohash.encode(float(S['lat']),float(S['lon']))
D[s + distance] = S
elif s == 'W': # West
W['km'] = distance
W['lon'] = lon
W['lat'] = str(round(float(lat) - coord ,4))
W['hash'] = geohash.encode(float(W['lat']),float(W['lon']))
D[s + distance] = W
del N,E,S,W
return D
def get_yahoo_weather(config,ip_add,Coord):
APP_ID = config['yahoo_weather']['App_ID']
lon = config['locate']['longitude']
lat = config['locate']['latitude']
list = Coord.keys()
COORDINATES=''
for s in list:
COORDINATES = COORDINATES + str(Coord[s]['lon']) + ',' + str(Coord[s]['lat']) + '%20'
# print(COORDINATES)
BASE_URL = "https://map.yahooapis.jp/weather/V1/place"
OUTPUT="json"
url = BASE_URL + "?appid=%s&coordinates=%s&output=%s&interval=5" % (APP_ID,COORDINATES,OUTPUT)
print (url)
response = urllib.request.urlopen(url).read()
j = json.loads(response.decode("utf-8"))
# pprint.pprint(j)
CCM=[]
ccm_data=[0,'','']
for k in range(len(j['Feature'])):
geo_lon = j['Feature'][k]['Geometry']['Coordinates'].split(",")[0]
geo_lat = j['Feature'][k]['Geometry']['Coordinates'].split(",")[1]
geo_hash = geohash.encode(round(float(geo_lat),4),round(float(geo_lon),4))
for s in list:
if str(Coord[s]['hash']) == str(geo_hash):
Dirct = s # Coord.keys()
km = float(Coord[s]['km'])
date = []
rainfall=[]
type =[]
data = {}
for i in range(len(j['Feature'][k]['Property']['WeatherList']['Weather'])):
type.append(j['Feature'][k]['Property']['WeatherList']['Weather'][i]['Type'])
now = datetime.now()
forcast_time = datetime.strptime(j['Feature'][k]['Property']['WeatherList']['Weather'][i]['Date'], '%Y%m%d%H%M')
delta = forcast_time - now
fc_time = math.floor(math.floor(delta.total_seconds()/60+5)/5)*5
# print(now,forcast_time,str(fc_time),j['Feature'][k]['Property']['WeatherList']['Weather'][i]['Rainfall'])
if type[i]=='observation':
epoch_time = int(time.mktime(forcast_time.timetuple())) * 1000000000
data['ob'] = float(j['Feature'][k]['Property']['WeatherList']['Weather'][i]['Rainfall'])*1.0
if fc_time >= 5 : #forcast time check
data[config['yahoo_weather']['type'] + str(fc_time)] = float(j['Feature'][k]['Property']['WeatherList']['Weather'][i]['Rainfall'])*1.0
# CCM data
if config['yahoo_weather']['send_ccm_time']=='':
config['yahoo_weather']['send_ccm_time']='20' # Defalut set 20 Minute
if config['yahoo_weather']['send_ccm_range']=='':
config['yahoo_weather']['send_ccm_range']='0' # Defalut set 0km = Center
m = int(config['yahoo_weather']['send_ccm_time'])//5 # check time
for mm in range(1,m+1):
if float(config['yahoo_weather']['send_ccm_range'])*1.0 >= km: #check range km
if ccm_data[0] < data[config['yahoo_weather']['type'] + str(mm*5)]: # check MAX data
ccm_data[0] = data[config['yahoo_weather']['type'] + str(mm*5)]
ccm_data[1] = str(Dirct)
ccm_data[2] = 'fc' + str(mm*5)
print('Coordnates: ' + ccm_data[1] + ' forcast:' + str(ccm_data[2]) + ' CCM :' + str(ccm_data[0]))
# local Forcast & Observation
insert_fc_influxdb(config,config['influx_db']['local_host'],'YOLP',data,geo_lat,geo_lon,geo_hash,Dirct,config['yahoo_weather']['type'])
insert_ob_influxdb(config,config['influx_db']['local_host'],'YOLP_ob',data,geo_lat,geo_lon,geo_hash,Dirct,config['yahoo_weather']['type'],epoch_time)
# cloud Forcast & Observation
if config['influx_db_cloud']['cloud_host']!='':
insert_fc_influxdb(config,config['influx_db_cloud']['cloud_host'],'YOLP',data,geo_lat,geo_lon,geo_hash,Dirct,config['yahoo_weather']['type'])
insert_ob_influxdb(config,config['influx_db_cloud']['cloud_host'],'YOLP_ob',data,geo_lat,geo_lon,geo_hash,Dirct,config['yahoo_weather']['type'],epoch_time)
del date,rainfall,type,data
# CCM SEND
type = config['yahoo_weather']['type'] + '.oMC'
msg = "<?xml version=\"1.0\"?><UECS ver=\"1.00-E10\">"
msg = msg + "<DATA type=\"" + type + "\""
msg = msg + " room=" + "\"" + config['yahoo_weather']['room'] + "\""
msg = msg + " region=" + "\"" + config['yahoo_weather']['region'] + "\""
msg = msg + " order=" + "\"" + config['yahoo_weather']['order'] + "\""
msg = msg + " priority=" + "\"" + config['yahoo_weather']['priority'] + "\">"
msg = msg + str(ccm_data[0]) + "</DATA>"
msg = msg + "<IP>" + ip_add + "</IP></UECS>"
CCM.append(msg)
# print(msg)
insert_ccm_influxdb(config,config['influx_db']['local_host'],'CCM',ccm_data)
if config['influx_db_cloud']['cloud_host']!='':
insert_ccm_influxdb(config,config['influx_db_cloud']['cloud_host'],'CCM',ccm_data)
return CCM
"""
#filepath = os.path.dirname(os.getcwd()) + '/uecs_c.cfg'
filepath = os.path.dirname(os.path.abspath(__file__))
filepath = os.path.dirname(filepath) + '/uecs_c.cfg'
config = configparser.ConfigParser()
config.read(filepath)
Coord ={}
Coord = get_coordinates(config)
print(Coord)
get_yahoo_weather(config,'192.168.1.13',Coord)
#print("test")
"""
#-------------------------------------------------------------------------------
# Name: UECS protcol to (swlite3 and influxdb)
# Purpose:
# referencd : http://localhost:8083
# Author: ookuma
# InfluxDB Setup Lib: git clone git://github.com/influxdata/influxdb-python.git
# sudo python3 setup.py install
# curl -G 'http://localhost:8086/query?pretty=true' --data-urlencode "db=test" --data-urlencode "q=SELECT * FROM cpu WHERE region='jp'"
# curl -i -XPOST 'http://localhost:8086/write?db=test' --data-binary 'cpu,host=server04,region=jp value=0.99 '
# Created:
# Copyright: (c) ookuma 2015
# Licence: <your licence>
#-------------------------------------------------------------------------------
#!/usr/bin python3
# -*- coding: utf-8 -*-
import sys,time,os
from datetime import datetime
import Initial_set,configparser
import Exec_sqlite,Exec_Influxdb
def db_write(root,config,FLG_UP,FLG_DIFF):
data=[]
for child in root:
#print(child.tag)
if child.tag == "DATA" :
c = child.get("type")
type = str(c[0:c.find('.')])
table = type + "_" + child.get("room") + "_" + child.get("region") + "_" + child.get("order") + "_" + child.get("priority")
data.append(child.text)
if child.tag == "IP" :
data.append(child.text)
dt = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
table = table.lower() #small charactor
now_data =''
if table in FLG_DIFF :
script = "select avg(value) from " + table + "_diff"
rtn = Exec_sqlite.exec(config,"select", script, table + "_diff") # create_table_day
if rtn is not None:
now_data = float(data[0]) - float(rtn)
#delete data
script = "delete from " + table + "_diff"
Exec_sqlite.exec(config,"delete", script, table + "_diff")
#insert into table_diff
script = "insert into " + table + "_diff(time,ip_add, value) values (\'%s\',\'%s\',%f )" % (dt, data[1], float(data[0]))
Exec_sqlite.exec(config,"insert", script, table + "_diff")
if table in FLG_UP : #
if table in FLG_DIFF:
data[0] = now_data
if config['table_name'][table] in ("on","off"):
data[0] =str(round(float(data[0])))
#print(table,data[0])
#Loacl influxDB
if config['influx_db']['local_host'] != '' :
Exec_Influxdb.insert_influxdb(config,config['influx_db']['local_host'],table,float(data[0])*1.0,data[1],'0',None)
#Cloud influxDB
# if config['influx_db_cloud']['cloud_host'] != '' :
# Exec_Influxdb.cloud_data_up(config,table)
# print(table)
print('process id:'+ str(os.getpid()) + ' ' + table +'('+ str(data[0])+')')
# time.sleep(0.5)
#-------------------------------------------------------------------------------
# Name: InfluxDB Downsampling
# Purpose: Downsampling and Method for management of empty capacity
# referencd : http://localhost:8083
# Author: ookuma
# Created:
# Copyright: (c) ookuma 2017
# Licence: <your licence>
#-------------------------------------------------------------------------------
#!/usr/bin python3
# -*- coding: utf-8 -*-
import sys,time,os
from datetime import datetime, timedelta, timezone,date
import Initial_set,configparser
import Exec_Influxdb
"""
# read config
filepath = os.path.dirname(os.path.abspath(__file__))
filepath = os.path.dirname(filepath) + '/grafux.cfg'
config = configparser.ConfigParser()
config.read(filepath)
# FLG_Cloud up
FLG_UP=[]
FLG_DIFF=[]
FLG_MAX=[]
FLG_UP,FLG_DIFF,FLG_MAX,FLG_ABC = Initial_set.chk_table()
"""
def downsamling(config,FLG_UP,FLG_DIFF,FLG_MAX,FLG_ABC):
table =[]
for table in FLG_UP:
func = ' mean(value) '
if table in FLG_MAX :
func = ' max(value) '
elif table in FLG_DIFF :
func = ' sum(value) '
print('DownSampling : ' + table )
Exec_Influxdb.downsampling(config,config['influx_db']['local_host'],table,func)
if config['influx_db_cloud']['cloud_host'] != '':
Exec_Influxdb.downsampling(config,config['influx_db_cloud']['cloud_host'],table,func)
path = '/'
st = os.statvfs(path)
# G byte
total = round(st.f_frsize * st.f_blocks/1000000000.0 ,2)
used = round(st.f_frsize * (st.f_blocks - st.f_bfree)/1000000000.0 ,2)
free = round(st.f_frsize * st.f_bavail/1000000000.0 ,2)
print('total: ', total)
print(' used: ', used)
print(' free: ', free)
used = round((used / total * 100.0),2)
script=( 'delete from disk_capa')
Exec_Influxdb.delete_influxdb(config,config['influx_db']['local_host'],'disk_capa',script)
Exec_Influxdb.insert_influxdb(config,config['influx_db']['local_host'],'disk_capa',used,None,'1',None)
if config['influx_db_cloud']['cloud_host'] != '':
Exec_Influxdb.delete_influxdb(config,config['influx_db_cloud']['cloud_host'],'disk_capa',script)
Exec_Influxdb.insert_influxdb(config,config['influx_db_cloud']['cloud_host'],'disk_capa',used,None,'1',None)
print(str(used)+'%')
if used > 90 :
print("no disk")
for table in FLG_UP:
r = {}
script = 'select * from %s limit 2' % (table)
r = Exec_Influxdb.select_influxdb(config,config['influx_db']['local_host'],table,None,None,script)
for x in r:
for i in x:
old_time = x[1]['time']
break
break
print(str(old_time))
del_day= str(24*20)+'h' #delete 20days
script = 'delete from %s where time < (\'%s\' - %s)' % (table,str(old_time),del_day)
Exec_Influxdb.delete_influxdb(config,config['influx_db']['local_host'],table,script)
else :
print("ok disk")
#-------------------------------------------------------------------------------
# Name: UECS Aggregate
# Purpose: ABCD average
# referencd : http://localhost:8083
# Author: ookuma
# Created:
# Copyright: (c) ookuma 2017
# Licence: <your licence>
#-------------------------------------------------------------------------------
#!/usr/bin python3
# -*- coding: utf-8 -*-
import os
import Initial_set,configparser
import Exec_Influxdb,CalSun
from chk_process import checkURL
from datetime import datetime
def aggregate(config,FLG_UP,FLG_DIFF,FLG_MAX,FLG_ABC):
for table in FLG_ABC:
# base table
script ='select * from %s order by time asc' % (table)
result = Exec_Influxdb.select_influxdb(config,config['influx_db']['local_host'],table,None,script)
table_list = []
for x in result:
for i in range(0,len(x)):
day = x[i]['time'].split('T')[0]
time = x[i]['time'].split('T')[1].split(':')
table_list.append(day)
# aggregate table
script =('delete from ABC_%s where time >= now() - 2d ') % (table) # delete 2days
Exec_Influxdb.delete_influxdb(config,config['influx_db']['local_host'] ,'ABC_'+table,script)
if config['influx_db_cloud']['cloud_host'] !='':
Exec_Influxdb.delete_influxdb(config,config['influx_db_cloud']['cloud_host'] ,'ABC_'+table,script)
script ='select * from ABC_%s where time < now() - 2d order by time asc ' % (table)
result = Exec_Influxdb.select_influxdb(config,config['influx_db']['local_host'],table,None,script)
ag_list = []
for x in result:
for i in range(0,len(x)):
day = x[i]['time'].split('T')[0]
time = x[i]['time'].split('T')[1].split(':')
ag_list.append(day)
# difference
tb = set(table_list)
ag = set(ag_list)
dif = tb -ag
print(dif)
for d in dif:
print(d)
basedate1 = d.replace('-','/') + ' 12:00:00' #12h
basedate2 = datetime.strptime(basedate1, '%Y/%m/%d %H:%M:%S') # str2datetime
now_jst,pre_sr,pre_ss,nex_sr,nex_ss,H12,H12_utc,H00,H24 = CalSun.sun_exec(basedate2,config['locate']['longitude'],config['locate']['latitude'])
data=[]
print('START !! '+ table)
# A_table Night from pre_sunset to sunrise
print("A_table make")
print("Night",pre_ss,pre_sr)
data.append(Exec_Influxdb.ABC_data(config,config['influx_db']['local_host'],table,pre_ss,pre_sr))
# B_table AM from sunrise to 12:00 and same day
print("B_table make")
print("AM",pre_sr,H12)
data.append(Exec_Influxdb.ABC_data(config,config['influx_db']['local_host'],table,pre_sr,H12))
# C_table PM from 12:00 to sunset
print("C_table make")
print("PM",H12,nex_ss)
data.append(Exec_Influxdb.ABC_data(config,config['influx_db']['local_host'],table,H12,nex_ss))
# D_table daytime from sunrise to sunset
print("D_table make")
print("DAY",pre_sr,nex_ss)
data.append(Exec_Influxdb.ABC_data(config,config['influx_db']['local_host'],table,pre_sr,nex_ss))
# Diff D_table-A_table
if data[3] is not None:
data.append(data[3]-data[0])
else:
data.append(None)
# today avg
print("today avg")
print("TODAY",H00,H24)
data.append(Exec_Influxdb.ABC_data(config,config['influx_db']['local_host'],table,H00,H24))
print("data: ")
print(data)
for d in data:
if d is not None:
sr = datetime.fromtimestamp(int(pre_sr)/1000000000.0) #sunrise
ssr = str(sr.year) +'/'+ str(sr.month) +'/'+ str(sr.day) +'T'+ str(sr.hour) +':' + str(sr.minute)
ss = datetime.fromtimestamp(int(nex_ss)/1000000000.0) #sunset
sss = str(ss.year) +'/'+ str(ss.month) +'/'+ str(ss.day) +'T'+ str(ss.hour) +':' + str(ss.minute)
Exec_Influxdb.aggregate_influxdb(config,config['influx_db']['local_host'] , table,data,None,H12_utc,ssr,sss)
if config['influx_db_cloud']['cloud_host'] !='':
Exec_Influxdb.aggregate_influxdb(config,config['influx_db_cloud']['cloud_host'] , table,data,None,H12_utc,ssr,sss)
break
del tb,ag,dif
"""
filepath = os.path.dirname(os.path.abspath(__file__))
filepath = os.path.dirname(filepath) + '/grafux.cfg'
config = configparser.ConfigParser()
config.read(filepath)
# FLG up
FLG_UP=[]
FLG_DIFF=[]
FLG_MAX=[]
FLG_ABC=[]
FLG_UP,FLG_DIFF,FLG_MAX,FLG_ABC = Initial_set.chk_table(config)
aggregate(config,FLG_UP,FLG_DIFF,FLG_MAX,FLG_ABC)
"""
import os
import configparser
import Exec_sqlite
# -*- coding: utf-8 -*-
def chk_table(config):
#read cfg
# filepath = os.path.dirname(os.getcwd()) + '/grafux.cfg'
# filepath = os.path.dirname(os.path.abspath(__file__))
# filepath = os.path.dirname(filepath) + '/grafux.cfg'
# config = configparser.ConfigParser()
# config.read(filepath)
FLG_UP=[]
FLG_DIFF=[]
FLG_MAX=[]
FLG_ABC=[]
for tb in config.options('table_name'):
# create data store table
if config['table_name'][tb] !="":
FLG_UP.append(tb)
# last data table
if config['table_name'][tb] == "diff":
FLG_DIFF.append(tb)
tb_diff = tb + "_diff"
script = "create table if not exists " + tb_diff + "(time datetime not null,ip_add varchar(20) not null, value float not null);"
print(tb_diff +' : ' + script)
Exec_sqlite.exec(config,'create',script,tb_diff) #create_table_sum
# aggregate
if config['table_name'][tb] in ("on","off"):
FLG_MAX.append(tb)
if config['table_name'][tb] == "abc":
FLG_ABC.append(tb)
return FLG_UP,FLG_DIFF,FLG_MAX,FLG_ABC
"""
def chk_Aggregate():
#read cfg
# filepath = os.path.dirname(os.getcwd()) + '/uecs_c.cfg'
filepath = os.path.dirname(os.path.abspath(__file__))
filepath = os.path.dirname(filepath) + '/uecs_c.cfg'
config = configparser.ConfigParser()
config.read(filepath)
FLG_ABC=[]
FLG_SUM=[]
FLG_MAX=[]
for tb in config.options('table_name'):
# create data store table
if config['table_name'][tb] =="abc":
FLG_ABC.append(tb)
if config['table_name'][tb] =="diff":
FLG_SUM.append(tb)
if config['table_name'][tb] in ("on","off"):
FLG_MAX.append(tb)
return FLG_ABC,FLG_SUM,FLG_MAX
"""
#!/usr/bin python3
# -*- coding: utf-8 -*-
# parameter
#
# cmd : not null : "None,create,insert,update,"
# script : not null : "None, select * from table" or "update_table.sql"
# root_table : null :table
#Initialset_create table
import sqlite3,os
def exec(config,cmd ,script,table):
#db_file = os.path.dirname(os.getcwd()) + '/data/db.sqlite3'
filepath = os.path.dirname(os.path.abspath(__file__))
db_file = os.path.dirname(filepath) + '/data/db.sqlite3'
if not os.path.exists(db_file):
open(db_file, 'w')
conn = sqlite3.connect(db_file)
c = conn.cursor()
if cmd in ('delete','insert','create') : # return is None
c.execute(script)
rtn = 0
else : # select
c.execute(script)
rtn = c.fetchone()[0]
conn.commit()
conn.close()
return rtn
# cnt!= 0 data ari
#script="create table if not exists wairtemp_1_2_1_1 (time datetime not null,ip_add varchar(20) not null, value varchar(20) not null);"
#exec("create",script,"wairtemp_1_2_1_1",None)
#exec("create","sqlite_create_table.sql","wairtemp_1_2_1_1" , "wairtemp_1_2_1_1_day") #create_table_day
#print("test")
#!/usr/bin python3
# -*- coding: utf-8 -*-
#from pprint import pprint
from influxdb import InfluxDBClient
import sys,time,os,json
import Initial_set,configparser
def set_dbuser(config,host_name):
# local db set
db_user = config['influx_db']['user_name']
db_pass = config['influx_db']['user_pass']
db_database = config['influx_db']['db_name']
if host_name != config['influx_db']['local_host']:
# cloud db set
db_user = config['influx_db_cloud']['user_name']
db_pass = config['influx_db_cloud']['user_pass']
db_database = config['influx_db_cloud']['db_name']
return db_user,db_pass,db_database
def insert_influxdb(config,host_name,table,data,host_ip,tag,time):
db_user,db_pass,db_database = set_dbuser(config,host_name)
client = InfluxDBClient(host_name, 8086, db_user, db_pass, db_database)
# now = datetime.now()
# epoch = datetime_to_epoch(now) * 1000000000'
if time is None :
json_body = [
{
"measurement": table,
"tags": {
"sum": tag
},
# speed up "time": epoch,
"precision": "s",
"fields": {
"value": data
}
}
]
else :
json_body = [
{
"measurement": table,
"tags": {
"sum": tag
},
"time": time,
"precision": "s",
"fields": {
"value": data
}
}
]
print("influxdb(%s): %s" %(host_name,json_body))
client.write_points(json_body)
def delete_influxdb(config,host_name,table,script):
db_user,db_pass,db_database = set_dbuser(config,host_name)
client = InfluxDBClient(host_name, 8086, db_user, db_pass, db_database)
if script is None:
script=( 'delete from %s where sum = \'0\' and time < now()-24h; ' % (table))
client.query(script)
def select_influxdb(config,host_name,table,func,script):
db_user,db_pass,db_database = set_dbuser(config,host_name)
client = InfluxDBClient(host_name, 8086, db_user, db_pass, db_database)
if script is None:
interval = config['influx_db']['interval'] + 'm'
script = ('SELECT %s as value FROM %s where time < now()-24h and sum = \'0\' GROUP BY time(%s) ' % (func,table,interval))
result = client.query(script)
return result
def downsampling(config,host_name,table,func):
r={}
insert_data={}
r = select_influxdb(config,host_name,table,func,None)
for x in r:
for i in range(0,len(x)):
# print(x[i]['time'],x[i]['value'])
if x[i]['value'] is not None:
print(x[i]['time'],x[i]['value'])
insert_influxdb(config,host_name,table,x[i]['value']*1.0,None,'1',x[i]['time'])
delete_influxdb(config,host_name,table,None)
print('downsampling !')
def cloud_data_up(config,table):
script = 'SHOW MEASUREMENTS'
r = select_influxdb(config,config['influx_db_cloud']['cloud_host'],table,None,script)
table_list=[]
for x in r:
for i in range(0,len(x)):
table_list.append(x[i]['name'])
# print(table_list)
if table not in table_list:
script_A='select * from %s order by time asc limit 30' % (table)
if table in table_list:
script='select * from ' + table + ' order by time desc limit 1'
r = select_influxdb(config,config['influx_db_cloud']['cloud_host'],table,None,script)
for x in r:
print(table , x[0]['time'])
script_A='select * from %s where time > \'%s\' order by time asc limit 30' % (table , x[0]['time'])
# Get upload data
print(script_A)
r_local = select_influxdb(config,config['influx_db']['local_host'],table,None,script_A)
data = list(r_local.get_points(measurement=table))
json_body = []
for i in data:
json_body.append({
"measurement": table,
"tags": {
"sum": i['sum']
},
"time": i['time'],
"fields": {
"value": float(i['value'])*1.0
}
})
host_name = config['influx_db_cloud']['cloud_host']
db_user,db_pass,db_database = set_dbuser(config,host_name)
client = InfluxDBClient(host_name, 8086, db_user, db_pass, db_database)
print("influxdb(%s): %s" %(host_name,json_body))
client.write_points(json_body)
def aggregate_influxdb(config,host_name,table,data,host_ip,time,sr,ss):
db_user,db_pass,db_database = set_dbuser(config,host_name)
client = InfluxDBClient(host_name, 8086, db_user, db_pass, db_database)
json_body = [
{
"measurement": 'ABC_'+table,
# "tags": {
# "class": tag
# },
"time": time,
"precision": "m",
"fields": {
"Sunrise":sr,
"Sunset":ss,
"Night": data[0] ,
"AM": data[1] ,
"PM": data[2] ,
"Day time":data[3] ,
"Day-Night":data[4],
"TodayAVG":data[5]
}
}
]
print("influxdb(%s): %s" % (host_name,json_body))
client.write_points(json_body)
def ABC_data(config,host_name,table,start,end):
db_user,db_pass,db_database = set_dbuser(config,host_name)
client = InfluxDBClient(host_name, 8086, db_user, db_pass, db_database)
# script = ('select time,mean(value) as value from %s where time > %s and time < %s tz(\'Asia/Tokyo\')') % (table,start,end)
script = ('select time,mean(value) as value from %s where time > %s and time < %s ') % (table,start,end)
r = select_influxdb(config,config['influx_db']['local_host'] ,'ABC_'+table,None,script)
print(script)
for x in r:
for i in range(0,len(x)):
if x[i]['value'] is not None:
return x[i]['value']*1.0
else:
return None
#-------------------------------------------
# UTC Sunrise Sunset Calculate
# reference: http://rhodesmill.org/pyephem/rise-set.html
# install : sudo pip3 install pyephem
#-------------------------------------------
#!/usr/bin/env python
import ephem,time
#import datetime, time
from datetime import datetime, timedelta, timezone
def date2epoch(daytime):
dt = str(daytime).split('-')
dt_day = dt[2].split(' ')
dt_time = dt_day[1].split(':')
year= int(dt[0])
month = int(dt[1])
day = int(dt_day[0])
h= int(dt_time[0])
m= int(dt_time[1])
t = datetime(year,month,day,h,m)
return str(time.mktime(t.timetuple())).split('.')[0] + '000000000'
def sun_exec(basedate,lon,lat):
#ephem
pl = ephem.Observer()
pl.lat, pl.lon , pl.elevation = lat,lon ,0
# pl.date = basedate
# now=datetime.now()
# t=datetime(now.year,now.month,now.day,12,00)
t=datetime(basedate.year,basedate.month,basedate.day,basedate.hour,basedate.minute)
unix_time=int(time.mktime(t.timetuple()))
pl.date = datetime.utcfromtimestamp(unix_time)
pre_sunrise_utc = pl.previous_rising(ephem.Sun())
pre_sunset_utc = pl.previous_setting(ephem.Sun())
nex_sunrise_utc = pl.next_rising(ephem.Sun())
nex_sunset_utc = pl.next_setting(ephem.Sun())
pre_sunset_jst = ephem.localtime(pre_sunset_utc)
pre_sunrise_jst = ephem.localtime(pre_sunrise_utc)
nex_sunrise_jst = ephem.localtime(nex_sunrise_utc)
nex_sunset_jst = ephem.localtime(nex_sunset_utc)
JST = timezone(timedelta(hours=+9), 'JST')
UTC = timezone(timedelta(hours=-9), 'UTC')
now_jst = datetime.now(JST).strftime('%s')+'000000000'
pre_sr = date2epoch(pre_sunrise_jst)
pre_ss = date2epoch(pre_sunset_jst)
nex_sr = date2epoch(nex_sunrise_jst)
nex_ss = date2epoch(nex_sunset_jst)
H12s = str(basedate.year) +'/'+ str(basedate.month) +'/'+ str(basedate.day) + ' 12:00:00' #12h
H12dt = datetime.strptime(H12s, '%Y/%m/%d %H:%M:%S') # str2datetime
H12 = H12dt.strftime('%s') + '000000000'
H00s= str(basedate.year) +'/'+ str(basedate.month) +'/'+ str(basedate.day) + ' 00:00:00' #0h
H00dt = datetime.strptime(H00s, '%Y/%m/%d %H:%M:%S') # str2datetime
H00 = H00dt.strftime('%s') + '000000000'
H24s= str(basedate.year) +'/'+ str(basedate.month) +'/'+ str(basedate.day) + ' 23:59:59' #24h
H24dt = datetime.strptime(H24s, '%Y/%m/%d %H:%M:%S') # str2datetime
H24 = H24dt.strftime('%s') + '000000000'
# H12_utc = H12dt - timedelta(hours=9) 12:00
H12_utc = H12dt - timedelta(hours=12) #09:00
return now_jst,pre_sr,pre_ss,nex_sr,nex_ss,H12,H12_utc,H00,H24
"""
pre_sunset_jst = ephem.localtime(pre_sunset_utc)
pre_sunrise_jst = ephem.localtime(pre_sunrise_utc)
nex_sunrise_jst = ephem.localtime(nex_sunrise_utc)
nex_sunset_jst = ephem.localtime(nex_sunset_utc)
now = datetime.now()
now_utc = now.strftime('%s')+'000000000'
pre_sr = pre_sunrise_jst.strftime('%s')+'000000000'
pre_ss = pre_sunset_jst.strftime('%s')+'000000000'
H12s = datetime.now().strftime('%Y/%m/%d 12:00:00')
H12dt = datetime.strptime(H12s,'%Y/%m/%d %H:%M:%S')
H12 = H12dt.strftime('%s') + '000000000'
nex_sr = nex_sunrise_jst.strftime('%s')+'000000000'
nex_ss = nex_sunset_jst.strftime('%s')+'000000000'
return now_utc,pre_sr,pre_ss,H12,nex_sr,nex_ss
"""
#-------------------------------------------------------------------------------
# Name: UECS protcol to (swlite3 and influxdb)
# Purpose:
# referencd : http://localhost:8083
# Author: ookuma
# InfluxDB Setup Lib: git clone git://github.com/influxdata/influxdb-python.git
# sudo python3 setup.py install
# curl -G 'http://localhost:8086/query?pretty=true' --data-urlencode "db=test" --data-urlencode "q=SELECT * FROM cpu WHERE region='jp'"
# curl -i -XPOST 'http://localhost:8086/write?db=test' --data-binary 'cpu,host=server04,region=jp value=0.99 '
# Created:
# Copyright: (c) ookuma 2015
# Licence: <your licence>
#-------------------------------------------------------------------------------
#!/usr/bin python3
# -*- coding: utf-8 -*-
from socket import *
import sys,time,os,uecs_rec
import xml.etree.ElementTree as ET
from datetime import datetime
import Initial_set,configparser
from chk_process import checkURL
from multiprocessing import Process
import YOLP_weather3
import uecs_downsampling,uecs_aggregate2
if __name__ == '__main__':
# read config
#filepath = os.path.dirname(os.getcwd()) + '/uecs_c.cfg'
filepath = os.path.dirname(os.path.abspath(__file__))
filepath = os.path.dirname(filepath) + '/grafux.cfg'
config = configparser.ConfigParser()
config.read(filepath)
# get ipaddress
ip = socket(AF_INET, SOCK_DGRAM)
ip.connect(("8.8.8.8", 80))
ip_add = ip.getsockname()[0]
ip.close()
# FLG up
FLG_UP=[]
FLG_DIFF=[]
FLG_MAX=[]
FLG_ABC=[]
FLG_UP,FLG_DIFF,FLG_MAX,FLG_ABC = Initial_set.chk_table(config)
startm = int(datetime.now().strftime('%M'))
starts = startm * 60 + int(datetime.now().strftime('%S'))
ccm_interval= int(config['yahoo_weather']['interval'])
CCM=[]
Coord ={}
Coord = YOLP_weather3.get_coordinates(config)
startm_c = startm
starts_c = starts
cloud_interval= int(config['influx_db_cloud']['interval'])*60
starth = int(datetime.now().strftime('%H'))
# Capture start
HOST = ''
PORT = 16520
ADDRESS = "255.255.255.255"
s =socket(AF_INET,SOCK_DGRAM)
s.bind((HOST, PORT))
s.setsockopt(SOL_SOCKET, SO_BROADCAST, 1)
while True:
#receve ccm
msg, address = s.recvfrom(8192)
XmlData = msg.decode('utf-8')
root = ET.fromstring(XmlData)
p = Process(target=uecs_rec.db_write, args=(root,config,FLG_UP,FLG_DIFF))
p.start()
p.join()
# now time
nowh = int(datetime.now().strftime('%H'))
nowm = int(datetime.now().strftime('%M'))
nows = nowm * 60 + int(datetime.now().strftime('%S'))
# cloud up ccm_table only
if config['influx_db_cloud']['cloud_host'] != '':
if abs(nows - starts_c) > cloud_interval:
for s_table in FLG_UP:
c = Process(target=Exec_Influxdb.cloud_data_up, args=(config,s_table))
c.start()
c.join()
startm_c = int(datetime.now().strftime('%M'))
starts_c = startm_c * 60 + int(datetime.now().strftime('%S'))
print('cloud up process: ' + s_table)
#send ccm
if config['yahoo_weather']['App_ID'] !='':
# nowm = int(datetime.now().strftime('%M'))
# nows = nowm * 60 + int(datetime.now().strftime('%S'))
if abs(nowm-startm) >= 5 : # per 5m
del CCM
CCM = YOLP_weather3.get_yahoo_weather(config,ip_add,Coord)
startm = int(datetime.now().strftime('%M'))
starts = startm * 60 + int(datetime.now().strftime('%S'))
for msg in CCM :
s.sendto(msg.encode('utf-8'), (ADDRESS, PORT))
print(msg)
if abs(nows - starts) > ccm_interval and len(CCM)>0: # per X seconds send CCM
starts = int(datetime.now().strftime('%M'))*60 + int(datetime.now().strftime('%S'))
for msg in CCM :
s.sendto(msg.encode('utf-8'), (ADDRESS, PORT))
print(msg)
#per 1hour
if abs(nowh-starth) >= 1.0:
# downsampling
d = Process(target=uecs_downsampling.downsamling, args=(config,FLG_UP,FLG_DIFF,FLG_MAX,FLG_ABC))
d.start()
d.join()
# aggregate
e = Process(target=uecs_aggregate2.aggregate, args=(config,FLG_UP,FLG_DIFF,FLG_MAX,FLG_ABC))
e.start()
e.join()
starth = int(datetime.now().strftime('%H'))
s.close()
sys.exit