iniyanp
2/22/2017 - 12:22 AM

Grouping in a Process. How cool this is!!!! yay :)

Grouping in a Process. How cool this is!!!! yay :)

import scalaz.Scalaz._
import scalaz._
import scalaz.concurrent.Task
import scalaz.stream.Writer1
import scalaz.stream.Process._
import scalaz.stream.Process

private def groupAdjacent[A, B](key: B)(
  f: A => B): Writer1[Vector[A], A, A] = {
  import Process._

  def loop(acc: Vector[A]): Writer1[Vector[A], A, A] = {
    val term = if (!acc.isEmpty) emit(-\/(acc)) else halt

    receive1Or[A, Vector[A] \/ A](term) { item =>
      if (acc.size >= 3) {
        val e = emit(-\/(acc))

        if (f(item) == key)
          e ++ loop(Vector(item))
        else
          e ++ emit(\/-(item)) ++ loop(Vector.empty)
      }

      else if (f(item) == key) {
        loop(acc :+ item)
      }

      else if (acc.isEmpty) {
        emit(\/-(item)) ++ loop(acc)
      }

      else {
//        emit(-\/(acc)) ++ emit(\/-(item)) ++ loop(Vector.empty) //grouping only in adjacent.
        emit(\/-(item)) ++ loop(acc) //grouping in entire stream.
      }
    }
  }

  loop(Vector.empty)
}

val list:List[(String,String)] = List(("1","device"),("2","service"),("3","device"),("4","device"),("5","service"),("6","device"))

val p:Process[Task, (String,String)] = Process.emitAll(list)

val func = groupAdjacent[(String,String),String]("device"){
  case (_, tpe) => tpe
}
val p1 = p pipe func
p1.runLog.run.toList

//Inputs and outputs.
val list:List[(String,String)] = List(("1","device"),("2","service"),("3","device"),("4","device"),("5","service"),("6","device"))

Batch size is 3.

why this? //Grouping only in adjacent.
List(-\/(Vector((1,device))), \/-((2,service)), -\/(Vector((3,device), (4,device))), \/-((5,service)), -\/(Vector((6,device))))

why not this? //Grouping applies in entire stream.
List(\/-((2,service)), -\/(Vector((1,device), (3,device), (4,device))), \/-((5,service)), -\/(Vector((6,device))))