node package manager

Introducing npm Enterprise add-ons. Integrate third-party dev tools into npm…

node-taskman

Fast work queue based on redis.

node-taskman

node-taskman is a fast work queue based on redis.

Core features:

npm install node-taskman
var taskman = require('node-taskman');
 
// Process tasks. 
var worker = taskman.createWorker('email');
worker.process(function sendEmail(emails, done) {
  // send emails 
  done();
});
 
// Create tasks. 
var queue = taskman.createQueue('email');
queue.push({to: 'hello@world.com', body: 'Hello world!'});

Create a new worker to process tasks.

Arguments:

  {string} name Name of the task to process.
  {object} options Options.
  {number} options.batch Number of tasks popped in each tick (default 1).
  {string} options.name Name of the worker (default os.hostname()).
  {number} options.ping Internal ping interval in ms (default 1000).
  {number} options.sleep Sleep time between each tick in ms (default 0)   
  {object|function} options.redis Redis configuration.
  {string} options.type Type of worker, 'fifo' or 'lifo' (default 'fifo').
  {boolean} options.unique Unique queue or not (default false).
// Create a new worker that sleep 2s between each task. 
var worker = taskman.createWorker('email', {sleep: 2000});

Process tasks. The action has two arguments, first is the tasks, the number depends of the batch option. The second is the callback, if it's called with an error, a "job failure" event is emitted, else a "job complete" event is emitted.

worker.process(function (tasks, done) {
 // process tasks 
});

Update options of the worker.

Arguments:

  {object} options Options.
  {number} options.batch Number of tasks popped in each tick.
  {number} options.ping Internal ping interval in ms.
  {number} options.sleep Sleep time between each tick in ms.
  {object|function} options.redis Redis configuration.
  {string} options.type Type of worker, 'fifo' or 'lifo'.
  {function} [callback] Optional callback.
// Put worker in lifo mode. 
worker.set({type: 'lifo'});

Get informations about the worker.

worker.get(function (err, infos) {
  console.log(infos); // worker infos 
});

Gracefully stop the worker.

Arguments:

  {object} [options] Optional options.
  {boolean} options.queue Close the worker queue (default true).
  {function} [callback] Optional callback.
worker.close(function (err) {
  // worker closed 
});

Emitted when:

  • a callback is omitted and an error occur in a method
  • a redis "error" event is emitted
worker.on('error', function (error) {
  // ... 
});

Emitted when an error is returned by the job process.

worker.on('job failure', function (task, error) {
  // ... 
});

Emitted when a job is completed without error.

worker.on('job complete', function (task) {
  // ... 
});

Emitted when the worker status change.

worker.on('status change', function (status) {
  // ... 
});

Create a new queue to add task.

Arguments:

  {string} name Name of the task to process.
  {object} options Options.
  {object|function} options.redis Redis configuration.
  {boolean} options.unique Unique queue or not (default false).
// Create a new unique queue. 
var queue = taskman.createQueue('email', {unique: true});

Push a new task at the end of the queue.

queue.push({to: 'hello@world.com', body: 'Hello world!'});

Close the connection to the queue, the redis command used is QUIT, so it will wait that all redis commands are done before closing the connection. The queue will continue to persist so tasks will not be lost.

Arguments:

  {function} [callback] Optional callback.
queue.close(function (err) {
  // queue closed 
});

Emitted when:

  • a callback is omitted and an error occur in a method
  • a redis "error" event is emitted
queue.on('error', function (error) {
  // ... 
});

Emitted when a new task is created.

queue.on('created', function (task) {
  // ... 
});

Every tasks are added and removed using redis transactions. You can't be stuck in an unstable state.

Thanks to redis, it's possible to choose the level of persistence of your queue.

Workers configuration (batch, ping, sleep) is stored in a redis hash. If you change this redis hash, the changes will be active without the need to reload your workers.

The redis hash name is "worker:{queueName}:{workerName}".

You will find some informations like:

  • createdAt: creation date of the worker
  • pid: process identifier
  • batch: number of tasks processed at each tick
  • ping: ping time
  • sleep: sleep time
  • type: type of worker (fifo or lifo)
  • queue: name of the queue
  • taskCount: number of tasks already processed
  • status: status of the worker (waiting, working, stopped)

If you choose the unique mode, every tasks added in the queue will be present once at a time X. This feature is very interesting if you need to add task that consume a lot of resources. To known if the task is unique or not we compare all the payload of the task.

Using the configuration "batch" it's possible to specify the number of tasks processed at each tick. This feature is interesting if you need to batch some tasks in a single request (insert in database, indexing...).

MIT

Written and maintained by Greg Bergé and Justin Bourrousse.

An original idea by David Desbouis.

Built an used at Le Monde.