node package manager
Share your code. npm Orgs help your team discover, share, and reuse code. Create a free org »



queue batch jobs and stream results to a blob store

build status


adding a job

First, we can add a job. Adding a job both creates the job and pushes it onto the pending queue:

var batchdb = require('batchdb');
var db = require('level')('/tmp/compute.db');
var compute = batchdb(db, { path: '/tmp/compute.blobs' });
compute.add().end('sleep 5; date');
compute.on('create', function (key) {
    console.log('created', key);
compute.on('push', function (key, created) {
    console.log('pushed', key, created);

Now we'll schedule a few jobs:

$ node add.js
created ce6cc4e7fcb4a902f7c836cb93a32ae02cb47679c86e3d99c811233bf6f258c0
pushed ce6cc4e7fcb4a902f7c836cb93a32ae02cb47679c86e3d99c811233bf6f258c0 1409895158130
$ node add.js
created ce6cc4e7fcb4a902f7c836cb93a32ae02cb47679c86e3d99c811233bf6f258c0
pushed ce6cc4e7fcb4a902f7c836cb93a32ae02cb47679c86e3d99c811233bf6f258c0 1409895160930

The job ID is the same in this case because the job content is identical, but the job was still scheduled to run twice.

listing pending jobs

Now that jobs have been added, they show up in the pending list:

var batchdb = require('batchdb');
var db = require('level')('/tmp/compute.db');
var compute = batchdb(db, { path: '/tmp/compute.blobs' });
compute.list('pending').on('data', console.log);
{ job: 'ce6cc4e7fcb4a902f7c836cb93a32ae02cb47679c86e3d99c811233bf6f258c0',
  created: 1409895158130,
  running: false }
{ job: 'ce6cc4e7fcb4a902f7c836cb93a32ae02cb47679c86e3d99c811233bf6f258c0',
  created: 1409895160930,
  running: false }

running jobs

Now we can tell the jobs to run by calling and providing a run function that will store the batch results:

var batchdb = require('batchdb');
var spawn = require('child_process').spawn;
var duplexer = require('duplexer');
var db = require('level')('/tmp/compute.db');
var compute = batchdb(db, { path: '/tmp/compute.blobs', run: run });
function run (key) {
    var ps = spawn('bash');
    return duplexer(ps.stdin, ps.stdout);
compute.on('result', function (key, id) {
    console.log('RESULT', key, id);
$ node run.js 
RESULT ce6cc4e7fcb4a902f7c836cb93a32ae02cb47679c86e3d99c811233bf6f258c0 0469e2796db49df219ca5d9380dddd5b48f9a7fe930945d1988aa966a62f1e0a
Thu Sep  4 22:36:12 PDT 2014
RESULT ce6cc4e7fcb4a902f7c836cb93a32ae02cb47679c86e3d99c811233bf6f258c0 4c2d44f035f6fa04a1246806fda7f881347b96b8ff821a0a5aacad08e904d06c
Thu Sep  4 22:36:17 PDT 2014


var batchdb = require('batchdb')

var compute = batchdb(db, opts)

Create a batchdb instance, compute from a leveldb handle db and some options:

var ws = compute.create(cb)

Create a new job from the payload written to the writable stream ws.

After payload has been saved and the job has been written to the database, cb(err, jobkey) fires with the jobkey id.

compute.push(jobkey, cb)

Push a job to the end of the pending queue by its key, jobkey.

var ws = compute.add(cb)

Create a job and push it to the pending queue in one step. This is similar to calling .create() followed by .push(), but more atomic.

Process the pending queue by most-recent first.

var rs = compute.list(type)

Return a readable object stream rs that contains rows for each type:

  • 'job'
  • 'pending'
  • 'result'
  • 'list'

example job object:

{ key: 'ce6cc4e7fcb4a902f7c836cb93a32ae02cb47679c86e3d99c811233bf6f258c0',
  running: [] }

example pending object:

{ job: 'ce6cc4e7fcb4a902f7c836cb93a32ae02cb47679c86e3d99c811233bf6f258c0',
  created: 1409896340875,
  running: false }

example result object:

{ key: 
   [ 'ce6cc4e7fcb4a902f7c836cb93a32ae02cb47679c86e3d99c811233bf6f258c0',
     1409895158130 ],
   { hash: '0469e2796db49df219ca5d9380dddd5b48f9a7fe930945d1988aa966a62f1e0a',
     start: 1409895367770,
     end: 1409895372831,
     created: 1409895158130 } }

example fail object:

{ key:
   [ 'fail',
     1410274490145 ],
  value: { message: 'yo' } }

var rs = compute.get(key)

Return a readable stream of the result or job hash key.

To fetch a result from the compute.list('result') stream records, use row.value.hash as the result id to call compute.getResult(id).


compute.on('create', function (jobkey) {})

When a new job is created, this event fires with the job key jobkey.

compute.on('push', function (jobkey, created) {})

When a job is pushed to the pending queue, this event fires with the job key jobkey and an epoch time created in milliseconds that uniquely identifies the instance of the job.

compute.on('result', function (jobkey, resultId, created) {})

When a job finishes, this event fires with the jobkey, the resultId, and the created time.


With npm do:

npm install batchdb