waterpark

0.2.6 • Public • Published

waterpark

JavaScript Style Guide Build Status experimental

Stream toolbox library

While working with streams, these basic operations will ease your life.

Quickstart

npm i waterpark
const {range, take, reduce} = require('waterpark')
 
// sum even numbers from 0 to 100
range(0, 100)
  .pipe(take.obj({amount: 1, every: 2}))
  .pipe(reduce.obj((sum, val) => sum + val, 0))
  .on('data', console.log)

Supported Streaming Modes

Object Mode: stream with objectMode: true

Buffer Mode: stream with objectMode: false

Waterpark streams default to objectMode (exception: fromBuffer).

Types: R = Readable, T = Transform, W = Writable, D = Duplex

Name Type Object Mode Buffer Mode Shorthand
concurrent T concurrent (concurrency, transformHandler, options)
count R count (offset, options)
delay T delay (milliseconds, jitter, options)
filter T filter (filterHandler, options)
fromArray R fromArray (array, options)
fromBuffer R fromBuffer (buffer, options)
interval R interval (milliseconds, options)
multicore T multicore (cores, path, options)
random R random (size, options)
range R range (from, to, options)
reduce R reduce (reducer, initialValue, repeatAfter)
slice T slice (begin, end, every, options)
skip T skip (begin, every, options)
take T take (amount, every, options)
through T through (fn, options)

count (options)

  • offset (default = 0) offset will be the first number emitted.
  • ...options <ReadableOptions> options for a readable stream.
  • Returns: <Readable>

Creates a readable stream emitting incrementing numbers.

Example

const {count} = require('waterpark')
count().on('data', console.log)

Expected output:

0
1
2
...

fromArray (options)

  • array <Array> source for the readable stream
  • ...options <ReadableOptions> options for a readable stream.
  • Returns: <Readable> supporting object mode ✓ | buffer mode ✓

Creates a readable stream form an array.

Example

const {fromArray} = require('waterpark')
 
const array = ['streaming', 'is', 'awesome']
fromArray({array})
  .on('data', console.log)

Expected output:

streaming
is
awesome

fromBuffer (options)

  • buffer <Buffer> source for the readable stream
  • ...options <ReadableOptions> options for a readable stream.
  • Returns: <Readable> supporting object mode ✗ | buffer mode ✓

Creates a readable stream form a buffer.

Example

const {fromBuffer} = require('waterpark')
 
const buffer = Buffer.from('letters')
fromBuffer({buffer, highWaterMark: 3})
  .on('data', buf => console.log(buf.toString()))

Expected output:

let
ter
s

interval (options)

  • interval <Number> interval in milliseconds
  • ...options <ReadableOptions> options for a readable stream.
  • Returns: <Readable> supporting object mode ✓ | buffer mode ✗

Periodically emits the current unix timestamp.

Internally interval is using setInterval. Temporal jitter as well as drift within the order of milliseconds might occur.

Example

const {interval} = require('waterpark')
 
interval({interval: 500, objectMode: true})
  .on('data', console.log)

Expected output:

1520281268331
1520281269344
1520281270346
...

random (options)

  • size <Number> length of emitted strings.
  • ...options <ReadableOptions> options for a readable stream.
  • Returns: <Readable> supporting object mode ✓ | buffer mode ✓

Emits random hex-encoded strings / buffers with a given size.

Example

const {random} = require('waterpark')
random.obj({
    size: 32 * 3,
    encoding: 'hex',
    highWaterMark: 32
  })
  .on('data', console.log)

Example output

c55607c14da303103810c1d0e608f1275e20f7c72d1df4cd9f4b9a4daa48dc39
f52a5c53a3971c1d43713ce36c81723f09f9ae7f8170dec26545c5b1f8b6b272
8a9ffdd4f6a90ebae1364bede92fb19b428670af05b3fb184b4b39de582eb7ba

range (options)

  • from <Number> integer, included range start.
  • to <Number> integer, included range end.
  • ...options <ReadableOptions> optional stream options.
  • Returns: <Readable> supporting object mode ✓ | buffer mode ✗

Emits integers in sequence from the defined range. from may be smaller than to, but both must be integer.

Example

const {range} = require('waterpark')
 
range({from: 1, to: 3})
  .on('data', console.log)

Expected output:

1
2
3

concurrent (options)

  • concurrency <Number> integer, concurrent transform operations.
  • ...options <TransformOptions> optional stream options.
  • Returns: <Transform> supporting object mode ✓ | buffer mode ✓

Concurrent stream processing.

Example

const {range, concurrent} = require('waterpark')
range({from: 1, to: 100})
  .pipe(concurrent({
    concurrency: 10,
    transform: (data, encoding, cb) => {
      setTimeout(() => cb(null, data), 100)
    }
   }))
  .on('data', console.log)

Finishes in ~1s while through would take ~10s

delay (options)

  • milliseconds <Number> integer, included range start.
  • jitter <Number> integer, included range end.
  • ...options <TransformOptions> optional stream options.
  • Returns: <Transform> supporting object mode ✓ | buffer mode ✓

Emits data delayed by a specified amount of time.

Example

const {range, delay} = require('waterpark')
range({from: 1, to: 3})
  .pipe(delay({delay: 500}))
  .on('data', console.log)

Expected output:

1
2
3

Lines will be printed in sequence. Each one delayed by ~500ms.

filter (options)

  • filter <[Function](data) => [Boolean]> pipe data that meets the filter condition.
  • ...options <TransformOptions> optional stream options.
  • Returns: <Transform> supporting object mode ✓ | buffer mode ✗

