pipe-segment

0.1.2 • Public • Published

node-pipe-segment

Think of a pipe-segment as a bundle of streams, that do one thing together. See the discussion here: https://github.com/jbenet/random-ideas/issues/24 . Pipe segments are designed to be joined together into more complicated pipelines. It's similar in spirit to substack/labeled-stream-splicer, but rather than making one big pipeline, it uses segments that expose different streams for different things.

Inherit from this module and follow the pattern to make reusable components that can be joined together.

Install

npm install pipe-segment

Examples

Integrity Checksum pipe segment

Let's make a stream that wraps raw data with checksums (into a checksum-buffer). This is just a simple transform stream.

var multihash = require('multihashes')
var CkBuffer = require('checksum-buffer')
var through2 = require('through2')
 
function cksumWrap(func) {
  func = multihash.coerceCode(func) // ensure valid func code
  return through2.obj(cksumAdd)
 
  function cksumAdd(buf, enc, next) {
    return CkBuffer(buf, func).buffer
  }
}

And one that unwraps a checksum-buffer. This is just a simple transform stream again.

var CkBuffer = require('checksum-buffer')
var through2 = require('through2')
 
function cksumUnwrap() { // dont need func
  return through2.obj(cksumRemove)
 
  function cksumRemove(ckbuf, enc, next) {
    return CkBuffer(ckbuf).data()
  }
}

Now, let's make a segment that checks the checksum on a checksum-buffer. A segment is like a bundle of streams that are all related. This one is basically a pipe-segment-filter, so its stream interfaces will be input, output, and failed.

var CkBuffer = require('checksum-buffer')
var multihash = require('multihashes')
var filterSegment = require('pipe-segment-filter')
// filterSegment is like a T pipe segment.
// it returns a segment object, with three streams:
// - segment.input
// - segment.output
// - segment.filtered
 
function cksumCheckSegment(func) {
  // func = optional check for a specific multihash function
  if (func) func = multihash.coerceCode(func)
  return filterSegment(cksumCheck)
 
  function cksumCheck(buf) {
    var ckbuf = CkBuffer(buf)
    if (func && ckbuf.mhparams.code !== func)
      return false
    return ckbuf.check()
  }
}

Now, let's make a checksum integrity checked pipe-segment.

// we'll reuse the two functions above
var segment = require('pipe-segment') // makes pipe segments
var duplexer2 = require('duplexer2')
 
function integrityTransportSegment(cksumFunc) {
  //  raw   ----------- wrap ------->  cksum
  //  side  <--- unwrap --- check ---  side
 
  // make the internal streams
  var check = cksumCheckSegment(cksumFunc)
  var wrap = cksumWrap(cksumFunc)
  var unwrap = cksumUnwrap()
 
  // wire up the internal streams
  check.output.pipe(unwrap)
 
  // make duplex interfaces for each "side" of the streams
  var side1 = duplexer2(wrap, unwrap) // read from unwrap, write to wrap
  var side2 = duplexer2(check.input, wrap)
  // read from wrap, write to check.input
 
  return segment({
    'raw': side1,
    'checksum': side2,
    'filtered': check.filtered,
  })
}

Boom, now we have a T pipe that wraps/unwraps/checks multihash checksums.

Dependencies (1)

Dev Dependencies (4)

Package Sidebar

Install

npm i pipe-segment

Weekly Downloads

1

Version

0.1.2

License

MIT

Last publish

Collaborators

  • jbenet