A poller to have a more event driven interface to jzmq
object ZeroMQPoller {
type ZMessageHandler = ZMessage => Unit
}
class ZeroMQPoller(context: Context) {
import ZeroMQPoller._
private var poller: Poller = null
private val pollinHandlers = ListBuffer[ZMessageHandler]()
private val sockets = ListBuffer[Socket]()
protected def register(socket: Socket, messageHandler: ZMessageHandler) {
sockets += socket
pollinHandlers += messageHandler
if(poller != null) {
poller.register(socket, Poller.POLLIN)
}
}
def init() {
poller = context.poller(sockets.size)
sockets.foreach(poller.register(_, Poller.POLLIN))
}
def dispose() {
sockets.foreach(poller.unregister(_))
sockets.clear()
pollinHandlers.clear()
}
def -=(socket: Socket) {
val idx = sockets indexOf socket
poller unregister socket
sockets -= socket
pollinHandlers remove idx
}
def +=(handler: (Socket, ZMessageHandler)) {
(register _).tupled(handler)
}
def poll(timeout: Long = -1) {
if(poller == null) init()
val timo = if (timeout > 0) timeout * 1000 else timeout
poller.poll(timo)
(0 until poller.getSize) foreach { idx =>
if(poller.pollin(idx)) {
pollinHandlers(idx)(ZMessage(poller.getSocket(idx)))
}
}
}
}