Learning from Frida
If you want to convert Asynchronous API to a monadic API, use Task.async
/* Note: interesting question here is what should be behavior on "Task.retry"?
In theory Task retry should retry IO and we do not expect to get back "cached" value.
On other hand this is wrapper for callback operation. Sometimes it may be not possible to "retry/rerun it".
We will let caller decide if retry is meaningful.
Note that this is internal conversion and this task is not exposed to the client. It is always created
as part of processing some other task inside the DAL code.
*/
def toTask[T](computeLf: => ListenableFuture[T])(pool: ExecutorService): Task[T] =
//explicitly specify thread pool here to ensure that driver code is executed on the DAL pool!
// (note: Task.delay() will use non-DAL pool!)
Task(computeLf)(pool).flatMap { lf =>
val futureAsTask = Task.async[T] { k => {
val p = Futures.addCallback(lf, new FutureCallback[T] {
override def onSuccess(result: T) = k(result.right)
override def onFailure(t: Throwable) = k(t.left)
}, pool)
p
}
}
futureAsTask
}
//How to convert result set into stream of Items.
def process(implicit pool: ExecutorService): Process[Task, Item] = rs map { rows =>
def go(): Process[Task, Item] = {
// optimization: start prefetching in the background while there are still some elements available
if (!rows.isFullyFetched && rows.getAvailableWithoutFetching == defaultMinPrefetched) {
CassandraDatastore.log.debug(s"Prefetch next batch for $rows as we only have $defaultMinPrefetched prefetched")
rows.fetchMoreResults()
}
if (!hasNext) {
Process.halt
} else if (!rows.isFullyFetched && rows.getAvailableWithoutFetching == 0) {
val moreResultsTask:Task[ResultSet] = ListenableFutureHelpers.toTask(rows.fetchMoreResults())(pool)
CassandraDatastore.log.debug(s"Wait for next batch for $rows as there are no rows available")
Process.await(moreResultsTask)(_ => go()) //This is how we have to wait for Task[ResultSet] (waiting for monadic API.)
} else {
Process.emit(new Item(rows.one())) ++ go()
}
}
go()
} getOrElse {
Process.halt
}