casualjim
4/19/2011 - 9:44 AM

zeromq_fun.scala

import java.nio.charset.Charset
import java.util.concurrent.{TimeUnit, Executors}
import java.util.Date
import org.zeromq.ZMQ

trait ZeroMQTrials


object Server {
  def main(args: Array[String]) {
    println("the server")
    val context = ZMQ context 1
    val socket = context socket ZMQ.REP
    socket bind "tcp://*:5555"

    while(true) {
      val req = socket recv 0
      println("Receiving request: [" + new String(req) + "]")
      try {
        Thread.sleep(1000)
      } catch {
        case e: InterruptedException => {
          e.printStackTrace()
        }
      }
      socket.send("World".getBytes("UTF-8"), 0)
    }
  }
}

object Client {
  def main(args: Array[String]) {
    println("the client")
    val context = ZMQ context 1
    val socket = context socket ZMQ.REQ
    println("Connecting to hello world server...")
    socket connect "tcp://localhost:5555"

    (1 to 10) foreach { i =>
      println("Sending request %s...".format(i))
      socket.send("Hello".getBytes("UTF-8"), 0)
      val reply = socket recv 0
      println("Received reply %s: [%s]" format (i, new String(reply, "UTF-8")))
    }
  }
}

object Pusher {
  val executor = Executors.newSingleThreadScheduledExecutor

  def main(args: Array[String]) {
    println("starting pusher")
    val context = ZMQ context 1
    val socket = context socket ZMQ.PUSH
    println("Starting pusher...")
    socket bind "tcp://127.0.0.1:5566"
    executor.scheduleAtFixedRate(new Runnable {
      def run() {
        print(".")
        socket.send("0\u0000".getBytes(Charset.forName("UTF-8")), 0)
        socket.send("Hello it's now: %s".format(new Date().toString).getBytes("UTF-8"), 0)
      }
    }, 500, 500, TimeUnit.MILLISECONDS)

  }
}

object Puller {
  def main(args: Array[String]) {
    println("Starting puller")
    val context = ZMQ context 1
    val socket = context socket ZMQ.PULL
    println("Connecting to pusher...")
    socket connect "tcp://127.0.0.1:5566"
    println("connected")
    while(true) {
      print(".")
      val msg = new String(socket.recv(0), "UTF-8")
      print("#")
      if(msg != null) {
        println(msg)
      }
    }

  }
}

object Publisher {
  val executor = Executors.newSingleThreadScheduledExecutor

  def main(args: Array[String]) {
    val context = ZMQ context 1
    val socket = context socket ZMQ.PUB
    println("Starting publisher...")
    socket bind "tcp://127.0.0.1:5577"
    executor.scheduleAtFixedRate(new Runnable {
      def run() {
        print(".")
        socket.send("Hello it's now: %s".format(new Date().toString).getBytes("UTF-8"), 0)
      }
    }, 500, 500, TimeUnit.MILLISECONDS)
  }
}

object Subscriber {

  def main(args: Array[String]) {
    val context = ZMQ context 1
    val socket = context socket ZMQ.SUB
    println("Starting subscriber...")
    socket connect "tcp://127.0.0.1:5577"
    socket subscribe "".getBytes("UTF-8")
    while(true) {
      val msgBytes = socket recv 0
      val msg = new String(msgBytes, "UTF-8")
      print("RECV: ")
      println(msg)
    }
  }
}