stream-capacitor

0.2.0 • Public • Published

stream-capacitor

npm Dependencies Build Status Coverage Status JavaScript Standard Style

Throttles streams based on customizable throughput.

Purpose

The idea is to throttle a sequence of operations (i.e., Node.js streams) based on their throughput.

Where other implementations express their thresholds in bytes, this package uses an abstract number of items. This makes it suitable for use with object streams, where the raw payload size is either unknown or a less useful metric than a simple counter.

When you create a stream capacitor, it gives you access to two ports: the input port and the output port. When something flows through the input port, that increments the capacitor's queue size. When something flows through the output port, that decrements it again. Hence, you pipe the input port into the streams you want to throttle, and you pipe those back into the output port.

Here is a visualization of a typical setup:

Diagram

Note that the transforms in the input and output section are entirely optional.

Usage

This snippet uses pumpify to easily pipe a list of streams together. You can also use streams' native .pipe() function if you prefer.

import StreamCapacitor from 'stream-capacitor'
import pumpify from 'pumpify'
 
// When waiting for at least 10,000 items on the output side, pause input
const highWaterMark = 10000
// When the queue size drops below 3,000 again, resume input
const lowWaterMark = 3000
// Options are passed to the internal streams
const options = {objectMode: true}
 
// Create a new capacitor
const cap = new StreamCapacitor(highWaterMark, lowWaterMark, options)
 
// Put it all together
const flow = pumpify.obj(
  someReadable,         // Produce input
  inputTransform,       // Unthrottled input transform
  cap.input,            // Throttle every stream that follows
  transform1,           // Throttled transforms
  transform2,           // ...
  transform3,           // ...
  cap.output,           // End throttling
  outputTransform,      // Unthrottled output transform
  someWritable          // Consume output
)

Events

A capacitor is an EventEmitter that can emit two events:

  • close when throttling starts, and
  • open when throttling stops.

Custom Counting

By default, each item that flow through the input port will add one to the queue size, and each item going through the output port will subtract one. If you want to alter this behavior, you can use the count option. Its value is a function that maps an item to the number of items it represents.

For example, to parse log lines into data objects and then batch them into equally-sized arrays using batch-stream, you could write something like:

import StreamCapacitor from 'stream-capacitor'
import BatchStream from 'batch-stream'
import pumpify from 'pumpify'
 
const customCounter = (chunk, encoding) => {
  if (Array.isArray(chunk)) {
    // The output port receives arrays containing multiple items
    return chunk.length
  } else {
    // The input port receives one item at a time
    return 1
  }
}
 
const highWaterMark = 10000
const lowWaterMark = 3000
const options = {
  objectMode: true,
  count: customCounter  // See above
}
const cap = new StreamCapacitor(highWaterMark, lowWaterMark, options)
const flow = pumpify.obj(
  createLogLineReader(),
  cap.input,            // Adds one per log string
  createLogLineToDataObjectTransformer(),
  new BatchStream({size}),
  cap.output,           // Subtracts batch array length per batch
  createDataObjectWriter()
)

Adding and Removing Items

A throttled transform may need to add or remove items. For example, you might have a transform that transforms each input item into multiple output items, or it might drop invalid input objects.

In this case, you can set the delta property on the capacitor to the number of additional items that you're expecting on the output side. By default, it is zero, meaning the capacitor expects one output item for each input item.

Maintainer

Tim De Pauw

License

MIT

Readme

Keywords

none

Package Sidebar

Install

npm i stream-capacitor

Weekly Downloads

0

Version

0.2.0

License

MIT

Last publish

Collaborators

  • timdp