A queuereader with a little problem.
import logging
import time
import random
import json
from nsq import Reader, run
from tornado.autoreload import start as autoreload
settings = {
'nsq_channel': 'andrew-kelleher',
'topic': 'memory_leak',
'nsq_lookupd_http_addresses': '',
'max_in_flight': 10
}
logging.basicConfig(
level=logging.DEBUG if settings.get('debug') else logging.INFO)
class MyClass:
def __init__(self, task):
cardinality = task['cardinality']
self.state = [random.random() for i in range(int(cardinality))]
def work_a_while(self):
time.sleep(random.random() * 10)
return sum(self.state)
def message_handler(msg):
"""
Takes message of the form
.. code-block:: json
{
"cardinality": 1000
}
"""
msg.enable_async()
task = json.loads(msg.body)
myclass = MyClass(task)
myclass.work_a_while()
msg.finish()
if __name__ == '__main__':
if settings.get('debug'):
autoreload()
Reader(
channel=settings.get('nsq_channel'),
lookupd_http_addresses=settings.get('nsq_lookupd_http_addresses'),
max_in_flight=settings.get('max_in_flight'),
message_handler=message_handler,
topic='memory_leak',
max_tries=5
)
run()