node package manager
Loosely couple your services. Use Orgs to version and reuse your code. Create a free org »


Simple Stream

Build Status


Useful stream sources, transforms and sinks for Simple Streams.






### fromArray(array) -> stream Creates an stream from an array.
var arrayStream = stream.fromArray(numbers)
### fromReadableStream(readableStream) -> stream Creates an stream from a [Readable Stream](
var readStream = fs.createReadStream('input.txt', {encoding: 'utf8'})
var streamStream = stream.fromReadableStream(readStream)


### map(stream, mapFn) -> stream Create an stream that applies a map function to transform each value of the source stream.
var mapStream =, function(each) {
  return each * 2
// pipe the stream to an array: 
stream.toArray(mapStream)(function(err, res) {
### mapAsync(stream, mapFn) -> stream
var mapStream =, function(each, cb) {
  cb(null, each * 2)
Create an stream that filters the values of the source stream using a filter function.

filter(stream, filterFn) -> stream

var evenNumbersStream = stream.filter(someNumberStream, function(each) {
  return (each % 2) == 0
### filterAsync(stream, filterFn) -> stream
var evenNumbersStream = stream.filter(someNumberStream, function(each, cb) {
  cb(null, (each % 2) == 0)
### range(stream, range) -> stream Creates an stream that only streames over the specified range.

range is specified as {from: startIndex, to: endIndex} where from and to are both inclusive.

var rangeStream = stream.range(stream, {from: 10, to: 19})
### buffer(stream, bufferSize) -> stream Creates an stream with an internal buffer that is always filled until `bufferSize`. The buffer can abviously only grow if the buffer stream is read slower than the underlying stream source can return data.

The current buffer fill ratio can be inspected at any time using bufferFillRatio() which returns a number between 0..1.

The buffer size can be changed using setBufferSize(bufferSize).

var bufferedStream = stream.buffer(someStream, 10)
// inspect buffer size 
// change the buffer size later 


### toArray(stream) -> continuable Reads the source stream and writes the results to an array.
stream.toArray(someStream)(function(err, array) {
### toWritableStream(stream, writeStream, encoding) -> continuable Reads the source stream and writes the result to a [Writable Stream](
var writeStream = fs.createWriteStream('output.txt')
stream.toWritableStream(stream, writeStream, 'utf8')(function(err) {
### forEach(stream, fn) -> continuable Reads the source stream and invokes `fn` for each value of the stream.
stream.forEach(someStream, function(data) {
})(function(err) {
### forEachAsync(stream, fn) -> continuable Reads the source stream and invokes `fn` for each value of the stream. Only once the callback is invoked the next value is read from the source stream.
stream.forEachAsync(someStream, function(data, cb) {
  setTimeout(cb, 100)
}(function(err) {


This project was created by Mirko Kiefer (@mirkokiefer).