node package manager
Share your code. npm Orgs help your team discover, share, and reuse code. Create a free org »


stream-worker build status

Execute an async function per stream data event, pausing the stream when a concurrency limit is saturated. Inspired by async.queue, optimized for streams.

Supports promises and callbacks.

The Basics

npm install stream-worker


var streamWorker = require('stream-worker');

Promise style:

function doWork(data){
  /* ... do some work with data ... */
  return Promise.resolve();
streamWorker(stream, doWork, {promises : true, concurrency : 10})
.then(function() {
  /* ... the stream is exhausted and all workers are finished ... */
}, function(err) {
  /* ... there was an error processing the stream ... */

Callback style:

function doWork(data, done){
  /* ... do some work with data ... */
  return done(err);;
streamWorker(stream, doWork, {promises : false, concurrency :10},
  function(err) {
    /* ... the stream is exhauseted and all workers are finished ... */


streamWorker(stream, work, options, done)

Where options is an object with 2 optional parameters:

Parameter Default Description
promises false true if you want to use the above promises style
concurrency 10 specifies how many concurrent workers you want doing work in the stream

And done is a callback function if you use the callback workflow.