tadkim
6/24/2017 - 1:46 AM

RxJS Subjects and Multicasting Operators

RxJS Subjects and Multicasting Operators

/* todo: add styles */
ol,
ul, li {
  list-style: none;
  margin: 0px;
  padding: 0px;
}
const dom = $('#operator');
const observable = Rx.Observable.interval(1000).take(5);
/*
function multipleBy(cb){
  const source = this;
  return Rx.Observable.create( (observer)=>{
    source.subscribe( (x) => {
      const result = cb(x)
      observer.next(result);
    }, (err)=> {}, () => dom.append('<li>done</li>'))
  });
}
  
Rx.Observable.prototype.multipleBy = multipleBy;
*/

// we use RxJS Map!!
// const transform = observable.multipleBy((x)=> x*2);


const transform = observable.map((x)=> x*2);
transform.subscribe((x) => dom.append('<li>' + x + '</li>'));
<!doctype html>
<html lang="en">
<head>
  <meta charset="utf-8">
  <title>GistRun</title>
  <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.4.0/Rx.js"></script>
  <script src="https://code.jquery.com/jquery-2.2.4.min.js"></script>
  <link rel="stylesheet" href="styles.css">
</head>
<body>
  <h1>Subjects and Multicasting</h1>
  <div id="operator"></div>
  
  <!--<script src="13-sandbox.js"></script>-->
  <!--<script src="multiplyByTen.js"></script>-->
  <!--<script src="1-observer.js"></script>-->
  <!--<script src="2-bridge.js"></script>-->
  <!--<script src="3-subject.js"></script>-->
  <!--<script src="5-behavior.js"></script>-->
  <!--<script src="9-unsubscribe.js"></script>-->
  <!--<script src="10-refcount.js"></script>-->
  <!--<script src="11-publish.js"></script>-->
  <script src="12-factory.js"></script>
  
  
  
</body>
</html>


const dom = $('#operator');

 //source obs
const connectableObservable = Rx.Observable
  .interval(1000)
  .do((x)=> dom.append('<li>' + x + '</li>'))
  // .take(5)
  .multicast(new Rx.Subject());

// work samme as before process (like subject..)  
let sub = connectableObservable.connect(); 



const observerA = {
  next: (item) => dom.append('<li>A:' + item + '</li>'),
  error: (err) => dom.append(err),
  complete: () => dom.append('<li>A:done</li>'),
}

dom.append('<li>subscribe observerA</li>');
let subA = connectableObservable.subscribe(observerA);
let subB;


const observerB = {
  next: (item) => dom.append('<li>B:' + item + '</li>'),
  error: (err) => dom.append(err),
  complete: () => dom.append('<li>B:done</li>'),
}

setTimeout(() => {
  // subject.subscribe(observerB);
  dom.append('<li>subscribe observerB</li>');
  subB = connectableObservable.subscribe(observerB);
}, 3000);


setTimeout(() => {
  // do do this. source obs is still work.
  // subA.unsubscribe();
  // subB.unsubscribe();
  
  sub.unsubscribe();
}, 4000);
// change subject to behavior subject

const dom = $('#operator');
 //source obs
const connectableObservable = Rx.Observable
  .interval(1000)
  .take(5)
  .multicast(new Rx.Subject());


// work samme as before process (like subject..)  
connectableObservable.connect(); 
/*
setTimeout(()=>{
  connectableObservable.connect();
},4000);
*/
// const subject = new Rx.Subject();



const observerA = {
  next: (item) => dom.append('<li>A:' + item + '</li>'),
  error: (err) => dom.append(err),
  complete: () => dom.append('<li>A:done</li>'),
}

dom.append('<li>subscribe observerA</li>');
// subject.subscribe(observerA);
// observable.subscribe(subject);
connectableObservable.subscribe(observerA);


const observerB = {
  next: (item) => dom.append('<li>B:' + item + '</li>'),
  error: (err) => dom.append(err),
  complete: () => dom.append('<li>B:done</li>'),
}

setTimeout(() => {
  // subject.subscribe(observerB);
  connectableObservable.subscribe(observerB);
}, 4000);
// async if completion


const dom = $('#operator');
const observable = Rx.Observable.interval(1000).take(5);

const observerA = {
  next: (item) => dom.append('<li>A:' + item + '</li>'),
  error: (err) => dom.append(err),
  complete: () => dom.append('<li>A:done</li>'),
}

observable.subscribe(observerA);

const observerB = {
  next: (item) => dom.append('<li>B:' + item + '</li>'),
  error: (err) => dom.append(err),
  complete: () => dom.append('<li>B:done</li>'),
}

setTimeout(() => {
  observable.subscribe(observerB);
}, 3000);
// add buffer

