Simple buffer for running arbitrary operations in batches
from typing import TypeVar, List, Callable
import logging
from datetime import datetime
T = TypeVar('T')
Inserter = Callable[[List[T]], None]
class Buffer:
"""Buffer items until either max_size or max_time has been reached.
params:
inserter: function that takes a list of items and actually does
something with them. It is called automatically with currently
buffered items when conditions for flushing are met.
"""
def __init__(self, inserter: Inserter, max_size: int = 100, max_time: int = 300, log_level='INFO') -> None:
self.inserter = inserter
self.max_size = max_size
self.max_time = max_time
self.items = [] # type: List[T]
self.logger = logging.getLogger('Buffer')
self.logger.setLevel(log_level.upper())
# initialize to current date so that we don't need to handle Nones
# separately
self.last_flushed = self._now()
self.logger.debug("Buffer created with max_size %d, max_time %ds", max_size, max_time)
def add(self, item: T) -> None:
self.items.append(item)
if self._needs_flush():
self.logger.debug("Max time/size exceeded, flushing")
self.flush()
def _needs_flush(self) -> bool:
delta = self._now() - self.last_flushed
return len(self.items) >= self.max_size or delta.total_seconds() >= self.max_time
@staticmethod
def _now():
return datetime.now()
def flush(self):
self.logger.debug("Flushing %d items", len(self.items))
self.inserter(self.items)
self.last_flushed = self._now()
self.items = []
# Usage:
#
# buf = Buffer(inserter)
#
# for i in iterable:
# buf.add(i)
# buf.flush() # flush in the end to ensure remaining items are processed
#
# `inserter` should be a function that takes a single argument, a list, and does whatever you want to do
# with the list. Buffer ensures the list is at most `max_items` large or at most `max_seconds` has elapsed before
# passing it to `inserter`.