akellehe
4/25/2017 - 10:37 PM

A queuereader with a little problem.

A queuereader with a little problem.

import logging
import time
import random
import json

from nsq import Reader, run
from tornado.autoreload import start as autoreload

settings = {
  'nsq_channel': 'andrew-kelleher',
  'topic': 'memory_leak',
  'nsq_lookupd_http_addresses': '',
  'max_in_flight': 10  
}

logging.basicConfig(
    level=logging.DEBUG if settings.get('debug') else logging.INFO)


class MyClass:

    def __init__(self, task):
        cardinality = task['cardinality']
        self.state = [random.random() for i in range(int(cardinality))]

    def work_a_while(self):
        time.sleep(random.random() * 10)
        return sum(self.state)


def message_handler(msg):
    """
    Takes message of the form

    .. code-block:: json

        {
            "cardinality": 1000
        }
    """
    msg.enable_async()
    task = json.loads(msg.body)
    myclass = MyClass(task)
    myclass.work_a_while()
    msg.finish()


if __name__ == '__main__':
    if settings.get('debug'):
        autoreload()

    Reader(
        channel=settings.get('nsq_channel'),
        lookupd_http_addresses=settings.get('nsq_lookupd_http_addresses'),
        max_in_flight=settings.get('max_in_flight'),
        message_handler=message_handler,
        topic='memory_leak',
        max_tries=5
    )

    run()