krebernisak
11/23/2015 - 11:04 AM

RxJava Bindings for Firebase

RxJava Bindings for Firebase

package com.firebase.client;

import com.firebase.client.AuthData;
import com.firebase.client.ChildEventListener;
import com.firebase.client.DataSnapshot;
import com.firebase.client.Firebase;
import com.firebase.client.FirebaseError;
import com.firebase.client.Query;
import com.firebase.client.ValueEventListener;
import com.firebase.client.core.view.Event;

import rx.Observable;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Func1;
import rx.subscriptions.Subscriptions;

import static com.firebase.client.core.view.Event.EventType.CHILD_ADDED;
import static com.firebase.client.core.view.Event.EventType.CHILD_CHANGED;
import static com.firebase.client.core.view.Event.EventType.CHILD_MOVED;
import static com.firebase.client.core.view.Event.EventType.CHILD_REMOVED;

public class RxFirebase {

    /**
     * Essentially a struct so that we can pass all children events through as a single object.
     */
    public static class FirebaseChildEvent {
        public final DataSnapshot snapshot;
        public final Event.EventType eventType;
        public final String prevName;

        public FirebaseChildEvent(DataSnapshot snapshot, Event.EventType eventType, String prevName) {
            this.snapshot = snapshot;
            this.eventType = eventType;
            this.prevName = prevName;
        }
    }

    public static class RxFirebaseException extends RuntimeException {

        public final FirebaseError error;

        private RxFirebaseException(FirebaseError error) {
            super(error.getMessage(), error.toException());
            this.error = error;
        }

        public static RxFirebaseException from(FirebaseError error) {
            return new RxFirebaseException(error);
        }
    }

    public static Observable<FirebaseChildEvent> observeChildren(final Query ref) {
        return Observable.create(new Observable.OnSubscribe<FirebaseChildEvent>() {

            @Override
            public void call(final Subscriber<? super FirebaseChildEvent> subscriber) {
                final ChildEventListener listener = ref.addChildEventListener(new ChildEventListener() {
                    @Override
                    public void onChildAdded(DataSnapshot dataSnapshot, String prevName) {
                        subscriber.onNext(new FirebaseChildEvent(dataSnapshot, CHILD_ADDED, prevName));
                    }

                    @Override
                    public void onChildChanged(DataSnapshot dataSnapshot, String prevName) {
                        subscriber.onNext(new FirebaseChildEvent(dataSnapshot, CHILD_CHANGED, prevName));
                    }

                    @Override
                    public void onChildRemoved(DataSnapshot dataSnapshot) {
                        subscriber.onNext(new FirebaseChildEvent(dataSnapshot, CHILD_REMOVED, null));
                    }

                    @Override
                    public void onChildMoved(DataSnapshot dataSnapshot, String prevName) {
                        subscriber.onNext(new FirebaseChildEvent(dataSnapshot, CHILD_MOVED, prevName));
                    }

                    @Override
                    public void onCancelled(FirebaseError error) {
                        // Turn the FirebaseError into a throwable to conform to the API
                        subscriber.onError(RxFirebaseException.from(error));
                    }
                });

                // When the subscription is cancelled, remove the listener
                subscriber.add(Subscriptions.create(new Action0() {
                    @Override
                    public void call() {
                        ref.removeEventListener(listener);
                    }
                }));
            }
        });
    }

    private static Func1<FirebaseChildEvent, Boolean> makeEventFilter(final Event.EventType eventType) {
        return new Func1<FirebaseChildEvent, Boolean>() {

            @Override
            public Boolean call(FirebaseChildEvent firebaseChildEvent) {
                return firebaseChildEvent.eventType == eventType;
            }
        };
    }

    public static Observable<FirebaseChildEvent> observeChildAdded(Query ref) {
        return observeChildren(ref).filter(makeEventFilter(CHILD_ADDED));
    }

    public static Observable<FirebaseChildEvent> observeChildChanged(Query ref) {
        return observeChildren(ref).filter(makeEventFilter(CHILD_CHANGED));
    }

    public static Observable<FirebaseChildEvent> observeChildMoved(Query ref) {
        return observeChildren(ref).filter(makeEventFilter(CHILD_MOVED));
    }

    public static Observable<FirebaseChildEvent> observeChildRemoved(Query ref) {
        return observeChildren(ref).filter(makeEventFilter(CHILD_REMOVED));
    }

    public static Observable<DataSnapshot> observe(final Query ref) {

        return Observable.create(new Observable.OnSubscribe<DataSnapshot>() {

            @Override
            public void call(final Subscriber<? super DataSnapshot> subscriber) {
                final ValueEventListener listener = ref.addValueEventListener(new ValueEventListener() {
                    @Override
                    public void onDataChange(DataSnapshot dataSnapshot) {
                        subscriber.onNext(dataSnapshot);
                    }

                    @Override
                    public void onCancelled(FirebaseError error) {
                        // Turn the FirebaseError into a throwable to conform to the API
                        subscriber.onError(RxFirebaseException.from(error));
                    }
                });

                // When the subscription is cancelled, remove the listener
                subscriber.add(Subscriptions.create(new Action0() {
                    @Override
                    public void call() {
                        ref.removeEventListener(listener);
                    }
                }));
            }
        });
    }

    public static Observable<DataSnapshot> once(final Query ref) {

        return Observable.create(new Observable.OnSubscribe<DataSnapshot>() {

            @Override
            public void call(final Subscriber<? super DataSnapshot> subscriber) {
                ref.addListenerForSingleValueEvent(new ValueEventListener() {
                    @Override
                    public void onDataChange(DataSnapshot dataSnapshot) {
                        subscriber.onNext(dataSnapshot);
                    }

                    @Override
                    public void onCancelled(FirebaseError error) {
                        // Turn the FirebaseError into a throwable to conform to the API
                        subscriber.onError(RxFirebaseException.from(error));
                    }
                });

            }
        });
    }

    public static Observable<Firebase> setValue(final Firebase ref, final Object value) {

        return Observable.create(new Observable.OnSubscribe<Firebase>() {

            @Override
            public void call(final Subscriber<? super Firebase> subscriber) {
                ref.setValue(value, new Firebase.CompletionListener() {

                    @Override
                    public void onComplete(FirebaseError error, Firebase firebase) {

                        if (error != null) {
                            subscriber.onError(RxFirebaseException.from(error));
                            return;
                        }

                        subscriber.onNext(firebase);
                        subscriber.onCompleted();
                    }
                });
            }
        });
    }

    public static Observable<AuthData> observeAuthState(final Firebase ref) {

        return Observable.create(new Observable.OnSubscribe<AuthData>() {

            @Override
            public void call(final Subscriber<? super AuthData> subscriber) {

                final Firebase.AuthStateListener listener = ref.addAuthStateListener(new Firebase.AuthStateListener() {
                    @Override
                    public void onAuthStateChanged (AuthData authData) {
                        subscriber.onNext(authData);
                    }
                });

                // When the subscription is cancelled, remove the listener
                subscriber.add(Subscriptions.create(new Action0() {
                    @Override
                    public void call() {
                        ref.removeAuthStateListener(listener);
                    }
                }));
            }
        });
    }

}