iniyanp
2/10/2017 - 10:18 PM

Describes about usage of contramap and changes happening in one thread will be visible to another thread.

Describes about usage of contramap and changes happening in one thread will be visible to another thread.

//contramap.
http://igstan.ro/posts/2013-10-31-contravariant-functors-an-intuition.html

def by[T, S](f: T => S)(implicit ord: Ordering[S]): Ordering[T] =
  new Ordering[T] {
    def compare(x:T, y:T) = ord.compare(f(x), f(y))
  }

we need Ordering[Money].

we have Ordering[Int]
If we have function, Int => Money, will get that.

val contramapFn: Money => Int = (money) => money.amount
implicit val moneyOrd: Ordering[Money] = Ordering.by(contramapFn)


package exercise
import scalaz.concurrent.{Strategy, Task}
import scalaz.stream.async
import scalaz.Scalaz._
import scalaz._
import scalaz.stream._
import scalaz.stream.async.mutable.Signal.Msg

/**
  * Created by paramin on 2/6/17. Updating in oneThread will be available to another thread.
  */
object exercise20 {

  val p:Process[Task, String] = Process.emitAll(List("Iniyan","Preethi","Thamarai","Kumar")).toSource
  val currentApp = async.signalUnset[String](Strategy.DefaultStrategy)
  val currentAppSignalChanges = currentApp.discrete

  def main(args: Array[String]): Unit = {
    createThread
    
    //String => Msg[String] like Money => Int, Ordering[Int] => Ordering[Money]
    //Then if we have context around, Msg[String], it will be applicable to String as well.
    //so Sink[Task, Msg[String]] => Sink[Task, String]
    
    val func:String => Msg[String] = s => async.mutable.Signal.Set(s)
    val sink:Sink[Task, String] = currentApp.sink.contramap(func)
    val transSink = p to sink
    transSink.run.run
  }

  def createThread: Unit = {

      val thread = new Thread {
        override def run {
          currentAppSignalChanges.map(x => println(s"Capturing values ${x}")).run.run
        }
      }
      thread.start
      Thread.sleep(1000) // slow the loop down a bit

  }
}

//Using Process.bracket(acquire)(free){
// usage
// }

package exercise
import scalaz.concurrent.{Strategy, Task}
import scalaz.stream.async
import scalaz.Scalaz._
import scalaz._
import scalaz.stream._
import scalaz.stream.async.mutable.Signal
import scalaz.stream.async.mutable.Signal.Msg

/**
  * Created by paramin on 2/6/17.
  */
object exercise20 {

  val p:Process[Task, String] = Process.emitAll(List("Iniyan","Preethi","Thamarai","Kumar")).toSource


  def main(args: Array[String]): Unit = {

    val signalProcess:Process[Task, Unit] = Process.bracket(Task.delay(async.signalUnset[String](Strategy.DefaultStrategy)))(signal =>
      Process.eval_(Task.delay(println("Signal is closed.")) *> signal.close)){
      signal => {
        Process.eval(Task.delay(signal))
        val func:String => Msg[String] = s => async.mutable.Signal.Set(s)
        createThread(signal.discrete)
        val sink = signal.sink.contramap(func)
        val transSink = p to sink
        transSink
      }
    }

    signalProcess.run.run
  }

  def createThread(currentAppSignalChanges: Process[Task, String]): Unit = {

    println("In create Thread.")
      val thread = new Thread {
        override def run {
          currentAppSignalChanges.map(x => println(s"Capturing values ${x}")).run.run
        }
      }
      thread.start
//      Thread.sleep(1000) // slow the loop down a bit

  }
}