Network Performance Monitor

    @devteks/node-atomics
    TypeScript icon, indicating that this package has built-in type declarations

    0.0.4 • Public • Published

    @devteks/node-atomics

    Node.js Thread safe tools synchronized across worker threads

    Classes included

    • AtomicInt8
    • AtomicInt16
    • AtomicInt32
    • AtomicUint8
    • AtomicUint16
    • AtomicUint32
    • AtomicBigInt64
    • AtomicBigUint64
    • AtomicBool
    • Mutex
    • Semaphore
    • WaitGroup
    • NotifyWait
    • NotifyDone

    how to use

    npm install @devteks/node-atomics --save

    Import:

    import:

    const { AtomicInt32, Mutex } = require('@devteks/node-atomics');
    // OR
    import { AtomicInt32, Mutex } from '@devteks/node-atomics';

    Usage

    All types has two methods

    • .synchronize(): for not async function
    • .asynchronize(): for async function

    Using Atomic Integers

    const { isMainThread, workerData, threadId, Worker } = require('worker_threads');
    const { AtomicInt32 } = require('@devteks/node-atomics');
    
    if (isMainThread) {
      const int = new AtomicInt32();
      Array(5).fill(null).map(() => new Worker(__filename, { workerData: { buffer: int.buffer } }));
      process.on('exit', () => {
        console.log(int.value);
      });
    } else {
      const int = AtomicInt32.from(workerData.buffer);
      console.log(`Thread ${threadId} started`, int.value);
      const v = int.synchronize(e => {
        for (let i = 0; i < 1000; i++) {
          e.value++;
        }
        return e.value;
      });
      console.log(`Thread ${threadId} ended`, v);
    }

    Using Mutex with .synchronize()

    const { isMainThread, workerData, threadId, Worker } = require('worker_threads');
    const { Mutex } = require('@devteks/node-atomics');
    
    if (isMainThread) {
      const mutex = new Mutex();
      const buffer = new SharedArrayBuffer(4);
      const arr = new Int32Array(buffer);
      Array(5).fill(null).map(() => new Worker(__filename, { workerData: { buffer: buffer, mutex: mutex.buffer } }));
      process.on('exit', () => {
        console.log(arr[0]);
      });
    } else {
      const mutex = Mutex.from(workerData.mutex);
      const arr = new Int32Array(workerData.buffer);
      console.log(`Thread ${threadId} started`, arr[0]);
    	// or use mutex.lock() and mutex.unlock()
      const v = mutex.synchronize(() => {
        for (let i = 0; i < 1000; i++) {
          arr[0]++;
        }
        return arr[0];
      });
      console.log(`Thread ${threadId} ended`, v);
    }

    Using Mutex with .asynchronize()

    const { isMainThread, workerData, threadId, Worker } = require('worker_threads');
    const { setTimeout } = require('timers/promises');
    const { Mutex } = require('@devteks/node-atomics');
    
    async function mainThread() {
      console.time('main');
      const mutex = new Mutex();
      const arr = new Int32Array(new SharedArrayBuffer(4));
    
      const runTask = (id) => {
        return new Promise((resolve, reject) => {
          const worker = new Worker(__filename, { workerData: { data: arr.buffer, mutex: mutex.buffer, id } });
          worker.on('exit', () => {
            console.log(`Thread ${id} exited`);
            resolve();
          });
          worker.on('error', err => reject(err));
        });
      }
      await Promise.all( Array(5).fill(null).map((_, id) => runTask(id)) );
    
      //await setTimeout(1000);
      console.log('RESULT:', arr[0]);
      console.timeEnd('main');
    }
    
    async function workerThread() {
      const id = workerData.id;
      const mutex = Mutex.from(workerData.mutex);
      const arr = new Int32Array(workerData.data);
      const run = async () => {
        const v = await mutex.asynchronize(async () => {
          await setTimeout(100);
          for (let i = 0; i < 1000; i++) {
            arr[0]++;
          }
          return arr[0];
        });
        console.log(`Thread ${id} ended`, v);
      };
      const run2 = async () => {
        mutex.lock();
        await setTimeout(100);
        for (let i = 0; i < 1000; i++) {
          arr[0]++;
        }
        const v = arr[0];
        mutex.unlock();
        console.log(`Thread ${id} ended`, v);
      };
    
      await run();
    }
    
    if (isMainThread) {
      mainThread();
    } else {
      workerThread();
    }

    Using WaitGroup

    in main.js

    const { join } = require('path');
    const { Worker } = require('worker_threads');
    const { WaitGroup, Mutex } = require('@devteks/node-atomics');
    
    async function main() {
    	const workers = [];
    	const sab = new SharedArrayBuffer(4);
    	const count = new Int32Array(sab);
    	const mutex = new Mutex();
    	const size = 5;
    	const waitGroup = new WaitGroup(size);
    
    	for (let i = 0; i < size; i++) {
    		const worker = new Worker(join(__dirname, './worker.js'), {
    			stdout: true,
    			workerData: {
    				waitGroupBuffer: waitGroup.buffer,
    				mutexBuffer: mutex.buffer,
    				sab: sab
    			}
    		});
    		workers.push(worker);
    	}
    
    	waitGroup.wait();
    	console.log('Result:', count[0]);
    }
    
    main().then(() => console.log('Done'));

    in worker.js

    const { workerData, threadId } = require("worker_threads");
    const delay = require('timers/promises').setTimeout;
    const { WaitGroup, Mutex } = require("../../");
    
    async function main() {
    	const waitGroup = WaitGroup.from(workerData.waitGroupBuffer);
    	const mutex = Mutex.from(workerData.mutexBuffer);
    	const count = new Int32Array(workerData.sab);
    
    	mutex.lock();
    	Array(1000).fill(null).forEach(() => count[0]++);
    	mutex.unlock();
    
    	await delay(1000);
    	waitGroup.done();
    }
    
    main();

    Using NotifyWait and NotifyDone

    use NotifyWait in main thread and use NotifyDone in worker thread

    in main.js

    const { join } = require('path');
    const { Worker, MessageChannel, receiveMessageOnPort } = require('worker_threads');
    const { NotifyWait } = require('../../');
    
    const { port1, port2 } = new MessageChannel();
    const notify = new NotifyWait();
    
    const worker = new Worker(join(__dirname, '/worker.js'), {
    	workerData: { port: port2, notify: notify.buffer },
    	transferList: [ port2 ]
    });
    
    worker.on('error', error => {
    	console.error(error);
    });
    
    console.log('[MAIN] WAIT START');
    notify.wait();
    console.log('[MAIN] WAIT END');
    const { text } = receiveMessageOnPort(port1)?.message;
    console.log('[MAIN] length:', text.length);

    in worker.js

    const { workerData } = require('worker_threads');
    const delay = require('timers/promises').setTimeout;
    const { NotifyDone } = require('../../');
    
    async function getText() {
    	await delay(1000,);
    	return 'hello';
    }
    
    async function main() {
    	const port = workerData.port;
    	const notify = new NotifyDone(workerData.notify);
    
      const text = await getText();
    	console.log('[WORKER] length:', text.length);
    	await delay(1000);
      port.postMessage({ text });
    	console.log('[WORKER] set');
    	notify.done();
    	await delay(1000);
    	console.log('END WORKER');
    }
    
    main();

    RESULT:

    [MAIN] WAIT START
    [MAIN] WAIT END
    [MAIN] length: 5
    [WORKER] length: 5
    [WORKER] set
    END WORKER
    

    clone the repository and run examples in the examples directory

    Install

    npm i @devteks/node-atomics

    DownloadsWeekly Downloads

    8

    Version

    0.0.4

    License

    MIT

    Unpacked Size

    55.2 kB

    Total Files

    6

    Last publish

    Collaborators

    • samikhaled2010
    • mosa.muhana