Wogan
12/19/2017 - 5:28 PM

Retry helper methods & DSL for Monix Tasks.

Retry helper methods & DSL for Monix Tasks.


import monix.eval.Task
import monix.execution.Scheduler

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}

package object retry {

  /**
    * Given the attempt number, determines how long to delay before retrying.
    */
  type Backoff = Int => FiniteDuration

  /**
    * Given the attempt number and the time that will have elapsed since starting
    * when the next attempt will execute, return whether or not to terminate retrying.
    */
  type Termination = (Int, FiniteDuration) => Boolean

  implicit class TaskRetrySyntax[A](task: Task[A]) {

    @inline
    def reject(pf: PartialFunction[A, Throwable]): Task[A] =
      task flatMap { a =>
        pf.lift(a).fold(Task.now(a))(Task.raiseError)
      }

    @inline
    def retryWith(policy: RetryPolicy): Task[A] =
      Task.deferAction(s => execute(task, policy, s.currentTimeMillis))

    @inline
    def retrying(implicit policy: RetryPolicy): Task[A] =
      retryWith(policy)

    @inline
    def retryWhen(error: Throwable => Boolean)(implicit policy: RetryPolicy): Task[A] =
      Task.deferAction(s => execute(task, policy, s.currentTimeMillis, error))

    @inline
    def retryOnly(error: PartialFunction[Throwable, Unit])(implicit policy: RetryPolicy): Task[A] =
      retryWhen(error.isDefinedAt)

    @inline
    def retryUnless(error: PartialFunction[Throwable, Unit])(implicit policy: RetryPolicy): Task[A] =
      retryWhen((error.isDefinedAt _).andThen(!_))
  }

  private def execute[A](task: Task[A],
                         policy: RetryPolicy,
                         started: Long,
                         shouldRetry: Throwable => Boolean = _ => true): Task[A] = {
    def forAttempt(initialDelay: FiniteDuration, attempt: Int): Task[A] =
      task delayExecution initialDelay onErrorHandleWith { error =>
        Task deferAction { s =>
          val timeSoFar = (s.currentTimeMillis - started).millis
          val delay = policy.backoff(attempt)
          val terminate = policy.terminate(attempt, delay + timeSoFar)
          if (!terminate && shouldRetry(error)) {
            forAttempt(delay, attempt + 1)
          } else {
            Task raiseError error
          }
        }
      }

    forAttempt(Duration.Zero, 1)
  }

}
package retry

import scala.concurrent.duration.{Duration, DurationConversions, FiniteDuration, TimeUnit}
import scala.language.implicitConversions

/**
  * Domain specific language for constructing a RetryPolicy.
  * {{{
  * retryFor(3.attempts or 2.seconds) using exponentialBackoff(10.millis).randomized(5.millis)
  * }}}*/
object dsl {

  def retryFor(termination: Termination): RetryPolicy =
    RetryPolicy(terminate = termination)

  def exponentialBackoff(duration: FiniteDuration): Backoff =
    Backoff exponential duration

  def linearBackoff(duration: FiniteDuration): Backoff =
    Backoff linear duration

  def constantBackoff(duration: FiniteDuration): Backoff =
    Backoff constant duration

  def noBackoff: Backoff =
    Backoff.none

  implicit final class IntSyntax(private val int: Int) extends AnyVal {
    def attempts: Termination =
      Termination limitAttempts int
  }

  implicit final class DurationInt(private val n: Int) extends AnyVal with DurationConversions {
    override protected def durationIn(unit: TimeUnit): FiniteDuration = Duration(n.toLong, unit)
  }

  implicit final class RetryPolicySyntax(private val rp: RetryPolicy) extends AnyVal {
    def using(backoff: Backoff): RetryPolicy =
      rp.copy(backoff = backoff)
  }

  implicit final class TerminationSyntax(private val t: Termination) extends AnyVal {
    def and(other: Termination): Termination =
      Termination.all(t, other)

    def or(other: Termination): Termination =
      Termination.any(t, other)
  }

  implicit final class BackoffSyntax(private val b: Backoff) extends AnyVal {
    def randomized(jitter: FiniteDuration): Backoff =
      Backoff.randomize(b, -jitter / 2, jitter)

    def randomized(offset: FiniteDuration, scale: FiniteDuration): Backoff =
      Backoff.randomize(b, offset, scale)
  }

  implicit def durationToTermination(finiteDuration: FiniteDuration): Termination =
    Termination limitDuration finiteDuration

}
package retry

import scala.concurrent.duration.FiniteDuration

object Termination {

  val always: Termination = (_, _) => true

  val never: Termination = (_, _) => false

  def limitAttempts(maxAttempts: Int): Termination =
    (attempts, _) => attempts >= maxAttempts

  def limitDuration(maxDuration: FiniteDuration): Termination =
    (_, duration) => duration >= maxDuration

  def any(first: Termination, rest: Termination*): Termination = {
    val all = first +: rest
    (attempt, duration) => all.exists(_.apply(attempt, duration))
  }

  def all(first: Termination, rest: Termination*): Termination = {
    val all = first +: rest
    (attempt, duration) => all.forall(_.apply(attempt, duration))
  }

}
package retry

case class RetryPolicy(terminate: Termination = Termination.always,
                       backoff: Backoff = Backoff.none)
package retry

import scala.concurrent.duration.{Duration, FiniteDuration}
import scala.util.Random

import scala.concurrent.duration._

object Backoff {

  val none: Backoff = _ => Duration.Zero

  def constant(value: FiniteDuration): Backoff =
    _ => value

  def exponential(initial: FiniteDuration): Backoff =
    attempt => initial * (1L << attempt - 1)

  def linear(initial: FiniteDuration): Backoff =
    initial * _

  def randomize(policy: Backoff, offset: FiniteDuration, scale: FiniteDuration): Backoff =
    policy.andThen(_ + offset + (scale.toNanos * Random.nextDouble).nanos)

}