worker-agent
Typesafe wrapper for worker threads
worker-agent is a TypeScript wrapper designed to simplify interactions with worker threads in NodeJS.
Actually, it implements a fairly common Erlang/Elixir pattern: the sophisticated messaging protocol is hidden behind a variety of abstraction layers - each with different trade-off between simplicity and performance - while still ensuring strongly typed interfaces.
Installation
npm install @giancosta86/worker-agent
or
yarn add @giancosta86/worker-agent
The public API entirely resides in the root package index, so you shouldn't reference specific modules.
Usage
-
Create a module - an operation module, in worker-agent's parlance - that:
-
can import names from any other module or package
-
should only export your operation - that is, a single function; in TypeScript, the function must be exported via
export = myFunction
The operation function should be:
-
with an arbitrary name
-
with at most one parameter - of arbitrary name and type
-
returning an arbitrary type, including a
Promise
-
throwing errors when needed: neither errors nor rejected promises can crash the underlying worker
For example, to declare a synchronous function:
function add40(value: number): number { return value + 40; } export = add40;
and to declare an asynchronous function:
import { someAsyncFunction } from "some-package"; async function specialSum(value: number): Promise<number> { const temp = await someAsyncFunction(value); return temp + value + 100; } export = specialSum;
-
-
Create a new instance of
WorkerAgent<TInput, TOutput>
or the more expressivePromiseAgent<TInput, TOutput>
, passing the path to the operation module: this will start a new worker thread, driven by the agent.Furthermore:
-
TInput
should be the type of the parameter expected by the operation -
TOutput
should be the operation's return type - or the typeT
wrapped by itsPromise<T>
For example:
import { join } from "node:path"; const agent = new PromiseAgent<number, number>(join(__dirname, "my-sum"));
-
Choosing the right agent type
The actual choice depends on a compromise between simplicity and performance:
-
PromiseAgent
is particularly expressive, because of itsPromise
-based interface -
WorkerAgent
is hyper-minimalist, but it is also more complicated to use
Using PromiseAgent
PromiseAgent
is incredibly user-friendly. Just call:
-
its
runOperation()
method, to obtain aPromise
that will either resolve or reject as soon as the worker thread has finished processing the given input -
its
requestExit()
method, returning aPromise
that will resolve to the worker's exit code.Please, note: don't forget to call
requestExit()
as soon as you have finished using the agent; furthermore, the warning about dangling asynchronous operations - discussed below - applies to this agent, as well.
Using WorkerAgent
WorkerAgent
is the original agent implementation - and the more sophisticated as well. In particular, once you have an instance of the agent, you'll need to:
-
Subscribe to events; to register an event listener, you can call either
.on(...)
or.once(...)
- as usual in NodeJS.The available events are:
-
result: the most important event - the one actually returning output values and errors from the operation function called by the worker thread.
The expected listener is a standard error-first callback - a
(err: Error | null, output: TOutput | null) => void
function.For example:
agent.on("result", (err, output) => { if (err) { //Process the error return; } //Process the output });
Please, note: the error passed to this callback, when non-null, has a peculiarity: its message is the serialization string - in the form
ErrorClass("Message")
- of the error that occurred within the worker thread -
error: sent whenever an non-operational, more serious error occurs within the worker thread - for example, because it couldn't find the operation module. It expects a
(error: Error) => void
callback.Please note: errors thrown by the operation do not trigger error events - instead, they are passed to the error-first callback of the result event
-
exit: sent by the worker thread upon termination, even after an error event. It takes a
(exitCode: number) => void
callback
-
-
Start sending input data - by calling
runOperation(input: TInput)
: every call will send a message to the worker thread queue - passing the given input, ready to be processed by the operation exported by the operation module.For example:
agent.runOperation(90);
It is a
void
method - because results - both output values and errors - will be returned later, via the result event.You can send multiple operation requests: they are enqueued by the worker thread, ready to be processed one at a time.
-
Finally, don't forget to call
requestExit()
to send an exit message to the worker thread's queue.Please, note: calling
requestExit()
enqueues a termination message that will be evaluated as soon as all the previously-enqueued synchronous operations have completed; however, for performance reasons, no check is performed on asynchronous operations - so they will probably remain unfulfilled! Consequently, it is up to you, in your client code, to ensure that all the async operations have settled before callingrequestExit()
.A possible solution to the above issue may consist in a counter that is incremented when calling
runOperation()
and decremented within the result event callback.
Further reference
For additional examples, please consult the test suites in the source code repository.