macrovve
5/15/2018 - 2:53 PM

MultiProcess RotatingFileHandler in Python

MultiProcess RotatingFileHandler in Python

import logging
import os
logger = logging.getLogger(__name__)


def test(value):
    msg = '[{}] value {}'.format(os.getpid(), value)
    logger.info(msg)
import logging
from multiprocessing import Pool
import os
from subproc import test

logger = logging.getLogger(__name__)

if __name__ == '__main__':
    import logging.config
    import yaml

    path = 'logging.yaml'
    if os.path.exists(path):
        with open(path, 'rt') as f:
            config = yaml.load(f.read())
        logging.config.dictConfig(config)

    test('begin')

    p = Pool(4)
    p.map(test, [1, 2, 3, 4])

    test('end')
subproc             INFO    [5432] value begin
subproc             INFO    [5432] value end
subproc             INFO    [20728] value begin
subproc             INFO    [20731] value 1
subproc             INFO    [20732] value 2
subproc             INFO    [20733] value 3
subproc             INFO    [20734] value 4
subproc             INFO    [20728] value end
from logging.handlers import TimedRotatingFileHandler
import multiprocessing, threading, logging, sys, traceback
import os


class MultiProcessingLog(logging.Handler):
    def __init__(self, name, mode, maxsize, rotate):
        logging.Handler.__init__(self)

        self._handler = TimedRotatingFileHandler(name, when, interval, rotate)
        self.queue = multiprocessing.Queue(-1)

        t = threading.Thread(target=self.receive)
        t.daemon = True
        t.start()

    def setFormatter(self, fmt):
        logging.Handler.setFormatter(self, fmt)
        self._handler.setFormatter(fmt)

    def receive(self):
        while True:
            try:
                record = self.queue.get()
                self._handler.emit(record)
                print('received on pid {}'.format(os.getpid()))
            except (KeyboardInterrupt, SystemExit):
                raise
            except EOFError:
                break
            except:
                traceback.print_exc(file=sys.stderr)

    def send(self, s):
        self.queue.put_nowait(s)

    def _format_record(self, record):
        # ensure that exc_info and args have been stringified. Removes any
        # chance of unpickleable things inside and possibly reduces message size
        # sent over the pipe
        if record.args:
            record.msg = record.msg % record.args
            record.args = None
        if record.exc_info:
            dummy = self.format(record)
            record.exc_info = None

        return record

    def emit(self, record):
        try:
            s = self._format_record(record)
            self.send(s)
        except (KeyboardInterrupt, SystemExit):
            raise
        except:
            self.handleError(record)

    def close(self):
        self._handler.close()
        logging.Handler.close(self)
from logging.handlers import RotatingFileHandler
import multiprocessing, threading, logging, sys, traceback
import os


class MultiProcessingLog(logging.Handler):
    def __init__(self, name, mode, maxsize, rotate):
        logging.Handler.__init__(self)

        self._handler = RotatingFileHandler(name, mode, maxsize, rotate)
        self.queue = multiprocessing.Queue(-1)

        t = threading.Thread(target=self.receive)
        t.daemon = True
        t.start()

    def setFormatter(self, fmt):
        logging.Handler.setFormatter(self, fmt)
        self._handler.setFormatter(fmt)

    def receive(self):
        while True:
            try:
                record = self.queue.get()
                self._handler.emit(record)
                print('received on pid {}'.format(os.getpid()))
            except (KeyboardInterrupt, SystemExit):
                raise
            except EOFError:
                break
            except:
                traceback.print_exc(file=sys.stderr)

    def send(self, s):
        self.queue.put_nowait(s)

    def _format_record(self, record):
        # ensure that exc_info and args have been stringified. Removes any
        # chance of unpickleable things inside and possibly reduces message size
        # sent over the pipe
        if record.args:
            record.msg = record.msg % record.args
            record.args = None
        if record.exc_info:
            dummy = self.format(record)
            record.exc_info = None

        return record

    def emit(self, record):
        try:
            s = self._format_record(record)
            self.send(s)
        except (KeyboardInterrupt, SystemExit):
            raise
        except:
            self.handleError(record)

    def close(self):
        self._handler.close()
        logging.Handler.close(self)
---
version: 1
disable_existing_loggers: False
formatters:
    simple:
        format: "%(name)-20s%(levelname)-8s%(message)s"
handlers:
    console:
        class: logging.StreamHandler
        level: DEBUG
        formatter: simple
        stream: ext://sys.stdout
    mplog:
        class: mplog.MultiProcessingLog
        level: DEBUG
        formatter: simple
        name: mplog.log
        mode: a
        maxsize: 1024
        rotate: 0
root:
    level: DEBUG
    handlers: [console, mplog]