minimal pull stream


Minimal Pipeable Pull-stream

In classic-streams, streams push data to the next stream in the pipeline. In new-streams, data is pulled out of the source stream, into the destination.

pull-stream is a minimal take on pull streams, optimized for "object" streams, but still supporting text streams.

Stat some files:

  pull.values(['file1', 'file2', 'file3']),
  pull.collect(function (errarray) {

note that pull(a, b, c) is basically the same as a.pipe(b).pipe(c).

The best thing about pull-stream is that it can be completely lazy. This is perfect for async traversals where you might want to stop early.

pull-streams are not directly compatible with node streams, but pull-streams can be converted into node streams with pull-stream-to-stream and node streams can be converted into pull-stream using stream-to-pull-stream

What if implementing a stream was this simple:

pull.{Source,Through,Sink} wraps a function and added a type property to signify what type of pull-stream it is.

var pull = require('pull-stream')
var createSourceStream = pull.Source(function () {
  return function (endcb) {
    return cb(end, Math.random())
var createThroughStream = pull.Through(function (read) {
  return function (endcb) {
    read(end, cb)
var createSinkStream = pull.Sink(function (read) {
  read(null, function next (enddata) {
    if(end) return
    read(null, next)
pull(createSourceStream(), createThroughStream(), createSinkStream())

Instead of a readable stream, and a writable stream, there is a readable stream, and a reader stream.

See also:

The readable stream is just a function(end, cb), that may be called many times, and will (asynchronously) callback(null, data) once for each call.

The readable stream eventually callback(err) if there was an error, or callback(true) if the stream has no more data. In both cases a second argument will be irgnored. That is, the readable stream either provides data or indicates an error or end-of-data condition, never both.

if the user passes in end = true, then stop getting data from wherever.

All Sources and Throughs are readable streams.

var i = 100
var randomReadable = pull.Source(function () {
  return function (endcb) {
    if(end) return cb(end)
    //only read 100 times 
    if(i-- < 0) return cb(true)
    cb(null, Math.random())

A reader, is just a function that calls a readable, until it decideds to stop, or the readable cb(err || true)

All Throughs and Sinks are reader streams.

var logger = pull.Sink(function (read) {
  read(null, function next(enddata) {
    if(end === true) return
    if(end) throw end
    read(null, next)

These can be connected together by passing the readable to the reader


Or, if you prefer to read things left-to-right

pull(randomReadable(), logger())

A duplex/through stream is both a reader that is also readable

A duplex/through stream is just a function that takes a read function, and returns another read function.

var map = pull.Through(function (readmap) {
  //return a readable function! 
  return function (endcb) {
    read(end, function (enddata) {
      cb(end, data != null ? map(data) : null)

Every pipeline must go from a source to a sink. Data will not start moving until the whole thing is connected.

pull(source, through, sink)

some times, it's simplest to describe a stream in terms of other streams. pull can detect what sort of stream it starts with (by counting arguments) and if you pull together through streams, it gives you a new through stream.

var tripleThrough =
  pull(through1(), through2(), through3())
pull(source(), tripleThrough, sink())

Duplex streams, which are used to communicate between two things, (i.e. over a network) are a little different. In a duplex stream, messages go both ways, so instead of a single function that represents the stream, you need a pair of streams. {source: sourceStream, sink: sinkStream}

pipe duplex streams like this:

var a = duplex()
var b = duplex()
pull(a.source, b.sink)
pull(b.source, a.sink)
//which is the same as 
b.sink(a.source); a.sink(b.source)
//but the easiest way is to allow pull to handle this 
pull(a, b, a)
//"pull from a to b and then back to a" 

There is a deeper, platonic abstraction, where a streams is just an array in time, instead of in space. And all the various streaming "abstractions" are just crude implementations of this abstract idea.

classic-streams, new-streams, reducers

The objective here is to find a simple realization of the best features of the above.

A stream abstraction should be able to handle both streams of text and streams of objects.

Something like this should work: a.pipe(x.pipe(y).pipe(z)).pipe(b) this makes it possible to write a custom stream simply by combining a few available streams.

If a stream ends in an unexpected way (error), then other streams in the pipeline should be notified. (this is a problem in node streams - when an error occurs, the stream is disconnected, and the user must handle that specially)

Also, the stream should be able to be ended from either end.

Very simple transform streams must be able to transfer back pressure instantly.

This is a problem in node streams, pause is only transfered on write, so on a long chain (a.pipe(b).pipe(c)), if c pauses, b will have to write to it to pause, and then a will have to write to b to pause. If b only transforms a's output, then a will have to write to b twice to find out that c is paused.

reducers reducers has an interesting method, where synchronous tranformations propagate back pressure instantly!

This means you can have two "smart" streams doing io at the ends, and lots of dumb streams in the middle, and back pressure will work perfectly, as if the dumb streams are not there.

This makes laziness work right.

in pull streams, any part of the stream (source, sink, or through) may terminate the stream. (this is the case with node streams too, but it's not handled well).

A source may end (cb(true) after read) or error (cb(error) after read) After ending, the source must never cb(null, data)

Sinks do not normally end the stream, but if they decide they do not need any more data they may "abort" the source by calling read(true, cb). A abort (read(true, cb)) may be called before a preceding read call has called back.

Rules for implementing read in a through stream:

  1. Sink wants to stop. sink aborts the through

    just forward the exact read() call to your source, any future read calls should cb(true).

  2. We want to stop. (abort from the middle of the stream)

    abort your source, and then cb(true) to tell the sink we have ended. If the source errored during abort, end the sink by cb read with cb(err). (this will be an ordinary end/error for the sink)

  3. Source wants to stop. (read(null, cb) -> cb(err||true))

    forward that exact callback towards the sink chain, we must respond to any future read calls with cb(err||true).

In none of the above cases data is flowing!

  1. If data is flowing (normal operation: read(null, cb) -> cb(null, data)

    forward data downstream (towards the Sink) do none of the above!

There either is data flowing (4) OR you have the error/abort cases (1-3), never both.