Emits data that passes the filter filter condition. This stream operates in object mode per default.

Example

const {range, filter} = require('waterpark')
range(1, 5)
  .pipe(filter(n => n % 2))
  .on('data', console.log)

Expected output:

1
3
5

The inital range 1 to 5 is filtered for odd numbers

multicore (options)

  • path <String> path to module that will be used for clustering.
  • cores <Number> number of cores used in parallel.
  • ...options <TransformOptions> optional stream options.
  • Returns: <Transform> supporting object mode ✓ | buffer mode ✓

Stream operations in parallel on multiple cores. ★

Forks the module referenced by path, core times, spreads the previous stream data to these child processes, computes their transform handlers in parallel and collects their digests while preserving order.

Communication between the main process and child processes is done via JSON encoding. Include serialization and deserialization of data that needs to be communicated to the worker and back into your performance estimation. If your work is mostly I/O bound you might be looking for concurrent which is used in multicore as scheduler.

For optimal performance, use the number of physical cores. Exceeding that amount is possible on machines with hyper threading, yet might yield actually less performance due to thrashing.

Example

Let's do some cpu intense calculation and compute the 1e6-fold SHA256 hash of multiple messages.

./main.js

const {range, multicore} = require('waterpark')
range(1, 12)
  .pipe(multicore(4, require.resolve('./worker.js')))
  .on('data', console.log)

./worker.js

const {createHash} = require('crypto')
 
process.on('message', (msg) => {
  for (let i = 1e6; i > 0; i--) {
    msg = createHash('sha256').update(msg.toString()).digest('hex')
  }
  process.send(msg)
})

Then execute node main

Expected output:

d6c3110abae572a3ce11a696068dca0f01961fbbf9f2c08bdfdde3640b79db0b
3a2ae473ab4a5fc533adb7367af8b1ffdd5a5a78fafb51945a0869021b07bb14
945a76e4ef3a32651ffde16b90d26c24bbadc9bdf50ff5f580f869108d6bff86
60ca5d721a66d84bfcfab6e0b79a8f5e83bb7a7cd24dcf11dcf4b8a348cf5fe8
...

Each line represents the outcome of a CPU intense calculation. range runs on the main process and sequentially pipes numbers into 4 child process each running on a separate core which are therefore able to calculate the expensive hash function in parallel.

slice (options)

  • begin <Number> zero based index at which to begin extraction. Default is 0
  • end <Number> pass elements up to but not including (zero based index).
  • every <Number> repeat slice operation after every elements.
  • ...options <TransformOptions> optional stream options.
  • Returns: <Transform> supporting object mode ✓ | buffer mode ✓

Similar to Array.slice() and Buffer.slice() slice is acting as range filter on its source.

In respect to streams potential infinite nature, the every parameter has been introduced.

Example Every 5 elements, pass the 2nd to the 4th element.

const {range, slice} = require('waterpark')
range(0, 9)
  .pipe(slice.obj(1, 4, 5))
  .on('data', console.log)

Expected output:

1
2
3
6
7
8

See: skip, take

reduce (reducer[, initalValue][, every])

  • options <TransformOptions> optional stream options.
  • reducer <Function (accumulator, currentValue, currentIndex)>
    • accumulator <any> accumulates the callbacks return values.
    • currentValue <any> the current element being processed.
    • currentIndex <Number> zero based index of the current element.
  • initialValue <any> Initial accumulator value. Default is 0
  • every <Number> repeat reduction after every steps. Default is infinity which reduces only once at the end of the source.
  • Returns: <Transform> supporting object mode ✓ | buffer mode ✗

Reduces stream emissions to one (source stream must be finite) or many if every is set.

Example Every 4 numbers emit the sum of the last 4 numbers.

const {range, reduce} = require('waterpark')
 
range.obj(1, 100)
  .pipe(reduce.obj((sum, val) => sum + val, 0, 4))
  .on('data', console.log)

Expected output:

10
26
42
58
74
...

skip (amount[, every][, options])

  • amount <Number> skip this amount of objects / bytes. Default is 0
  • every <Number> repeat skip operation after every elements. Default is infinity. every must be bigger than amount.
  • ...options <TransformOptions> optional stream options.
  • Returns: <Transform> supporting object mode ✓ | buffer mode ✓

See also: slice

In respect to streams potential infinite nature, the every parameter has been introduced.

Example Every 5 elements, pass the 2nd to the 4th element.

const {range, slice} = require('waterpark')
range(0, 9)
  .pipe(slice(1, 4, 5))
  .on('data', console.log)

Expected output:

1
2
3
6
7
8

take (amount[, every][, options])

  • amount <Number> only take this amount of objects / bytes. Default is 0
  • every <Number> repeat take operation after every elements. Default is infinity, which will cause take to end after amount elements have been processed.
  • ...options <TransformOptions> optional stream options.
  • Returns: <Transform> supporting object mode ✓ | buffer mode ✓

In respect to streams potential infinite nature, the every parameter has been introduced.

See also: slice

Example Take first 3 elements of a stream.

const {count, take} = require('waterpark')
count()
  .pipe(take(3))
  .on('data', console.log)

Expected output:

0
1
2

through ([options, ]fn(data, encoding, cb))

// TODO: write docs

Dependents (1)

Package Sidebar

Install

npm i waterpark

Weekly Downloads

0

Version

0.2.6

License

ISC

Unpacked Size

38.7 kB

Total Files

24

Last publish

Collaborators

  • ablay