A meta-package for min-stream helper modules.
A meta-package for min-stream helper modules. This contains several useful and related modules for working with min-streams. Note that implementations of min-stream should not need to depend on this library.
The min-stream system is an interface more than anything. This has three main types: sources, filters, and sinks.
// If close is truthy, that means to clean up any resources and close the stream// call callback with an END event when done.// Otherwise, get some data and when ready, callback(err, item)// DATA is encoded as (falsy, item)// END is encoded as (falsy, undefined) or (falsy) or ()// ERROR is encoded as (err, undefined) or (err)
Sources are usually things like the readable end of TCP sockets and readable file streams. They can really be anything that emits events in a stream though.
A filter is a function that accepts a source and returns a new transformed source. This is where protocols are implemented. It has the signature:
// Set up per-stream state herereturn// Handle per event logic here, reading from upstream `read` when needed.// Close is often just forwarded to the upstream `read`.;
There are also technically two other filter types supported by the
chain helper described later on. They are regular map functions and push filters. They have less power than normal pull filters; but are in many cases much easier to write.
A sink represents something like the writable end of a TCP socket or a writable file. You can't write directly to it. Rather, you hand it a source function and it pulls at the rate it can handle. This way backpressure works automatically without having to deal with pause, resume, drain, and ready.
// Usage is simple.sinksource;
Or in the likely case you have a filter
Now that you know what min-streams are, you'll find that working with them directly is sometimes challenging. Their goal was to be minimal and easy to implement. This library makes them easy to use and very powerful as well!
Any module can be loaded either directly as
require('min-stream/module.js') or as a reference to index as
module is (
Manually connecting sink to filter to source ends up being a reverse pyramid. Often it's preferred to write it as a chain from source to sink in the same direction the data flows.
chain module helps with this. Here is a simple example.
chainsourcesocketsourcepullmyappsinksocketsink;return// Implement app logic as pull filter...;
Wrap a source function adding in the
Returns the function for easy chaining.
Wrap a pull filter by adding in the
Returns the function for easy chaining.
Wrap a push filter, converting it to a pull filter. Push filters accept an emit function and return a new emit function. There is no way to control back-pressure from within a push filter. Also close events skip push filters.
// Set up per-stream statereturn// handle this event and call `emit` 0 or more times as required by protocol.;
Wrap a map function, converting it to a pull filter. Map functions can't see backpressure, close events, or even end or error events. They only see items. Map functions are stateless. If they throw an exception it will be caught and sent as an error event.
// return transformed item.
Existing examples of map functions are
When chaining off a wrapped source, you can add pull, push, or map filters and a new wrapped source will be returned every time.
var parsedSource = chainsourcesocketsourcepushdeframermapJSONparse;
Attaching a sink to a source completes the chain and starts the action. Doesn't return anything and can't be chained from.
When chaining off a wrapped pull filter, you can add pull, push, or map filters and a new composite and wrapped pull filter will be returned.
var combined = chainpushpushFiltermapmyMap;
A composite sink can be built by chaining a sink call from a wrapped pull filter.
var newsink = chainmapJSONstringifysinksocketsink;
Accepts a variable number of source functions and returns a combined source. The input sources will be read in sequential order. Arrays of items may be used in place of sources for convenience.
var combined = cat"header" stream "tail";
Accepts a variable number of source functions and returns a combined source. The input sources will be read in parallel.
num copies. Returns the copies as an array of sources. This does not buffer, so all the duplicates must be read in parallel.
// Split a stream to go to both file and socketvar sources = dup2 input;logfilesinksources0;tcpClientsinksources1;
Multiplex several streams into one stream. This is like merge, except it annotates the data events so that you know which source they came from.
It accepts either an array of streams or a hash of streams. In the case of an array, the items will be tagged with the numerical index of the stream. In case of a hash object, they will be tagged with the object keys.
Items will be tagged by wrapping them in an array. For example, the item
"Hello" from the stream
words would be
["words", "Hello"] in the combined stream.
// tag using 0 and 1var combined = muxinput output;// or use wordsvar combined = muxinput: inputoutput: output;
De-multiplex a stream. This module undoes what the
mux module did. You have to give it either the number of array streams to expect or the hash keys as an array. Any keys you leave out will be dropped in the stream.
var sources = demux"words" "colors" combined;sourceswords // -> a stream of anything with the pattern ["words", value]sourcescolors // -> another stream of data
Just like dup, all the output streams must be read in parallel. There is no internal buffering.
Here is an example of array mode:
var sources = demux2 combined;sources0 // items matching [0, item] as just itemsources1 // items matching [1, item] as just item
This helper will consume all the events in a stream and
callback(null, items) with an array of items when done. If there was an error, it will be in the
Consume is also available as a sink for use with
This is like
consume, except it accepts an array of streams of a hash of streams. It will consume them all in parallel and
callback(null, result) when all are done. The
result will have the same structure and type of the original
consumealldemux5 source// result is an array of arrays;
A simple module that converts an array of items into a source function.
A version of the conversion that produces a slow source that waits a random amount of time before calling the read callback each time. Useful in testing.
There are many packaged and modules out there that implement this interface. Some interesting ones are: