Easy stream transformation and composition for node.js that integrates seamlessly with Q promises
One of the most useful feature of streams is back-pressure: if the bottom of the stream pipeline is slow (for example the Web client), then the top will automatically push slowly (for example your database and/or Web server). As a result memory and CPU consumption in node are optimal.
Streamee.js allows you to build very easily pipelines that compose and transform streams, so that you can keep back-pressure all along the way in a nice functional programming style. All transformation functions can return Q promises instead of direct values, which makes asynchronous operations less verbose and more functional (less callback hell!).
var ee = require'streamee';var stream1 = // Readable stream, for example from a HTTP chunked response, a MongoDB response, ...var promiseOfStream = // sometimes, because of callbacks, we can only get a Q.Promise[Readable] instead of a Readablevar stream2 = eeflattenReadablepromiseOfStream // well, now you can flatten it!eepipeAndRun // create a pipelineeeinterleavestream1 stream2 // interleave the streamseemapeeobj // 'ee.obj' means that we want to handle the chunk as a json objectobjnewField = 'something useful'return obj; // return directly a value, so this is a sync mapeecollecteeobj // collect is filter + mapif objintField > 3 && objintField < 10 // filterreturn getPromiseOfDataobj; // async map by returning a Q promise. The promise can contain either an object// or a string or a bufferdestination // Writable stream, for example a HTTP chunked response toward a Web client or a Websocket connection;
Inspired from Play Framework Enumeratee.
Streamee.js uses node 1.0+ streams, so if you use an API that returns node 0.8 streams, you have to wrap them like this:
var stream = require'stream';var newStream = wrapoldStream;
If each chunk of your stream is a logical independent unit (for example a stream of json strings), you should create an 'objectMode' stream so that node's stream buffers does not automatically concatenate the chunks:
var objectModeStream = objectMode: truewrapnonObjectModeStream;
For example, here is a function that returns a chunked http response as an objectMode stream:
var http = require'http';var Q = require'q';// GET a http chunked stream (for example a stream of json strings)var deferred = Qdefer;httpgeturldeferredresolveobjectMode: truewrapres;;return eeflattenReadabledeferredpromise; // flatten a Promise[stream.Readable] to a stream.Readable
All transformers (map, filter, collect...) take as first parameter the type in which you want to handle the chunk in the transformation function.
ee.bin is buffer (binary data),
ee.str is string and
ee.obj is an object. If a chunk is not
convertible to the asked type, it will be dropped.
Also, all transformation functions can return either buffer or string or object, as well as Promise[string] or Promise[buffer] or Promise[object].
Default encoding for all transformers is utf8. If a source or a destination has a different encoding, you can use
ee.encode(fromEncoding, toEncoding) at the begin or the end of the stream pipeline.
Map each chunk.
eemapeestrreturn str + ' is mapped to this message';
Example with a promise
var request = require'request';// Helper function that does a http GET request and returns a promise of the response bodyvar deferred = Qdefer;requesturlif !err && resstatusCode == 200 deferredresolvebodyelse deferredrejecterr;;return deferredpromise;eemapeeobjreturn GETobjurlthenreturn obj + ' is mapped to ' + body;;
Keep only the chunks that pass the truth test f.
eemapeeobjreturn objaField === 'someValue';
Collect is filter + map.
eecollecteestrif strlength > 10 return 'We keep ' + str + ' and map it to this message';
Encode the chunks that were encoded in 'fromEncoding' to 'toEncoding'.
var utf8stream = eeencode'utf16le' 'utf8';
Interleave the readable streams passed in the array.
var mixedStream = eeinterleavestream1 stream2;
Flatten a Q.Promise[Readable] to a Readable stream.
var aStream = eeflattenReadablepromiseOfReadableStream;
Take the streams passed in parameter and sequentially pipe them. Equivalent to stream1.pipe(stream2).pipe(...) ...
eepipeAndRunsrcStreameemapeeobjvar mappedObj = // ...return mappedObj;destinationStream;
This software is licensed under the Apache 2 license, quoted below.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.