streamworks

Run a nested collection of streams in pipe or merge configurations

streamworks

Combine merge and pipe streams into stream architectures that can bend packets to your will.

A merge stream will duplicate the input across all members and merge the output from all members (based on combine-stream.

A pipe stream will pass the output from each member as the input to the next (based on stream-combiner.

Both types accepts streams in their list - you can combine merge and pipes with each other - it's stream inception!

$ npm install streamworks

There are 2 main methods:

  • merge
  • pipe

Create each type of stream by passing an array of either:

  • functions - these are turned into streams using through2
  • streams - these are used as is
var from = require('from');
var streamworks = require('streamworks');
 
 
// a merge is a stream that splits the input across each function and merges the output back into one stream 
var mergestream = streamworks.merge([
 
    // order lots 
    function(chunkenccallback){
        this.push(chunk + ':10')
        callback()
    },
 
    // order some 
    function(chunkenccallback){
        this.push(chunk + ':2')
        callback()
    }
])
 
 
// a pipe is a stream that passes values through each function 
var pipestream = streamworks.pipe([
 
    // filter anything that does not start with p 
    function(chunkenccallback){
        if(chunk.toString().indexOf('p')!=0){
            this.push(chunk);
        }
        callback();
    },
 
    // uppercase to input 
    function(chunkenccallback){
 
        this.push(chunk.toString().toUpperCase())
    }
])
 
// run some data through the merge stream (which will duplicate it) and then through the pipe stream (which will filter it) 
from(['apple', 'pie', 'custard'])
    .pipe(mergestream)
    .pipe(pipestream)
    .pipe(process.stdout)

Because streamworks streams are, umm, streams - you can create complex nested stream-structures:

 
var bigAssStream = streamworks.pipe([
  function(chunkenccallback){
    if(chunk!='world'){
      this.push(chunk);
    }
    callback();
  },
 
  streamworks.merge([
    function(chunkenccallback){
      this.push('A1:' + chunk);
      callback();
    },
    function(chunkenccallback){
      this.push('A2:' + chunk);
      callback();
    },
    function(chunkenccallback){
      this.push('A3:' + chunk);
      callback();
    }
  ]),
 
  streamworks.pipe([
 
    function(chunkenccallback){
      if(chunk.toString().indexOf('A2')!=0){
        this.push(chunk);
      }
      callback();
    },
    streamworks.merge([
      function(chunkenccallback){
        this.push('sub1:' + chunk);
        callback(); 
      },
      function(chunkenccallback){
        this.push('sub2:' + chunk);
        callback(); 
      }
    ]),
    function(chunkenccallback){
      if(chunk.toString().indexOf('sub2:A1:')!=0){
        this.push(chunk);
      }
      
      callback()
    },
  ])
])
 
var arr = [];
 
from(['hello','world','apple']).pipe(bigAssStream)
.on('data', function(chunk){
  arr.push(chunk.toString())
}).on('end', function(){
    console.dir(arr);
})
 
/*
 
sub1:A1:hello
sub1:A3:hello
sub2:A3:hello
sub1:A1:apple
sub1:A3:apple
sub2:A3:apple
    
*/
 

If you pass true or:

{
    objectMode:true
}

as the first argument to pipe or merge - the stream will be in object mode.

this means that the 'chunks' will be what you sent and not buffers/strings.

You can also use:

  • pipeObjects
  • mergeObjects

Which are shortcuts for:

  • pipe(true, [])
  • merge(true, [])

create a new readable/writable stream that will pipe each value through the array of streams/functions

shorthand for pipe(true, [])

create a new readable/writable stream that will duplicate each value into each of the streams/functions and merge the results back into the output

shorthand for merge(true, [])

MIT