EdvardM
7/18/2017 - 2:06 PM

Simple buffer for running arbitrary operations in batches

Simple buffer for running arbitrary operations in batches

from typing import TypeVar, List, Callable
import logging
from datetime import datetime

T = TypeVar('T')
Inserter = Callable[[List[T]], None]

class Buffer:
    """Buffer items until either max_size or max_time has been reached.

    params:
        inserter: function that takes a list of items and actually does
            something with them. It is called automatically with currently
            buffered items when conditions for flushing are met.
    """

    def __init__(self, inserter: Inserter, max_size: int = 100, max_time: int = 300, log_level='INFO') -> None:
        self.inserter = inserter
        self.max_size = max_size
        self.max_time = max_time
        self.items = []  # type: List[T]
        self.logger = logging.getLogger('Buffer')
        self.logger.setLevel(log_level.upper())

        # initialize to current date so that we don't need to handle Nones
        # separately
        self.last_flushed = self._now()
        self.logger.debug("Buffer created with max_size %d, max_time %ds", max_size, max_time)

    def add(self, item: T) -> None:
        self.items.append(item)

        if self._needs_flush():
            self.logger.debug("Max time/size exceeded, flushing")
            self.flush()

    def _needs_flush(self) -> bool:
        delta = self._now() - self.last_flushed
        return len(self.items) >= self.max_size or delta.total_seconds() >= self.max_time

    @staticmethod
    def _now():
        return datetime.now()

    def flush(self):
        self.logger.debug("Flushing %d items", len(self.items))
        self.inserter(self.items)
        self.last_flushed = self._now()
        self.items = []


# Usage:
#
# buf = Buffer(inserter)
# 
# for i in iterable:
#   buf.add(i)
# buf.flush() # flush in the end to ensure remaining items are processed
#
# `inserter` should be a function that takes a single argument, a list, and does whatever you want to do
# with the list. Buffer ensures the list is at most `max_items` large or at most `max_seconds` has elapsed before 
# passing it to `inserter`.