mickdelaney
9/23/2014 - 5:09 AM

OrderExampleReliable.scala

common {
  akka {
    actor {
      provider = "akka.remote.RemoteActorRefProvider"
    }
    remote {
      enabled-transports = ["akka.remote.netty.tcp"]
      netty.tcp {
        hostname = "127.0.0.1"
      }
    }
    loglevel = INFO
  }
}

processor {
  akka {
    remote {
      netty.tcp {
        port = 2553
      }
    }
  }
}

validator {
  akka {
    remote {
      netty.tcp {
        port = 2552
      }
    }
  }
}
name := "external-service-integration-akka-persistence"

version := "1.0"

scalaVersion := "2.11.1"

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-persistence-experimental" % "2.3.6",
  "com.typesafe.akka" %% "akka-remote" % "2.3.6"
)
package org.iainhull.akka


import scala.concurrent.duration._

import akka.actor._
import akka.event.Logging
import akka.pattern.ask
import akka.util.Timeout
import akka.persistence.{PersistentView, AtLeastOnceDelivery, PersistentActor}
import akka.persistence.AtLeastOnceDelivery.{UnconfirmedDelivery, UnconfirmedWarning}

import com.typesafe.config.ConfigFactory

// ------------------------------------
// Domain object
// ------------------------------------

case class Order(id: Int = -1, details: String,
                 creditCardNumber: String,
                 creditCardValidation: Validation = Validation.Pending)

sealed trait Validation

object Validation {

  case object Pending extends Validation

  case object Success extends Validation

  case object Failure extends Validation

}

// ------------------------------------
// Domain events
// ------------------------------------

sealed trait Command

case class OrderSubmitted(order: Order) extends Command

sealed trait Event

case class OrderStored(order: Order) extends Event

case class OrderAccepted(order: Order, deliveryId: Long) extends Event

case class OrderRejected(order: Order, deliveryId: Long) extends Event

case class CreditCardValidationRequested(deliveryId: Long, orderId: Int, creditCardNumber: String)

case class CreditCardValidated(deliveryId: Long, orderId: Int)

case class CreditCardValidationFailed(deliveryId: Long, orderId: Int)

// ------------------------------------
// Application commands/events
// ------------------------------------

case class SetCreditCardValidator(validator: ActorPath)

case class SetValidOrderDestination(destination: ActorRef)

case class SetInvalidOrderDestination(destination: ActorRef)

case class Recover(timeout: Timeout)

case object Recovered

// ------------------------------------
// Eventsourced order processor
// ------------------------------------

class OrderProcessor extends PersistentActor with AtLeastOnceDelivery {

  import Validation._

  var validator: ActorPath = context.system.deadLetters.path
  var orders = Map.empty[Int, Order] // state

  override def persistenceId = "order-processor"

  def updateState(event: Event): Unit = event match {
    case OrderStored(order) =>
      orders = orders + (order.id -> order)
      deliver(validator, deliveryId => CreditCardValidationRequested(deliveryId, order.id, order.creditCardNumber))
    case OrderAccepted(order, deliveryId) =>
      orders = orders + (order.id -> order)
      confirmDelivery(deliveryId)
    case OrderRejected(order, deliveryId) =>
      orders = orders + (order.id -> order)
      confirmDelivery(deliveryId)
  }

  val receiveCommand: Receive = {
    case OrderSubmitted(order) =>
      val id = orders.size
      persist(OrderStored(order.copy(id = id))) { event =>
        updateState(event)
        sender ! event
      }
    case CreditCardValidated(deliveryId, orderId) =>
      orders.get(orderId) foreach { order =>
        if (order.creditCardValidation == Pending) {
          persist(OrderAccepted(order.copy(creditCardValidation = Success), deliveryId))(updateState)
        }
      }
    case CreditCardValidationFailed(deliveryId, orderId) =>
      orders.get(orderId) foreach { order =>
        if (order.creditCardValidation == Pending) {
          persist(OrderRejected(order.copy(creditCardValidation = Failure), deliveryId))(updateState)
        }
      }
    case UnconfirmedWarning(unconfirmedDeliveries) =>
      for {
        UnconfirmedDelivery(deliveryId, _, CreditCardValidationRequested(_, orderId, _)) <- unconfirmedDeliveries
        order <- orders.get(orderId)
      } {
        persist(OrderRejected(order.copy(creditCardValidation = Failure), deliveryId))(updateState)
      }

    case SetCreditCardValidator(v) =>
      validator = v
  }

  val receiveRecover: Receive = {
    case event: Event => updateState(event)
  }
}


object OrderProcessor extends App {
  val config = ConfigFactory.load("order")
  val configCommon = config.getConfig("common")

  println(config.getConfig("processor").withFallback(configCommon))

  implicit val system = ActorSystem("example", config.getConfig("processor").withFallback(configCommon))
  implicit val timeout = Timeout(5 seconds)

  import system.dispatcher

  val log = Logging(system, this.getClass)

  val destination = system.actorOf(Props[OrderDestination], "destination")
  val processor = system.actorOf(Props[OrderProcessor], "processor")
  val validator = ActorPath.fromString("akka.tcp://example@127.0.0.1:2552/user/validator")

  processor ! SetCreditCardValidator(validator)

  val f1 = processor ? OrderSubmitted(Order(details = "jelly beans", creditCardNumber = "1234-5678-1234-5678"))
  val f2 = processor ? OrderSubmitted(Order(details = "jelly beans", creditCardNumber = "1234-5678-1234-0000"))

  for (r1 <- f1; r2 <- f2) {
    log.info("Reply 1: {}", r1)
    log.info("Reply 2: {}", r2)
  }
}

// ------------------------------------
//  Local receiver of orders after
//  credit card validation
// ------------------------------------

class OrderDestination extends PersistentView with ActorLogging {

  override def persistenceId = "order-processor"
  override def viewId = "order-processor-view"

  def receive = {
    case OrderAccepted(order, _) => log.info("Received accepted order: {}", order)
    case OrderRejected(order, _) => log.info("Received rejected order: {}", order)
  }
}

// ------------------------------------
//  Remote credit card validator
// ------------------------------------

class CreditCardValidator extends Actor {
  def receive = {
    case CreditCardValidationRequested(deliveryId, orderId, creditCardNumber) =>
      if (creditCardNumber.contains("0000")) {
        sender ! CreditCardValidationFailed(deliveryId, orderId)
      } else {
        sender ! CreditCardValidated(deliveryId, orderId)
      }
  }
}

object CreditCardValidator extends App {
  val config = ConfigFactory.load("order")
  val configCommon = config.getConfig("common")
  val system = ActorSystem("example", config.getConfig("validator").withFallback(configCommon))
  system.actorOf(Props[CreditCardValidator], "validator")
}