A resque implementation in scala with akka actors and scala-redis
package com.mojolly.backchat
package redis
package resque
import com.mojolly.backchat.redis.resque.Resque.{ResqueWorkerActor}
import net.liftweb.json._
import JsonAST._
import JsonDSL._
import java.net.InetAddress
import com.redis.ds.{ RedisDeque, RedisDequeClient }
import com.redis.RedisClient
import java.util.{ Date, GregorianCalendar, Calendar }
import akka.dispatch.{ Dispatchers, HawtDispatcher }
import akka.actor._
import Actor._
import com.mojolly.backchat.redis.RedisNamespace
import actor.{ScheduledTask, Supervising, Supervised}
import java.util.concurrent.{TimeUnit, ScheduledFuture, ConcurrentHashMap}
object Resque {
private object Meta {
/*
* For converting scala objects into DBObject values
*/
object Reflection {
/*
* These don't require a conversion and can be put directly into a DBObject
*/
val primitives = Set[Class[_]](classOf[String], classOf[Int], classOf[Long], classOf[Double],
classOf[Float], classOf[Byte], classOf[BigInt], classOf[Boolean],
classOf[Short], classOf[java.lang.Integer], classOf[java.lang.Long],
classOf[java.lang.Double], classOf[java.lang.Float],
classOf[java.lang.Byte], classOf[java.lang.Boolean],
classOf[java.lang.Short])
def primitive_?(clazz: Class[_]) = primitives contains clazz
/*
* This is used to convert DBObjects into JObjects
*/
def primitive2jvalue(a: Any) = a match {
case x: String => JString(x)
case x: Int => JInt(x)
case x: Long => JInt(x)
case x: Double => JDouble(x)
case x: Float => JDouble(x)
case x: Byte => JInt(BigInt(x))
case x: BigInt => JInt(x)
case x: Boolean => JBool(x)
case x: Short => JInt(BigInt(x))
case x: java.lang.Integer => JInt(BigInt(x.asInstanceOf[Int]))
case x: java.lang.Long => JInt(BigInt(x.asInstanceOf[Long]))
case x: java.lang.Double => JDouble(x.asInstanceOf[Double])
case x: java.lang.Float => JDouble(x.asInstanceOf[Float])
case x: java.lang.Byte => JInt(BigInt(x.asInstanceOf[Byte]))
case x: java.lang.Boolean => JBool(x.asInstanceOf[Boolean])
case x: java.lang.Short => JInt(BigInt(x.asInstanceOf[Short]))
case x if datetype_?(x.asInstanceOf[AnyRef].getClass) => datetype2jvalue(x)
case _ => error("not a primitive " + a.asInstanceOf[AnyRef].getClass)
}
/*
* Date types require formatting
*/
val datetypes = Set[Class[_]](classOf[Calendar], classOf[Date], classOf[GregorianCalendar])
def datetype_?(clazz: Class[_]) = datetypes contains clazz
def datetype2jvalue(a: Any)(implicit formats: Formats) = a match {
case x: Calendar => dateAsJValue(x.getTime)
case x: Date => dateAsJValue(x)
}
def dateAsJValue(d: Date)(implicit formats: Formats) =
JObject(JField("$dt", JString(formats.dateFormat.format(d))) :: Nil)
}
}
implicit def namespaceToString(ns: RedisNamespace): String = ns.toString
import messages._
private var _resque: ActorRef = null
private var _resqueSupervisor: ActorRef = null
private def resque: ActorRef = {
if (_resque.isNull) {
_resque = addToSupervisor(actorOf(new Resque(RedisConfig())))
}
_resque
}
private def addToSupervisor(actor: ActorRef) = {
if (_resqueSupervisor.isNull) {
_resqueSupervisor = actorOf(new Actor with Supervising { }).start
Runtime.getRuntime.addShutdownHook(new Thread {
override def run = {
Actor.registry.actorsFor(classOf[ResqueListener]) foreach { worker =>
Option(_resque) foreach { _ ! StopWorker(worker) }
}
}
})
}
actor.start
_resqueSupervisor ! Link(actor)
actor
}
def resque_=(res: ActorRef) = _resque = res
def register[Worker <: ResqueWorker](worker: => Worker)(implicit mf: Manifest[Worker]){
val (job, q) = getForQueue(worker)
resque ! Reserve(q.name, job)
}
private def getForQueue[Worker <: ResqueWorker](worker: => Worker)(implicit mf: Manifest[Worker]) = {
val a = actorOf(worker)
val q = queueJobMap.get(Symbol(mf.erasure.getSimpleName))
(a, q)
}
def enqueue[Worker <: ResqueWorker](worker: => Worker, args: Any*)(implicit mf: Manifest[Worker]) = {
val (job, q) = getForQueue(worker)
resque ! Enqueue(q.name, job.id, args.toList)
}
object Naming {
object QueueName {
def apply(name: String)(implicit resqueConfig: ResqueConfig) = {
( resqueConfig.namespace :: queue :: name :: Nil) mkString keySeparator
}
}
object StatKey {
def apply(statistic: String)(implicit resqueConfig: ResqueConfig) = {
(resqueConfig.namespace :: stat :: statistic :: Nil) mkString keySeparator
}
def apply(statistic: String, workerId: String)(implicit resqueConfig: ResqueConfig) = {
(resqueConfig.namespace :: stat :: statistic :: workerId :: Nil) mkString keySeparator
}
}
object WorkerKey {
def apply(workerId: String)(implicit resqueConfig: ResqueConfig) = {
(resqueConfig.namespace :: worker :: workerId :: Nil) mkString keySeparator
}
}
object WorkerId extends {
def apply(pid: String, queueName: String) = {
(hostName :: pid :: queueName :: Nil) mkString keySeparator
}
}
object WorkerSet {
def apply()(implicit resqueConfig: ResqueConfig) = (resqueConfig.namespace :: workers :: Nil) mkString keySeparator
}
}
trait DefaultResqueConfig {
implicit val resqueConfig = ResqueConfig()
}
private object messages {
sealed trait ResqueMessage {
def toJson = Serialization.write(this)
}
case object Poll extends ResqueMessage
case class Perform(data: String) extends ResqueMessage
case class Reserve(queue: String, worker: ActorRef) extends ResqueMessage
case class StopWorker(worker: ActorRef) extends ResqueMessage
case object StartResque extends ResqueMessage
case class StartedWorker(worker: ActorRef) extends ResqueMessage
case object Stop extends ResqueMessage
case class Enqueue(queue: String, klass: String, args: List[Any]) extends ResqueMessage
sealed trait JobResult
case class JobFailure(
payload: String, queue: String, worker: ActorRef,
error: String, backtrace: List[String], failed_at: DateTime = DateTime.now) extends JobResult {
def toJson = Serialization.write(this)
}
object JobFailure {
def apply(payload: String, queue: String, worker: ActorRef, exception: Throwable): JobFailure =
apply(payload, queue, worker, exception.getMessage, exception.getStackTrace.map(_.toString).toList)
}
object JobSuccess extends JobResult
case class Success(worker: ActorRef) extends JobResult
sealed trait JobProcessingMessage {
def toJson = Serialization.write(this)
}
case class WorkingOn(worker: ActorRef, queue: String, payload: JValue, run_at: DateTime = DateTime.now) extends JobProcessingMessage
}
private val hawt = new HawtDispatcher
class Resque(config: RedisConfig) extends Actor with DefaultResqueConfig with Supervising {
self.dispatcher = hawt
import Resque.Naming._
protected val resqueClient = new RedisDequeClient(config.host, config.port)
private lazy val redisActor = {
val a = actorOf(new Actor {
self.dispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher
protected val redis = new RedisClient(config.host, config.port)
protected def receive = {
case m: JobFailure => {
val msg: JValue = ("failed_at" -> m.failed_at.toString(ISO8601_DATE)) ~
("payload" -> m.payload) ~
("error" -> m.error) ~
("backtrace" -> m.backtrace) ~
("worker" -> m.worker.id) ~
("queue" -> m.queue)
redis.rpush("resque:failed", Printer.compact(render(msg)))
}
case Success(worker) => {
redis.incr(StatKey("processed", worker.id))
redis.incr(StatKey("processed"))
redis.del(WorkerKey(worker.id))
}
case m @ WorkingOn(worker, queue, payload, runAt) => {
val msg: JValue = ("queue" -> QueueName(queue)) ~ ("payload" -> payload) ~ ("run_at" -> runAt.toString(ISO8601_DATE))
redis.set(WorkerKey(worker.id), Printer.compact(render(msg)))
}
case StopWorker(worker) => {
redis.srem(WorkerSet(), worker.id)
redis.del(WorkerKey(worker.id) + ":started")
}
case StartedWorker(worker) => {
redis.sadd(WorkerSet(), worker.id)
redis.set(WorkerKey(worker.id + ":started"), DateTime.now.toString(ISO8601_DATE))
}
case Enqueue(queueName, job, data) => {
val json: JValue = ("class" -> job) ~ ("args" -> data.map(Meta.Reflection.primitive2jvalue _))
log debug ("Enqueuing [%s] on [%s] with key [%s]:\n%s", job, queueName, QueueName(queueName), json.toPrettyJson)
redis.sadd(ResqueNamespace("queues").toString, queueName)
redis.rpush(QueueName(queueName), Printer.compact(render(json)) )
}
}
})
self startLink a
a
}
override protected def receive = {
case Reserve(queueName, worker) => {
val queueListener = actorOf(new ResqueListener(queueName, resqueClient.getDeque(QueueName(queueName))))
self startLink queueListener
self.startLink(worker)
queueListener ! Poll
redisActor ! StartedWorker(queueListener)
}
case m: JobFailure => {
redisActor ! m
}
case m: Success => {
redisActor ! m
}
case m: WorkingOn => {
redisActor ! m
}
case m @ StopWorker(worker) => {
redisActor ! m
self ! UnlinkAndStop(worker)
}
case m: StartedWorker => {
redisActor ! m
}
case m: Enqueue => redisActor ! m
case Stop => {
self.shutdownLinkedActors
self.supervisor foreach { _ ! UnlinkAndStop(self) }
}
}
}
val queueJobMap = new ConcurrentHashMap[Symbol, Symbol]
private class ResqueListener(queue: String, redisDeque: RedisDeque[String]) extends Actor with Supervised {
self.dispatcher = hawt
protected val timeout = akka.util.duration.pairIntToDuration(250 -> TimeUnit.MILLISECONDS)
import Naming.{ WorkerId}
self.id = WorkerId(self.uuid.toString, queue)
protected def schedulePoll = {
currentScheduler = Scheduler.scheduleOnce(() => self ! Poll, timeout.length, timeout.unit)
}
protected var currentScheduler: ScheduledFuture[AnyRef] = null
protected def receive = {
case Poll => {
val polled = redisDeque.pollFirst
polled foreach { x =>
val json = JsonParser.parse(x)
val jn = (json \ "class").extract[String]
val actors = Actor.registry.actorsFor(jn)
val args = Perform(x)
actors foreach { _ ! args }
self.supervisor foreach { _ ! WorkingOn(self, queue, json)}
}
if (polled.isEmpty) schedulePoll
}
case f: JobFailure => {
self.supervisor foreach { _ ! f.copy(worker = self)}
self ! Poll
}
case s: Success => {
self.supervisor foreach { _ ! s.copy(worker = self)}
self ! Poll
}
}
override def postStop = {
if(currentScheduler != null && !(currentScheduler.isCancelled || currentScheduler.isDone)) currentScheduler.cancel(false)
}
}
lazy val hostName = InetAddress.getLocalHost.getHostName
private val queue = "queue".intern
private val stat = "stat".intern
private val worker = "worker".intern
private val keySeparator = ":".intern
private val queueSeparator = "$".intern
private val workers = "workers".intern
private val payload = "payload".intern
private[resque] trait ResqueWorkerActor extends Actor with Supervised {
protected val queue: Symbol
self.id = getClass.getSimpleName
final protected def receive = {
case Perform(data) => {
log info ("Worker [%s] received:\n%s", self.id, data)
try {
val json = JsonParser.parse(data)
perform((json \ "args").children.head.values.asInstanceOf[List[Any]]:_ *)
notifyOther(Success(self))
} catch {
case e => {
notifyOther(JobFailure(data, queue.name, self, e))
log.error(e, "There was an error in job: %s", self.id)
}
}
}
}
private def notifyOther[T](msg: T) {
self.sender foreach { _ ! msg }
}
protected def perform(data: Any*): Unit
}
}
object ResqueNamespace extends RedisNamespace("resque")
case class ResqueConfig(namespace: String)
object ResqueConfig { def apply(): ResqueConfig = apply("resque") }
abstract class ResqueWorker(protected val queue: Symbol) extends ResqueWorkerActor {
Resque.queueJobMap.put(Symbol(self.id), queue)
}
case class RedisConfig(host: String = "localhost", port: Int = 6379)