iniyanp
3/10/2017 - 5:36 PM

Redirecting elements to sink. //Use channel.lift

Redirecting elements to sink. //Use channel.lift

 import java.nio.channels.Channel
import java.util.concurrent.TimeUnit

import scala.concurrent.duration.{Duration, FiniteDuration}
import scalaz.concurrent.{Strategy, Task}
import scalaz.stream._
import scalaz.stream.channel

 implicit val S = Strategy.DefaultStrategy
  implicit val scheduler = scalaz.stream.DefaultScheduler

  def method(elem:Duration, p:Process[Task, Int]):Process[Task, (Duration,Int)] = {
    p.flatMap(x => Process.eval(Task.now((elem,x))))
  }


  /**
    * This below code awakes once in 5 seconds, and emits a tuple (Duration and random integer)
    *
    * We are lifting a simple function to sink. (Using channel lift.)
    *
    * and then redirecting the tuples into sink.
    *
    */

  def sample():Unit = {

    //Why awakeEvery is not waking up in everyDuration when I attach it to the Sink.
    val p5:Process[Task,Int] = Process.eval(Task.delay(scala.util.Random.nextInt()))
    val p6 = time.awakeEvery(FiniteDuration(5, TimeUnit.SECONDS)).flatMap(x => method(x,p5))
    val func:((Duration,Int)) => Task[Unit] = s => Task.delay{
      println( "(" + s._1 + " " + s._2 + ") .....YAY")
    }
    val sink:Sink[Task, (Duration,Int)] = channel.lift[Task, (Duration,Int), Unit](func)

    val p7 = p6 to sink
    p7.run.run

//    val p7 = p6.toSignal.continuous to sink
//    p7.run.run

//    p6.run.run
//    val p8 = p6 to sink
//    p8.run.run



  }

//Output is:
(5027704285 nanoseconds -305343129) .....YAY
(10027563126 nanoseconds -1314383749) .....YAY
(15028087514 nanoseconds 558381731) .....YAY
(20025939296 nanoseconds -1292311819) .....YAY