node package manager

zugzug

Synopsis

ZugZug is an unfancy task queue built on Redis.

stability 2 - unstable license - MIT Flattr this

Build Status Coverage Status

NPM status

Dependencies

Why?

Because Kue does too much and bare redis is too painful.

ZugZug?

Zug-zug.

Install

Node.js

With NPM

npm install zugzug

From source

git clone https://github.com/pluma/zugzug.git
cd zugzug
npm install
make

Usage example

Producer

var zz = require('zugzug')(); // use local Redis with default settings 
var queue = zz.queue('example');
 
setInterval(function() {
    // create a new job 
    var job = queue.createJob();
    // throw some data in there 
    job.data = 'There is always more work to be done.';
    // save it to the database 
    job.save().catch(function(err) {
        console.log('Something is wrong with your database.');
        console.error(err);
    });
}, 2000); // repeat every 2 seconds 

Consumer (Worker)

var zz = require('zugzug')();
var queue = zz.queue('example');
 
doForever(performJob);
 
function doForever(work) {
    return queue.next() // take a job from the queue 
        .then(work) // process the job 
        .then(function() {
            return doForever(work); // rinse, repeat 
        });
}
 
function performJob(job) {
    try {
        doSomeWork(job); // do something with the job 
    } catch(err) {
        // job failed. oh no! 
        console.log('job', job.id, 'failed!');
        console.error(err);
        return job.fail(err.message);
        // but the job can still be re-enqueued later 
    }
    // job is done! yay! 
    console.log('job', job.id, 'completed!');
    return job.complete();
}
 
function doSomeWork(job) {
    if (Math.random() > 0.5) {
        // randomly throw an error here because why not 
        throw new Error("That's a nice program you have there...");
    }
    // otherwise does stuff with the job's data 
    console.log(job.data);
}

API

All asynchronous methods in ZugZug return bluebird promises and accept node-style callbacks.

ZugZug

new ZugZug([options:Object]):ZugZug

Creates a new ZugZug instance that connects to a Redis server with the given options.

Use of the new keyword is optional.

In addition to the options accepted by redis, ZugZug recognizes port and server and passes them on correctly.

If options.prefix is defined, all Redis keys will use that prefix. Defaults to zugzug:.

zugzug.useDatabase(db:Number, [callback:Function]):Promise(self)

Tells the underlying Redis connection to use the database with the given number. Resolves to the ZugZug instance itself on success or is rejected with the error returned by redis for the underlying SELECT command.

zugzug.queue([name:String]):Queue

Returns a Queue instance representing the queue with the given name. Multiple invocations with the same name will return the same instance.

If name is not provided it is set to "default".

zugzug.getJob(id:String, [callback:Function]):Promise(Job)

Retrieves the job with the given id from the database. Resolves to a new Job instance representing the job on success or is rejected with the error returned by redis for the underlying HGETALL command.

If the job does not exist, resolves to null instead.

zugzug.getJobLog(id:String, [callback:Function]):Promise(Object[])

Retrieves the log entries associated with the job with the given id from the database. Resolves to an array of log entry objects on success or is rejected with the error returned by redis for the underlying LRANGE command.

If the job does not exist, resolves to an empty array instead.

zugzug.moveJob(id:String, toQueue:String, [callback:Function]):Promise(Boolean)

Moves the job with the given id to the queue toQueue. Resolves to true on success or is rejected with the error returned by redis for the underlying commands.

If the job does not exist, resolves to false instead.

zugzug.resetJob(id:String, [callback:Function]):Promise(Boolean)

Re-enqueues the job with the given id. Resolves to true on success or is rejected with the error returned by redis for the underlying commands.

The job will change its state to pending regardless of its previous state.

If the job does not exist, resolves to false instead.

If you want to put a failed job back in the queue, this will let you do that.

If you want the failed job to be retried immediately, use zugzug.startJob instead.

zugzug.startJob(id:String, [callback:Function]):Promise(Job)

Retrieves the job with the given id from the database. Resolves to a new Job instance representing the job on success or is rejected with the error returned by redis for the underlying commands.

The returned job will change its state to progress regardless of its previous state.

If the job does not exist, resolves to null instead.

If you want to restart a failed job in a worker script, this will let you do that.

If you only want the failed job to be re-enqueued, use zugzug.resetJob instead.

zugzug.quit([callback:Function]):Promise(self)

Terminates the database connection cleanly. Resolves to the ZugZug instance itself or is rejected with the error returned by redis for the underlying QUIT command.

Note that the Redis connection used by a ZugZug instance is shared by all queues and jobs associated with it.

It is probably a bad idea to call this method if queue.next is still waiting for new jobs.

Queues

