casualjim
4/25/2011 - 10:48 PM

A poller to have a more event driven interface to jzmq

A poller to have a more event driven interface to jzmq

object ZeroMQPoller {
  type ZMessageHandler = ZMessage => Unit
}

class ZeroMQPoller(context: Context) {

  import ZeroMQPoller._

  private var poller: Poller = null
  private val pollinHandlers = ListBuffer[ZMessageHandler]()
  private val sockets = ListBuffer[Socket]()

  protected def register(socket: Socket, messageHandler: ZMessageHandler) {
    sockets += socket
    pollinHandlers += messageHandler
    if(poller != null) {
      poller.register(socket, Poller.POLLIN)
    }
  }

  def init() {
    poller = context.poller(sockets.size)
    sockets.foreach(poller.register(_, Poller.POLLIN))
  }

  def dispose() {
    sockets.foreach(poller.unregister(_))
    sockets.clear()
    pollinHandlers.clear()
  }

  def -=(socket: Socket) {
    val idx = sockets indexOf socket
    poller unregister socket
    sockets -= socket
    pollinHandlers remove idx
  }

  def +=(handler: (Socket, ZMessageHandler))  {
    (register _).tupled(handler)
  }

  def poll(timeout: Long = -1) {
    if(poller == null) init()
    val timo = if (timeout > 0) timeout * 1000 else timeout
    poller.poll(timo)
    (0 until poller.getSize) foreach { idx =>
      if(poller.pollin(idx)) {
        pollinHandlers(idx)(ZMessage(poller.getSocket(idx)))
      }
    }
  }
}