szaydel
5/7/2013 - 1:43 AM

asyncsrv.py

import zmq
import threading
import time
from random import choice

class ClientTask(threading.Thread):
    """ClientTask"""
    def __init__(self):
        threading.Thread.__init__ (self)

    def run(self):
        context = zmq.Context()
        socket = context.socket(zmq.XREQ)
        identity = 'worker-%d' % (choice([0,1,2,3,4,5,6,7,8,9]))
        socket.setsockopt(zmq.IDENTITY, identity )
        socket.connect('tcp://localhost:5570')
        print 'Client %s started' % (identity)
        poll = zmq.Poller()
        poll.register(socket, zmq.POLLIN)
        reqs = 0
        while True:
            for i in xrange(5):
                sockets = dict(poll.poll(1000))
                if socket in sockets:
                    if sockets[socket] == zmq.POLLIN:
                        msg = socket.recv()
                        print '%s: %s\n' % (identity, msg)
                        del msg
            reqs = reqs + 1
            print 'Req #%d sent..' % (reqs)
            socket.send('request #%d' % (reqs))

        socket.close()
        context.term()

class ServerTask(threading.Thread):
    """ServerTask"""
    def __init__(self):
        threading.Thread.__init__ (self)

    def run(self):
        context = zmq.Context()
        frontend = context.socket(zmq.XREP)
        frontend.bind('tcp://*:5570')

        backend = context.socket(zmq.XREQ)
        backend.bind('inproc://backend')

        workers = []
        for i in xrange(5):
            worker = ServerWorker(context)
            worker.start()
            workers.append(worker)

        poll = zmq.Poller()
        poll.register(frontend, zmq.POLLIN)
        poll.register(backend,  zmq.POLLIN)

        while True:
            sockets = dict(poll.poll())
            if frontend in sockets:
                if sockets[frontend] == zmq.POLLIN:
                    msg = frontend.recv()
                    print 'Server received %s' % (msg)
                    backend.send(msg)
            if backend in sockets:
                if sockets[backend] == zmq.POLLIN:
                    msg = backend.recv()
                    frontend.send(msg)

        frontend.close()
        backend.close()
        context.term()
 
class ServerWorker(threading.Thread):
    """ServerWorker"""
    def __init__(self, context):
        threading.Thread.__init__ (self)
        self.context = context

    def run(self):
        worker = self.context.socket(zmq.XREQ)
        worker.connect('inproc://backend')
        print 'Worker started'
        while True:
            msg = worker.recv()
            print 'Worker received %s' % (msg)
            replies = choice(xrange(5))
            for i in xrange(replies):
                time.sleep(1/choice(range(1,10)))
                worker.send(msg)
            del msg

        worker.close()


def main():
    """main function"""
    server = ServerTask()
    server.start()
    for i in xrange(3):
        client = ClientTask()
        client.start()
    
    server.join()
    

if __name__ == "__main__":
    main()