Couchbase Observable Backpressure
// when getting a lot of doc from couchbase, need a backpressure/error policy (retry exponential)
Observable
.from(ids)
.flatMap(id => toScalaObservable(bucket.get(id).retryWhen(
RetryBuilder
.anyOf(classOf[TemporaryFailureException], classOf[BackpressureException], classOf[CouchbaseOutOfMemoryException])
.delay(Delay.exponential(TimeUnit.MILLISECONDS, maxDelay, minDelay))
.max(maxRetries)
.build())
))
.toBlocking
.toIterable
.iterator