node package manager


Job Queue in LevelDB


Job Queue in LevelDB for Node.js

  • Define worker functions
  • Persist work units
  • Work units are retried when failed
  • Define maximum concurrency
$ npm install level-jobs --save
var levelup = require('levelup');
var db = levelup('./db')
var Jobs = require('level-jobs');

This function will take care of a work unit.

function worker(payload, cb) {

This function gets 2 arguments: one is the payload of the work unit and the other is the callback function that must be called when the work is done.

This callback function accepts an error as the first argument. If an error is provided, the work unit is retried.

var queue = Jobs(db, worker);

This database will be at the mercy and control of level-jobs, don't use it for anything else!

(this database can be a root levelup database or a sublevel)

You can define a maximum concurrency (the default is Infinity):

var maxConcurrency = 2;
var queue = Jobs(db, worker, maxConcurrency);

As an alternative the third argument can be an options object with these defaults:

var options = {
  maxConcurrency: Infinity,
  maxRetries:     10,
  backoff: {
    randomisationFactor: 0,
    initialDelay: 10,
    maxDelay: 300
var queue = Jobs(db, worker, options);
var payload = {what: 'ever'};
var jobId = queue.push(payload, function(err) {
  if (err) console.error('Error pushing work into the queue', err.stack);

(Only works for jobs that haven't started yet!)

queue.del(jobId, function(err) {
  if (err) console.error('Error deleting job', err.stack);

(Only works for jobs that haven't started yet!)

var stream = queue.readStream();
stream.on('data', function(d) {
  var jobId = d.key;
  var work = d.value;
  console.log('pending job id: %s, work: %j', jobId, work);

A queue object emits the following event:

  • drain — when there are no more jobs pending. Also happens on startup after consuming the backlog work units.
  • error - when something goes wrong.
  • retry - when a job is retried because something goes wrong.

If you simply want a pure queue client that is only able to push jobs into the queue, you can use level-jobs/client like this:

var QueueClient = require('level-jobs/client');
var client = QueueClient(db);
client.push(work, function(err) {
  if (err) throw err;