using sleep retry.
package exercise
import java.util.concurrent._
import scala.concurrent.duration._
import journal.Logger
import scala.concurrent.duration.Duration
import scalaz.{-\/, \/-}
import scalaz.concurrent.{Strategy, Task}
import scalaz.concurrent.Task._
import scalaz.stream.Process
import scalaz.stream.Process._
/**
* Created by paramin on 3/3/17.
*/
object exercise34 {
private val logger = journal.Logger[exercise34.type]
def sleep(time: FiniteDuration)(
implicit E: ScheduledExecutorService,
S: Strategy): Process[Task, Nothing] = {
val t: Task[Unit] = async { cb =>
val callable = new Callable[Unit] {
def call(): Unit = {
S { cb(\/-(())) }
()
}
}
E.schedule(callable, time.toMillis, TimeUnit.MILLISECONDS)
()
}
Process eval_ t
}
def retry[A](
p: Process[Task, A],
seed: FiniteDuration,
iterations: Int,
bound: Option[FiniteDuration] = None,
log: String => Task[Unit] = Function.const(now(())))(
implicit S: ScheduledExecutorService): Process[Task, A] = {
import Process._
def loop(n: Int): Process[Task, A] = {
if (n >= iterations) {
fail(new TimeoutException("retry failed (max iterations exceeded)"))
} else {
p.attempt() flatMap {
case -\/(t) =>
for {
r <- eval(delay { math.random })
weighted = r * (seed.toMillis * (n + 1))
d = weighted.toLong.millis
bounded = bound map { _ min d } getOrElse d
_ <- sleep(bounded) append emit(())
_ = log(s"Retrying ($n)").run
back <- loop(n + 1)
} yield back
case \/-(a) => {
emit(a) append sleep(seed) append retry(p,seed,iterations,bound) //retry again :)
}
}
}
}
loop(0)
}
def repeatEvery(task: Task[Unit], d: Duration)(implicit ec: ScheduledExecutorService): Task[Unit] = {
println("In repeatEvery")
val p = task.attemptRunFor(10.seconds)
val q = p.swap.toOption.foreach(ex => logger.error("Error in executing delete task",ex))
val r = Task.schedule(q, d)(ec).flatMap(_ => {
println("In flatMap....")
repeatEvery(task,d)(ec)
})
// r
// val r = Task.schedule(q, d)(ec).flatMap(x => Task.delay{
// println("asdasds")
// })
r
}
def main(args: Array[String]): Unit = {
implicit val schex: ScheduledExecutorService = Executors.newScheduledThreadPool(2, scalaz.concurrent.Strategy.DefaultDaemonThreadFactory)
// val p = repeatEvery(Task.delay(println("Printing me")),10.seconds)(schex)
// val q = p.runAsync(_.swap.toOption.foreach(err => logger.error(err.toString)))
val p = retry(Process.eval(Task.delay(println("Iniyan"))),10.seconds, 10)(schex)
p.run.run
// println("Iniyan")
}
}
//Another nice retry. Instead of calling sleep to do scheduling once in a minute. We can use Task.schedule
//and repeatedly calling Task.schedule will help us achieve our goal.
def repeatEvery(task: Task[Unit], d: Duration)(implicit ec: ScheduledExecutorService): Task[Unit] = {
val p = task.attemptRunFor(10.seconds)
val q = p.swap.toOption.foreach(ex => logger.error("Error in executing delete task",ex))
val r = Task.schedule(q, d)(ec).flatMap(_ => {
repeatEvery(task,d)(ec)
})
r
}
def main(args: Array[String]): Unit = {
implicit val schex: ScheduledExecutorService = Executors.newScheduledThreadPool(2, scalaz.concurrent.Strategy.DefaultDaemonThreadFactory)
val p = repeatEvery(Task.delay(println("Printing me")),10.seconds)(schex)
p.run
}
//import scalaz.stream.{Process, time}
//Retry using awakeEvery.
def myTask:Task[Unit] = {
Task delay println("Iniyan")
}
val p1 = time.awakeEvery(1.seconds)
val p2 = p1.evalMap(_ => myTask)
p2.run.run
//More poilished version of the above: //Attach error handler to task.
val task = Task.delay{println("Print me!!!!")}
//To Tackle the error.
val p = Process.eval(task.attempt).flatMap(x => x.fold(
{
x => println("Error")
Process.empty
},
a => Process.emit(a)
))
implicit val scheduler = scalaz.stream.DefaultScheduler
val p1 = time.awakeEvery(FiniteDuration(10, TimeUnit.MINUTES))
val p2 = p ++ p1.flatMap(_ => p) //Append p, so that it will be emitted immedeiately.
// No need to wait for 10 minutes to get the first element.
p2.run.run