Ahmed3Elshaer
12/14/2018 - 5:08 PM

RxJava Subjects

ReplaySubject

        "Default ReplaySubject"
        
        unsubscribe();
        createSubscription();

        ReplaySubject<Stock> stockReplaySubject = ReplaySubject.create();

        stockReplaySubject.onNext(new Stock(GOOG, 715.09));
        stockReplaySubject.onNext(new Stock(GOOG, 716.00));
        stockReplaySubject.onNext(new Stock(GOOG, 714.00));

        Subscription defaultSub = stockReplaySubject.subscribe(getDefaultSubscriber());
        compositeSubscription.add(defaultSub); // All three values will be delivered.

        stockReplaySubject.onNext(new Stock(GOOG, 720));
        stockReplaySubject.onCompleted(); // Terminate the stream with a completed event.

        // Subscribe again, this time the subscriber will get all events and the terminal event
        // right away. All items are "replayed" even though the stream has completed.
        Subscription tardySubscription = stockReplaySubject.subscribe(getTardySubscriber());
        compositeSubscription.add(tardySubscription);
        
        ==>OUTPUT<==
        2018-12-14 19:03:27.408 432-432/io.caster.rxexamples D/ReplaySubjectFragment: onNext on default subscriber: GOOG - 715.09
        2018-12-14 19:03:27.410 432-432/io.caster.rxexamples D/ReplaySubjectFragment: onNext on default subscriber: GOOG - 716.0
        2018-12-14 19:03:27.412 432-432/io.caster.rxexamples D/ReplaySubjectFragment: onNext on default subscriber: GOOG - 714.0
        2018-12-14 19:03:27.414 432-432/io.caster.rxexamples D/ReplaySubjectFragment: onNext on default subscriber: GOOG - 720.0
        2018-12-14 19:03:27.415 432-432/io.caster.rxexamples D/ReplaySubjectFragment: Default subscriber completed called.
        2018-12-14 19:03:27.417 432-432/io.caster.rxexamples D/ReplaySubjectFragment: onNext on tardy subscriber: GOOG - 715.09
        2018-12-14 19:03:27.419 432-432/io.caster.rxexamples D/ReplaySubjectFragment: onNext on tardy subscriber: GOOG - 716.0
        2018-12-14 19:03:27.420 432-432/io.caster.rxexamples D/ReplaySubjectFragment: onNext on tardy subscriber: GOOG - 714.0
        2018-12-14 19:03:27.422 432-432/io.caster.rxexamples D/ReplaySubjectFragment: onNext on tardy subscriber: GOOG - 720.0
        2018-12-14 19:03:27.422 432-432/io.caster.rxexamples D/ReplaySubjectFragment: Tardy subscriber completed called.
        
        ________________________________________________________________________________
        "Sized Replay Subject"
         unsubscribe();
        createSubscription();

        // A replay subject that will only replay the last two items.
        ReplaySubject<Stock> stockReplaySubject = ReplaySubject.createWithSize(2);

        stockReplaySubject.onNext(new Stock(GOOG, 715.09));
        stockReplaySubject.onNext(new Stock(GOOG, 716.00));
        stockReplaySubject.onNext(new Stock(GOOG, 714.00));

        // Only the last two items will be replayed to this subscriber (716 and 714)
        Subscription defaultSub = stockReplaySubject.subscribe(getDefaultSubscriber());
        compositeSubscription.add(defaultSub); // All three values will be delivered.

        // This will also be emitted to the defaultSub above.
        stockReplaySubject.onNext(new Stock(GOOG, 720));
        stockReplaySubject.onCompleted(); // Terminate the stream with a completed event.

        // Subscribe again, this time the subscriber will get the last two events and the terminal
        // event right away. The last two items are "replayed" even though the stream has completed.
        Subscription tardySubscription = stockReplaySubject.subscribe(getTardySubscriber());
        compositeSubscription.add(tardySubscription);
        
         ==>OUTPUT<==
        2018-12-14 19:12:19.959 432-432/io.caster.rxexamples D/ReplaySubjectFragment: onNext on default subscriber: GOOG - 716.0
        2018-12-14 19:12:19.960 432-432/io.caster.rxexamples D/ReplaySubjectFragment: onNext on default subscriber: GOOG - 714.0
        2018-12-14 19:12:19.962 432-432/io.caster.rxexamples D/ReplaySubjectFragment: onNext on default subscriber: GOOG - 720.0
        2018-12-14 19:12:19.963 432-432/io.caster.rxexamples D/ReplaySubjectFragment: Default subscriber completed called.
        2018-12-14 19:12:19.964 432-432/io.caster.rxexamples D/ReplaySubjectFragment: onNext on tardy subscriber: GOOG - 714.0
        2018-12-14 19:12:19.966 432-432/io.caster.rxexamples D/ReplaySubjectFragment: onNext on tardy subscriber: GOOG - 720.0
        2018-12-14 19:12:19.966 432-432/io.caster.rxexamples D/ReplaySubjectFragment: Tardy subscriber completed called.
        
        ________________________________________________________________________________
        "Timed Replay Subject"
        unsubscribe();
            createSubscription();

            // A replay subject that will only replay the last two items.
            ReplaySubject<Stock> stockReplaySubject = ReplaySubject.createWithTime(250, TimeUnit.MILLISECONDS, Schedulers.immediate());

            stockReplaySubject.onNext(new Stock(GOOG, 715.09));
            Thread.sleep(100);
            stockReplaySubject.onNext(new Stock(GOOG, 716.00));
            Thread.sleep(100);
            stockReplaySubject.onNext(new Stock(GOOG, 714.00));
            Thread.sleep(100);

            // Only the last two items will be replayed to this subscriber (716 and 714)
            // because the first one has already expired.
            Subscription defaultSub = stockReplaySubject.subscribe(getDefaultSubscriber());
            compositeSubscription.add(defaultSub); // All three values will be delivered.

            // This will also be emitted to the defaultSub above.
            stockReplaySubject.onNext(new Stock(GOOG, 720));
            Thread.sleep(100);
            stockReplaySubject.onCompleted(); // Terminate the stream with a completed event.

            // Lets sleep for another 100 millis to simulate some time passing.
            Thread.sleep(100);

            // Subscribe again with a new subscriber. This time the only item that is valid is
            // the last item: '720' as all the others have expired.
            Subscription tardySubscription = stockReplaySubject.subscribe(getTardySubscriber());
            compositeSubscription.add(tardySubscription);
        
         ==>OUTPUT<==
        2018-12-14 19:26:17.449 432-432/io.caster.rxexamples D/ReplaySubjectFragment: onNext on default subscriber: GOOG - 716.0
        2018-12-14 19:26:17.453 432-432/io.caster.rxexamples D/ReplaySubjectFragment: onNext on default subscriber: GOOG - 714.0
        2018-12-14 19:26:17.458 432-432/io.caster.rxexamples D/ReplaySubjectFragment: onNext on default subscriber: GOOG - 720.0
        2018-12-14 19:26:17.560 432-432/io.caster.rxexamples D/ReplaySubjectFragment: Default subscriber completed called.
        2018-12-14 19:26:17.665 432-432/io.caster.rxexamples D/ReplaySubjectFragment: onNext on tardy subscriber: GOOG - 720.0
        2018-12-14 19:26:17.667 432-432/io.caster.rxexamples D/ReplaySubjectFragment: Tardy subscriber completed called.
        
        
        _________________________________________
         Time and Size bound ReplaySubject
         
         unsubscribe();
            createSubscription();

            // A replay subject that will only replay the last two items.
            ReplaySubject<Stock> stockReplaySubject = ReplaySubject.createWithTimeAndSize(250, TimeUnit.MILLISECONDS,2, Schedulers.immediate());

            stockReplaySubject.onNext(new Stock(GOOG, 715.09));
            Thread.sleep(100);
            stockReplaySubject.onNext(new Stock(GOOG, 716.00));
            Thread.sleep(100);
            stockReplaySubject.onNext(new Stock(GOOG, 714.00));
            Thread.sleep(100);

            // Only the last two items will be replayed to this subscriber (716 and 714) and limited to size 2
            // because the first one has already expired.
            Subscription defaultSub = stockReplaySubject.subscribe(getDefaultSubscriber());
            compositeSubscription.add(defaultSub); // All three values will be delivered.

            // This will also be emitted to the defaultSub above.
            stockReplaySubject.onNext(new Stock(GOOG, 720));
            Thread.sleep(100);
            stockReplaySubject.onCompleted(); // Terminate the stream with a completed event.

            // Lets sleep for another 100 millis to simulate some time passing.
            Thread.sleep(100);

            // Subscribe again with a new subscriber. This time the only item that is valid is
            // the last item: '720' as all the others have expired and it more than the size limit (2).
            Subscription tardySubscription = stockReplaySubject.subscribe(getTardySubscriber());
            compositeSubscription.add(tardySubscription);
            
            
          ==>OUTPUT<==
         2018-12-14 19:31:56.589 6482-6482/io.caster.rxexamples D/ReplaySubjectFragment: onNext on default subscriber: GOOG - 716.0
        2018-12-14 19:31:56.593 6482-6482/io.caster.rxexamples D/ReplaySubjectFragment: onNext on default subscriber: GOOG - 714.0
        2018-12-14 19:31:56.598 6482-6482/io.caster.rxexamples D/ReplaySubjectFragment: onNext on default subscriber: GOOG - 720.0
        2018-12-14 19:31:56.700 6482-6482/io.caster.rxexamples D/ReplaySubjectFragment: Default subscriber completed called.
        2018-12-14 19:31:56.806 6482-6482/io.caster.rxexamples D/ReplaySubjectFragment: onNext on tardy subscriber: GOOG - 720.0
        2018-12-14 19:31:56.807 6482-6482/io.caster.rxexamples D/ReplaySubjectFragment: Tardy subscriber completed called.