queue.createJob([data:*, [maxFailures:Number]]):Job

Returns a new Job instance bound to this queue.

See job.data.

See job.maxFailures.

queue.getInfo([callback:Function]):Promise(Object)

Collects statistics for the queue from the databases. Resolves to an object containing the statistical information on success or is rejected with the error returned by redis for the underlying commands.

The result object will contain a property total denoting the total number of jobs in the queue, as well as a property for each possible state denoting the number of jobs in the queue that are currently set to each respective state.

See job.state.

queue.next([timeout:Number], [callback:Function]):Promise(Job)

Retrieves the oldest pending job from the queue. Resolves to a new Job instance representing the job on success or is rejected with the error returned by redis for the underlying commands.

The returned job will change its state from pending to progress.

If the queue is currently empty, it will wait until a new job is added to the queue.

If a timeout is provided, it will resolve to null if no job is found in the queue before the given number of seconds has elapsed.

If you want to use ZugZug in a worker script, this will likely be how you want to fetch new jobs from the queue.

queue.delete([callback:Function]):Promise(self)

Deletes the queue and all associated jobs from the database. Resolves to the Queue instance itself on success or is rejected with the error returned by redis for the underlying commands.

This will also remove the queue from the associated ZugZug instance's queue cache, so calling zugzug.queue(queue.name) will return a new Queue object.

Jobs

job.id:String (read-only)

The unique ID of the job which can be used to re-load the job with zugzug.getJob. This property is only defined if the job has been saved to the database.

job.data:*

The job's user-defined data. Will be serialized to JSON for storage in Redis, so you may want to avoid relying on non-serializable objects. Defaults to an empty object.

job.queue:String (read-only)

The name of the queue this job is bound to. If you want to move a job to a different queue, use zugzug.moveJob(id, name).

job.state:String (read-only)

The current state of the job. This property is only defined if the job has been saved to the database.

pending

The job has been added to the queue and is waiting to be picked up by a worker.

progress

The job has been picked up by a worker and is currently being worked on.

error

The job has failed too many times.

See job.fail.

done

The job has been completed successfully.

job.progress:Number (read-only)

The progress of this job at the time of the most recent update. This value will be set to 0 whenever the job is (re-)started.

job.failures:Number (read-only)

The number of times that this job has failed.

See job.fail.

job.maxFailures:Number

The maximum number of times the job is allowed to fail before it will no longer be tried again. Defaults to 1.

See job.fail.

job.created:Date (read-only)

The Date at which the job was first saved to the database. Will be set automatically on the first call to save or when the job is loaded from the database.

job.updated:Date (read-only)

The Date at which the job was last saved to the database. Will be updated automatically for every operation that modifies the job in the database (including job.log).

job.save([callback:Function]):Promise(self)

Saves the job to the database. Resolves to the Job instance itself on success or is rejected with the error returned by redis for the underlying commands.

If the job did not exist in the database before, it will be assigned a unique ID and its state will be set to pending.

job.update(progress:Number, [message:String], [callback:Function]):Promise(self)

Updates the job's progress to the given progress. Resolves to the Job instance itself on success or is rejected with the error returned by redis for the underlying commands.

The progress should be a value between 0.0 and 1.0.

If message is provided, a matching log entry will be created.

job.log(message:String, [details:*], [callback:Function]):Promise(self)

Logs the given message to the job's log with the current timestamp. Resolves to the Job instance itself on success or is rejected with the error returned by redis for the underlying commands.

If details is provided, it will be serialized to JSON and stored on the log message.

job.start([callback:Function]):Promise(self)

Sets the job's state to progress and creates a log entry. Resolves to the Job instance itself on success or is rejected with the error returned by redis for the underlying commands.

job.complete([callback:Function]):Promise(self)

Sets the job's state to done and creates a log entry indicating success. Resolves to the Job instance itself on success or is rejected with the error returned by redis for the underlying commands.

job.fail([details:*], [callback:Function]):Promise(self)

Increments the job's number of failures and creates a log entry indicating failure. Resolves to the Job instance itself on success or is rejected with the error returned by redis for the underlying commands.

If failures is lower than the job's maxFailures or maxFailures is set to 0, the job's state will be reset to pending and it will be re-enqueued automatically.

If failures is greater than or equal to the job's maxFailures, the job's state will be set to error and it will not be re-enqueued.

If details is provided, it will be serialized to JSON and stored on the log message.

job.delete([callback:Function]):Promise(self)

Deletes the job from the database. Resolves to the Job instance itself on success or is rejected with the error returned by redis for the underlying commands.

License

The MIT/Expat license. For more information, see http://pluma.mit-license.org/ or the accompanying LICENSE file.