Taming Awful Legacy Data Feeds Using Akka (FSM)
// From https://engineering.footballradar.com/taming-awful-legacy-data-feeds-using-akka/
sealed trait AcmeMessage
sealed trait Incoming extends AcmeMessage
sealed trait Outgoing extends AcmeMessage
case class Login(username: String, password: String) extends Incoming
case object LoginSuccess extends Outgoing
case object LoginFailure extends Outgoing
case class Logout(username: String) extends Incoming
object Serialiser {
def toAcmeMessage(s: String): Option[Outgoing] = ???
def toString(input: Incoming): String = ???
}
trait Connection {
def read: Option[String]
def write(data: String): Unit
}
object ConnectionActor {
case class Input(data: String)
case class Output(data: String)
sealed trait State
object State {
case object LoggedOut extends State
case object LoginPending extends State
case object LoggedIn extends State
}
sealed trait Data
object Data {
case object Empty extends Data
case class Username(value: String) extends Data
}
sealed trait Error
case object NotLoggedIn extends Error
case object AlreadyLoggedIn extends Error
case object LoginFailed extends Error
case object LoginSucceeded
}
// Without FSM (but without Login logic)
//class ConnectionActor(connection: Connection, receiver: ActorRef) extends Actor {
//
// private case object ReadNow
//
// self ! ReadNow
//
// override def receive {
// case ReadNow =>
// connection.read match {
// case None =>
// context.system.scheduler.scheduleOnce(5 millis, self, ReadNow)
// case Some(string) =>
// Serialiser.toAcmeMessage(string) match {
// case Some(outgoingMessage) =>
// receiver ! outgoingMessage
// case None =>
// println(s"Oh dear. I couldn't deserialise '$string'")
// }
// self ! ReadNow
// }
// case incoming: Incoming =>
// connection.write(Serialiser.toString(incoming))
// case x: Any =>
// println(s"Oh dear. Received unexpected message '$x'")
// }
//}
class ConnectionActor(connection: Connection, receiver: ActorRef)
extends FSM[ConnectionActor.State, ConnectionActor.Data] {
import ConnectionActor._
import ConnectionActor.State._
private case object ReadNow
startWith(LoggedOut, Data.Empty)
////////////////////////////////////////////////////////////////////
whenUnhandled {
case Event(ReadNow, _) =>
connection.read match {
case None =>
context.system.scheduler.scheduleOnce(5 millis, self, ReadNow)
case Some(string) =>
Serialiser.toAcmeMessage(string) match {
case Some(outgoingMessage) =>
self ! outgoingMessage
case None =>
println(s"Oh dear. I couldn't deserialise '$string'")
}
self ! ReadNow
}
stay
case Event(x: Any, _) =>
println(s"Oh dear. Received unexpected message '$x' while in state '$stateName'")
stay
}
////////////////////////////////////////////////////////////////////
when(LoggedIn) {
case Event(logout: Logout, _) =>
connection.write(Serialiser.toString(logout))
goto(LoggedOut) using Data.Empty
case Event(Login(_, _), _) =>
sender ! AlreadyLoggedIn
stay
case Event(incoming: Incoming, _) =>
connection.write(Serialiser.toString(incoming))
stay
case Event(outgoing: Outgoing, _) =>
receiver ! outgoing
stay
}
////////////////////////////////////////////////////////////////////
when(LoggedOut) {
case Event(login: Login, _) =>
connection.write(Serialiser.toString(login))
goto(LoginPending) using Data.Username(login.username) forMax(2 minutes)
case Event(incoming: Incoming, _) =>
sender ! NotLoggedIn
stay
case Event(outgoing: Outgoing, _) =>
receiver ! outgoing
stay
}
////////////////////////////////////////////////////////////////////
when(LoginPending) {
case Event(LoginSuccess, _) =>
receiver ! LoginSucceeded
goto(LoggedIn)
case Event(LoginFailure, _) =>
receiver ! LoginFailed
goto(LoggedOut) using Data.Empty
case Event(incoming: Incoming, _) =>
sender ! NotLoggedIn
stay
case Event(outgoing: Outgoing, _) =>
receiver ! outgoing
stay
case Event(StateTimeout, _) =>
receiver ! LoginFailed
goto(LoggedOut) using Data.Empty
}
////////////////////////////////////////////////////////////////////
override def postStop() = {
(stateName, stateData) match {
case (LoggedIn, Data.Username(username)) =>
connection.write(Serialiser.toString(Logout(username)))
case (_, _) =>
}
}
}