Merging promises with functional reactive map/reduce (+ debounce and throttle). Pipes run on backpressure and can be used in backends as well.
// ...const readFile = pipeconst writeFile = pipe// ...
creating pipes with deferrer and generator
pipe(<deferrerFn>) -> <generatorFn> creates a pipe.
<deferrerFn>is a callback that will be called with 2 arguments:
- Behaves like it's Promise counterpart.
<generatorFn> can be returned from the
Will be called repeatedly with a
<deferrerFn>resolved or rejected.
<nextFn>must be called only once with a value each time
So that values are emitted as fast as subsequent consumption is performed.
Example of a generator emitting keys of an object literal as fast as subsequent consumers can process:
.map(<operate-fn>) -> <pipe>
.filter(<operate-fn>) -> <pipe>
.forEach(<operate-fn>) -> <pipe>
.reduce(<operate-fn>) -> <pipe> behave like their array counterparts.
.reduce(<operate-fn>) -> <pipe> the reduce result is retrieved by chaining a
.then(<operate-fn>) -> <pipe>
.catch(<fn>) behave like their Promise counterparts.
.debounce(<delay>) -> <pipe> continues the stream of operations only after a firing silence of the previous operation of at least
.throttle(<delay>) -> <pipe> limits the events coming from the previous operation to firing in the interval of the given
pipe.resolve(<value>) -> <pipe> returns a pipe that will resolve with the given value.
pipe.reject(<error>) -> <pipe> returns a pipe that will reject with the given error.
pipe.all(<Array[Thenable]>) -> <pipe> resolves after all thenables (Promise-compatible asynchronous computations) in the given array did resolve.
- Passes on an array of results.
pipe.race(<Array[Thenable]>) -> <pipe> resolves as soon as the first of all the thenables (Promise-compatible asynchronous computations) resolved.
- Passes on the respective result.
pipe.from(<Array>) -> <pipe> creates an iterable pipe on which
.reduce(<fn>) can be used, from an array of values.
pipe.wrap(<NodeJS-style-callback>) -> <pipe> wraps a NodeJS style callback function (1st argument error, others results) into a pipe.
- Will resolve with the given arguments in an array (if more than one), with the result value otherwise.
- Will reject on error.
pipe.buffer(<size>) -> <buffer> creates a buffer that keeps maximum the
<size> amount of emitted values before the consuming operation is retrieving them. If the consumer is too slow and a
<size> is given, values might be omitted. Without a
<size> given and a slow consumer the buffer might overflow and crash your application. It's advisable to structure your code so that a buffer is not needed.
<buffer>.emit(<value>)emits a value onto the buffer. The value is stored until a pipe consumer retrieves it or it gets pushed from the buffer by reaching the
<buffer>.resolve(<value>)resolves the pipe underneath the buffer.
<buffer>.reject(<error>)rejects the pipe underneath the buffer.
<buffer>.pipethe pipe underneath the buffer.