node package manager


Concurrent transform stream

speculum - transform concurrently

The speculum Node package provides a Readable stream that combines a readable input stream and a configurable number of Transform stream instances to concurrently transform data from a single source (the input stream). If result order is not paramount, speculum can reduce run time.

An IO-heavy transform stream's run time T grows linearly with the number N of chunks (units of IO work) C:

T = N * C

speculum divides the run time by the number of concurrent streams X:

T = N * C / X

Here is a—somewhat contrived but runnable—example comparing the run time of a single stream with five concurrent streams:

var speculum = require('speculum')
var stream = require('stream')
var util = require('util')
util.inherits(Echo, stream.Transform)
function Echo (opts) {, opts)
Echo.prototype._transform = function (chunk, enc, cb) {
  var me = this
  setTimeout(function () {
  }, 100)
util.inherits(Count, stream.Readable)
function Count (opts, max) {, opts)
  this.count = 0
  this.max = max
Count.prototype._read = function () {
  var str = String(this.count++)
  this.push(new Buffer(str))
  if (this.count > MAX) {
function run (x, cb) {
  var opts = null
  var reader = new Count(opts, 10)
  function create () {
    return new Echo()
  var s = speculum(opts, reader, create, x)
  s.on('end', cb)
  s.on('error', cb)
function measure (x, cb) {
  function time (t) {
    return t[0] * 1e9 + t[1]
  var t = process.hrtime()
  run(x, function (er) {
    var lat = time(process.hrtime(t))
    console.log(+ ' X took ' + (lat / 1e6).toFixed(2) + ' ms')
measure(1, function (er) {
  measure(5, function (er) {})

Run it with:

$ node example.js
  • opts Object | null Options passed to this stream constructor
  • reader stream.Readable The input stream
  • create Function Factory function to create transform streams
  • x Number | 5 The number of concurrent transform streams to use

The speculum module exports a function that returns an instance of the Speculum class which extends stream.Readable. To access the Speculum class require('speculum'). The speculum stream round-robins the transform instances while avoiding to overflow its own and the transformers' buffers.

With npm do:

$ npm install speculum

MIT License