Redirecting elements to sink. //Use channel.lift
import java.nio.channels.Channel
import java.util.concurrent.TimeUnit
import scala.concurrent.duration.{Duration, FiniteDuration}
import scalaz.concurrent.{Strategy, Task}
import scalaz.stream._
import scalaz.stream.channel
implicit val S = Strategy.DefaultStrategy
implicit val scheduler = scalaz.stream.DefaultScheduler
def method(elem:Duration, p:Process[Task, Int]):Process[Task, (Duration,Int)] = {
p.flatMap(x => Process.eval(Task.now((elem,x))))
}
/**
* This below code awakes once in 5 seconds, and emits a tuple (Duration and random integer)
*
* We are lifting a simple function to sink. (Using channel lift.)
*
* and then redirecting the tuples into sink.
*
*/
def sample():Unit = {
//Why awakeEvery is not waking up in everyDuration when I attach it to the Sink.
val p5:Process[Task,Int] = Process.eval(Task.delay(scala.util.Random.nextInt()))
val p6 = time.awakeEvery(FiniteDuration(5, TimeUnit.SECONDS)).flatMap(x => method(x,p5))
val func:((Duration,Int)) => Task[Unit] = s => Task.delay{
println( "(" + s._1 + " " + s._2 + ") .....YAY")
}
val sink:Sink[Task, (Duration,Int)] = channel.lift[Task, (Duration,Int), Unit](func)
val p7 = p6 to sink
p7.run.run
// val p7 = p6.toSignal.continuous to sink
// p7.run.run
// p6.run.run
// val p8 = p6 to sink
// p8.run.run
}
//Output is:
(5027704285 nanoseconds -305343129) .....YAY
(10027563126 nanoseconds -1314383749) .....YAY
(15028087514 nanoseconds 558381731) .....YAY
(20025939296 nanoseconds -1292311819) .....YAY