*Async Subject*
        
        unsubscribe();
        createSubscription();

        AsyncSubject<Stock> stockAsyncSubject = AsyncSubject.create();

        stockAsyncSubject.onNext(new Stock(GOOG, 722));

        // Will get the last value (GOOG, 723) and all future items and terminal events
        Subscription subscription = stockAsyncSubject.subscribe(getDefaultSubscriber());
        compositeSubscription.add(subscription);

        stockAsyncSubject.onNext(new Stock(GOOG, 723));
        stockAsyncSubject.onNext(new Stock(GOOG, 100));
        stockAsyncSubject.onNext(new Stock(GOOG, 699));
        stockAsyncSubject.onCompleted();

        Subscription tardySubscription = stockAsyncSubject.subscribe(getTardySubscriber());
        compositeSubscription.add(tardySubscription);
        
        ==>OUTPUT<==
        2018-12-14 19:45:40.991 6482-6482/io.caster.rxexamples D/AsyncSubjectFragment: onNext on default subscriber: GOOG - 699.0
        2018-12-14 19:45:40.992 6482-6482/io.caster.rxexamples D/AsyncSubjectFragment: Default subscriber completed called.
        2018-12-14 19:45:40.995 6482-6482/io.caster.rxexamples D/AsyncSubjectFragment: onNext on tardy subscriber: GOOG - 699.0
        2018-12-14 19:45:40.996 6482-6482/io.caster.rxexamples D/AsyncSubjectFragment: Tardy subscriber completed called.
        ________________________________________________________________________________
        "with error"
        unsubscribe();
        createSubscription();

        AsyncSubject<Stock> stockAsyncSubject = AsyncSubject.create();

        stockAsyncSubject.onNext(new Stock(GOOG, 722));

        // Will get the last value (GOOG, 723) and all future items and terminal events
        Subscription subscription = stockAsyncSubject.subscribe(getDefaultSubscriber());
        compositeSubscription.add(subscription);

        stockAsyncSubject.onNext(new Stock(GOOG, 723));
        stockAsyncSubject.onNext(new Stock(GOOG, 100));
        stockAsyncSubject.onNext(new Stock(GOOG, 699));
        stockAsyncSubject.onError(new Exception("Boom!")); // current and future subscribers will only receive this, with NO items emitted.

        Subscription tardySubscription = stockAsyncSubject.subscribe(getTardySubscriber());
        compositeSubscription.add(tardySubscription);
         ==>OUTPUT<==
         2018-12-14 19:47:02.844 6482-6482/io.caster.rxexamples E/AsyncSubjectFragment: Error called on default subscriber.
         java.lang.Exception: Boom!
         
         


