recall704
9/5/2016 - 5:38 AM

main.py

# coding=utf-8
"""
说明: 采集日志文件 上送至数据库
作者:pengxin.wu 15645060726@163.com
创建时间:
"""

import input
import output
import filter
from common.dcoslog import logging
import gevent
from gevent.lock import BoundedSemaphore
import uuid
import monitor
import threading
from globalvar import g_theads, g_file_set
import sys
import os

sys.path.insert(1, os.path.dirname(os.path.abspath(__file__)))
# 创建一个信号量
g_sem = BoundedSemaphore(2)

log = logging.getLogger("loooooo")
log.add_file_handler("error.logcrawler.log", level=logging.ERROR)
log.add_stream_handler()
log.setLevel(logging.INFO)


def worker_read_send(file_rd, es_w):
    """
    读取文件内容 对内容处理后 输出到es
    :param file_rd:
    :param es_w:
    :return:
    """
    g_sem.acquire()
    setattr(g_theads[file_rd.filename], "name", "read_file_to_es")
    setattr(g_theads[file_rd.filename], "id", uuid.uuid4().hex)
    setattr(g_theads[file_rd.filename], "tid", threading.current_thread().ident)
    g_sem.release()
    uid = uuid.uuid4().hex
    tans = filter.Filter(file_rd.config)
    while True:
        # log.info("inter uid:%s" % uid)
        ret_buf = file_rd.gen.next()
        if not ret_buf:
            # 如果没有读取到文件内容 让出CPU
            # log.info("not ret", request_id=uid)
            gevent.sleep(2)
            # log.info("%s not find new contant" % file_rd.filename, request_id=uid)
            continue
        while True:
            ret = es_w.send_to_es(tans.trans_contant(ret_buf))
            if not ret[0]:
                # 如果没有发送成功 则再次尝试发送 直到发送成功 才能继续读取文件的内容
                log.error("%s" % ret[1], request_id=uid)
            else:
                break


def monitor_dir_change(config, file_set, theads):
    """
    监视指定文件夹的内容变化 如果有新增内容 需要创建新的协程去读取文件并上送到数据库 如果有删除的文件 需要删除处理这个文件的
    worker
    :param config: 配置文件
    :param file_set: 目前已经存在的文件集合
    :param theads: 已经启动的线程集合
    :return:
    """
    while True:
        new_file_set = input.list_all_files(config["input"]["path"], config["input"]["matchfile"])
        add = new_file_set - file_set
        delete = file_set - new_file_set
        if add:

            # 说明有新增的内容 要为新增的文件 创建协程
            log.info("new add %s" % add)
            for filename in add:
                infd = input.FileRead(filename, config)
                outws = output.EsWrite(infd.outconfig)
                theads[filename] = gevent.spawn(worker_read_send, infd, outws)
                file_set.add(filename)
        if delete:
            # 说明有文件被移除掉
            log.info("new delete %s" % delete)
            for filename in delete:
                if filename in theads:
                    g_sem.acquire()
                    # 删除此协程
                    theads[filename].kill()
                    g_sem.release()
                    theads.pop(filename)
                    input.FileRead.delete_fileinfo(filename)
                file_set.remove(filename)
        # 让出CPU
        gevent.sleep(10)


if __name__ == "__main__":
    conf = input.read_conf("./conf.yml")
    log.info(id(g_file_set))
    th = gevent.spawn(monitor_dir_change, conf, g_file_set, g_theads)
    setattr(th, "name", "monitor file change")
    setattr(th, "id", uuid.uuid4().hex)
    setattr(th, "tid", threading.current_thread().ident)
    #web_server = monitor.start_monitor_task()
    #setattr(web_server, "name", "web server for monitor")
    #setattr(web_server, "id", uuid.uuid4().hex)
    #setattr(web_server, "tid", threading.current_thread().ident)

    g_sem.acquire()
    g_theads["monitor_file_change"] = th
    #g_theads["web_server_for_monitor"] = web_server
    g_sem.release()

    gevent.wait()