Sawtaytoes
12/14/2018 - 1:04 PM

WriteStream-capable RxJS Transform Stream

This is an example of using both a read and write stream from Node.js and using an observable as part of the transform stream.

const fs = require('fs')
const { map, take, tap } = require('rxjs/operators')
const { Subject } = require('rxjs')
const { Transform } = require('stream')

const createStreamObservable = (
  readStream,
) => {
  const chunk$ = new Subject()
  const stream$ = new Subject()

  const pullNextChunk = (
    value,
  ) => {
    chunk$
    .next(value)
  }

  const transformStream = (
    new Transform({
      readableObjectMode: true,

      transform(
        chunk,
        encoding,
        callback,
      ) {
        chunk$
        .pipe(
          take(1),
        )
        .subscribe(value => {
          this
          .push(value)

          callback()
        })

        stream$
        .next(chunk)
      },

      writableObjectMode: true,
    })
  )

  const transformedReadStream = (
    readStream
    .pipe(transformStream)
    .on(
      'finish',
      () => {
        chunk$
        .complete()

        stream$
        .complete()
      }
    )
  )

  return {
    pullNextChunk,
    stream$,
    transformedReadStream, // Give access to the Node.js stream in case more native piping is required
  }
}

const readStream = (
  fs
  .createReadStream('./novel.txt')
)

const writeStream = (
  fs
  .createWriteStream('./uppercaseNovel.txt')
)

const {
  pullNextChunk,
  stream$,
  transformedReadStream,
} = createStreamObservable(readStream)

// Pipe to a `writeStream` or stream$ will never complete
transformedReadStream
.pipe(writeStream)

stream$
.pipe(
  map(chunk => (
    chunk
    .toString()
  )),
  tap(() => {
    console
    .info('INCOMING CHUNK')
  }),
  map(stringifiedChunk => (
    stringifiedChunk
    .toUpperCase()
  )),
  tap(() => {
    console
    .info('DONE PROCESSING CHUNK')
  }),
)
.subscribe({
  complete: () => {
    console
    .info('DONE PROCESSING ALL CHUNKS')
  },
  next: pullNextChunk,
})

// INCOMING CHUNK
// DONE PROCESSING CHUNK
// ...
// INCOMING CHUNK
// DONE PROCESSING CHUNK
// DONE PROCESSING ALL CHUNKS

// uppercaseNovel.txt
// ... (lots of text) lorem ipsum
// ... (lots of uppercase text) LOREM IPSUM