8/5/2012 - 5:59 PM

How to push 2 mixed searches from twitter with Scala, Play2, Iteratee and Comet

How to push 2 mixed searches from twitter with Scala, Play2, Iteratee and Comet


We will see how to mix 2 twitter searches and push the results to the browser in real time.

First, checkout this mini project

To try it, you need to install Play 2.0


We define a comet method in our Controller :

def comet(query1: String, query2: String) = Action {

      lazy val results1 = getStream(query1)

      lazy val results2 = getStream(query2)

      //pipe result 1 and result 2 and push to comet socket 
      Ok.stream(results1 >- results2 &> Comet(callback = "parent.messageChanged"))

query1 and query2 are just some query strings, like "java" or "ruby"

results1 and results2 will contain Twitter search results for this queries.

In the last line, Ok.stream will send a chunked based HTTP response. That means that instead of a full response, the browser will receive some bits of response progressively.

results1 >- results2 will pipe, or mix the responses of both searches and &> Comet(callback = "parent.messageChanged") will push this to a comet socket.

Now, let's see how wet get the responses from Twitter. For this we will use some enumerators. Enumerators are part of the Play iteratees API. It provides a way to provide data to an iteratee, which will consume this data. Iteratees are able to consume data in a non blocking and asynchronous way. But don't worry, the framework will do all this work for you when you combine the enumerators with the Comet object.

private def getStream(query: String) = {
        Enumerator.fromCallback[String](() => 
            Promise.timeout(WS.url("http://search.twitter.com/search.json?q="+query+"&rpp=1").get(), 1000 milliseconds).flatMap(_.map { response =>
                (response.json \\ "text").headOption.map(query + " : " + _.as[String])

This code says that every second, we will ask twitter to find new tweets corresponding to our query. Enumerator.fromCallbackis waiting for a function that returns a promise of Response. When this response is ready, it will be pushed (asynchronously) to the comet socket. We combine it with Promise.timeout to ask the new results to twitter every seconds. Then we have a promise of promise of response! D'oh! That's why we use flatMapto get the result inside the promise.

Another trick, response.json \\ "text" helps to parse the json query and to extract the text values.

Adapt the content with an enumeratee

An enumeratee is a kind of adapter in the iteratee API. We will use this to transform the results sent to the browser. Let's see a very simple example : send all tweets in upper case.

val upperCase = Enumeratee.map[String] {
    tweet => tweet.map(_.toUpperCase)

To insert this transformation in the pipe just before sending the content to the Comet socket, we just need to modify our code like this :

Ok.stream(results1 >- results2 &> upperCase &> Comet(callback = "parent.messageChanged"))

NB : &> is an alias for "through"

Finally we just have to use the "iframe hack" to make the stream alive on the browser (see index.scala.html)


Link to the Iteratee documentation