const dom = $('#operator');
const observable = Rx.Observable.interval(1000).take(5);

const observerA = {
  next: (item) => dom.append('<li>A:' + item + '</li>'),
  error: (err) => dom.append(err),
  complete: () => dom.append('<li>A:done</li>'),
}

observable.subscribe(observerA);

const observerB = {
  next: (item) => dom.append('<li>B:' + item + '</li>'),
  error: (err) => dom.append(err),
  complete: () => dom.append('<li>B:done</li>'),
}

setTimeout(() => {
  observable.subscribe(observerB);
}, 3000);
// change subject to behavior subject

const dom = $('#operator');
const observable = Rx.Observable.interval(1000).take(5); //source obs

// const subject = new Rx.Subject();
// const subject = new Rx.BehaviorSubject();
const subject = new Rx.ReplaySubject(3); //get three value from previous stream.
// const subject = new Rx.AsyncSubject(); // If source observable is not complete, this subject do not work.
// undefined : behaviorSuject have initial value


const observerA = {
  next: (item) => dom.append('<li>A:' + item + '</li>'),
  error: (err) => dom.append(err),
  complete: () => dom.append('<li>A:done</li>'),
}

dom.append('<li>subscribe observerA</li>');
subject.subscribe(observerA);
observable.subscribe(subject);
const observerB = {
  next: (item) => dom.append('<li>B:' + item + '</li>'),
  error: (err) => dom.append(err),
  complete: () => dom.append('<li>B:done</li>'),
}

setTimeout(() => {
  subject.subscribe(observerB);
}, 4000);
// remove observable
// use just subject (next, subscribe)

const dom = $('#operator');
const observable = Rx.Observable.interval(1000).take(5);

const observerA = {
  next: (item) => dom.append('<li>A:' + item + '</li>'),
  error: (err) => dom.append(err),
  complete: () => dom.append('<li>A:done</li>'),
}

observable.subscribe(observerA);

const observerB = {
  next: (item) => dom.append('<li>B:' + item + '</li>'),
  error: (err) => dom.append(err),
  complete: () => dom.append('<li>B:done</li>'),
}

setTimeout(() => {
  observable.subscribe(observerB);
}, 3000);
// share multicast bridge observers

const dom = $('#operator');
const observable = Rx.Observable.interval(1000).take(5);

const observerA = {
  next: (item) => dom.append('<li>A:' + item + '</li>'),
  error: (err) => dom.append(err),
  complete: () => dom.append('<li>A:done</li>'),
}

//Change bridgeObserver object to new Subject()
const bridgeObservers = new Rx.Subject();

dom.append('<li>subscribe observerA</li>');
bridgeObservers.subscribe(observerA);
observable.subscribe(bridgeObservers);


// observable.subscribe(observerA);

const observerB = {
  next: (item) => dom.append('<li>B:' + item + '</li>'),
  error: (err) => dom.append(err),
  complete: () => dom.append('<li>B:done</li>'),
}


const observerC = {
  next: (item) => dom.append('<li>B:' + item + '</li>'),
  error: (err) => dom.append(err),
  complete: () => dom.append('<li>B:done</li>'),
}

setTimeout(() => {
  dom.append('<li>subscribe observerB</li>');
  bridgeObservers.subscribe(observerB);
  // observable.subscribe(observerB);
}, 3000);


setTimeout(()=>{
  dom.append('<li>subscribe observerC</li>');
  bridgeObservers.subscribe(observerC);
}, 7000);

// share multicast bridge observers

const dom = $('#operator');
const observable = Rx.Observable.interval(1000).take(5);

const observerA = {
  next: (item) => dom.append('<li>A:' + item + '</li>'),
  error: (err) => dom.append(err),
  complete: () => dom.append('<li>A:done</li>'),
}


const bridgeObservers = {
  observers: [],
  addObserver: function(observer){
    this.observers.push(observer);
  },
  next: function(x){
    this.observers.forEach((observer)=> observer.next(x))
  },
  error: function(err){
    this.observers.forEach((observer)=> observer.error(err))
  },
  complete: function(){
    this.observers.forEach((observer)=> observer.complete())
  }
};

dom.append('<li>subscribe observerA</li>');
bridgeObservers.addObserver(observerA);
observable.subscribe(bridgeObservers);


// observable.subscribe(observerA);

const observerB = {
  next: (item) => dom.append('<li>B:' + item + '</li>'),
  error: (err) => dom.append(err),
  complete: () => dom.append('<li>B:done</li>'),
}

setTimeout(() => {
  dom.append('<li>subscribe observerB</li>');
  bridgeObservers.addObserver(observerB);
  // observable.subscribe(observerB);
}, 3000);

