6qat
6/28/2018 - 12:07 PM

inputstream-to-observable.scala

import java.io.{File, FileInputStream}
import java.util
import monix.execution.Cancelable
import monix.reactive.Observable
import scala.util.control.NonFatal

def fromInputStream(in: java.io.InputStream, chunkSize: Int = 256): Observable[Array[Byte]] = {
  val iterator = new Iterator[Array[Byte]] {
    private[this] val buffer = new Array[Byte](chunkSize)
    private[this] var lastCount = 0

    def hasNext: Boolean =
      lastCount match {
        case 0 =>
          lastCount = in.read(buffer)
          lastCount >= 0
        case nr =>
          nr >= 0
      }

    def next(): Array[Byte] = {
      if (lastCount < 0)
        throw new NoSuchElementException("next")
      else {
        val result = util.Arrays.copyOf(buffer, lastCount)
        lastCount = 0
        result
      }
    }
  }

  Observable.fromIterator(iterator)
}

def fromFile(file: File, chunkSize: Int = 256): Observable[Array[Byte]] =
  Observable.unsafeCreate { subscriber =>
    var streamErrors = true
    try {
      val in = new FileInputStream(file)
      streamErrors = false
      fromInputStream(in, chunkSize)
        .unsafeSubscribeFn(subscriber)
    } catch {
      case NonFatal(ex) =>
        if (streamErrors) subscriber.onError(ex)
        else subscriber.scheduler.reportFailure(ex)
        Cancelable.empty
    }
  }