import java.nio.charset.Charset
import java.util.concurrent.{TimeUnit, Executors}
import java.util.Date
import org.zeromq.ZMQ
trait ZeroMQTrials
object Server {
def main(args: Array[String]) {
println("the server")
val context = ZMQ context 1
val socket = context socket ZMQ.REP
socket bind "tcp://*:5555"
while(true) {
val req = socket recv 0
println("Receiving request: [" + new String(req) + "]")
try {
Thread.sleep(1000)
} catch {
case e: InterruptedException => {
e.printStackTrace()
}
}
socket.send("World".getBytes("UTF-8"), 0)
}
}
}
object Client {
def main(args: Array[String]) {
println("the client")
val context = ZMQ context 1
val socket = context socket ZMQ.REQ
println("Connecting to hello world server...")
socket connect "tcp://localhost:5555"
(1 to 10) foreach { i =>
println("Sending request %s...".format(i))
socket.send("Hello".getBytes("UTF-8"), 0)
val reply = socket recv 0
println("Received reply %s: [%s]" format (i, new String(reply, "UTF-8")))
}
}
}
object Pusher {
val executor = Executors.newSingleThreadScheduledExecutor
def main(args: Array[String]) {
println("starting pusher")
val context = ZMQ context 1
val socket = context socket ZMQ.PUSH
println("Starting pusher...")
socket bind "tcp://127.0.0.1:5566"
executor.scheduleAtFixedRate(new Runnable {
def run() {
print(".")
socket.send("0\u0000".getBytes(Charset.forName("UTF-8")), 0)
socket.send("Hello it's now: %s".format(new Date().toString).getBytes("UTF-8"), 0)
}
}, 500, 500, TimeUnit.MILLISECONDS)
}
}
object Puller {
def main(args: Array[String]) {
println("Starting puller")
val context = ZMQ context 1
val socket = context socket ZMQ.PULL
println("Connecting to pusher...")
socket connect "tcp://127.0.0.1:5566"
println("connected")
while(true) {
print(".")
val msg = new String(socket.recv(0), "UTF-8")
print("#")
if(msg != null) {
println(msg)
}
}
}
}
object Publisher {
val executor = Executors.newSingleThreadScheduledExecutor
def main(args: Array[String]) {
val context = ZMQ context 1
val socket = context socket ZMQ.PUB
println("Starting publisher...")
socket bind "tcp://127.0.0.1:5577"
executor.scheduleAtFixedRate(new Runnable {
def run() {
print(".")
socket.send("Hello it's now: %s".format(new Date().toString).getBytes("UTF-8"), 0)
}
}, 500, 500, TimeUnit.MILLISECONDS)
}
}
object Subscriber {
def main(args: Array[String]) {
val context = ZMQ context 1
val socket = context socket ZMQ.SUB
println("Starting subscriber...")
socket connect "tcp://127.0.0.1:5577"
socket subscribe "".getBytes("UTF-8")
while(true) {
val msgBytes = socket recv 0
val msg = new String(msgBytes, "UTF-8")
print("RECV: ")
println(msg)
}
}
}