Network Performance Monitor

    pipeline-pipe
    TypeScript icon, indicating that this package has built-in type declarations

    0.3.0 • Public • Published

    pipeline-pipe Node CI npm version downloads

    Parallel transform and some utilities for Node Object Stream lovers

    Why

    Install

    npm install pipeline-pipe

    pipe(fn, opts)

    Example usage:

    // Example to scrape HTML and store titles of them in DB:
    
    const { pipeline, Readable } = require('stream');
    const pipe = require('pipeline-pipe');
    
    pipeline(
        Readable.from([1, 2, 3]),
    
        // Request HTML asynchronously in 16 parallel
        pipe(async postId => {
          const json = await getPost(postId);
          return json;
        }, 16),
    
        // Synchronous transformation as Array.prototype.map
        pipe(json => parseHTML(json.postBody).document.title),
    
        // Synchronous transformation as Array.prototype.filter
        pipe(title => title.includes('important') ? title : null),
    
        // Asynchronous in 4 parallel
        pipe(async title => {
          const result = await storeInDB(title), 4);
          console.info(result);
        }, 4)
    
        (err) => console.info('All done!')
    );

    Types:

    import { Transform, TransformOptions } from 'stream';
    
    type ParallelTransformOpitons =
      | number
      | TransformOptions & { maxParallel?: number, ordered?: boolean };
    
    export default function pipe(
        fn: (data: any) => Promise<any> | any,
        opts?: ParallelTransformOptions,
    ): Transform;
    Option property Default value description
    maxParallel 10 Number of maximum parallel executions.
    ordered true Preserving order of streaming chunks.

    A number can be passed to opts. pipe(fn, 20) is same as pipe(fn, {maxParallel: 20}).

    Some utility functions

    pipeline(stream, stream, ...)

    A promisified version of require('stream').pipeline. Equivalent to:

    const { promisify } = require('util');
    const { pipeline: _pipeline } = require('stream');
    const pipeline = promisify(_pipeline);

    Example:

    const { pipeline, pipe } = require('pipeline-pipe');
    
    await pipeline(
        readable,
        pipe(chunk => chunk.replace('a', 'z')),
        pipe(chunk => storeInDB(chunk)),
    );
    console.log('All done!');

    concat(size)

    It concatenates sequential data to be specified size of array. This is useful when you post array data at once in the way that Elasticsearch Bulk API does.

    Example:

    const { pipeline } = require('stream');
    const { concat, pipe } = require('pipeline-pipe');
    
    pipeline(
        Readable.from([1, 2, 3, 4, 5]),
        concat(2),
        pipe(console.log),  // [ 1, 2 ]
                            // [ 3, 4 ]
                            // [ 5 ]
        (err) => console.info('All done!'),
    );

    split()

    Creates a Transform to split incoming Array chunk into pieces to subsequent streams.

    const { pipeline } = require('stream');
    const { split, pipe } = require('pipeline-pipe');
    
    pipeline(
        Readable.from([1, 2, 3]),
        pipe(page => getPostsByPage(page)),
        pipe(json => json.posts),               // Returns an array of posts
        split(),                                // Splits the array into each posts
        pipe(post => storeInDB(post.title)),    // Now the argument is a post
        (err) => console.info('All done!')
    );

    License

    MIT

    Install

    npm i pipeline-pipe

    DownloadsWeekly Downloads

    2,847

    Version

    0.3.0

    License

    MIT

    Unpacked Size

    18.6 kB

    Total Files

    26

    Last publish

    Collaborators

    • piglovesyou