tadkim
6/24/2017 - 1:46 AM

RxJS Subjects and Multicasting Operators

RxJS Subjects and Multicasting Operators

/* todo: add styles */
ol,
ul, li {
  list-style: none;
  margin: 0px;
  padding: 0px;
}
<!doctype html>
<html lang="en">
<head>
  <meta charset="utf-8">
  <title>GistRun</title>
  <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.4.0/Rx.js"></script>
  <script src="https://code.jquery.com/jquery-2.2.4.min.js"></script>
  <link rel="stylesheet" href="styles.css">
</head>
<body>
  <h1>Subjects and Multicasting</h1>
  <div id="operator"></div>
  
  <script src="13-sandbox.js"></script>
</body>
</html>
// when unsubscribe per subscription or unsbuscribe about connectableObservable
// use do operator

const dom = $('#operator');
const observable = Rx.Observable.interval(1000).take(5);

const observerA = {
  next: (item) => dom.append('<li>A:' + item + '</li>'),
  error: (err) => dom.append(err),
  complete: () => dom.append('<li>A:done</li>'),
}

observable.subscribe(observerA);

const observerB = {
  next: (item) => dom.append('<li>B:' + item + '</li>'),
  error: (err) => dom.append(err),
  complete: () => dom.append('<li>B:done</li>'),
}

setTimeout(() => {
  observable.subscribe(observerB);
}, 3000);
// add multicast syntax with new subject
// connectable observable use connect method

const dom = $('#operator');
const observable = Rx.Observable.interval(1000).take(5);

const observerA = {
  next: (item) => dom.append('<li>A:' + item + '</li>'),
  error: (err) => dom.append(err),
  complete: () => dom.append('<li>A:done</li>'),
}

observable.subscribe(observerA);

const observerB = {
  next: (item) => dom.append('<li>B:' + item + '</li>'),
  error: (err) => dom.append(err),
  complete: () => dom.append('<li>B:done</li>'),
}

setTimeout(() => {
  observable.subscribe(observerB);
}, 3000);
// async if completion


const dom = $('#operator');
const observable = Rx.Observable.interval(1000).take(5);

const observerA = {
  next: (item) => dom.append('<li>A:' + item + '</li>'),
  error: (err) => dom.append(err),
  complete: () => dom.append('<li>A:done</li>'),
}

observable.subscribe(observerA);

const observerB = {
  next: (item) => dom.append('<li>B:' + item + '</li>'),
  error: (err) => dom.append(err),
  complete: () => dom.append('<li>B:done</li>'),
}

setTimeout(() => {
  observable.subscribe(observerB);
}, 3000);
// add buffer

const dom = $('#operator');
const observable = Rx.Observable.interval(1000).take(5);

const observerA = {
  next: (item) => dom.append('<li>A:' + item + '</li>'),
  error: (err) => dom.append(err),
  complete: () => dom.append('<li>A:done</li>'),
}

observable.subscribe(observerA);

const observerB = {
  next: (item) => dom.append('<li>B:' + item + '</li>'),
  error: (err) => dom.append(err),
  complete: () => dom.append('<li>B:done</li>'),
}

setTimeout(() => {
  observable.subscribe(observerB);
}, 3000);
// change subject to behavior subject

const dom = $('#operator');
const observable = Rx.Observable.interval(1000).take(5);

const observerA = {
  next: (item) => dom.append('<li>A:' + item + '</li>'),
  error: (err) => dom.append(err),
  complete: () => dom.append('<li>A:done</li>'),
}

observable.subscribe(observerA);

const observerB = {
  next: (item) => dom.append('<li>B:' + item + '</li>'),
  error: (err) => dom.append(err),
  complete: () => dom.append('<li>B:done</li>'),
}

setTimeout(() => {
  observable.subscribe(observerB);
}, 3000);
// remove observable
// use just subject (next, subscribe)

const dom = $('#operator');
const observable = Rx.Observable.interval(1000).take(5);

const observerA = {
  next: (item) => dom.append('<li>A:' + item + '</li>'),
  error: (err) => dom.append(err),
  complete: () => dom.append('<li>A:done</li>'),
}

observable.subscribe(observerA);

const observerB = {
  next: (item) => dom.append('<li>B:' + item + '</li>'),
  error: (err) => dom.append(err),
  complete: () => dom.append('<li>B:done</li>'),
}

setTimeout(() => {
  observable.subscribe(observerB);
}, 3000);
// change bridge observer to subject
// change addObservers to subscribe

const dom = $('#operator');
const observable = Rx.Observable.interval(1000).take(5);

