chtefi
12/4/2016 - 9:13 AM

Taming Awful Legacy Data Feeds Using Akka (FSM)

Taming Awful Legacy Data Feeds Using Akka (FSM)

// From https://engineering.footballradar.com/taming-awful-legacy-data-feeds-using-akka/

sealed trait AcmeMessage
sealed trait Incoming extends AcmeMessage
sealed trait Outgoing extends AcmeMessage

case class Login(username: String, password: String) extends Incoming
case object LoginSuccess extends Outgoing
case object LoginFailure extends Outgoing
case class Logout(username: String) extends Incoming

object Serialiser {
  def toAcmeMessage(s: String): Option[Outgoing] = ???
  def toString(input: Incoming): String = ???
}

trait Connection {
  def read: Option[String]
  def write(data: String): Unit
}


object ConnectionActor {
  case class Input(data: String)
  case class Output(data: String)

  sealed trait State
  object State {
    case object LoggedOut extends State
    case object LoginPending extends State
    case object LoggedIn extends State
  }

  sealed trait Data
  object Data {
    case object Empty extends Data
    case class Username(value: String) extends Data
  }

  sealed trait Error
  case object NotLoggedIn extends Error
  case object AlreadyLoggedIn extends Error
  case object LoginFailed extends Error
  case object LoginSucceeded
}





// Without FSM (but without Login logic)

//class ConnectionActor(connection: Connection, receiver: ActorRef) extends Actor {
//
//  private case object ReadNow
//
//  self ! ReadNow
//
//  override def receive {
//    case ReadNow =>
//      connection.read match {
//        case None =>
//          context.system.scheduler.scheduleOnce(5 millis, self, ReadNow)
//        case Some(string) =>
//          Serialiser.toAcmeMessage(string) match {
//            case Some(outgoingMessage) =>
//              receiver ! outgoingMessage
//            case None =>
//              println(s"Oh dear. I couldn't deserialise '$string'")
//          }
//          self ! ReadNow
//      }
//    case incoming: Incoming =>
//      connection.write(Serialiser.toString(incoming))
//    case x: Any =>
//      println(s"Oh dear. Received unexpected message '$x'")
//  }
//}

class ConnectionActor(connection: Connection, receiver: ActorRef)
  extends FSM[ConnectionActor.State, ConnectionActor.Data] {

  import ConnectionActor._
  import ConnectionActor.State._
  private case object ReadNow

  startWith(LoggedOut, Data.Empty)

  ////////////////////////////////////////////////////////////////////    
    
  whenUnhandled {
    case Event(ReadNow, _) =>
      connection.read match {
        case None =>
          context.system.scheduler.scheduleOnce(5 millis, self, ReadNow)
        case Some(string) =>
          Serialiser.toAcmeMessage(string) match {
            case Some(outgoingMessage) =>
              self ! outgoingMessage
            case None =>
              println(s"Oh dear. I couldn't deserialise '$string'")
          }
          self ! ReadNow
      }
      stay
    case Event(x: Any, _) =>
      println(s"Oh dear. Received unexpected message '$x' while in state '$stateName'")
      stay
  }

  ////////////////////////////////////////////////////////////////////
    
  when(LoggedIn) {
    case Event(logout: Logout, _) =>
      connection.write(Serialiser.toString(logout))
      goto(LoggedOut) using Data.Empty
    case Event(Login(_, _), _) =>
      sender ! AlreadyLoggedIn
      stay
    case Event(incoming: Incoming, _) =>
      connection.write(Serialiser.toString(incoming))
      stay
    case Event(outgoing: Outgoing, _) =>
      receiver ! outgoing
      stay
  }

  ////////////////////////////////////////////////////////////////////
    
  when(LoggedOut) {
    case Event(login: Login, _) =>
      connection.write(Serialiser.toString(login))
      goto(LoginPending) using Data.Username(login.username) forMax(2 minutes)
    case Event(incoming: Incoming, _) =>
      sender ! NotLoggedIn
      stay
    case Event(outgoing: Outgoing, _) =>
      receiver ! outgoing
      stay
  }
    
  ////////////////////////////////////////////////////////////////////

  when(LoginPending) {
    case Event(LoginSuccess, _) =>
      receiver ! LoginSucceeded
      goto(LoggedIn)
    case Event(LoginFailure, _) =>
      receiver ! LoginFailed
      goto(LoggedOut) using Data.Empty
    case Event(incoming: Incoming, _) =>
      sender ! NotLoggedIn
      stay
    case Event(outgoing: Outgoing, _) =>
      receiver ! outgoing
      stay
    case Event(StateTimeout, _) =>
      receiver ! LoginFailed
      goto(LoggedOut) using Data.Empty
  }

  ////////////////////////////////////////////////////////////////////
    
  override def postStop() = {
    (stateName, stateData) match {
      case (LoggedIn, Data.Username(username)) =>
        connection.write(Serialiser.toString(Logout(username)))
      case (_, _) =>
    }
  }
}