RxJS Subjects and Multicasting Operators
/* todo: add styles */
ol,
ul, li {
list-style: none;
margin: 0px;
padding: 0px;
}const dom = $('#operator');
const observable = Rx.Observable.interval(1000).take(5);
/*
function multipleBy(cb){
const source = this;
return Rx.Observable.create( (observer)=>{
source.subscribe( (x) => {
const result = cb(x)
observer.next(result);
}, (err)=> {}, () => dom.append('<li>done</li>'))
});
}
Rx.Observable.prototype.multipleBy = multipleBy;
*/
// we use RxJS Map!!
// const transform = observable.multipleBy((x)=> x*2);
const transform = observable.map((x)=> x*2);
transform.subscribe((x) => dom.append('<li>' + x + '</li>'));
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>GistRun</title>
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.4.0/Rx.js"></script>
<script src="https://code.jquery.com/jquery-2.2.4.min.js"></script>
<link rel="stylesheet" href="styles.css">
</head>
<body>
<h1>Subjects and Multicasting</h1>
<div id="operator"></div>
<!--<script src="13-sandbox.js"></script>-->
<!--<script src="multiplyByTen.js"></script>-->
<!--<script src="1-observer.js"></script>-->
<!--<script src="2-bridge.js"></script>-->
<!--<script src="3-subject.js"></script>-->
<!--<script src="5-behavior.js"></script>-->
<!--<script src="9-unsubscribe.js"></script>-->
<!--<script src="10-refcount.js"></script>-->
<!--<script src="11-publish.js"></script>-->
<script src="12-factory.js"></script>
</body>
</html>
const dom = $('#operator');
//source obs
const connectableObservable = Rx.Observable
.interval(1000)
.do((x)=> dom.append('<li>' + x + '</li>'))
// .take(5)
.multicast(new Rx.Subject());
// work samme as before process (like subject..)
let sub = connectableObservable.connect();
const observerA = {
next: (item) => dom.append('<li>A:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>A:done</li>'),
}
dom.append('<li>subscribe observerA</li>');
let subA = connectableObservable.subscribe(observerA);
let subB;
const observerB = {
next: (item) => dom.append('<li>B:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>B:done</li>'),
}
setTimeout(() => {
// subject.subscribe(observerB);
dom.append('<li>subscribe observerB</li>');
subB = connectableObservable.subscribe(observerB);
}, 3000);
setTimeout(() => {
// do do this. source obs is still work.
// subA.unsubscribe();
// subB.unsubscribe();
sub.unsubscribe();
}, 4000);// change subject to behavior subject
const dom = $('#operator');
//source obs
const connectableObservable = Rx.Observable
.interval(1000)
.take(5)
.multicast(new Rx.Subject());
// work samme as before process (like subject..)
connectableObservable.connect();
/*
setTimeout(()=>{
connectableObservable.connect();
},4000);
*/
// const subject = new Rx.Subject();
const observerA = {
next: (item) => dom.append('<li>A:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>A:done</li>'),
}
dom.append('<li>subscribe observerA</li>');
// subject.subscribe(observerA);
// observable.subscribe(subject);
connectableObservable.subscribe(observerA);
const observerB = {
next: (item) => dom.append('<li>B:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>B:done</li>'),
}
setTimeout(() => {
// subject.subscribe(observerB);
connectableObservable.subscribe(observerB);
}, 4000);// async if completion
const dom = $('#operator');
const observable = Rx.Observable.interval(1000).take(5);
const observerA = {
next: (item) => dom.append('<li>A:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>A:done</li>'),
}
observable.subscribe(observerA);
const observerB = {
next: (item) => dom.append('<li>B:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>B:done</li>'),
}
setTimeout(() => {
observable.subscribe(observerB);
}, 3000);// add buffer
const dom = $('#operator');
const observable = Rx.Observable.interval(1000).take(5);
const observerA = {
next: (item) => dom.append('<li>A:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>A:done</li>'),
}
observable.subscribe(observerA);
const observerB = {
next: (item) => dom.append('<li>B:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>B:done</li>'),
}
setTimeout(() => {
observable.subscribe(observerB);
}, 3000);// change subject to behavior subject
const dom = $('#operator');
const observable = Rx.Observable.interval(1000).take(5); //source obs
// const subject = new Rx.Subject();
// const subject = new Rx.BehaviorSubject();
const subject = new Rx.ReplaySubject(3); //get three value from previous stream.
// const subject = new Rx.AsyncSubject(); // If source observable is not complete, this subject do not work.
// undefined : behaviorSuject have initial value
const observerA = {
next: (item) => dom.append('<li>A:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>A:done</li>'),
}
dom.append('<li>subscribe observerA</li>');
subject.subscribe(observerA);
observable.subscribe(subject);
const observerB = {
next: (item) => dom.append('<li>B:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>B:done</li>'),
}
setTimeout(() => {
subject.subscribe(observerB);
}, 4000);// remove observable
// use just subject (next, subscribe)
const dom = $('#operator');
const observable = Rx.Observable.interval(1000).take(5);
const observerA = {
next: (item) => dom.append('<li>A:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>A:done</li>'),
}
observable.subscribe(observerA);
const observerB = {
next: (item) => dom.append('<li>B:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>B:done</li>'),
}
setTimeout(() => {
observable.subscribe(observerB);
}, 3000);// share multicast bridge observers
const dom = $('#operator');
const observable = Rx.Observable.interval(1000).take(5);
const observerA = {
next: (item) => dom.append('<li>A:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>A:done</li>'),
}
//Change bridgeObserver object to new Subject()
const bridgeObservers = new Rx.Subject();
dom.append('<li>subscribe observerA</li>');
bridgeObservers.subscribe(observerA);
observable.subscribe(bridgeObservers);
// observable.subscribe(observerA);
const observerB = {
next: (item) => dom.append('<li>B:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>B:done</li>'),
}
const observerC = {
next: (item) => dom.append('<li>B:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>B:done</li>'),
}
setTimeout(() => {
dom.append('<li>subscribe observerB</li>');
bridgeObservers.subscribe(observerB);
// observable.subscribe(observerB);
}, 3000);
setTimeout(()=>{
dom.append('<li>subscribe observerC</li>');
bridgeObservers.subscribe(observerC);
}, 7000);
// share multicast bridge observers
const dom = $('#operator');
const observable = Rx.Observable.interval(1000).take(5);
const observerA = {
next: (item) => dom.append('<li>A:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>A:done</li>'),
}
const bridgeObservers = {
observers: [],
addObserver: function(observer){
this.observers.push(observer);
},
next: function(x){
this.observers.forEach((observer)=> observer.next(x))
},
error: function(err){
this.observers.forEach((observer)=> observer.error(err))
},
complete: function(){
this.observers.forEach((observer)=> observer.complete())
}
};
dom.append('<li>subscribe observerA</li>');
bridgeObservers.addObserver(observerA);
observable.subscribe(bridgeObservers);
// observable.subscribe(observerA);
const observerB = {
next: (item) => dom.append('<li>B:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>B:done</li>'),
}
setTimeout(() => {
dom.append('<li>subscribe observerB</li>');
bridgeObservers.addObserver(observerB);
// observable.subscribe(observerB);
}, 3000);
// If you want to manipulation and then share
const dom = $('#operator');
function subjectFactory() {
return Rx.Subject();
}
const result = Rx.Observable.interval(1000).take(8)
.do(x => dom.append('<li>source' + x + '</li>'))
.map(x => Math.random())
.multicast(subjectFactory, function selector(shared) {
const sharedDelayed = shared.delay(500);
const merged = shared.merge(sharedDelayed);
return merged;
});
result.subscribe(x => dom.append('<li>' + x + '</li>'));
const subject = new Rx.Subject();
const dom = $('#operator');
const observable = Rx.Observable.interval(1000)
// .take(7)
.do(x => dom.append('<li>source ' + x + '</li>'))
.multicast(new Rx.Subject())
.refCount();
const observerA = {
next: (item) => dom.append('<li>A:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>A:done</li>'),
}
dom.append('--subscribe A');
let subA = observable.subscribe(observerA);
const observerB = {
next: (item) => dom.append('<li>B:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>B:done</li>'),
}
let subB;
setTimeout(() => {
dom.append('--subscribe B');
subB = observable.subscribe(observerB);
}, 3000);
setTimeout(() => {
subA.unsubscribe();
}, 4000);
setTimeout(() => {
subB.unsubscribe();
}, 4100);
setTimeout(() => {
subA = observable.subscribe(observerA);
}, 6000);const dom = $('#operator');
//source obs
const autoConnectableObservable = Rx.Observable
.interval(1000)
// .take(2)
// .do((x)=> dom.append('<li>source' + x + '(with do())</li>'))
// .publish() // .multicast(new Rx.Subject())
// .refCount();
.share(); // === publish().refCount();
// .publishBehavior(1)
// .publishLast(1)
const observerA = {
next: (item) => dom.append('<li>A:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>A:done</li>'),
}
dom.append('<li>subscribe observerA</li>');
let subA = autoConnectableObservable.subscribe(observerA);
//refCount 0 -> 1
let subB;
const observerB = {
next: (item) => dom.append('<li>B:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>B:done</li>'),
}
setTimeout(() => {
dom.append('<li>subscribe observerB</li>');
subB = autoConnectableObservable.subscribe(observerB);
//refCount 1 -> 2
}, 3000);
setTimeout(() => {
subA.unsubscribe(); //refCount 2 -> 1
}, 2000);
setTimeout(() => {
subB.unsubscribe(); //refCount 1 -> 0
}, 4000);const dom = $('#operator');
//source obs
const autoConnectableObservable = Rx.Observable
.interval(1000)
.do((x)=> dom.append('<li>source' + x + '(with do())</li>'))
.multicast(new Rx.Subject())
.refCount();
// let sub = connectableObservable.connect();
const observerA = {
next: (item) => dom.append('<li>A:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>A:done</li>'),
}
dom.append('<li>subscribe observerA</li>');
let subA = autoConnectableObservable.subscribe(observerA);
//refCount 0 -> 1
let subB;
const observerB = {
next: (item) => dom.append('<li>B:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>B:done</li>'),
}
setTimeout(() => {
dom.append('<li>subscribe observerB</li>');
subB = autoConnectableObservable.subscribe(observerB);
//refCount 1 -> 2
}, 3000);
setTimeout(() => {
subA.unsubscribe(); //refCount 2 -> 1
}, 2000);
setTimeout(() => {
subB.unsubscribe(); //refCount 1 -> 0
}, 4000);const dom = $('#operator');
const observable = Rx.Observable.interval(1000).take(5);
const observerA = {
next: (item) => dom.append('<li>A:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>A:done</li>'),
}
observable.subscribe(observerA);
const observerB = {
next: (item) => dom.append('<li>B:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>B:done</li>'),
}
setTimeout(() => {
observable.subscribe(observerB);
}, 3000);