casualjim
12/14/2010 - 5:57 PM

jetty continuation issues

jetty continuation issues

<?xml version="1.0"?>
<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xmlns="http://java.sun.com/xml/ns/javaee"
         xmlns:web="http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
         xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
         id="Backchat Streams"
         version="2.5">
    <listener>
      <listener-class>akka.servlet.Initializer</listener-class>
    </listener>


    <servlet>
        <servlet-name>StreamServlet</servlet-name>
        <servlet-class>com.mojolly.websocket.CometServlet</servlet-class>
    </servlet>

    <servlet-mapping>
        <servlet-name>StreamServlet</servlet-name>
        <url-pattern>/stream</url-pattern>
    </servlet-mapping>

    <servlet-mapping>
      <servlet-name>default</servlet-name>
      <url-pattern>/images/*</url-pattern>
      <url-pattern>/css/*</url-pattern>
      <url-pattern>/js/*</url-pattern>
    </servlet-mapping>

</web-app>
package com.mojolly.websocket

import org.eclipse.jetty.server.Server
import javax.servlet.http.{HttpServlet, HttpServletResponse, HttpServletRequest}
import org.eclipse.jetty.continuation.ContinuationSupport
import java.lang.String
import org.eclipse.jetty.webapp.WebAppContext
import akka.actor.ActorRegistry

class CometServlet extends HttpServlet with Logging {

  override protected def doGet(req: HttpServletRequest, resp: HttpServletResponse) {
    println("Got request for continuation")
    val cont = ContinuationSupport.getContinuation(req)
    cont.setTimeout(5000)
    println(cont.toString)
    if (cont.isInitial) {
      cont.getServletResponse.getWriter.println("hello...")
      cont.getServletResponse.flushBuffer
      ActorRegistry.actorsFor(classOf[PollerActor]) foreach { _ ! RegisterCallback(
        msg => {
          cont.setAttribute("messages", msg)
          cont.resume
        }
      )}
    }
    if(cont.isResumed && cont.getAttribute("messages") != null) {
      cont.getServletResponse.getWriter.println(cont.getAttribute("messages").toString)
      cont.getServletResponse.flushBuffer
      cont.setAttribute("messages", null)
    }
    if(cont.isExpired) {
      try {
        cont.getServletResponse.getWriter.println("")
        cont.getServletResponse.flushBuffer
      } catch {
        case e => {
          log.warn(e, "A problem writing to the stream on timeout")
          cont.suspend
          cont.complete
        }
      }
    }
    cont.suspend()
  }

}

object WebServer {


  def main(args: Array[String]) {
    val server = new Server(8888)

    val context = new WebAppContext
    context.setDescriptor("src/main/webapp/WEB-INF/web.xml")
    context.setResourceBase("src/main/webapp")
    context.setContextPath("/")

    context.setParentLoaderPriority(true)

    server.setHandler(context)

    server.start
    server.join
  }
}
with complete:


(master)» curl -iv http://localhost:8888/stream
* About to connect() to localhost port 8888 (#0)
*   Trying ::1... connected
* Connected to localhost (::1) port 8888 (#0)
> GET /stream HTTP/1.1
> User-Agent: curl/7.19.7 (universal-apple-darwin10.0) libcurl/7.19.7 OpenSSL/0.9.8l zlib/1.2.3
> Host: localhost:8888
> Accept: */*
> 
< HTTP/1.1 200 OK
HTTP/1.1 200 OK
< Transfer-Encoding: chunked
Transfer-Encoding: chunked
< Server: Jetty(7.2.2.v20101205)
Server: Jetty(7.2.2.v20101205)

< 
hello...
message from console
* Connection #0 to host localhost left intact
* Closing connection #0
package com.mojolly.websocket

import akka.actor.{Scheduler, Actor}
import java.util.concurrent.TimeUnit
import Actor._
import org.jredis.ri.alphazero.JRedisClient

/*
 * Gets executed by the listener defined in web.xml
 */

case class RegisterCallback(cb: String => Unit)
case object Poll

class PollerActor extends Actor {

  val redis = new JRedisClient("localhost", 6379)

  private var _callback: Option[String => Unit] = None

  protected def receive = {
    case Poll => _callback foreach { cb =>
      val msg = Option(redis.lpop("message_queue"))
      msg foreach { bytes =>
        cb(new String(bytes, "UTF-8"))
      }
    }
    case RegisterCallback(cb) => _callback = Some(cb)
  }
}

class Boot {

  val poller = actorOf[PollerActor].start

  Scheduler.schedule(poller, Poll, 500, 500, TimeUnit.MILLISECONDS)
}