AnyISalIn
8/2/2017 - 2:35 AM

water_quality collect

water_quality collect

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Float, Boolean

Base = declarative_base()
engine = create_engine('mssql+pymssql://sa:passwd1Q@192.168.20.183/tempdb')
Session = sessionmaker(bind=engine)
session = Session()


class Metric(Base):

    __tablename__ = 'metric'

    id = Column(Integer, primary_key=True)
    date = Column(String(200))
    time = Column(String(200))
    Turbid = Column(Float)
    TOC = Column(Float)
    COD = Column(Float)
    DOC = Column(Float)
    NH3 = Column(Float)

    sended = Column(Boolean, default=False)

    def __repr__(self):
        return '<Metric {} - {}>'.format(self.time, self.date)
from db import session, Metric, engine, Base
import csv
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

TITLE = ['date', 'time', 'Turbid', 'TOC', 'COD', 'DOC', 'NH3']


def read_data():
    with open('./water_quality.csv', 'rU') as f:
        reader = csv.reader(f)
        items = [row for row in reader][1:]
        for item in items:
            yield dict(zip(TITLE, item))


def main():
    Base.metadata.create_all(engine)

    for item in read_data():
        m = Metric(**item)
        logging.info('Send {}'.format(m))
        session.add(m)
        session.commit()

if __name__ == '__main__':
    main()
from db import session, Metric
from time import sleep
from datetime import datetime
from water_quality.security import WaterCipher

import copy
import json
import logging
import requests
import threading
import traceback

logger = logging.getLogger()
logger.setLevel(logging.INFO)

METRIC_KEY = ['date', 'time', 'Turbid', 'TOC', 'COD', 'DOC', 'NH3']

event = threading.Event()


def read_data():
    while not event.is_set():
        items = session.query(Metric).filter_by(sended=False).all()

        if not items:
            sleep(5)
            continue

        for item in items:
            try:
                logger.info('process metric {}'.format(item))
                data = {k: v for k, v in copy.deepcopy(
                    item.__dict__.items()) if k in METRIC_KEY}
                date = data.pop('date')
                time = data.pop('time')
                data['TIMESTAMP'] = datetime.strptime('{} {}'.format(
                    date, time), '%d/%m/%Y %H:%M').strftime('%s')
            except Exception:
                logging.warning(traceback.format_exc())
            yield data, item


def main():
    w = WaterCipher()
    try:
        for data, item in read_data():
            enc_data = w.encrypt(json.dumps(data))
            res = requests.post(
                'http://10.0.0.92:5000/api/water_quality', data=enc_data)
            if res.status_code > 400:
                logging.warning('some error {}'.format(res.json()))
            else:
                logging.info('success')
                item.sended = True
                session.add(item)
                session.commit()
    except KeyboardInterrupt:
        event.set()

if __name__ == '__main__':
    main()