stream-capacitor
Throttles streams based on customizable throughput.
Purpose
The idea is to throttle a sequence of operations (i.e., Node.js streams) based on their throughput.
Where other implementations express their thresholds in bytes, this package uses an abstract number of items. This makes it suitable for use with object streams, where the raw payload size is either unknown or a less useful metric than a simple counter.
When you create a stream capacitor, it gives you access to two ports: the input port and the output port. When something flows through the input port, that increments the capacitor's queue size. When something flows through the output port, that decrements it again. Hence, you pipe the input port into the streams you want to throttle, and you pipe those back into the output port.
Here is a visualization of a typical setup:
Note that the transforms in the input and output section are entirely optional.
Usage
This snippet uses pumpify to easily
pipe a list of streams together. You can also use streams' native .pipe()
function if you prefer.
// When waiting for at least 10,000 items on the output side, pause inputconst highWaterMark = 10000// When the queue size drops below 3,000 again, resume inputconst lowWaterMark = 3000// Options are passed to the internal streamsconst options = objectMode: true // Create a new capacitorconst cap = highWaterMark lowWaterMark options // Put it all togetherconst flow = pumpify
Events
A capacitor is an EventEmitter
that can emit two events:
close
when throttling starts, andopen
when throttling stops.
Custom Counting
By default, each item that flow through the input port will add one to the
queue size, and each item going through the output port will subtract one. If
you want to alter this behavior, you can use the count
option. Its value is
a function that maps an item to the number of items it represents.
For example, to parse log lines into data objects and then batch them into equally-sized arrays using batch-stream, you could write something like:
const customCounter = { if Array // The output port receives arrays containing multiple items return chunklength else // The input port receives one item at a time return 1 } const highWaterMark = 10000const lowWaterMark = 3000const options = objectMode: true count: customCounter // See aboveconst cap = highWaterMark lowWaterMark optionsconst flow = pumpify
Adding and Removing Items
A throttled transform may need to add or remove items. For example, you might have a transform that transforms each input item into multiple output items, or it might drop invalid input objects.
In this case, you can set the delta
property on the capacitor to the number
of additional items that you're expecting on the output side. By default, it is
zero, meaning the capacitor expects one output item for each input item.
Maintainer
License
MIT