RxJS Subjects and Multicasting Operators
/* todo: add styles */
ol,
ul, li {
list-style: none;
margin: 0px;
padding: 0px;
}<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>GistRun</title>
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.4.0/Rx.js"></script>
<script src="https://code.jquery.com/jquery-2.2.4.min.js"></script>
<link rel="stylesheet" href="styles.css">
</head>
<body>
<h1>Subjects and Multicasting</h1>
<div id="operator"></div>
<script src="13-sandbox.js"></script>
</body>
</html>// when unsubscribe per subscription or unsbuscribe about connectableObservable
// use do operator
const dom = $('#operator');
const observable = Rx.Observable.interval(1000).take(5);
const observerA = {
next: (item) => dom.append('<li>A:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>A:done</li>'),
}
observable.subscribe(observerA);
const observerB = {
next: (item) => dom.append('<li>B:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>B:done</li>'),
}
setTimeout(() => {
observable.subscribe(observerB);
}, 3000);// add multicast syntax with new subject
// connectable observable use connect method
const dom = $('#operator');
const observable = Rx.Observable.interval(1000).take(5);
const observerA = {
next: (item) => dom.append('<li>A:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>A:done</li>'),
}
observable.subscribe(observerA);
const observerB = {
next: (item) => dom.append('<li>B:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>B:done</li>'),
}
setTimeout(() => {
observable.subscribe(observerB);
}, 3000);// async if completion
const dom = $('#operator');
const observable = Rx.Observable.interval(1000).take(5);
const observerA = {
next: (item) => dom.append('<li>A:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>A:done</li>'),
}
observable.subscribe(observerA);
const observerB = {
next: (item) => dom.append('<li>B:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>B:done</li>'),
}
setTimeout(() => {
observable.subscribe(observerB);
}, 3000);// add buffer
const dom = $('#operator');
const observable = Rx.Observable.interval(1000).take(5);
const observerA = {
next: (item) => dom.append('<li>A:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>A:done</li>'),
}
observable.subscribe(observerA);
const observerB = {
next: (item) => dom.append('<li>B:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>B:done</li>'),
}
setTimeout(() => {
observable.subscribe(observerB);
}, 3000);// change subject to behavior subject
const dom = $('#operator');
const observable = Rx.Observable.interval(1000).take(5);
const observerA = {
next: (item) => dom.append('<li>A:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>A:done</li>'),
}
observable.subscribe(observerA);
const observerB = {
next: (item) => dom.append('<li>B:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>B:done</li>'),
}
setTimeout(() => {
observable.subscribe(observerB);
}, 3000);// remove observable
// use just subject (next, subscribe)
const dom = $('#operator');
const observable = Rx.Observable.interval(1000).take(5);
const observerA = {
next: (item) => dom.append('<li>A:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>A:done</li>'),
}
observable.subscribe(observerA);
const observerB = {
next: (item) => dom.append('<li>B:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>B:done</li>'),
}
setTimeout(() => {
observable.subscribe(observerB);
}, 3000);// change bridge observer to subject
// change addObservers to subscribe
const dom = $('#operator');
const observable = Rx.Observable.interval(1000).take(5);
const observerA = {
next: (item) => dom.append('<li>A:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>A:done</li>'),
}
observable.subscribe(observerA);
const observerB = {
next: (item) => dom.append('<li>B:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>B:done</li>'),
}
setTimeout(() => {
observable.subscribe(observerB);
}, 3000);// share multicast bridge observers
const dom = $('#operator');
const observable = Rx.Observable.interval(1000).take(5);
const observerA = {
next: (item) => dom.append('<li>A:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>A:done</li>'),
}
observable.subscribe(observerA);
const observerB = {
next: (item) => dom.append('<li>B:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>B:done</li>'),
}
setTimeout(() => {
observable.subscribe(observerB);
}, 3000);// If you want to manipulation and then share
const dom = $('#operator');
function subjectFactory() {
return Rx.Subject();
}
const result = Rx.Observable.interval(1000).take(8)
.do(x => dom.append('<li>source' + x + '</li>'))
.map(x => Math.random())
.multicast(subjectFactory, function selector(shared) {
const sharedDelayed = shared.delay(500);
const merged = shared.merge(sharedDelayed);
return merged;
});
result.subscribe(x => dom.append('<li>' + x + '</li>'));
const subject = new Rx.Subject();
function subjectFactory() {
return new Rx.Subject();
}
const dom = $('#operator');
const observable = Rx.Observable.interval(1000)
.take(7)
.do(x => dom.append('<li>source ' + x + '</li>'))
.multicast(subjectFactory)
.refCount();
const observerA = {
next: (item) => dom.append('<li>A:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>A:done</li>'),
}
dom.append('--subscribe A');
let subA = observable.subscribe(observerA);
const observerB = {
next: (item) => dom.append('<li>B:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>B:done</li>'),
}
let subB;
setTimeout(() => {
dom.append('--subscribe B');
subB = observable.subscribe(observerB);
}, 3000);
setTimeout(() => {
subA.unsubscribe();
}, 4000);
setTimeout(() => {
subB.unsubscribe();
}, 4100);
setTimeout(() => {
subA = observable.subscribe(observerA);
}, 6000);// use publish for multicast(Subject)
// use publish().refCount()
// use share()
const dom = $('#operator');
const observable = Rx.Observable.interval(1000).take(5);
const observerA = {
next: (item) => dom.append('<li>A:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>A:done</li>'),
}
observable.subscribe(observerA);
const observerB = {
next: (item) => dom.append('<li>B:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>B:done</li>'),
}
setTimeout(() => {
observable.subscribe(observerB);
}, 3000);// reference count 0 -> 1, 1 -> 2, 2 -> 1, 1 -> 0
// use autoConnectableObservable
// use refCount()
const dom = $('#operator');
const observable = Rx.Observable.interval(1000).take(5);
const observerA = {
next: (item) => dom.append('<li>A:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>A:done</li>'),
}
observable.subscribe(observerA);
const observerB = {
next: (item) => dom.append('<li>B:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>B:done</li>'),
}
setTimeout(() => {
observable.subscribe(observerB);
}, 3000);const dom = $('#operator');
const observable = Rx.Observable.interval(1000).take(5);
const observerA = {
next: (item) => dom.append('<li>A:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>A:done</li>'),
}
observable.subscribe(observerA);
const observerB = {
next: (item) => dom.append('<li>B:' + item + '</li>'),
error: (err) => dom.append(err),
complete: () => dom.append('<li>B:done</li>'),
}
setTimeout(() => {
observable.subscribe(observerB);
}, 3000);