leafsummer
7/17/2017 - 8:11 AM

a writer thead to database with queue for handling the many requests

a writer thead to database with queue for handling the many requests

class AsyncCursor(object):
    def __init__(self, event, sql, params, timeout):
        self._event = event  # Used to signal when results are ready.
        self.sql = sql
        self.params = params
        self.timeout = timeout
        self._cursor = None
        self._rows = None
        self._ready = False

    def set_result(self, cursor):
        # This method is called once the worker thread has executed the
        # query (self.sql).
        self._cursor = cursor
        self._rows = cursor.fetchall()
        self._event.set()  # Wake up the thread that's waiting on the event.
        return self

    def _wait(self, timeout=None):
        # This method is used by the caller to block until results are ready,
        # or, optionally, raise an exception if the query takes longer than
        # is acceptable.
        timeout = timeout if timeout is not None else self.timeout

        # Call the event's `wait()` method.
        if not self._event.wait(timeout=timeout) and timeout:
            raise ResultTimeout('results not ready, timed out.')
        self._ready = True

    def __iter__(self):
        # If the caller attempts to iterate over the Cursor, first ensure
        # that the results are ready before exposing a row iterator.
        if not self._ready:
            self._wait()
        return iter(self._rows)

    @property
    def lastrowid(self):
        # If the caller requests the ID of the most recently inserted row,
        # we need to first make sure the query was actually executed.
        if not self._ready:
            self._wait()
        return self._cursor.lastrowid

    @property
    def rowcount(self):
        # Like lastrowid(), make sure the query was executed before returning
        # the number of affected rows.
        if not self._ready:
            self._wait()
        return self._cursor.rowcount

class Writer(object):
    def __init__(self, database, queue):
        self.database = database
        self.queue = queue

    def run(self):
        conn = self.database.get_conn()
        while self.loop(conn):
            pass

    def loop(self, conn):
        obj = self.queue.get()
        if isinstance(obj, AsyncCursor):
            self.execute(obj)
            return True
        elif obj is SHUTDOWN:
            return False

    def execute(self, async_cursor):
        # Call the base-class implementation of execute_sql to avoid entering
        # an endless chain of recursion.
        db_cursor = SqliteExtDatabase.execute_sql(
            self,
            async_cursor.sql,
            async_cursor.params)
        return async_cursor.set_result(db_cursor)
 
class SqliteQueueDatabase(SqliteExtDatabase):
    def execute_sql(self, sql, params=None, require_commit=True, timeout=None):
        if require_commit:  # Treat this as a write query.
            # Create an AsyncCursor object to encapsulate the execution
            # of our write query, add it to the writer thread's queue, and
            # return the wrapper to the caller.
            async_cursor = AsyncCursor(
                event=threading.Event(),
                sql=sql,
                params=params,
                timeout=timeout)
            self.write_queue.put(async_cursor)
            return async_cursor
        else:  # Regular read query.
            return super(SqliteQueueDatabase, self).execute_sql(
                sql, params, require_commit)