# -*- coding: utf-8 -*-
import kombu
import kombu.mixins
import kombu.common
import transactional.controller
class Worker(kombu.mixins.ConsumerMixin):
def __init__(self, connection, controller, queues):
self.connection = connection
self.controller = controller
self.queues = queues
def get_consumers(self, consumer_cls, channel):
return [
consumer_cls(queues=self.queues, callbacks=[self.handle_request])
]
def handle_request(self, body, message):
try:
response = self.controller.handle_request(body)
print(message.properties)
except Exception as e:
print('Exception: {}'.format(e))
finally:
message.ack()
class KombuAMQPAdapter(object):
def __init__(self, controller, routing_key, worker_cls, host='amqp://guest:guest@localhost//'):
self.controller = controller
self.exchange = kombu.Exchange('pleyade-exchange', 'topic', durable=True)
self.request_queue = kombu.Queue(routing_key, exchange=self.exchange, routing_key=routing_key)
self.host = host
self.worker_cls = worker_cls
def start_service(self):
print('Starting service at {}'.format(self.host))
with kombu.Connection(self.host) as conn:
self.request_queue(conn).declare()
try:
self.worker_cls(conn, self.controller, [self.request_queue]).run()
except KeyboardInterrupt:
self.stop_service()
def stop_service(self):
print('Bye!!')
if __name__ == '__main__':
controller = transactional.controller.FakeController()
adapter = KombuAMQPAdapter(controller, 'transactional.request', Worker)
adapter.start_service()