node-taskman

Fast work queue based on redis.

node-taskman

node-taskman is a fast work queue based on redis.

Core features:

  • atomicity
  • persistent queue
  • dynamic worker configuration
  • unique tasks
  • process several tasks at one time
npm install node-taskman
var taskman = require('node-taskman');
 
// Process tasks. 
var worker = taskman.createWorker('email');
worker.process(function sendEmail(emailsdone) {
  // send emails 
  done();
});
 
// Create tasks. 
var queue = taskman.createQueue('email');
queue.push({to: 'hello@world.com', body: 'Hello world!'});

Create a new worker to process tasks.

Arguments:

  {string} name Name of the task to process.
  {object} options Options.
  {number} options.batch Number of tasks popped in each tick (default 1).
  {string} options.name Name of the worker (default os.hostname()).
  {number} options.ping Internal ping interval in ms (default 1000).
  {number} options.sleep Sleep time between each tick in ms (default 0)   
  {object|function} options.redis Redis configuration.
  {string} options.type Type of worker, 'fifo' or 'lifo' (default 'fifo').
  {boolean} options.unique Unique queue or not (default false).
// Create a new worker that sleep 2s between each task. 
var worker = taskman.createWorker('email', {sleep: 2000});

Process tasks. The action has two arguments, first is the tasks, the number depends of the batch option. The second is the callback, if it's called with an error, a "job failure" event is emitted, else a "job complete" event is emitted.

worker.process(function (tasksdone) {
 // process tasks 
});

Update options of the worker.

Arguments:

  {object} options Options.
  {number} options.batch Number of tasks popped in each tick.
  {number} options.ping Internal ping interval in ms.
  {number} options.sleep Sleep time between each tick in ms.
  {object|function} options.redis Redis configuration.
  {string} options.type Type of worker, 'fifo' or 'lifo'.
  {function} [callback] Optional callback.
// Put worker in lifo mode. 
worker.set({type: 'lifo'});

Get informations about the worker.

worker.get(function (errinfos) {
  console.log(infos); // worker infos 
});

Gracefully stop the worker.

Arguments:

  {object} [options] Optional options.
  {boolean} options.queue Close the worker queue (default true).
  {function} [callback] Optional callback.
worker.close(function (err) {
  // worker closed 
});

Emitted when:

  • a callback is omitted and an error occur in a method
  • a redis "error" event is emitted
worker.on('error', function (error) {
  // ... 
});

Emitted when an error is returned by the job process.

worker.on('job failure', function (taskerror) {
  // ... 
});

Emitted when a job is completed without error.

worker.on('job complete', function (task) {
  // ... 
});

Emitted when the worker status change.

worker.on('status change', function (status) {
  // ... 
});

Create a new queue to add task.

Arguments:

  {string} name Name of the task to process.
  {object} options Options.
  {object|function} options.redis Redis configuration.
  {boolean} options.unique Unique queue or not (default false).
// Create a new unique queue. 
var queue = taskman.createQueue('email', {unique: true});

Push a new task at the end of the queue.

queue.push({to: 'hello@world.com', body: 'Hello world!'});

Arguments:

  {function} [callback] Optional callback.
queue.close(function (err) {
  // queue closed 
});

Emitted when:

  • a callback is omitted and an error occur in a method
  • a redis "error" event is emitted
queue.on('error', function (error) {
  // ... 
});

Emitted when a new task is created.

queue.on('created', function (task) {
  // ... 
});

MIT

Written and maintained by Greg Bergé and Justin Bourrousse.

An original idea by David Desbouis.

Built an used at Le Monde.