chtefi
11/21/2016 - 10:56 PM

Couchbase Observable Backpressure

Couchbase Observable Backpressure

// when getting a lot of doc from couchbase, need a backpressure/error policy (retry exponential)

Observable
  .from(ids)
  .flatMap(id => toScalaObservable(bucket.get(id).retryWhen(
    RetryBuilder
      .anyOf(classOf[TemporaryFailureException], classOf[BackpressureException], classOf[CouchbaseOutOfMemoryException])
      .delay(Delay.exponential(TimeUnit.MILLISECONDS, maxDelay, minDelay))
      .max(maxRetries)
      .build())
  ))
  .toBlocking
  .toIterable
  .iterator