@sinclair/threadbox
TypeScript icon, indicating that this package has built-in type declarations

0.8.15 • Public • Published

ThreadBox

Recursive Worker Threads in NodeJS

npm version Build Status

Example

The following replicates the above worker graph. See here for a JavaScript version.

// app.ts

import { spawn, Main, Thread, channel, Sender, Receiver } from '@sinclair/threadbox'

@Thread() class WorkerC {
    run() {
        return Math.random()
    }
}
@Thread() class WorkerB {
    async run(sender: Sender) {
        const c_0 = spawn(WorkerC)
        const c_1 = spawn(WorkerC)
        const c_2 = spawn(WorkerC)
        const c_3 = spawn(WorkerC)
        const [a, b, c, d] = await Promise.all([
            c_0.run(),
            c_1.run(),
            c_2.run(),
            c_3.run(),
        ])
        await sender.send([a, b, c, d])
        await sender.end()
        await c_0.dispose()
        await c_1.dispose()
        await c_2.dispose()
        await c_3.dispose()
    }
}
@Thread() class WorkerA {
    async run(receiver: Receiver) {
        for await(const [a, b, c, d] of receiver) { }
    }
}
// start here ...
@Main() default class {
    async main() {
        const [sender, receiver] = channel()
        const a = spawn(WorkerA)
        const b = spawn(WorkerB)
        await Promise.all([
            a.run(receiver),
            b.run(sender) 
        ])
        await a.dispose()
        await b.dispose()
    }
}

Overview

ThreadBox is a threading library for JavaScript that is built on top of NodeJS worker_threads. It is written to allow for compute intensive JavaScript or WASM processes to be trivially executed in remote worker threads. ThreadBox approaches this by allowing any class decorated with Thread to be spawned and instanced in a remote worker thread. ThreadBox constructs an async interface to the remote worker class, allowing the host thread to invoke operations on the remote worker through simple async method calls.

ThreadBox uses a recursive pattern to spawn worker threads. ThreadBox will recursively call into the applications entry module (typically app.js) and instance a requested Thread class when spawning a new worker. Because each new worker is spawned from the same entry module as the application, class, function and const definitions defined by the application are also available to each subsequent thread. This approach allows for ergonomic same file threading common to other languages.

ThreadBox was built as a research project and is primarily geared towards TypeScript development. It does however provide a non-decorator based fallback API for JavaScript users. This library is offered as is to anyone who may find it of use.

Built with Node 12.16.1 LTS and TypeScript 4.1.2.

Licence MIT

Install

$ npm install @sinclair/threadbox --save

Contents

Setup

ThreadBox primarily uses decorators to denote Main, Thread and Marshal class types. It also implements [Symbol.asyncIterator] for channels. TypeScript users should configure their environment for the following.

// tsconfig.json
{
   "compilerOptions": {
      "experimentalDecorators": true,
      "downlevelIteration": true,
      ...
   }
}

Main

The Main decorator registers a class as the programs entry point. The classes main(...) function will be automatically called by ThreadBox when the program is run.

import { Main } from '@sinclair/threadbox'

@Main() class Program {
    main() {
        console.log('Hello World')
    }
}

// JavaScript users can use __Main(Program) if
// decorators are not available.

Thread

The Thread decorator registers a class as threadable which allows it to be spawned in a worker thread. When spawned, the host thread will be able to execute any function available on the class. The class may additionally implement an optional dispose() function that will be invoked when the host thread terminates the worker.

import { Thread } from '@sinclair/threadbox'

@Thread() class Worker {
    add(a: number, b: number) {
        return a + b
    }
    dispose() { 
        console.log('disposed!')
    }
}
@Main() default class {
    async main() {
        const worker = spawn(Worker)
        const result = await worker.add(10, 20)
        await worker.dispose()
    }
}
// JavaScript users can use __Thread(Worker) if
// decorators are not available.

All JavaScript classes can be decorated with @Thread().

Spawn

The spawn(...) function will spawn a threadable class and return a handle to caller allowing it to call methods on the remote class instance. The spawn(...) function may also accept the classes constructor arguments.

import { spawn, Main, Worker } from '@sinclair/threadbox'

@Thread() class Worker {
    constructor(private message: string) {
        console.log('worker: constructor', message)
    }
    method() {
        console.log('worker: method')
    }
    dispose() {
        console.log('worker: dispose')
    }
}
@Main() default class {
    async main() {
        const worker = spawn(Worker, 'hello world')
        await worker.method()
        await worker.dispose() // important!
    }
}

The return type of the spawn() function is a ThreadInterface object. This ThreadInterface is a promisfied version of the class interface. The ThreadInterface also provides an additional dispose() function that is available irrespective of if the class has provided an implementation. Calling dispose() on the ThreadInterface will result in the worker being terminated.

Channel

ThreadBox provides a channel API that is built upon the MessageChannel API. ThreadBox channels implement a synchronization protocol that enables a Sender to optionally await for messages to be received by a Receiver. ThreadBox channels are loosely modelled on Rust mpsc channels.

import { channel } from '@sinclair/threadbox'

const [sender, receiver] = channel()

The channel Sender and Receiver types can be used to stream sequences of values between cooperating threads. The Sender will async buffer values if the caller does not await on send(). The Receiver type implements [Symbol.asyncIterator] so can be used with for-await-of.

Example 1

The following creates a channel inside the Main thread and sends the Sender to the worker thread. The worker thread will emit values to the Sender which are iterated on within the Main thread.

import { spawn, Main, Worker, channel, Sender, Receiver } from '@sinclair/threadbox'

