cluster-cmd
cluster-cmd is a small framework implementing the command/response paradigm for the Node.js Cluster module. Instead of concentrating inter-process comunication logic into the worker.on
and process.on
event handlers, you can call a function in another thread pretty much the same way you would call a local function.
Main features:
- transparent to the Node.js cluster module
- master-to-worker and worker-to-master commands
- basic master-worker synchronization
- timeout handling
How it works
Synchronous or asynchronous command handlers are registered using the on.sync
or on.async
functions, respectively.
These commands can then be remotely executed by the run
function.
Here is a 'Hello world' example (also available in the ./examples
folder):
const forkWait on ready run getWorkerByName myName = ; { // start thread 'worker' and wait until it is initialized } { // register 'helloWorld' command handler on // tell master that I'm ready to receive commands ;} if myName == 'master' else ;
See below for a more complex example demonstrating both master -> worker and worker -> master communication.
Exports
- fork
- forkWait
- on
- run
- ready
- failed
- cancel
- setOptions
- removeWorkerName
- getWorkerByName
- getWorkerNames
- myName
fork (workerName [,env])
workerName
: stringenv
: object
This function starts a new worker with the specified name. The worker name serves as a reference to the worker throughout the framework. getWorkerByName(workerName)
gets you the underlying cluster.worker object, getNames()
returns an array of all worker names.
env
is an object whose properties will be added to the worker's process.env
object. Default is { }. fork
always adds the property '_workerName'
(holding the worker's name) to the worker's environment.
fork
can produce the following errors (error.code):
- ERR_NO_WORKERNAME : missing worker name
- ERR_WORKERNAME_NOT_STRING : worker name is not a string
- ERR_WORKERNAME_EXISTS: a worker with the same name already exists
- ERR_WORKERNAME_MUST_NOT_BE_MASTER: 'master' is not allowed as worker name
forkWait(workerName [,env] [, timeoutMs] [, callback])
workerName
: stringtimeoutMs
: numberenv
: object
This function calls fork(workerName, env)
, then waits until the worker has called the ready
function. A timeout interval in milliseconds can be specified in timeoutMs
.
Example:
Promiseall // process.env.port will be available in all workers
forkWait
can produce the same errors as fork
plus:
- ERR_FORK_TOO_MANY_ARGS: too many arguments
- ERR_WORKER_TIMED_OUT: worker did not call
ready
withintimeoutMs
milliseconds - ERR_WORKER_FAILED: worker called
failed
on
You cannot callon
directly but have to use on.sync
or on.async
. If you call on
directly, you will get the ERR_ONSYNC_OR_ONASYNC error.
on.sync(command, handler)
command
: stringhandler
: function (arguments, command, workerName, timeoutMs)
This function registers a sync command handler, i.e. a function returning a value. It can be used both in master and worker mode. A special case is command == '*'
which registers a default command handler (used when no specific command handler exists).
If you mistakenly register an async function with on.sync
, you will get the ERR_NOT_SYNC_HANDLER error at the next run
.
on.sync
can produce the following errors (error.code):
- ERR_INVALID_COMMAND:
command
is undefined or not a string - ERR_INVALID_HANDLER:
command
is undefined or not a function
Example: registration of a specific command handler
on
Example: registration of the default handler ('*')
on
on.async(command, handler)
command
: stringhandler
: function (arguments, command, workerName, timeoutMs)
This function registers an async command handler, i.e. a function returning a Promise object. In all other respects it is the same as on.sync
.
If you mistakenly register an sync function with on.async
, you will get the ERR_NOT_ASYNC_HANDLER error at the next run
.
on.async
produces the same errors as on.sync
.
N.B.: The handler must return a Promise, callback is not supported!
Example:
var request = ; on
on.async
produces the same errors as on.sync
.
run(workerName, command [, args] [, timeoutMs] [, callback])
run(command [, args] [, timeoutMs] [, callback])
workerName
: string (only in master mode)command
: stringargs
: objecttimeoutMs
: numbercallback
: function(err, data)returns
: id (for the callback version, seecancel
) or Promise
This function executes a command registered in another thread. The first version is used in master mode, the second in worker mode.
workerName
is the name of the target worker (only in master mode). It is not possible for a worker to run commands in another worker. However, in master code it is possible to specify workerName == 'master'
. In this special case the master runs the command directly in its own thread, bypassing the inter-process communication mechanisms. This is very practical if master and workers are running the same code and you need to send the same command to all instances, like in the following example:
...let promises = ; for let workerName in 'master'...clc promises;let status = await Promiseallpromises;...
command
is the name of a previously registered command
args
is passed to the command handler and can be anything of type 'object' which can be serialized and de-serialized by JSON.
timeoutMs
specifies a timeout interval in millisecods. If it is omitted, the default timeout interval is used (default is currently 5 minutes). A value of 0 means that no timer will be set.
handler
: a command handler funtion of the type function(command, args, timeoutMs)
.
The callback version of run
returns an id which can be used to cancel the command (see cancel
for an example). In the promise version, this id can be got through the id
property of the returned promise (see cancel
for an example)again, .
run
can produce the following errors (error.code):
- ERR_TOO_MANY_ARGUMENTS: self-explanatory
- ERR_WORKERNAME_NOT_STRING: self-explanatory
- ERR_COMMAND_NOT_STRING: self-explanatory
- ERR_INVALID_COMMAND: command was not registered
- ERR_WORKERNAME_DOESNT_EXIST: worker name not found in worker list
- ERR_WORKER_DEAD_OR_DISCONNECTED: self-explanatory
- ERR_COMMAND_CANCELED: command was canceled using the
cancel
function - ERR_NOT_SYNC_HANDLER: command handler should be registered with
on.async
- ERR_NOT_ASYNC_HANDLER: command handler should be registered with
on.sync
- ERR_COMMAND_TIMED_OUT: command didn't execute within
timeoutMs
milliseconds - ERR_PENDING_OVERFLOW the default maximum of currently pending commands exceeded (default: 1000, see
setOptions
)
ready([data])
data
: anything JSON.stringifiableThis function is called by a worker after it is initialized and ready to receive commands from master.
data
is passed to the forkWait
function and can be anything which can be handled by process.run
(basically JSON.stringifiable)
ready
doesn't produce errors.
failed([err])
err
: Error object or anything JSON.stringifiableThis function is called by a worker when initialization failed and the worker cannot receive commands from master.
err
is optional and is passed back to forkWait
. When omitted, an ERR_WORKER_FAILED error is generated and passed to forkWait
.
After calling failed
the worker typically exits with cluster.worker.kill()
.
failed
doesn't produce errors.
cancel(id)
id
: stringThis function allow to cancel a pending command created by run
. The command is immediately terminated with an ERR_COMMAND_CANCELED error.
id
is the command id originally returned by run
. cancel
offers a more flexible way to cancel a command than the timeoutMs
argument. Example:
let id = ; // no timer......if bored ;
The same in promise style:
var id; // 0 means no timerid { // id passed to callback id = cmdId;}
The id
property of the returned promise expects a callback function which is called with the id string. It is important to place .id(...)
before .then
and .catch(...)
as the latter don't return the complete promise object.
cancel
doesn't produce errors.
setOptions(info)
info
: objectreturns
: objectThis function sets the global options.
info
is an object of type { name: value, name : value, ... }. Currently, the following options are available:
forkWaitTimeout
(default: 3000 - 5 min)runTimeout
(default: 300000 )maxPendingCommands
(default: 1000)
forkWaitTimeout
is used in forkWait
, runTimeout
in run
.
maxPendingCommands
designates the maximum number of pending commands, i.e. commands which have been started by run
but which have not yet returned a reply. If this number is exceeded, the next run
returns an ERR_PENDING_OVERFLOW error. This can be used to catch infinite recursion or synchronization errors.
setOptions
returns an object with the complete previous option values. Thus setOptions()
can be used to obtain the current options without changing them.
setOptions
can produce the following errors (error.code):
- ERR_INVALID_OPTION
removeWorkerName(workerName)
workerName
: stringreturns:
: booleanThis function removes workerName
from the list of active workers. It is the user's responsibility to terminate the worker in the proper way.
It returns true
if the entry existed, false
otherwise.
removeWOrkerName
doesn't produce errors.
getWorkerByName(workerName)
workerName
: stringreturns
: Cluster.worker object or undefined
This function returns the Cluster.worker
object referenced by workerName
, or undefined
when no such object exists.
getWorkerByName
doesn't produce errors.
getWorkerNames()
returns
: ArrayThis function returns the array of all workerNames created by fork
or forkWait
getWorkerNames
doesn't produce errors.
myName
This constant contains the current worker name (in worker code) or 'master' (in master mode).Tests
Mocha tests are available for master and worker code:
npm test test/master.js
and
npm test test/worker.js
Please run the above texts separately - npm test
won't work as Mocha has some subtle issues when running in multithreaded envornment (well, perhaps I could have figured it out but didn't have the time and motivation...)
Full working example
(also available in the ./examples folder)const forkWait on run ready getWorkerByName myName = ; // Register a sync command handler for both master and workeron; // Register an async command handler for both master and workeron if myName == 'master' // code executed by master // fork (start) a new worker thread and name it 'worker' // with a timeout interval of 5 seconds ... console; // ... and wait until it has sent a 'ready' message // If the worker does not signal 'ready' within 5 seconds // forkAndWait rejects with an 'ERR_WORKER_TIMED_OUT' error. else if myName == 'worker' // code executed by worker // register async command handler on // ready to receive commands console; ;