fespino
8/11/2015 - 9:12 AM

kombu_adapter.py

# -*- coding: utf-8 -*-
import kombu
import kombu.mixins
import kombu.common
import transactional.controller

class Worker(kombu.mixins.ConsumerMixin):

    def __init__(self, connection, controller, queues):
        self.connection = connection
        self.controller = controller
        self.queues = queues

    def get_consumers(self, consumer_cls, channel):
        return [
            consumer_cls(queues=self.queues, callbacks=[self.handle_request])
        ]

    def handle_request(self, body, message):
        try:
            response = self.controller.handle_request(body)
            print(message.properties)
        except Exception as e:
            print('Exception: {}'.format(e))
        finally:
            message.ack()


class KombuAMQPAdapter(object):

    def __init__(self, controller, routing_key, worker_cls, host='amqp://guest:guest@localhost//'):
        self.controller = controller
        self.exchange =  kombu.Exchange('pleyade-exchange', 'topic', durable=True)
        self.request_queue = kombu.Queue(routing_key, exchange=self.exchange, routing_key=routing_key)
        self.host = host
        self.worker_cls = worker_cls

    def start_service(self):
        print('Starting service at {}'.format(self.host))
        with kombu.Connection(self.host) as conn:
            self.request_queue(conn).declare()
            try:
                self.worker_cls(conn, self.controller, [self.request_queue]).run()
            except KeyboardInterrupt:
                self.stop_service()

    def stop_service(self):
        print('Bye!!')


if __name__ == '__main__':
    controller = transactional.controller.FakeController()
    adapter = KombuAMQPAdapter(controller, 'transactional.request', Worker)
    adapter.start_service()