pelias-parallel-stream

0.1.0 • Public • Published

pelias-parallel-stream

Sometimes you want to call an async function inside a transform stream and be able to take full advantage of the magical asynchronicity of node.js.

In order to do this, we must decouple the this.push(data) from the next().

This module allows you to do that in a clean and simple way. See below.

usage

var parallelStream = require('pelias-parallel-stream');
 
var maxInFlight = 2;
 
var seeYouLaterStream = parallelStream(maxInFlight,
  function (doc, enc, next) {
    console.log('I see you, ' + doc.name);
 
    setTimeout(function () {
      doc.msg = 'Oh hey there, ' + doc.name;
      next(null, doc);
    }, 1000);
  },
  function () {
     console.log('Ooh, looks like the stream is finished');
  });

NOTE: the end function is optional

Once you've made your new parallel stream, you can use it just like you would any normal transform stream. Just throw it into a .pipe() call, like so.

var streamArray = require('stream-array');
var sink = require('through2-sink');
 
streamArray([{name:'Diana'}, {name:'Julian'}, {name:'Stephen'}, {name:'Peter'}])
  .pipe(seeYouLaterStream)  // <--- there it is
  .pipe(sink.obj(function (doc) {
    console.log(doc.msg);
  }));

You can see the full example here

Now let's play around with this maxInFlight parameter. Let's first see how setting the maxInFlight to 1 results in a strandard serial transform stream. So the output will look like this...

time npm run example -- 1
I see you, Diana
Oh hey there again, Diana
I see you, Julian
Oh hey there again, Julian
I see you, Stephen
Oh hey there again, Stephen
I see you, Peter
Oh hey there again, Peter
 
real 0m4.256s
user 0m0.114s
sys     0m0.021s

Now let's set it to 2 and see how different the output looks, and if performance has improved.

time npm run example -- 2
I see you, Diana
I see you, Julian
Oh hey there again, Diana
I see you, Stephen
Oh hey there again, Julian
I see you, Peter
Oh hey there again, Stephen
Oh hey there again, Peter
 
real 0m2.258s
user 0m0.128s
sys     0m0.025s

You can see when we allow 2 requests in flight, we get the first 2 requests back-to-back, send then off for async handling and then pause to wait for one of them to return and make room for the next incoming request. As soon as we've seen one of the first 2 requests come back (Oh hey there again, Diana), another incoming requests comes in (I see you, Stephen). And let's note that the amount of time it took to get through all the data has been cut in half, because... asynchronous!

Finally, let's run it with maxInFlight set to 200, which is just a number larger than the length of the input data array.

time npm run example -- 200
I see you, Diana
I see you, Julian
I see you, Stephen
I see you, Peter
Oh hey there again, Diana
Oh hey there again, Julian
Oh hey there again, Stephen
Oh hey there again, Peter
 
real 0m1.159s
user 0m0.121s
sys     0m0.022s

You can see that all the requests were sent out at once, and all the responses came in shortly thereafter. Note how quickly it all happened, too.

Versioning

We rely on semantic-release and Greenkeeper to maintain our module and dependency versions.

Greenkeeper badge

Dependencies (1)

Dev Dependencies (8)

Package Sidebar

Install

npm i pelias-parallel-stream

Weekly Downloads

3

Version

0.1.0

License

MIT

Last publish

Collaborators

  • missinglink
  • dianashk
  • julian_mapzen
  • trescube