caipivara
7/3/2016 - 9:23 PM

Android - retry if timeout with compose for RXJava (http://blog.makingiants.com/retry-on-timeout/)

Android - retry if timeout with compose for RXJava (http://blog.makingiants.com/retry-on-timeout/)

myObservable.composeForIoTasks() // <- IO, MainThread and Retry on Timeout management ready
            .subscribe(...)

import rx.Observable
import rx.functions.Func1
import java.net.SocketTimeoutException
import java.util.concurrent.TimeUnit

/**
 * Retry observable subscription if timeout.
 *
 * For every retry it will wait delay + delayAmount so we wait more and more every retry.
 *
 * @param maxRetries number of retries
 * @param delay milliseconds of wait between each try
 * @param delayAmount  delay + delayAmount
 */
class RetryAfterTimeoutWithDelay(val maxRetries: Int, var delay: Long, val delayAmount: Long = 100)
  : Func1<Observable<out Throwable>, Observable<*>> {

  internal var retryCount = 0

  override fun call(attempts: Observable<out Throwable>): Observable<*> {
    return attempts.flatMap({
      if (++retryCount < maxRetries && it is SocketTimeoutException) {
        delay += delayAmount
        Observable.timer(delay, TimeUnit.MILLISECONDS)
      } else {
        Observable.error(it as Throwable)
      }
    })
  }
}
import rx.Observable
import rx.Observable.Transformer
import rx.Subscription
import rx.android.schedulers.AndroidSchedulers.mainThread
import rx.schedulers.Schedulers.io

/**
 * Shorthand to set [subscribeOn] and [observeOn] thread for observables
 */
fun <T> Observable<T>.composeForIoTasks(): Observable<T> = compose<T>(Transformer {
  it.subscribeOn(io()).observeOn(mainThread()).retryWhen(RetryAfterTimeoutWithDelay(3, 2))
})