casualjim
3/17/2011 - 5:40 PM

A resque implementation in scala with akka actors and scala-redis

A resque implementation in scala with akka actors and scala-redis

package com.mojolly.backchat
package redis
package resque

import com.mojolly.backchat.redis.resque.Resque.{ResqueWorkerActor}
import net.liftweb.json._
import JsonAST._
import JsonDSL._
import java.net.InetAddress
import com.redis.ds.{ RedisDeque, RedisDequeClient }
import com.redis.RedisClient
import java.util.{ Date, GregorianCalendar, Calendar }
import akka.dispatch.{ Dispatchers, HawtDispatcher }
import akka.actor._
import Actor._
import com.mojolly.backchat.redis.RedisNamespace
import actor.{ScheduledTask, Supervising, Supervised}
import java.util.concurrent.{TimeUnit, ScheduledFuture, ConcurrentHashMap}

object Resque {

  private object Meta {

    /*
    * For converting scala objects into DBObject values
    */
    object Reflection {

      /*
      * These don't require a conversion and can be put directly into a DBObject
      */
      val primitives = Set[Class[_]](classOf[String], classOf[Int], classOf[Long], classOf[Double],
                                    classOf[Float], classOf[Byte], classOf[BigInt], classOf[Boolean],
                                    classOf[Short], classOf[java.lang.Integer], classOf[java.lang.Long],
                                    classOf[java.lang.Double], classOf[java.lang.Float],
                                    classOf[java.lang.Byte], classOf[java.lang.Boolean],
                                    classOf[java.lang.Short])

      def primitive_?(clazz: Class[_]) = primitives contains clazz

      /*
      * This is used to convert DBObjects into JObjects
      */
      def primitive2jvalue(a: Any) = a match {
        case x: String => JString(x)
        case x: Int => JInt(x)
        case x: Long => JInt(x)
        case x: Double => JDouble(x)
        case x: Float => JDouble(x)
        case x: Byte => JInt(BigInt(x))
        case x: BigInt => JInt(x)
        case x: Boolean => JBool(x)
        case x: Short => JInt(BigInt(x))
        case x: java.lang.Integer => JInt(BigInt(x.asInstanceOf[Int]))
        case x: java.lang.Long => JInt(BigInt(x.asInstanceOf[Long]))
        case x: java.lang.Double => JDouble(x.asInstanceOf[Double])
        case x: java.lang.Float => JDouble(x.asInstanceOf[Float])
        case x: java.lang.Byte => JInt(BigInt(x.asInstanceOf[Byte]))
        case x: java.lang.Boolean => JBool(x.asInstanceOf[Boolean])
        case x: java.lang.Short => JInt(BigInt(x.asInstanceOf[Short]))
        case x if datetype_?(x.asInstanceOf[AnyRef].getClass) => datetype2jvalue(x)
        case _ => error("not a primitive " + a.asInstanceOf[AnyRef].getClass)
      }

      /*
      * Date types require formatting
      */
      val datetypes = Set[Class[_]](classOf[Calendar], classOf[Date], classOf[GregorianCalendar])

      def datetype_?(clazz: Class[_]) = datetypes contains clazz

      def datetype2jvalue(a: Any)(implicit formats: Formats) = a match {
        case x: Calendar => dateAsJValue(x.getTime)
        case x: Date => dateAsJValue(x)
      }

      def dateAsJValue(d: Date)(implicit formats: Formats) =
        JObject(JField("$dt", JString(formats.dateFormat.format(d))) :: Nil)
    }
  }

  implicit def namespaceToString(ns: RedisNamespace): String = ns.toString
  import messages._

  private var _resque: ActorRef = null
  private var _resqueSupervisor: ActorRef = null

  private def resque: ActorRef = {
    if (_resque.isNull) {
      _resque = addToSupervisor(actorOf(new Resque(RedisConfig())))
    }
    _resque
  }

  private def addToSupervisor(actor: ActorRef) = {
    if (_resqueSupervisor.isNull) {
      _resqueSupervisor = actorOf(new Actor with Supervising { }).start
      Runtime.getRuntime.addShutdownHook(new Thread {
        override def run = {
          Actor.registry.actorsFor(classOf[ResqueListener]) foreach { worker =>
              Option(_resque) foreach { _ ! StopWorker(worker) }
          }
        }
      })
    }
    actor.start
    _resqueSupervisor ! Link(actor)
    actor
  }

  def resque_=(res: ActorRef) = _resque = res

  def register[Worker <: ResqueWorker](worker: => Worker)(implicit mf: Manifest[Worker]){
    val (job, q) = getForQueue(worker)
    resque ! Reserve(q.name, job)
  }

