基于Node.js worker_threads 的轻量级跨线程RPC通信模块,支持主线程与Worker线程之间的异步方法调用。
🚀 双向异步方法调用
⚡ Promise原生支持
🔒 自动请求/响应匹配
🛡️ 线程安全与错误传递
🔄 生命周期自动管理
npm install workerhelp
``
或直接引用文件:
```javascript
const { WorkerRPC, WorkerRPCHandler } = require('workerhelp');
// main.js
const { WorkerRPC } = require('workerhelp');
async function main() {
// 创建RPC实例(指向Worker文件)
const workerRPC = new WorkerRPC('./worker.js');
try {
// 调用远程方法(支持异步等待)
const sum = await workerRPC.call('add', 2, 3);
const result = await workerRPC.call('processData', { id: 1 });
console.log('Sum:', sum); // 输出: 5
console.log('Result:', result);
} finally {
// 清理资源
workerRPC.destroy();
}
}
main();
// worker.js
const { WorkerRPCHandler } = require('workerhelp');
// 初始化RPC处理器
const handler = new WorkerRPCHandler();
// 注册同步方法
handler.register('add', (a, b) => a + b);
// 注册异步方法
handler.register('processData', async (data) => {
await someAsyncOperation();
return { ...data, status: 'processed' };
});
// 保持线程活跃
setInterval(() => {}, 1000);
方法 | 参数 | 返回值 | 说明 |
---|---|---|---|
constructor | workerPath: string |
- | 创建 Worker 实例 |
call | method: string, ...args: any[] |
Promise<any> |
发起远程调用 |
destroy | - | void |
终止 Worker 并清理资源 |
方法 | 参数 | 返回值 | 说明 |
---|---|---|---|
register | name: string, fn: Function |
void |
注册可调用方法 |
_handleRequest | (内部使用) | - | 请求处理器 |
错误处理
// 主线程捕获错误
workerRPC.call('dangerousOp')
.catch(err => console.error('操作失败:', err));
// Worker线程抛出错误
handler.register('fail', () => {
throw new Error('示例错误');
});
javascript 复制 // 扩展call方法(建议作为子类实现)
class TimeoutRPC extends WorkerRPC {
async callWithTimeout(method, timeout, ...args) {
return Promise.race([
this.call(method, ...args),
new Promise((_, reject) =>
setTimeout(() => reject(new Error('Timeout')), timeout)
)
]);
}
}
所有参数和返回值必须是可序列化的JSON对象
不支持传递函数/原型链等特殊对象
Worker内的错误栈信息不会跨线程传递
建议在错误消息中包含关键调试信息
确保在不再需要时调用destroy()
避免在Worker中创建无法自动回收的资源
双向调用 通过添加主线程方法注册功能,实现Worker→主线程的调用能力
// 大数据处理示例
handler.register('processLargeData', async (chunks) => {
const stream = createTransformStream();
for await (const chunk of chunks) {
await stream.write(processChunk(chunk));
}
return stream.end();
});
基于 worker_threads
和 SharedArrayBuffer
+ Atomics
的轻量级互斥锁,用于保护多线程环境下的临界区代码。
- 线程安全:通过原子操作实现锁的抢占和释放
- 零依赖:仅依赖 Node.js 原生模块
-
阻塞唤醒机制:使用
Atomics.wait
替代忙等待,减少 CPU 占用 -
异常安全:
try...finally
确保锁必然释放
// main.js
const { Worker } = require('worker_threads');
const sharedBuffer = new SharedArrayBuffer(4);
const initialView = new Int32Array(sharedBuffer);
initialView[0] = 0; // 初始化锁状态
// 启动 4 个 Worker
for (let i = 0; i < 4; i++) {
new Worker('./worker.js', {
workerData: { sharedBuffer }
});
}
// worker.js
const { workerData, parentPort } = require('worker_threads');
const Mutex = require('workerhelp').Mutex;
const mutex = new Mutex(workerData.sharedBuffer);
// 需要保护的函数
async function criticalSection() {
try {
mutex.lock();
// 临界区代码(示例:修改共享资源)
console.log('[+] 进入临界区');
await performSafeOperation();
} finally {
mutex.unlock();
console.log('[-] 离开临界区');
parentPort.postMessage('done');
}
}
criticalSection();
new Mutex(sharedBuffer: SharedArrayBuffer)//sharedBuffer: 必须为至少 4 字节的 SharedArrayBuffer
方法 | 参数 | 返回值 | 说明 |
---|---|---|---|
lock() | 无 | void | 阻塞直到成功获取锁 |
unlock() | 无 | void | 释放锁并唤醒等待线程 |
MIT License