workerhelp

1.0.3 • Public • Published

WorkerRPC

基于Node.js worker_threads 的轻量级跨线程RPC通信模块,支持主线程与Worker线程之间的异步方法调用。

特性

🚀 双向异步方法调用

⚡ Promise原生支持

🔒 自动请求/响应匹配

🛡️ 线程安全与错误传递

🔄 生命周期自动管理

安装

npm install workerhelp
``
或直接引用文件:

```javascript
const { WorkerRPC, WorkerRPCHandler } = require('workerhelp');

快速开始

主线程(Main Thread)

// main.js
const { WorkerRPC } = require('workerhelp');

async function main() {
  // 创建RPC实例(指向Worker文件)
  const workerRPC = new WorkerRPC('./worker.js');

  try {
    // 调用远程方法(支持异步等待)
    const sum = await workerRPC.call('add', 2, 3);
    const result = await workerRPC.call('processData', { id: 1 });
    
    console.log('Sum:', sum);    // 输出: 5
    console.log('Result:', result);
  } finally {
    // 清理资源
    workerRPC.destroy();
  }
}

main();

Worker线程(Worker Thread)

// worker.js
const { WorkerRPCHandler } = require('workerhelp');

// 初始化RPC处理器
const handler = new WorkerRPCHandler();

// 注册同步方法
handler.register('add', (a, b) => a + b);

// 注册异步方法
handler.register('processData', async (data) => {
  await someAsyncOperation();
  return { ...data, status: 'processed' };
});

// 保持线程活跃
setInterval(() => {}, 1000);

API 文档

WorkerRPC 类(主线程)

方法 参数 返回值 说明
constructor workerPath: string - 创建 Worker 实例
call method: string, ...args: any[] Promise<any> 发起远程调用
destroy - void 终止 Worker 并清理资源

WorkerRPCHandler 类(Worker 线程)

方法 参数 返回值 说明
register name: string, fn: Function void 注册可调用方法
_handleRequest (内部使用) - 请求处理器

最佳实践

错误处理

// 主线程捕获错误
workerRPC.call('dangerousOp')
  .catch(err => console.error('操作失败:', err));

// Worker线程抛出错误
handler.register('fail', () => {
  throw new Error('示例错误');
});

超时控制

javascript 复制 // 扩展call方法(建议作为子类实现)

class TimeoutRPC extends WorkerRPC {
  async callWithTimeout(method, timeout, ...args) {
    return Promise.race([
      this.call(method, ...args),
      new Promise((_, reject) => 
        setTimeout(() => reject(new Error('Timeout')), timeout)
      )
    ]);
  }
}

注意事项

数据序列化

所有参数和返回值必须是可序列化的JSON对象

不支持传递函数/原型链等特殊对象

错误传递

Worker内的错误栈信息不会跨线程传递

建议在错误消息中包含关键调试信息

资源管理

确保在不再需要时调用destroy()

避免在Worker中创建无法自动回收的资源

扩展建议

双向调用 通过添加主线程方法注册功能,实现Worker→主线程的调用能力

流式处理

// 大数据处理示例
handler.register('processLargeData', async (chunks) => {
  const stream = createTransformStream();
  for await (const chunk of chunks) {
    await stream.write(processChunk(chunk));
  }
  return stream.end();
});

Node.js 多线程互斥锁(Mutex)实现

基于 worker_threadsSharedArrayBuffer + Atomics 的轻量级互斥锁,用于保护多线程环境下的临界区代码。


特性

  • 线程安全:通过原子操作实现锁的抢占和释放
  • 零依赖:仅依赖 Node.js 原生模块
  • 阻塞唤醒机制:使用 Atomics.wait 替代忙等待,减少 CPU 占用
  • 异常安全try...finally 确保锁必然释放

Mutex 快速开始

1. 主线程创建共享内存

// main.js
const { Worker } = require('worker_threads');

const sharedBuffer = new SharedArrayBuffer(4);
const initialView = new Int32Array(sharedBuffer);
initialView[0] = 0; // 初始化锁状态

// 启动 4 个 Worker
for (let i = 0; i < 4; i++) {
  new Worker('./worker.js', {
    workerData: { sharedBuffer }
  });
}

2. Worker 线程获取锁

// worker.js
const { workerData, parentPort } = require('worker_threads');
const Mutex = require('workerhelp').Mutex;

const mutex = new Mutex(workerData.sharedBuffer);

// 需要保护的函数
async function criticalSection() {
  try {
    mutex.lock();
    // 临界区代码(示例:修改共享资源)
    console.log('[+] 进入临界区');
    await performSafeOperation(); 
  } finally {
    mutex.unlock();
    console.log('[-] 离开临界区');
    parentPort.postMessage('done');
  }
}

criticalSection();

3 API 文档

Mutex 类

构造函数
new Mutex(sharedBuffer: SharedArrayBuffer)//sharedBuffer: 必须为至少 4 字节的 SharedArrayBuffer
方法列表

方法列表

方法 参数 返回值 说明
lock() void 阻塞直到成功获取锁
unlock() void 释放锁并唤醒等待线程

协议

MIT License

Package Sidebar

Install

npm i workerhelp

Weekly Downloads

9

Version

1.0.3

License

ISC

Unpacked Size

13.8 kB

Total Files

10

Last publish

Collaborators

  • cx2017