Grouping in a Process. How cool this is!!!! yay :)
import scalaz.Scalaz._
import scalaz._
import scalaz.concurrent.Task
import scalaz.stream.Writer1
import scalaz.stream.Process._
import scalaz.stream.Process
private def groupAdjacent[A, B](key: B)(
f: A => B): Writer1[Vector[A], A, A] = {
import Process._
def loop(acc: Vector[A]): Writer1[Vector[A], A, A] = {
val term = if (!acc.isEmpty) emit(-\/(acc)) else halt
receive1Or[A, Vector[A] \/ A](term) { item =>
if (acc.size >= 3) {
val e = emit(-\/(acc))
if (f(item) == key)
e ++ loop(Vector(item))
else
e ++ emit(\/-(item)) ++ loop(Vector.empty)
}
else if (f(item) == key) {
loop(acc :+ item)
}
else if (acc.isEmpty) {
emit(\/-(item)) ++ loop(acc)
}
else {
// emit(-\/(acc)) ++ emit(\/-(item)) ++ loop(Vector.empty) //grouping only in adjacent.
emit(\/-(item)) ++ loop(acc) //grouping in entire stream.
}
}
}
loop(Vector.empty)
}
val list:List[(String,String)] = List(("1","device"),("2","service"),("3","device"),("4","device"),("5","service"),("6","device"))
val p:Process[Task, (String,String)] = Process.emitAll(list)
val func = groupAdjacent[(String,String),String]("device"){
case (_, tpe) => tpe
}
val p1 = p pipe func
p1.runLog.run.toList
//Inputs and outputs.
val list:List[(String,String)] = List(("1","device"),("2","service"),("3","device"),("4","device"),("5","service"),("6","device"))
Batch size is 3.
why this? //Grouping only in adjacent.
List(-\/(Vector((1,device))), \/-((2,service)), -\/(Vector((3,device), (4,device))), \/-((5,service)), -\/(Vector((6,device))))
why not this? //Grouping applies in entire stream.
List(\/-((2,service)), -\/(Vector((1,device), (3,device), (4,device))), \/-((5,service)), -\/(Vector((6,device))))