@devteks/node-atomics
TypeScript icon, indicating that this package has built-in type declarations

0.0.4 • Public • Published

@devteks/node-atomics

Node.js Thread safe tools synchronized across worker threads

Classes included

  • AtomicInt8
  • AtomicInt16
  • AtomicInt32
  • AtomicUint8
  • AtomicUint16
  • AtomicUint32
  • AtomicBigInt64
  • AtomicBigUint64
  • AtomicBool
  • Mutex
  • Semaphore
  • WaitGroup
  • NotifyWait
  • NotifyDone

how to use

npm install @devteks/node-atomics --save

Import:

import:

const { AtomicInt32, Mutex } = require('@devteks/node-atomics');
// OR
import { AtomicInt32, Mutex } from '@devteks/node-atomics';

Usage

All types has two methods

  • .synchronize(): for not async function
  • .asynchronize(): for async function

Using Atomic Integers

const { isMainThread, workerData, threadId, Worker } = require('worker_threads');
const { AtomicInt32 } = require('@devteks/node-atomics');

if (isMainThread) {
  const int = new AtomicInt32();
  Array(5).fill(null).map(() => new Worker(__filename, { workerData: { buffer: int.buffer } }));
  process.on('exit', () => {
    console.log(int.value);
  });
} else {
  const int = AtomicInt32.from(workerData.buffer);
  console.log(`Thread ${threadId} started`, int.value);
  const v = int.synchronize(e => {
    for (let i = 0; i < 1000; i++) {
      e.value++;
    }
    return e.value;
  });
  console.log(`Thread ${threadId} ended`, v);
}

Using Mutex with .synchronize()

const { isMainThread, workerData, threadId, Worker } = require('worker_threads');
const { Mutex } = require('@devteks/node-atomics');

if (isMainThread) {
  const mutex = new Mutex();
  const buffer = new SharedArrayBuffer(4);
  const arr = new Int32Array(buffer);
  Array(5).fill(null).map(() => new Worker(__filename, { workerData: { buffer: buffer, mutex: mutex.buffer } }));
  process.on('exit', () => {
    console.log(arr[0]);
  });
} else {
  const mutex = Mutex.from(workerData.mutex);
  const arr = new Int32Array(workerData.buffer);
  console.log(`Thread ${threadId} started`, arr[0]);
	// or use mutex.lock() and mutex.unlock()
  const v = mutex.synchronize(() => {
    for (let i = 0; i < 1000; i++) {
      arr[0]++;
    }
    return arr[0];
  });
  console.log(`Thread ${threadId} ended`, v);
}

Using Mutex with .asynchronize()

const { isMainThread, workerData, threadId, Worker } = require('worker_threads');
const { setTimeout } = require('timers/promises');
const { Mutex } = require('@devteks/node-atomics');

async function mainThread() {
  console.time('main');
  const mutex = new Mutex();
  const arr = new Int32Array(new SharedArrayBuffer(4));

  const runTask = (id) => {
    return new Promise((resolve, reject) => {
      const worker = new Worker(__filename, { workerData: { data: arr.buffer, mutex: mutex.buffer, id } });
      worker.on('exit', () => {
        console.log(`Thread ${id} exited`);
        resolve();
      });
      worker.on('error', err => reject(err));
    });
  }
  await Promise.all( Array(5).fill(null).map((_, id) => runTask(id)) );

  //await setTimeout(1000);
  console.log('RESULT:', arr[0]);
  console.timeEnd('main');
}

async function workerThread() {
  const id = workerData.id;
  const mutex = Mutex.from(workerData.mutex);
  const arr = new Int32Array(workerData.data);
  const run = async () => {
    const v = await mutex.asynchronize(async () => {
      await setTimeout(100);
      for (let i = 0; i < 1000; i++) {
        arr[0]++;
      }
      return arr[0];
    });
    console.log(`Thread ${id} ended`, v);
  };
  const run2 = async () => {
    mutex.lock();
    await setTimeout(100);
    for (let i = 0; i < 1000; i++) {
      arr[0]++;
    }
    const v = arr[0];
    mutex.unlock();
    console.log(`Thread ${id} ended`, v);
  };

  await run();
}

if (isMainThread) {
  mainThread();
} else {
  workerThread();
}

Using WaitGroup

in main.js

const { join } = require('path');
const { Worker } = require('worker_threads');
const { WaitGroup, Mutex } = require('@devteks/node-atomics');

async function main() {
	const workers = [];
	const sab = new SharedArrayBuffer(4);
	const count = new Int32Array(sab);
	const mutex = new Mutex();
	const size = 5;
	const waitGroup = new WaitGroup(size);

	for (let i = 0; i < size; i++) {
		const worker = new Worker(join(__dirname, './worker.js'), {
			stdout: true,
			workerData: {
				waitGroupBuffer: waitGroup.buffer,
				mutexBuffer: mutex.buffer,
				sab: sab
			}
		});
		workers.push(worker);
	}

	waitGroup.wait();
	console.log('Result:', count[0]);
}

main().then(() => console.log('Done'));

in worker.js

const { workerData, threadId } = require("worker_threads");
const delay = require('timers/promises').setTimeout;
const { WaitGroup, Mutex } = require("../../");

async function main() {
	const waitGroup = WaitGroup.from(workerData.waitGroupBuffer);
	const mutex = Mutex.from(workerData.mutexBuffer);
	const count = new Int32Array(workerData.sab);

	mutex.lock();
	Array(1000).fill(null).forEach(() => count[0]++);
	mutex.unlock();

	await delay(1000);
	waitGroup.done();
}

main();

Using NotifyWait and NotifyDone

use NotifyWait in main thread and use NotifyDone in worker thread

in main.js

const { join } = require('path');
const { Worker, MessageChannel, receiveMessageOnPort } = require('worker_threads');
const { NotifyWait } = require('../../');

const { port1, port2 } = new MessageChannel();
const notify = new NotifyWait();

const worker = new Worker(join(__dirname, '/worker.js'), {
	workerData: { port: port2, notify: notify.buffer },
	transferList: [ port2 ]
});

worker.on('error', error => {
	console.error(error);
});

console.log('[MAIN] WAIT START');
notify.wait();
console.log('[MAIN] WAIT END');
const { text } = receiveMessageOnPort(port1)?.message;
console.log('[MAIN] length:', text.length);

in worker.js

const { workerData } = require('worker_threads');
const delay = require('timers/promises').setTimeout;
const { NotifyDone } = require('../../');

async function getText() {
	await delay(1000,);
	return 'hello';
}

async function main() {
	const port = workerData.port;
	const notify = new NotifyDone(workerData.notify);

  const text = await getText();
	console.log('[WORKER] length:', text.length);
	await delay(1000);
  port.postMessage({ text });
	console.log('[WORKER] set');
	notify.done();
	await delay(1000);
	console.log('END WORKER');
}

main();

RESULT:

[MAIN] WAIT START
[MAIN] WAIT END
[MAIN] length: 5
[WORKER] length: 5
[WORKER] set
END WORKER

clone the repository and run examples in the examples directory

Dependents (0)

Package Sidebar

Install

npm i @devteks/node-atomics

Weekly Downloads

1

Version

0.0.4

License

MIT

Unpacked Size

55.2 kB

Total Files

6

Last publish

Collaborators

  • samikhaled2010
  • mosa.muhana