@Thread() class Worker {
    async execute(sender: Sender<number>) {
        await sender.send(1)
        await sender.send(2)
        await sender.send(3)
        await sender.end() // EOF
    }
}

@Main() default class {
    main() {
        const worker = spawn(Worker)
        const [sender, receiver] = channel<number>()

        worker.execute(sender) // move to worker
        
        for await(const value of receiver) {
            console.log('recv', value)
        }
        
        await worker.dispose()
    }
}

Example 2

The following creates a channel inside the worker thread and returns a Receiver on its stream() function. The Main thread then spawns the worker thread and calls stream() and awaits for the Receiver. It then iterates on the Receiver. The into() function is a util function that allows one to move into an async context.

import { spawn, into, Main, Worker, channel, Sender, Receiver } from '@sinclair/threadbox'

@Thread() class Worker {
    stream(): Receiver<number> {
        const [sender, receiver] = channel<number>()
        into(async() => {
            await sender.send(1)
            await sender.send(2)
            await sender.send(3)
            await sender.end()
        })
        return receiver // move to host
    }
}

@Main() default class {
    main() {
        const worker = spawn(Worker)
        const stream = await worker.stream()

        for await(const value of stream) {
            console.log('recv', value)
        }
        await worker.dispose()
    }
}

Select

Using channels, its possible to receive on multiple receivers using the select() function. The following demonstrates its use.

import { channel, select } from '@sinclair/threadbox'

// setup some channels...
const [sender0, receiver0] = channel<number>()
const [sender1, receiver1] = channel<string>()
const [sender2, receiver2] = channel<boolean>()

// send some values...
sender0.send(42)
sender1.send('hello')
sender2.send(true)

// receive from any...
for await(const value of select([receiver0, receiver1, receiver2])) {
    // value is number | string | boolean
}

Marshal

The Marshal decorator registers a class as marshalled. This enables instances of the class to be sent and reconstructed across thread boundaries. ThreadBox will automatically marshal all classes marked with Marshal across Thread function calls, as well as across a channel Sender.

This functionality allows class instances to be transferred to remote threads for remote invocation.

import { spawn, Main, Thread, Marshal } from '@sinclair/threadbox'

// Instances of this class can be sent between threads.
@Marshal() class Transferrable {
    method() {
        console.log('Hello World')
    }
}
@Thread() class Worker {
    execute(instance: Transferrable) {
        instance.method() // callable
    }
}
@Main() default class {
    async main() {
        const worker = spawn(Worker)
        const instance = new Transferrable()
        await worker.execute(instance)
        await worker.dispose()
    }
}

// JavaScript users can use __Marshal(Foo) if
// decorators are not available.

Note: There is a serialization cost to marshaling. For performance, only Marshal when you need to move logic in and out of threads.

Mutex

ThreadBox provides a Mutex primitive that is built upon JavaScript Atomics. It is used to enter into critical sections of code.

import { Mutex } from '@sinclair/threadbox'

const mutex = new Mutex()

const lock = mutex.lock()

// critical section

lock.dispose()

The example below spawns 4 instances of the Worker class. A Mutex instance is passed into each worker where by the worker takes a MutexLock on the execute() method. The worker thread holds onto their respective lock for 1 second before releasing. Only 1 of the 4 workers will execute the critical section (below) at one time. The timeout is used to demonstrate the locking behavior.

import { spawn, Main, Thread, Mutex } from '@sinclair/threadbox'

@Thread() class Worker {
    execute(mutex: Mutex) {
        const lock = mutex.lock()
        //
        // critical section !!
        //
        setTimeout(() => lock.dispose(), 1000)
    }
}
@Main() default class {
    async main() {
        const worker_0 = spawn(Worker)
        const worker_1 = spawn(Worker)
        const worker_2 = spawn(Worker)
        const worker_3 = spawn(Worker)

        const mutex  = new Mutex()
        await Promise.all([
            worker_0.execute(mutex),
            worker_1.execute(mutex),
            worker_2.execute(mutex),
            worker_3.execute(mutex)
        ]) // .. 4 seconds approx

        await worker_0.dispose()
        await worker_1.dispose()
        await worker_2.dispose()
        await worker_3.dispose()
    }
}

SharedArrayBuffer

The following demonstrates using SharedArrayBuffer to parallelize operations performed across a shared Float32Array. The shared buffer is sent to 4 workers with an index to store the result.

import { spawn, Main, Worker } from '@sinclair/threadbox'

@Thread() class ComputeForIndex {
    execute(buffer: Float32Array, index: number) {
        // sleep 5 seconds
        const started = Date.now()
        while((Date.now() - started) < 5000) {}
        buffer[index] = Math.random()
    }
}

@Main() default class {
    async main() {
        // 4 x 32bit floats
        const shared = new SharedArrayBuffer(4 * Float32Array.BYTES_PER_ELEMENT)
        const buffer = new Float32Array(shared)

        // spin up 4 workers
        const c_0 = spawn(ComputeForIndex)
        const c_1 = spawn(ComputeForIndex)
        const c_2 = spawn(ComputeForIndex)
        const c_3 = spawn(ComputeForIndex)

        // process in parallel
        await Promise.all([
            c_0.execute(buffer, 0),
            c_1.execute(buffer, 1),
            c_2.execute(buffer, 2),
            c_3.execute(buffer, 3)
        ])

        // clean up
        await c_0.dispose()
        await c_1.dispose()
        await c_2.dispose()
        await c_3.dispose()

        // result
        console.log('result', buffer)
    }
}

Package Sidebar

Install

npm i @sinclair/threadbox

Weekly Downloads

11

Version

0.8.15

License

MIT

Unpacked Size

119 kB

Total Files

47

Last publish

Collaborators

  • sinclair