@giancosta86/worker-transform
TypeScript icon, indicating that this package has built-in type declarations

1.0.3 • Public • Published

worker-transform

Multithread mapping stream

GitHub CI npm version MIT License

Overview

worker-transform provides a WorkerTransform object-oriented stream that transparently runs a given operation on multiple items, concurrently, by internally managing a pool of worker threads.

Like any other standard Transform, this stream can be plugged into a pipeline, as well as manually controlled; for maximum performance, the item order is not guaranteed.

Installation

npm install @giancosta86/worker-transform

or

yarn add @giancosta86/worker-transform

The public API entirely resides in the root package index, so you shouldn't reference specific modules.

Usage

  1. First of all, you need an operation module: a module exporting just the operation - a function that will be applied to every item flowing through the stream.

    The operation must be:

    • with an arbitrary name

    • accepting one parameter - of type ChunkInput<TInput> - where you need to replace TInput with the actual type of the items entering the stream

      ChunkInput<TInput> actually contains just two fields, provided by the underlying stream infrastructure:

      • value - the item entering the stream

      • encoding - the associated character encoding. Actually meaningful only when the item is a string

    • returning one of these two types:

      • ChunkOutput<TOutput> - if the function is synchronous (without interruptions). You need to replace TOutput with the type of the items produced by the stream.

        The ChunkOutput<TOutput> type contains the value to be produced by the function as well as the related encoding - but the latter is optional, because it is meaningful only if the output items are strings

        The result of the function must be a ChunkOutput<T> - but such T can be a nullable type, such as number | null! When the value field of the returned ChunkOutput is null, the stream will just skip it!

      • Promise<ChunkOutput<TOutput>> - if the function is asynchronous - i.e., if it could perform await on external conditions or, more generally, if it is designed to return a Promise. Again, you need to replace TOutput with the type of the items produced by the stream - which, again, can be nullable

    • throwing errors when needed: both errors and rejected promises simply make the stream ignore the related input element

    An operation module with a synchronous operation could be:

    import { ChunkInput, ChunkOutput } from "@giancosta86/worker-transform";
    
    function add200({ value }: ChunkInput<number>): ChunkOutput<number> {
      return { value: value + 200 };
    }
    
    export = add200;

    On the other hand, an operation module with an asynchronous operation could be:

    import delay from "delay";
    import { ChunkInput, ChunkOutput } from "@giancosta86/worker-transform";
    
    async function add500({
      value
    }: ChunkInput<number>): Promise<ChunkOutput<number>> {
      await delay(5);
      await delay(2);
      await delay(6);
    
      return Promise.resolve({ value: value + 500 });
    }
    
    export = add500;
  2. Create an instance of WorkerTransform - passing at least the path to the operation module as expected by resolve(), and maybe additional options (see the description below)

    For example:

    import { join } from "node:path";
    import { WorkerTransform } from "@giancosta86/worker-transform";
    
    const modulePath = join(__dirname, "add200");
    
    const transform = new WorkerTransform(modulePath);
  3. Use it like any other standard stream - for example, in a pipeline:

    await pipeline(Readable.from([90, 95, 98]), transform, someWritableStream);

Additional constructor options

The following values can be passed to the constructor as fields of an optional object, right after the operation module path:

  • agentCount: the number of worker threads in the pool. Default: the number of processors

  • logger: a Logger interface, as exported by unified-logging. Default: no logger

  • highWaterMark: if present, passed to the base constructor

  • signal: if present, passed to the base constructor

Further reference

For additional examples, please consult the test suites in the source code repository.

Package Sidebar

Install

npm i @giancosta86/worker-transform

Weekly Downloads

0

Version

1.0.3

License

MIT

Unpacked Size

19.2 kB

Total Files

13

Last publish

Collaborators

  • giancosta86