*Publish Subject*
        
        unsubscribe();
        createSubscription();

        PublishSubject<Stock> publishSubject = PublishSubject.create();

        publishSubject.onNext(new Stock(GOOG, 722));

        Subscription subscription = publishSubject.subscribe(getDefaultSubscriber());
        compositeSubscription.add(subscription);

        publishSubject.onNext(new Stock(GOOG, 723));
        publishSubject.onNext(new Stock(GOOG, 100));
        publishSubject.onNext(new Stock(GOOG, 699));
        publishSubject.onCompleted();

        Subscription tardySubscription = publishSubject.subscribe(getTardySubscriber());
        compositeSubscription.add(tardySubscription);
        
         ==>OUTPUT<==
        2018-12-15 00:36:41.482 10757-10757/io.caster.rxexamples D/PublishSubjectFragment: onNext on default subscriber: GOOG - 723.0
        2018-12-15 00:36:41.484 10757-10757/io.caster.rxexamples D/PublishSubjectFragment: onNext on default subscriber: GOOG - 100.0
        2018-12-15 00:36:41.485 10757-10757/io.caster.rxexamples D/PublishSubjectFragment: onNext on default subscriber: GOOG - 699.0
        2018-12-15 00:36:41.485 10757-10757/io.caster.rxexamples D/PublishSubjectFragment: Default subscriber completed called.
        2018-12-15 00:36:41.486 10757-10757/io.caster.rxexamples D/PublishSubjectFragment: Tardy subscriber completed called.