  private def getForQueue[Worker <: ResqueWorker](worker: => Worker)(implicit mf: Manifest[Worker]) = {
    val a = actorOf(worker)
    val q = queueJobMap.get(Symbol(mf.erasure.getSimpleName))
    (a, q)
  }

  def enqueue[Worker <: ResqueWorker](worker: => Worker, args: Any*)(implicit mf: Manifest[Worker]) = {
    val (job, q) = getForQueue(worker)
    resque ! Enqueue(q.name, job.id, args.toList)
  }

  object Naming {

    object QueueName {
      def apply(name: String)(implicit resqueConfig: ResqueConfig) = {
        ( resqueConfig.namespace :: queue :: name :: Nil) mkString keySeparator
      }
    }

    object StatKey {

      def apply(statistic: String)(implicit resqueConfig: ResqueConfig) = {
        (resqueConfig.namespace :: stat :: statistic :: Nil) mkString keySeparator
      }

      def apply(statistic: String, workerId: String)(implicit resqueConfig: ResqueConfig) = {
        (resqueConfig.namespace :: stat :: statistic :: workerId :: Nil) mkString keySeparator
      }
    }

    object WorkerKey {
      def apply(workerId: String)(implicit resqueConfig: ResqueConfig) = {
        (resqueConfig.namespace :: worker :: workerId :: Nil) mkString keySeparator
      }
    }

    object WorkerId extends {
      def apply(pid: String, queueName: String) = {
        (hostName :: pid :: queueName :: Nil) mkString keySeparator
      }
    }

    object WorkerSet {
      def apply()(implicit resqueConfig: ResqueConfig) = (resqueConfig.namespace :: workers :: Nil) mkString keySeparator
    }


  }

  trait DefaultResqueConfig {
    implicit val resqueConfig = ResqueConfig()
  }

  private object messages {

    sealed trait ResqueMessage {
      def toJson = Serialization.write(this)
    }

    case object Poll extends ResqueMessage
    case class Perform(data: String) extends ResqueMessage
    case class Reserve(queue: String, worker: ActorRef) extends ResqueMessage
    case class StopWorker(worker: ActorRef) extends ResqueMessage
    case object StartResque extends ResqueMessage
    case class StartedWorker(worker: ActorRef) extends ResqueMessage
    case object Stop extends ResqueMessage
    case class Enqueue(queue: String, klass: String, args: List[Any]) extends ResqueMessage

    sealed trait JobResult

    case class JobFailure(
                 payload: String, queue: String, worker: ActorRef,
                 error: String, backtrace: List[String], failed_at: DateTime = DateTime.now)  extends JobResult {
      def toJson = Serialization.write(this)
    }
    object JobFailure {
      def apply(payload: String, queue: String, worker: ActorRef, exception: Throwable): JobFailure =
        apply(payload, queue, worker, exception.getMessage, exception.getStackTrace.map(_.toString).toList)
    }
    object JobSuccess extends JobResult
    case class Success(worker: ActorRef) extends JobResult

    sealed trait JobProcessingMessage {
      def toJson = Serialization.write(this)
    }
    case class WorkingOn(worker: ActorRef, queue: String, payload: JValue, run_at: DateTime = DateTime.now) extends JobProcessingMessage
  }

  private val hawt = new HawtDispatcher

  class Resque(config: RedisConfig) extends Actor with DefaultResqueConfig with Supervising {

    self.dispatcher = hawt

    import Resque.Naming._
    protected val resqueClient = new RedisDequeClient(config.host, config.port)

    private lazy val redisActor = {
      val a = actorOf(new Actor {

        self.dispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher

        protected val redis = new RedisClient(config.host, config.port)

        protected def receive = {
          case m: JobFailure => {
            val msg: JValue = ("failed_at" -> m.failed_at.toString(ISO8601_DATE)) ~
              ("payload" -> m.payload) ~
              ("error" -> m.error) ~
              ("backtrace" -> m.backtrace) ~
              ("worker" -> m.worker.id) ~
              ("queue" -> m.queue)
            redis.rpush("resque:failed", Printer.compact(render(msg)))
          }
          case Success(worker) => {
            redis.incr(StatKey("processed", worker.id))
            redis.incr(StatKey("processed"))
            redis.del(WorkerKey(worker.id))
          }
          case m @ WorkingOn(worker, queue, payload, runAt) => {
              val msg: JValue = ("queue" -> QueueName(queue)) ~ ("payload" -> payload) ~ ("run_at" -> runAt.toString(ISO8601_DATE))
              redis.set(WorkerKey(worker.id), Printer.compact(render(msg)))
          }
          case StopWorker(worker) => {
            redis.srem(WorkerSet(), worker.id)
            redis.del(WorkerKey(worker.id) + ":started")
          }
          case StartedWorker(worker) => {
            redis.sadd(WorkerSet(), worker.id)
            redis.set(WorkerKey(worker.id + ":started"), DateTime.now.toString(ISO8601_DATE))
          }
          case Enqueue(queueName, job, data) => {
            val json: JValue = ("class" -> job) ~ ("args" -> data.map(Meta.Reflection.primitive2jvalue _))
            log debug ("Enqueuing [%s] on [%s] with key [%s]:\n%s", job, queueName, QueueName(queueName), json.toPrettyJson)
            redis.sadd(ResqueNamespace("queues").toString, queueName)
            redis.rpush(QueueName(queueName), Printer.compact(render(json)) )

          }

        }
      })
      self startLink a
      a
    }

