a writer thead to database with queue for handling the many requests
class AsyncCursor(object):
def __init__(self, event, sql, params, timeout):
self._event = event # Used to signal when results are ready.
self.sql = sql
self.params = params
self.timeout = timeout
self._cursor = None
self._rows = None
self._ready = False
def set_result(self, cursor):
# This method is called once the worker thread has executed the
# query (self.sql).
self._cursor = cursor
self._rows = cursor.fetchall()
self._event.set() # Wake up the thread that's waiting on the event.
return self
def _wait(self, timeout=None):
# This method is used by the caller to block until results are ready,
# or, optionally, raise an exception if the query takes longer than
# is acceptable.
timeout = timeout if timeout is not None else self.timeout
# Call the event's `wait()` method.
if not self._event.wait(timeout=timeout) and timeout:
raise ResultTimeout('results not ready, timed out.')
self._ready = True
def __iter__(self):
# If the caller attempts to iterate over the Cursor, first ensure
# that the results are ready before exposing a row iterator.
if not self._ready:
self._wait()
return iter(self._rows)
@property
def lastrowid(self):
# If the caller requests the ID of the most recently inserted row,
# we need to first make sure the query was actually executed.
if not self._ready:
self._wait()
return self._cursor.lastrowid
@property
def rowcount(self):
# Like lastrowid(), make sure the query was executed before returning
# the number of affected rows.
if not self._ready:
self._wait()
return self._cursor.rowcount
class Writer(object):
def __init__(self, database, queue):
self.database = database
self.queue = queue
def run(self):
conn = self.database.get_conn()
while self.loop(conn):
pass
def loop(self, conn):
obj = self.queue.get()
if isinstance(obj, AsyncCursor):
self.execute(obj)
return True
elif obj is SHUTDOWN:
return False
def execute(self, async_cursor):
# Call the base-class implementation of execute_sql to avoid entering
# an endless chain of recursion.
db_cursor = SqliteExtDatabase.execute_sql(
self,
async_cursor.sql,
async_cursor.params)
return async_cursor.set_result(db_cursor)
class SqliteQueueDatabase(SqliteExtDatabase):
def execute_sql(self, sql, params=None, require_commit=True, timeout=None):
if require_commit: # Treat this as a write query.
# Create an AsyncCursor object to encapsulate the execution
# of our write query, add it to the writer thread's queue, and
# return the wrapper to the caller.
async_cursor = AsyncCursor(
event=threading.Event(),
sql=sql,
params=params,
timeout=timeout)
self.write_queue.put(async_cursor)
return async_cursor
else: # Regular read query.
return super(SqliteQueueDatabase, self).execute_sql(
sql, params, require_commit)