const observerA = {
  next: (item) => dom.append('<li>A:' + item + '</li>'),
  error: (err) => dom.append(err),
  complete: () => dom.append('<li>A:done</li>'),
}

observable.subscribe(observerA);

const observerB = {
  next: (item) => dom.append('<li>B:' + item + '</li>'),
  error: (err) => dom.append(err),
  complete: () => dom.append('<li>B:done</li>'),
}

setTimeout(() => {
  observable.subscribe(observerB);
}, 3000);
// share multicast bridge observers

const dom = $('#operator');
const observable = Rx.Observable.interval(1000).take(5);

const observerA = {
  next: (item) => dom.append('<li>A:' + item + '</li>'),
  error: (err) => dom.append(err),
  complete: () => dom.append('<li>A:done</li>'),
}

observable.subscribe(observerA);

const observerB = {
  next: (item) => dom.append('<li>B:' + item + '</li>'),
  error: (err) => dom.append(err),
  complete: () => dom.append('<li>B:done</li>'),
}

setTimeout(() => {
  observable.subscribe(observerB);
}, 3000);
// If you want to manipulation and then share
const dom = $('#operator');

function subjectFactory() {
  return Rx.Subject();
}

const result = Rx.Observable.interval(1000).take(8)
  .do(x => dom.append('<li>source' + x + '</li>'))
  .map(x => Math.random())
  .multicast(subjectFactory, function selector(shared) {
    const sharedDelayed = shared.delay(500);
    const merged = shared.merge(sharedDelayed);
    return merged;
  });
  
result.subscribe(x => dom.append('<li>' + x + '</li>'));

const subject = new Rx.Subject();

function subjectFactory() {
  return new Rx.Subject();
}

const dom = $('#operator');
const observable = Rx.Observable.interval(1000)
  .take(7)
  .do(x => dom.append('<li>source ' + x + '</li>'))
  .multicast(subjectFactory)
  .refCount();

const observerA = {
  next: (item) => dom.append('<li>A:' + item + '</li>'),
  error: (err) => dom.append(err),
  complete: () => dom.append('<li>A:done</li>'),
}

dom.append('--subscribe A');
let subA = observable.subscribe(observerA);

const observerB = {
  next: (item) => dom.append('<li>B:' + item + '</li>'),
  error: (err) => dom.append(err),
  complete: () => dom.append('<li>B:done</li>'),
}

let subB;
setTimeout(() => {
  dom.append('--subscribe B');
  subB = observable.subscribe(observerB);
}, 3000);

setTimeout(() => {
  subA.unsubscribe();
}, 4000);

setTimeout(() => {
  subB.unsubscribe();
}, 4100);

setTimeout(() => {
subA = observable.subscribe(observerA);
}, 6000);
// use publish for multicast(Subject)
// use publish().refCount()
// use share()

const dom = $('#operator');
const observable = Rx.Observable.interval(1000).take(5);

const observerA = {
  next: (item) => dom.append('<li>A:' + item + '</li>'),
  error: (err) => dom.append(err),
  complete: () => dom.append('<li>A:done</li>'),
}

observable.subscribe(observerA);

const observerB = {
  next: (item) => dom.append('<li>B:' + item + '</li>'),
  error: (err) => dom.append(err),
  complete: () => dom.append('<li>B:done</li>'),
}

setTimeout(() => {
  observable.subscribe(observerB);
}, 3000);
// reference count 0 -> 1, 1 -> 2, 2 -> 1, 1 -> 0
// use autoConnectableObservable
// use refCount()

const dom = $('#operator');
const observable = Rx.Observable.interval(1000).take(5);

const observerA = {
  next: (item) => dom.append('<li>A:' + item + '</li>'),
  error: (err) => dom.append(err),
  complete: () => dom.append('<li>A:done</li>'),
}

observable.subscribe(observerA);

const observerB = {
  next: (item) => dom.append('<li>B:' + item + '</li>'),
  error: (err) => dom.append(err),
  complete: () => dom.append('<li>B:done</li>'),
}

setTimeout(() => {
  observable.subscribe(observerB);
}, 3000);
const dom = $('#operator');
const observable = Rx.Observable.interval(1000).take(5);

const observerA = {
  next: (item) => dom.append('<li>A:' + item + '</li>'),
  error: (err) => dom.append(err),
  complete: () => dom.append('<li>A:done</li>'),
}

observable.subscribe(observerA);

const observerB = {
  next: (item) => dom.append('<li>B:' + item + '</li>'),
  error: (err) => dom.append(err),
  complete: () => dom.append('<li>B:done</li>'),
}

setTimeout(() => {
  observable.subscribe(observerB);
}, 3000);