    override protected def receive = {
      case Reserve(queueName, worker) => {
        val queueListener = actorOf(new ResqueListener(queueName, resqueClient.getDeque(QueueName(queueName))))
        self startLink queueListener
        self.startLink(worker)
        queueListener ! Poll
        redisActor ! StartedWorker(queueListener)
      }
      case m: JobFailure => {
        redisActor ! m
      }
      case m: Success => {
        redisActor ! m
      }
      case m: WorkingOn => {
        redisActor ! m
      }
      case m @ StopWorker(worker) => {
        redisActor ! m
        self ! UnlinkAndStop(worker)
      }
      case m: StartedWorker => {
        redisActor ! m
      }
      case m: Enqueue => redisActor ! m
      case Stop => {
        self.shutdownLinkedActors
        self.supervisor foreach { _ ! UnlinkAndStop(self) }
      }
    }
  }

  val queueJobMap = new ConcurrentHashMap[Symbol, Symbol]

  private class ResqueListener(queue: String, redisDeque: RedisDeque[String]) extends Actor with Supervised {

    self.dispatcher = hawt

    protected val timeout = akka.util.duration.pairIntToDuration(250 -> TimeUnit.MILLISECONDS)

    import Naming.{ WorkerId}
    self.id = WorkerId(self.uuid.toString, queue)

    protected def schedulePoll = {
      currentScheduler = Scheduler.scheduleOnce(() => self ! Poll, timeout.length, timeout.unit)
    }

    protected var currentScheduler: ScheduledFuture[AnyRef] = null

    protected def receive = {
      case Poll => {
        val polled = redisDeque.pollFirst
        polled foreach { x =>
          val json = JsonParser.parse(x)
          val jn = (json \ "class").extract[String]
          val actors = Actor.registry.actorsFor(jn)
          val args = Perform(x)
          actors foreach { _ ! args }
          self.supervisor foreach { _ ! WorkingOn(self, queue, json)}
        }
        if (polled.isEmpty) schedulePoll
      }
      case f: JobFailure => {
        self.supervisor foreach { _ ! f.copy(worker = self)}
        self ! Poll
      }
      case s: Success => {
        self.supervisor foreach { _ ! s.copy(worker = self)}
        self ! Poll
      }
    }


    override def postStop = {
      if(currentScheduler != null && !(currentScheduler.isCancelled || currentScheduler.isDone)) currentScheduler.cancel(false)
    }
  }


  lazy val hostName = InetAddress.getLocalHost.getHostName
  private val queue = "queue".intern
  private val stat = "stat".intern
  private val worker = "worker".intern
  private val keySeparator = ":".intern
  private val queueSeparator = "$".intern
  private val workers = "workers".intern
  private val payload = "payload".intern

  private[resque] trait ResqueWorkerActor extends Actor with Supervised {

    protected val queue: Symbol
    self.id = getClass.getSimpleName

    final protected def receive = {
      case Perform(data) => {
        log info ("Worker [%s] received:\n%s", self.id, data)
        try {
          val json = JsonParser.parse(data)
          perform((json \ "args").children.head.values.asInstanceOf[List[Any]]:_ *)
          notifyOther(Success(self))
        } catch {
          case e => {
            notifyOther(JobFailure(data, queue.name, self, e))
            log.error(e, "There was an error in job: %s", self.id)
          }
        }
      }
    }

    private def notifyOther[T](msg: T) {
      self.sender foreach { _ ! msg }
    }

    protected def perform(data: Any*): Unit

  }
}

object ResqueNamespace extends RedisNamespace("resque")


case class ResqueConfig(namespace: String)
object ResqueConfig { def apply(): ResqueConfig = apply("resque") }

abstract class ResqueWorker(protected val queue: Symbol) extends ResqueWorkerActor {
  Resque.queueJobMap.put(Symbol(self.id), queue)
}
case class RedisConfig(host: String = "localhost", port: Int = 6379)