alexfu
7/4/2015 - 12:25 PM

Helpful RxJava functions for daily use

Helpful RxJava functions for daily use

/**
 * Unwraps a list of items and emits them one by one.
 */
public static <T> Func1<List<T>, Observable<T>> unwrapList() {
 return new Func1<List<T>, Observable<T>>() {
   @Override
   public Observable<T> call(List<T> list) {
     return Observable.from(list);
   }
 };
}
/**
 * Throws an exception; for testing purposes.
 */
public static <T> Observable<T> throwsException() {
 return Observable.create(new Observable.OnSubscribe<T>() {
   @Override
   public void call(Subscriber<? super T> subscriber) {
     subscriber.onError(new Exception("Mock exception"));
   }
 });
}
  /**
   * Provides an equivalent of mapWithIndex operation.
   * 
   * Usage:
   *   List<String> names = Arrays.asList("Joe", "Bob", "Dave");
   *   mapWithIndex(names, new Func2<String, Integer, String>() {
   *     @Override
   *     public String call(String name, Integer index) {
   *       if (index > 0) {
   *         return name + " !!";
   *       } else {
   *         return name;
   *       }
   *     }
   *   });
   */
  public static <T,R> Observable<R> mapWithIndex(Collection<T> collection, Func2<T, Integer, R> mapFunc) {
    return Observable.from(collection).zipWith(Observable.range(0, collection.size()), mapFunc);
  }
/**
 * Transforms an observable to run on a new thread
 * and to be observed on Androids main thread.
 */
public static <T> Observable.Transformer<T, T> androidAsync() {
 return new Observable.Transformer<T, T>() {
   @Override
   public Observable<T> call(Observable<T> observable) {
     return observable
       .subscribeOn(Schedulers.newThread())
       .observeOn(AndroidSchedulers.mainThread());
   }
 };
}