Streams Whack-a-Mole
@mafintosh
P2P Node.js Core Streams
A depressing story on why you probably all have bugs in your streams
A Brief History of Node.js Streams
Streams 1 - Node.js 0.6
Basically DIY streams.

var Stream = require('stream')
var s = new Stream() // just has .pipe
s.emit('data', 'hi') // you manually emit data
    

s.on('data', data => console.log('got data:', data)
if (s.pause) s.pause() // let's hope someone impl'ed pause
    
Simple!
Push based
But ... hard as you had to impl ... everything
Streams 2 - Node.js 0.10

var { Readable } = require('stream')
var s = new Readable() // proper abstract class
s._read = data => s.push('hi')
    

s.on('readable', function () {
  var data = s.read()
})
Complex Batteries included Pull based
Streams 3 - Node.js ~0.11
Small iteration on 2.
~ Makes the s.on('data') work together with the s.read() stuff
Streams are still evolving
Everytime we fix a bug we break tons of things
Streams are probably the most complex objects returned from Node.js core

var stream = require('stream')
var rs = new stream.Readable()
                                      .
rs.destroy()
    

var stream = require('stream')
var rs = new stream.Readable()
// fails in Node.js 6                 .
rs.destroy()
    

var stream = require('stream')
var rs = new stream.Readable()
// triggers rs.push(null) in Node.js 8.
rs.destroy()
    

var stream = require('stream')
var rs = new stream.Readable()
// emits('close') in Node.js 10       .
rs.destroy()
    
readable-stream is an official npm mirror of streams

var stream = require('readable-stream')
var rs = new stream.Readable()
If you are writing stream modules always use readable-stream - then you know what you get

var s = fs.createReadStream('filename.txt')


s.on('end', function () {
  console.log('stream ended')
})
    

var s = fs.createReadStream('filename.txt')
// 'end' is never triggered
// as the stream is never "drained"
s.on('end', function () {
  console.log('stream ended')
})
    
Memory leak. Always drain your streams.

var s = fs.createReadStream('filename.txt')

s.resume()
s.on('end', function () {
  console.log('stream ended')
})
    

var s = fs.createReadStream('filename.txt')

s.on('readable', () => while(s.read());)
s.on('end', function () {
  console.log('stream ended')
})
    

var s = fs.createReadStream('filename.txt')

s.pipe(destination)
s.on('end', function () {
  console.log('stream ended')
})
    

var s = fs.createReadStream('filename.txt')

s.on('data', data => console.log(data))
s.on('end', function () {
  console.log('stream ended')
})
    
Yes, on('data') has magic behaivor
(basically triggers s.resume() if s.pause() was not called)
on('end') does not mean end
on('end') means readable side succeded and is done

activeStreams++

stream.on('data', ...)
stream.on('end', function () {
  activeStreams--
})
    

activeStreams++
// 'end' might never fire
stream.on('data', ...)
stream.on('end', function () {
  activeStreams--
})
    
Streams can end with either 'close', 'error', 'finish', and/or 'end'

activeStreams++
stream.on('data', ...)
let once = false

stream.on('end', done)
stream.on('finish', done)
stream.on('close', done)
stream.on('error', done)

function done () {
  if (once) return
  once = true
  activeStreams--
}
    
'finish' means writable side succeded and is done
triggered by stream.end()
stream.push(null) -> stream.on('end') stream.end() -> stream.on('finish')
Backwards compat

var finished = require('end-of-stream') .
activeStreams++
finished(stream, function (err) {
  activeStreams--
})
  

var finished = require('stream').finished
activeStreams++
finished(stream, function (err) {
  activeStreams--
})
  

http.createServer(function (req, res) {

  fs.createReadStream('file.data').pipe(res)
})

http.createServer(function (req, res) {
  // Will memory leak if res closes
  fs.createReadStream('file.data').pipe(res)
})
pipe() does not do error handling

const { finished } = require('stream')
http.createServer(function (req, res) {
  const s = fs.createReadStream('file.data')
  finished(s, (err) => { if (err) res.destroy() })
  finished(res, (err) => { if (err) s.destroy() })
  s.pipe(res)
})
Never use pipe()

const pump = require('pump')
http.createServer(function (req, res) {

  pump(fs.createReadStream('file.data', res)                 .
})

const pump = require('pump')
http.createServer(function (req, res) {
  // error handles, shutdowns streams etc, callback support
  pump(fs.createReadStream('file.data', res, (err) => { ... })
})

const pump = require('stream').pipeline
http.createServer(function (req, res) {
  // error handles, shutdowns streams etc, callback support
  pump(fs.createReadStream('file.data', res, (err) => { ... })
})

var s = new stream.Writable()

s._write = (data, enc, cb) => {
  if (isHandshake(data)) {
    this.emit('handshake')
    // stream might be destroyed so this might error
    s.push(data.slice(offset))
  }
}
    

var s = new stream.Writable()

s._write = (data, enc, cb) => {
  if (isHandshake(data)) {
    this.emit('handshake') // can have side-effects!
    // stream might be destroyed so this might error
    s.push(data.slice(offset))
  }
}
    

var s = new stream.Writable()

s._write = (data, enc, cb) => {
  if (isHandshake(data)) {
    this.emit('handshake')
    if (s.destroyed) return                        .
    s.push(data.slice(offset))
  }
}
    
Everytime you emit('event') you have to re-check any public state
This usually means checking if you were destroyed
*phew*
There are more, honestly ... read the source :/
The Future
Stream 4?
Stream 5?
Stream X!
https://github.com/mafintosh/streamx
Makes streams easier to implement
Error handling pipe, proper life cycle support, backwards compat, and more

      const w = new streamx.Writable({
        open (cb) {
          fs.open(filename, (err, fd) => {
            if (err) return cb(err)
            w.fd = fd
            cb()
          })
        }
        write (data, cb) {
          fs.write(this.fd, data, 0, data.length, null, cb)
        },
        destroy (cb) {
          if (!w.fd) return cb()
          fs.close(w.fd, cb)
        }
      })
    
Thank you!