Streams Whack-a-Mole
@mafintosh
P2P
Node.js Core
Streams
A depressing story on why you probably all have bugs in your streams
A Brief History of Node.js Streams
Streams 1 - Node.js 0.6
Basically DIY streams.
var Stream = require('stream')
var s = new Stream() // just has .pipe
s.emit('data', 'hi') // you manually emit data
s.on('data', data => console.log('got data:', data)
if (s.pause) s.pause() // let's hope someone impl'ed pause
Simple!
Push based
But ... hard as you had to impl ... everything
Streams 2 - Node.js 0.10
var { Readable } = require('stream')
var s = new Readable() // proper abstract class
s._read = data => s.push('hi')
s.on('readable', function () {
var data = s.read()
})
Complex
Batteries included
Pull based
Streams 3 - Node.js ~0.11
Small iteration on 2.
~ Makes the s.on('data') work together with the s.read() stuff
Streams are still evolving
Everytime we fix a bug we break tons of things
Streams are probably the most complex objects returned from Node.js core
var stream = require('stream')
var rs = new stream.Readable()
.
rs.destroy()
var stream = require('stream')
var rs = new stream.Readable()
// fails in Node.js 6 .
rs.destroy()
var stream = require('stream')
var rs = new stream.Readable()
// triggers rs.push(null) in Node.js 8.
rs.destroy()
var stream = require('stream')
var rs = new stream.Readable()
// emits('close') in Node.js 10 .
rs.destroy()
readable-stream is an official npm mirror of streams
var stream = require('readable-stream')
var rs = new stream.Readable()
If you are writing stream modules always use readable-stream
- then you know what you get
var s = fs.createReadStream('filename.txt')
s.on('end', function () {
console.log('stream ended')
})
var s = fs.createReadStream('filename.txt')
// 'end' is never triggered
// as the stream is never "drained"
s.on('end', function () {
console.log('stream ended')
})
Memory leak. Always drain your streams.
var s = fs.createReadStream('filename.txt')
s.resume()
s.on('end', function () {
console.log('stream ended')
})
var s = fs.createReadStream('filename.txt')
s.on('readable', () => while(s.read());)
s.on('end', function () {
console.log('stream ended')
})
var s = fs.createReadStream('filename.txt')
s.pipe(destination)
s.on('end', function () {
console.log('stream ended')
})
var s = fs.createReadStream('filename.txt')
s.on('data', data => console.log(data))
s.on('end', function () {
console.log('stream ended')
})
Yes, on('data') has magic behaivor
(basically triggers s.resume() if s.pause() was not called)
on('end') does not mean end
on('end') means readable side succeded and is done
activeStreams++
stream.on('data', ...)
stream.on('end', function () {
activeStreams--
})
activeStreams++
// 'end' might never fire
stream.on('data', ...)
stream.on('end', function () {
activeStreams--
})
Streams can end with either 'close', 'error', 'finish', and/or 'end'
activeStreams++
stream.on('data', ...)
let once = false
stream.on('end', done)
stream.on('finish', done)
stream.on('close', done)
stream.on('error', done)
function done () {
if (once) return
once = true
activeStreams--
}
'finish' means writable side succeded and is done
triggered by stream.end()
stream.push(null) -> stream.on('end')
stream.end() -> stream.on('finish')
Backwards compat
var finished = require('end-of-stream') .
activeStreams++
finished(stream, function (err) {
activeStreams--
})
var finished = require('stream').finished
activeStreams++
finished(stream, function (err) {
activeStreams--
})
http.createServer(function (req, res) {
fs.createReadStream('file.data').pipe(res)
})
http.createServer(function (req, res) {
// Will memory leak if res closes
fs.createReadStream('file.data').pipe(res)
})
pipe() does not do error handling
const { finished } = require('stream')
http.createServer(function (req, res) {
const s = fs.createReadStream('file.data')
finished(s, (err) => { if (err) res.destroy() })
finished(res, (err) => { if (err) s.destroy() })
s.pipe(res)
})
Never use pipe()
const pump = require('pump')
http.createServer(function (req, res) {
pump(fs.createReadStream('file.data', res) .
})
const pump = require('pump')
http.createServer(function (req, res) {
// error handles, shutdowns streams etc, callback support
pump(fs.createReadStream('file.data', res, (err) => { ... })
})
const pump = require('stream').pipeline
http.createServer(function (req, res) {
// error handles, shutdowns streams etc, callback support
pump(fs.createReadStream('file.data', res, (err) => { ... })
})
var s = new stream.Writable()
s._write = (data, enc, cb) => {
if (isHandshake(data)) {
this.emit('handshake')
// stream might be destroyed so this might error
s.push(data.slice(offset))
}
}
var s = new stream.Writable()
s._write = (data, enc, cb) => {
if (isHandshake(data)) {
this.emit('handshake') // can have side-effects!
// stream might be destroyed so this might error
s.push(data.slice(offset))
}
}
var s = new stream.Writable()
s._write = (data, enc, cb) => {
if (isHandshake(data)) {
this.emit('handshake')
if (s.destroyed) return .
s.push(data.slice(offset))
}
}
Everytime you emit('event') you have
to re-check any public state
This usually means checking if you were destroyed
*phew*
There are more, honestly ... read the source :/
The Future
Stream 4?
Stream 5?
Stream X!
Makes streams easier to implement
Error handling pipe, proper life cycle support, backwards compat, and more
const w = new streamx.Writable({
open (cb) {
fs.open(filename, (err, fd) => {
if (err) return cb(err)
w.fd = fd
cb()
})
}
write (data, cb) {
fs.write(this.fd, data, 0, data.length, null, cb)
},
destroy (cb) {
if (!w.fd) return cb()
fs.close(w.fd, cb)
}
})
Thank you!