iniyanp
3/4/2017 - 2:08 AM

using sleep retry.

using sleep retry.

package exercise

import java.util.concurrent._

import scala.concurrent.duration._
import journal.Logger

import scala.concurrent.duration.Duration
import scalaz.{-\/, \/-}
import scalaz.concurrent.{Strategy, Task}
import scalaz.concurrent.Task._
import scalaz.stream.Process
import scalaz.stream.Process._

/**
  * Created by paramin on 3/3/17.
  */
object exercise34 {

  private val logger = journal.Logger[exercise34.type]


  def sleep(time: FiniteDuration)(
    implicit E: ScheduledExecutorService,
    S: Strategy): Process[Task, Nothing] = {
    val t: Task[Unit] = async { cb =>
      val callable = new Callable[Unit] {
        def call(): Unit = {
          S { cb(\/-(())) }
          ()
        }
      }

      E.schedule(callable, time.toMillis, TimeUnit.MILLISECONDS)

      ()
    }

    Process eval_ t
  }


  def retry[A](
                p: Process[Task, A],
                seed: FiniteDuration,
                iterations: Int,
                bound: Option[FiniteDuration] = None,
                log: String => Task[Unit] = Function.const(now(())))(
                implicit S: ScheduledExecutorService): Process[Task, A] = {

    import Process._

    def loop(n: Int): Process[Task, A] = {
      if (n >= iterations) {
        fail(new TimeoutException("retry failed (max iterations exceeded)"))
      } else {
        p.attempt() flatMap {
          case -\/(t) =>
            for {
              r <- eval(delay { math.random })
              weighted = r * (seed.toMillis * (n + 1))
              d = weighted.toLong.millis
              bounded = bound map { _ min d } getOrElse d
              _ <- sleep(bounded) append emit(())
              _ = log(s"Retrying ($n)").run
              back <- loop(n + 1)
            } yield back

          case \/-(a) => {
            emit(a) append sleep(seed) append retry(p,seed,iterations,bound) //retry again :)
          }
        }
      }
    }

    loop(0)
  }

  def repeatEvery(task: Task[Unit], d: Duration)(implicit ec: ScheduledExecutorService): Task[Unit] = {
    println("In repeatEvery")
    val p = task.attemptRunFor(10.seconds)
    val q = p.swap.toOption.foreach(ex => logger.error("Error in executing delete task",ex))
    val r  = Task.schedule(q, d)(ec).flatMap(_ => {
      println("In flatMap....")
      repeatEvery(task,d)(ec)
    })
//    r

//    val r  = Task.schedule(q, d)(ec).flatMap(x => Task.delay{
//      println("asdasds")
//    })

    r

  }


  def main(args: Array[String]): Unit = {
    implicit val schex: ScheduledExecutorService = Executors.newScheduledThreadPool(2, scalaz.concurrent.Strategy.DefaultDaemonThreadFactory)

//    val p = repeatEvery(Task.delay(println("Printing me")),10.seconds)(schex)
//    val q = p.runAsync(_.swap.toOption.foreach(err => logger.error(err.toString)))


    val p = retry(Process.eval(Task.delay(println("Iniyan"))),10.seconds, 10)(schex)
    p.run.run

//    println("Iniyan")
  }

}

//Another nice retry. Instead of calling sleep to do scheduling once in a minute. We can use Task.schedule 
//and repeatedly calling Task.schedule will help us achieve our goal.


 def repeatEvery(task: Task[Unit], d: Duration)(implicit ec: ScheduledExecutorService): Task[Unit] = {
    val p = task.attemptRunFor(10.seconds)
    val q = p.swap.toOption.foreach(ex => logger.error("Error in executing delete task",ex))
    val r  = Task.schedule(q, d)(ec).flatMap(_ => {
      repeatEvery(task,d)(ec)
    })
    r

  }


  def main(args: Array[String]): Unit = {
    implicit val schex: ScheduledExecutorService = Executors.newScheduledThreadPool(2, scalaz.concurrent.Strategy.DefaultDaemonThreadFactory)

    val p = repeatEvery(Task.delay(println("Printing me")),10.seconds)(schex)
    p.run
  }
  
    
  //import scalaz.stream.{Process, time}
  //Retry using awakeEvery.
  
   def myTask:Task[Unit] = {
    Task delay println("Iniyan")
  }
  
  val p1 = time.awakeEvery(1.seconds)
  val p2 = p1.evalMap(_ => myTask)
  p2.run.run
  
  //More poilished version of the above: //Attach error handler to task.
  
  val task = Task.delay{println("Print me!!!!")}
    
    //To Tackle the error.
    val p = Process.eval(task.attempt).flatMap(x => x.fold(
      {
        x => println("Error")
          Process.empty
      },
      a => Process.emit(a)
    ))

    implicit val scheduler = scalaz.stream.DefaultScheduler
    val p1 = time.awakeEvery(FiniteDuration(10, TimeUnit.MINUTES))

    val p2 = p ++ p1.flatMap(_ => p) //Append p, so that it will be emitted immedeiately. 
    // No need to wait for 10 minutes to get the first element.
    

    p2.run.run