// If you want to manipulation and then share
const dom = $('#operator');

function subjectFactory() {
  return Rx.Subject();
}

const result = Rx.Observable.interval(1000).take(8)
  .do(x => dom.append('<li>source' + x + '</li>'))
  .map(x => Math.random())
  .multicast(subjectFactory, function selector(shared) {
    const sharedDelayed = shared.delay(500);
    const merged = shared.merge(sharedDelayed);
    return merged;
  });
  
result.subscribe(x => dom.append('<li>' + x + '</li>'));

const subject = new Rx.Subject();

const dom = $('#operator');


const observable = Rx.Observable.interval(1000)
  // .take(7)
  .do(x => dom.append('<li>source ' + x + '</li>'))
  .multicast(new Rx.Subject())
  .refCount();

const observerA = {
  next: (item) => dom.append('<li>A:' + item + '</li>'),
  error: (err) => dom.append(err),
  complete: () => dom.append('<li>A:done</li>'),
}

dom.append('--subscribe A');
let subA = observable.subscribe(observerA);

const observerB = {
  next: (item) => dom.append('<li>B:' + item + '</li>'),
  error: (err) => dom.append(err),
  complete: () => dom.append('<li>B:done</li>'),
}

let subB;

setTimeout(() => {
  dom.append('--subscribe B');
  subB = observable.subscribe(observerB);
}, 3000);

setTimeout(() => {
  subA.unsubscribe();
}, 4000);

setTimeout(() => {
  subB.unsubscribe();
}, 4100);

setTimeout(() => {
  subA = observable.subscribe(observerA);
}, 6000);
const dom = $('#operator');

 //source obs
const autoConnectableObservable = Rx.Observable
  .interval(1000)
  // .take(2)
  // .do((x)=> dom.append('<li>source' + x + '(with do())</li>'))
  // .publish() // .multicast(new Rx.Subject())
  // .refCount();
  .share(); // === publish().refCount();


// .publishBehavior(1) 
// .publishLast(1) 



const observerA = {
  next: (item) => dom.append('<li>A:' + item + '</li>'),
  error: (err) => dom.append(err),
  complete: () => dom.append('<li>A:done</li>'),
}

dom.append('<li>subscribe observerA</li>');
let subA = autoConnectableObservable.subscribe(observerA);
//refCount 0 -> 1

let subB;



const observerB = {
  next: (item) => dom.append('<li>B:' + item + '</li>'),
  error: (err) => dom.append(err),
  complete: () => dom.append('<li>B:done</li>'),
}

setTimeout(() => {
  dom.append('<li>subscribe observerB</li>');
  subB = autoConnectableObservable.subscribe(observerB);
  //refCount 1 -> 2
}, 3000);


setTimeout(() => {
  subA.unsubscribe(); //refCount 2 -> 1
}, 2000);

setTimeout(() => {
  subB.unsubscribe(); //refCount 1 -> 0
}, 4000);
const dom = $('#operator');

 //source obs
const autoConnectableObservable = Rx.Observable
  .interval(1000)
  .do((x)=> dom.append('<li>source' + x + '(with do())</li>'))
  .multicast(new Rx.Subject())
  .refCount();



// let sub = connectableObservable.connect(); 


const observerA = {
  next: (item) => dom.append('<li>A:' + item + '</li>'),
  error: (err) => dom.append(err),
  complete: () => dom.append('<li>A:done</li>'),
}

dom.append('<li>subscribe observerA</li>');
let subA = autoConnectableObservable.subscribe(observerA);
//refCount 0 -> 1

let subB;



const observerB = {
  next: (item) => dom.append('<li>B:' + item + '</li>'),
  error: (err) => dom.append(err),
  complete: () => dom.append('<li>B:done</li>'),
}

setTimeout(() => {
  dom.append('<li>subscribe observerB</li>');
  subB = autoConnectableObservable.subscribe(observerB);
  //refCount 1 -> 2
}, 3000);


setTimeout(() => {
  subA.unsubscribe(); //refCount 2 -> 1
}, 2000);

setTimeout(() => {
  subB.unsubscribe(); //refCount 1 -> 0
}, 4000);
const dom = $('#operator');
const observable = Rx.Observable.interval(1000).take(5);

const observerA = {
  next: (item) => dom.append('<li>A:' + item + '</li>'),
  error: (err) => dom.append(err),
  complete: () => dom.append('<li>A:done</li>'),
}

observable.subscribe(observerA);

const observerB = {
  next: (item) => dom.append('<li>B:' + item + '</li>'),
  error: (err) => dom.append(err),
  complete: () => dom.append('<li>B:done</li>'),
}

setTimeout(() => {
  observable.subscribe(observerB);
}, 3000);