Sawtaytoes
12/14/2018 - 1:02 PM

Simple RxJS Stream Observable

A simple version of an RxJS transform stream observable using Node.js streams.

const fs = require('fs')
const { delay, map, tap } = require('rxjs/operators')
const { Subject } = require('rxjs')
const { Transform } = require('stream')

const createStreamObservable = (
  readStream,
) => {
  const chunk$ = new Subject()
  const stream$ = new Subject()

  const pullNextChunk = (
    value,
  ) => {
    chunk$
    .next(value)
  }

  const transformStream = (
    new Transform({
      readableObjectMode: true,

      transform(
        chunk,
        encoding,
        callback,
      ) {
        stream$
        .next(chunk)

        chunk$
        .subscribe(value => {
          this
          .push(value)

          callback()
        })
      },

      writableObjectMode: true,
    })
  )

  readStream
  .pipe(transformStream)
  .on(
    'finish',
    () => {
      chunk$
      .complete()

      stream$
      .complete()
    }
  )

  return {
    pullNextChunk,
    stream$: stream$,
  }
}

const readStream = (
  fs
  .createReadStream('./novel.txt')
)

const {
  pullNextChunk,
  stream$,
} = createStreamObservable(readStream)

stream$
.pipe(
  map(chunk => (
    chunk
    .toString()
  )),
  tap(() => {
    console
    .info('INCOMING CHUNK')
  }),
  delay(1000), // This could be an AJAX call
  map(stringifiedChunk => (
    stringifiedChunk
    .toUpperCase()
  )),
  tap(() => {
    console
    .info('DONE PROCESSING CHUNK')
  }),
)
.subscribe({
  complete: () => {
    console
    .info('DONE PROCESSING ALL CHUNKS')
  },
  next: pullNextChunk,
})

// INCOMING CHUNK
// DONE PROCESSING CHUNK
// ...
// INCOMING CHUNK
// DONE PROCESSING CHUNK
// DONE PROCESSING ALL CHUNKS