A simple version of an RxJS transform stream observable using Node.js streams.
const fs = require('fs')
const { delay, map, tap } = require('rxjs/operators')
const { Subject } = require('rxjs')
const { Transform } = require('stream')
const createStreamObservable = (
readStream,
) => {
const chunk$ = new Subject()
const stream$ = new Subject()
const pullNextChunk = (
value,
) => {
chunk$
.next(value)
}
const transformStream = (
new Transform({
readableObjectMode: true,
transform(
chunk,
encoding,
callback,
) {
stream$
.next(chunk)
chunk$
.subscribe(value => {
this
.push(value)
callback()
})
},
writableObjectMode: true,
})
)
readStream
.pipe(transformStream)
.on(
'finish',
() => {
chunk$
.complete()
stream$
.complete()
}
)
return {
pullNextChunk,
stream$: stream$,
}
}
const readStream = (
fs
.createReadStream('./novel.txt')
)
const {
pullNextChunk,
stream$,
} = createStreamObservable(readStream)
stream$
.pipe(
map(chunk => (
chunk
.toString()
)),
tap(() => {
console
.info('INCOMING CHUNK')
}),
delay(1000), // This could be an AJAX call
map(stringifiedChunk => (
stringifiedChunk
.toUpperCase()
)),
tap(() => {
console
.info('DONE PROCESSING CHUNK')
}),
)
.subscribe({
complete: () => {
console
.info('DONE PROCESSING ALL CHUNKS')
},
next: pullNextChunk,
})
// INCOMING CHUNK
// DONE PROCESSING CHUNK
// ...
// INCOMING CHUNK
// DONE PROCESSING CHUNK
// DONE PROCESSING ALL CHUNKS