Node.js streams demystified
var Writable = require("stream").Writable
var inherits = require("util").inherits
function Sink(options) {
Writable.call(this, options)
}
inherits(Sink, Writable)
Sink.prototype._write = function (chunk, encoding, callback) {
console.log(chunk.toString())
callback()
}
// a simple source stream
var Readable = require('stream').Readable;
var source = new Readable;
source.push('the quick brown fox ');
source.push('jumps over the lazy dog.\n');
source.push(null);
var sink = new Sink;
source.pipe(sink);
{Readable, Writable} = require('stream')
class Sink extends Writable
_write: (data, enc, next) ->
console.log(data.toString())
next()
# a simple source stream
source = new Readable
source.push 'the quick brown fox '
source.push 'jumps over the lazy dog.\n'
source.push null
sink = new Sink
source.pipe(sink)
# the quick brown fox
# jumps over the lazy dog.
###
Same example as above except that the source stream passes strings (instead of buffers) and the sink stream doesn't decode the strings to buffers before writing.
###
class Sink extends Writable
constructor: ->
super(decodeStrings: false) # don't decode strings
_write: (data, enc, next) ->
console.log(data)
next()
# a simple source stream
source = new Readable(encoding: 'utf8') # buffers will be decoded to strings
source.push 'the quick brown fox '
source.push 'jumps over the lazy dog.\n'
source.push null
sink = new Sink
source.pipe(sink)
# the quick brown fox
# jumps over the lazy dog.
var Transform = require("stream").Transform
var inherits = require("util").inherits
function ToUpper (options) {
Transform.call(this, options)
}
inherits(ToUpper, Transform)
ToUpper.prototype._transform = function (chunk, encoding, callback) {
var str = chunk.toString().toUpperCase()
this.push(str)
callback()
}
// a simple transform stream
var tx = new ToUpper;
// a simple source stream
var Readable = require('stream').Readable;
var rs = new Readable;
rs.push('the quick brown fox ');
rs.push('jumps over the lazy dog.\n');
rs.push(null);
rs.pipe(tx).pipe(process.stdout);
// THE QUICK BROWN FOX JUMPS OVER THE LAZY DOG.
{Readable, Transform} = require("stream")
class ToUpper extends Transform
_transform: (data, enc, next) ->
@push data.toString().toUpperCase()
next()
# a simple transform stream
tx = new ToUpper
# a simple source stream
rs = new Readable
rs.push 'the quick brown fox jumps over the lazy dog!\n'
rs.push null
rs.pipe(tx)
.pipe(process.stdout) # THE QUICK BROWN FOX JUMPS OVER THE LAZY DOG!
var Readable = require("stream").Readable
var inherits = require("util").inherits
function Source(content, options) {
Readable.call(this, options)
this.content = content
}
inherits(Source, Readable)
Source.prototype._read = function (size) {
if (!this.content) this.push(null)
else {
this.push(this.content.slice(0, size))
this.content = this.content.slice(size)
}
}
var s = new Source("The quick brown fox jumps over the lazy dog.")
console.log(s.read(10).toString())
console.log(s.read(10).toString())
console.log(s.read(10).toString())
console.log(s.read(10).toString())
console.log(s.read(10).toString())
// The quick
// brown fox
// jumps over
// the lazy
// dog.
var q = new Source("How now brown cow?")
q.pipe(process.stdout)
// How now brown cow?
Readable = require("stream").Readable
class Source extends Readable
constructor: (@content, options) ->
super options
_read: (size) ->
if not @content
@push null
else
@push(@content.slice(0, size))
@content = @content.slice(size)
s = new Source("The quick brown fox jumps over the lazy dog.")
console.log(chunk.toString()) while chunk = s.read(10) # print in 10 byte chunks
# The quick
# brown fox
# jumps over
# the lazy
# dog.
s = new Source("How now brown cow?")
s.pipe(process.stdout)
# How now brown cow?
A quick overview of the node.js streams interface with basic examples.
This is based on @brycebaril's presentation, Node.js Streams2 Demystified
Streams are a first-class construct in Node.js for handling data.
Think of them as as lazy evaluation applied to data.
There are essentially three major concepts:
Benefits in using streams:
Five classes of streams:
Readable
- sourcesWritable
- sinksDuplex
- both source and sinkTransform
- in-flight stream operationsPassthrough
- stream spyBelow is a quick overview of Readable, Writable, and Transform streams.
See also:
Use a Readable stream when supplying data as a stream.
Think: spigot/faucet.
Subclass stream.Readable.
Implement a _read(size)
method.
_read(size)
size
is in bytes, but can be ignored (especially for objectMode streams)_read(size)
must call this.push(chunk) to send a chunk to the consumerhighWaterMark
number: maximum number of bytes to store in the internal
buffer before ceasing to read (default: 16kb)
encoding
string: if set, buffers will be decoded to strings instead of
passing buffers (default: null
)
objectmode
boolean: instead of using buffers/strings, use javascript objects
(default: false
)
readable.pipe(target)
readable.read(size)
readable.on("data", ... )
Use a Writable stream when collecting data from a stream.
Think: drain/collect.
Subclass stream.Writable.
Implement a _write(chunk, encoding, callback)
method.
_write(chunk, encoding, callback)
chunk
is the content to writecallback()
when you're done with this chunkhighWaterMark
number: maximum number of bytes to store in the internal
buffer before ceasing to read (default: 16kb)
decodeStrings
boolean: whether to decode strings to Buffers before passing
them to _write()
(default: true)
source.pipe(sink)
writable.write(chunk [,encoding] [,callback])
Use a Transform stream when you want to operate on a stream in transit. This is a special kind of Duplex stream where the input and output stream are the same stream.
Think: filter/map.
_transform(chunk, encoding, callback)
method._flush(callback)
method._transform(chunk, encoding, callback)
Call this.push(something)
to forward it to the next consumer.
You don't have to push anything, this will skip a chunk.
You must call callback
one time per _transform
call.
_flush(callback)
When the stream ends, this is your chance to do any cleanup or last-minute this.push()
calls to clear any buffers or work. Call callback()
when done.
Superset of Readable and Writable options.
source.pipe(transform).pipe(drain)
transform.on("data", ... )
objectMode
)