loicdescotte
8/23/2013 - 9:56 AM

Application.scala

package models

import play.api.libs.json.{JsUndefined, JsValue, Json}
import play.api.libs.iteratee.Enumeratee

import play.api.libs.concurrent.Execution.Implicits._

trait Event

case class Operation(level: String, amount: Int, from: String, customer: String, timestamp: Long) extends Event
case class SystemStatus(message: String, typeOf: String, timestamp: Long) extends Event

object EventUtils {
  implicit val OperationFmt = Json.format[Operation]
  implicit val SystemStatusFmt = Json.format[SystemStatus]

  lazy val decoder: Enumeratee[JsValue, Event] = Enumeratee.map[JsValue] { jsValue =>
    (jsValue \ "level") match {
      case _: JsUndefined => SystemStatusFmt.reads(jsValue).get
      case _ => OperationFmt.reads(jsValue).get
    }
  }

  lazy val encoder: Enumeratee[Event, JsValue] = Enumeratee.map[Event] {
    case o: Operation => OperationFmt.writes(o)
    case s: SystemStatus => SystemStatusFmt.writes(s)
  }
}
package controllers

import play.api.mvc.{Controller, Action}
import play.api.libs.iteratee.{Enumerator}
import play.api.libs.concurrent.Execution.Implicits._
import models._
import play.api.libs.concurrent.Promise
import scala.util.Random
import java.util.UUID

object StocksWS extends Controller {

  def createFakeFeed(from: String) = {
    val operations: Enumerator[Event] = Enumerator.generateM[Event] {
      Promise.timeout(
        Some(
          Operation(
            if(Random.nextBoolean) "public" else "private",
            Random.nextInt(1000),
            from,
            UUID.randomUUID().toString,
            System.currentTimeMillis()
          )
        ), Random.nextInt(500) + Random.nextInt(300))
    }
    val noise: Enumerator[Event] = Enumerator.generateM[Event] {
      Promise.timeout(
        Some(
          SystemStatus(
            s"System message from $from",
            if(Random.nextBoolean) "ERROR" else "FAILURE",
            System.currentTimeMillis()
          )
        ), Random.nextInt(5000))
    }
    val nyEnumerator: Enumerator[Event] = operations >- noise
    Ok.chunked(nyEnumerator.through(EventUtils.encoder))
  }

  def ny = Action {
    createFakeFeed("New-York")
  }

  def london = Action {
    createFakeFeed("London")
  }

  def tokyo = Action {
    createFakeFeed("Tokyo")
  }
}
package controllers

import play.api.mvc.{Action, Controller}
import play.api.libs.iteratee.{Concurrent, Enumeratee}
import play.api.libs.json.{Json, JsValue}
import play.api.libs.EventSource
import play.api.libs.ws.WSEnumerator
import models._

import play.api.libs.concurrent.Execution.Implicits._

object StocksApplication extends Controller {

  lazy val nyEnumerator = WSEnumerator.getStream[JsValue]("http://localhost:9000/stocks/api/ny")(Json.parse(_))
  lazy val londonEnumerator = WSEnumerator.getStream[JsValue]("http://localhost:9000/stocks/api/ldn")(Json.parse(_))
  lazy val tokyoEnumerator = WSEnumerator.getStream[JsValue]("http://localhost:9000/stocks/api/tok")(Json.parse(_))

  def index(role: String) = Action {
    Ok(views.html.stocks.index(role))
  }

  def userEvent(role: String, lowerBound: Int, higherBound: Int) = Action.async {
    val secure: Enumeratee[Event, Event] = Enumeratee.collect[Event] {
      case e@SystemStatus(_, _, _) if role == "MANAGER" => e
      case e@Operation("private", _, _, _, _) if role == "MANAGER" => e
      case e@Operation("public", _, _, _, _) => e
    }
    val inBounds = Enumeratee.collect[Event] {
      case e@Operation(_, amout, _, _, _) if amout > lowerBound && amout < higherBound => e.asInstanceOf[Event]
      case e@SystemStatus(_, _, _) => e.asInstanceOf[Event]
    }

    for {
      ny <- nyEnumerator
      london <- londonEnumerator
      tokyo <- tokyoEnumerator
    } yield Ok.feed(
      Concurrent.broadcast(ny >- london >- tokyo)._1 &> EventUtils.decoder &> secure &> inBounds &> EventUtils.encoder &> EventSource()
    ).as("text/event-stream")
  }
}