1.0.9 • Public • Published


Merging promises with functional reactive map/reduce (+ debounce and throttle). Pipes run on backpressure and can be used in backends as well.

// ...
const readFile = pipe.wrap(fs.readFile)
const writeFile = pipe.wrap(fs.writeFile)
// ...
readFile('./package.json', 'utf8')
  .then(packString => JSON.parse(packString))
  .then(pack => pipe((resolve, reject) => {
    const keys = Object.keys(allScripts)
    return next => {
      if (keys.length) next(keys.splice(0, 1)[0])
      else resolve()
  .filter(lit => !lit.pack.scripts[lit.key])
  .map(lit => {
    lit.pack.scripts[lit.key] = lit.value
    return lit.pack
  .reduce((r, i) => i)
  .then(pack => JSON.stringify(pack, null, 2))
  .then(packString => writeFile('./package.json', packString))
  .then(() => { /* ... */ })

creating pipes with deferrer and generator

pipe(<deferrerFn>) -> <generatorFn> creates a pipe.

  • <deferrerFn> is a callback that will be called with 2 arguments: <resolveFn> and <rejectFn>.
  • Behaves like it's Promise counterpart.

<generatorFn> can be returned from the <deferrerFn>.

  • Will be called repeatedly with a <nextFn> until <deferrerFn> resolved or rejected.

  • Every <nextFn> must be called only once with a value each time <generatorFn> is called.

  • So that values are emitted as fast as subsequent consumption is performed.

  • Example of a generator emitting keys of an object literal as fast as subsequent consumers can process:

    pipe((resolve, reject) => {
      const keys = Object.keys(anObjectLiteral)
      return next => {
        if (keys.length) next(keys.splice(0, 1)[0])
        else resolve()


.map(<operate-fn>) -> <pipe> .filter(<operate-fn>) -> <pipe> .forEach(<operate-fn>) -> <pipe> .reduce(<operate-fn>) -> <pipe> behave like their array counterparts.

.reduce(<operate-fn>) -> <pipe> the reduce result is retrieved by chaining a then().

.then(<operate-fn>) -> <pipe> .catch(<fn>) behave like their Promise counterparts.

.debounce(<delay>) -> <pipe> continues the stream of operations only after a firing silence of the previous operation of at least <delay> milliseconds.

.throttle(<delay>) -> <pipe> limits the events coming from the previous operation to firing in the interval of the given <delay>.

constructor functions

pipe.resolve(<value>) -> <pipe> returns a pipe that will resolve with the given value.

pipe.reject(<error>) -> <pipe> returns a pipe that will reject with the given error.

pipe.all(<Array[Thenable]>) -> <pipe> resolves after all thenables (Promise-compatible asynchronous computations) in the given array did resolve.

  • Passes on an array of results.

pipe.race(<Array[Thenable]>) -> <pipe> resolves as soon as the first of all the thenables (Promise-compatible asynchronous computations) resolved.

  • Passes on the respective result.

pipe.from(<Array>) -> <pipe> creates an iterable pipe on which .map(<fn>) .filter(<fn>) .forEach(<fn>) .reduce(<fn>) can be used, from an array of values.

pipe.wrap(<NodeJS-style-callback>) -> <pipe> wraps a NodeJS style callback function (1st argument error, others results) into a pipe.

  • Will resolve with the given arguments in an array (if more than one), with the result value otherwise.
  • Will reject on error.

NOT RECOMMENDED: pipe.buffer(<size>) -> <buffer> creates a buffer that keeps maximum the <size> amount of emitted values before the consuming operation is retrieving them. If the consumer is too slow and a <size> is given, values might be omitted. Without a <size> given and a slow consumer the buffer might overflow and crash your application. It's advisable to structure your code so that a buffer is not needed.

  • <buffer>.emit(<value>) emits a value onto the buffer. The value is stored until a pipe consumer retrieves it or it gets pushed from the buffer by reaching the <size> limit.
  • <buffer>.resolve(<value>) resolves the pipe underneath the buffer.
  • <buffer>.reject(<error>) rejects the pipe underneath the buffer.
  • <buffer>.pipe the pipe underneath the buffer.

Package Sidebar


npm i teth-pipe

Weekly Downloads






Last publish


  • jaqmol