level-q

Priority queuing for leveldb/levelup

level-q

Priority queuing for leveldb/levelup

This module is installed via npm:

$ npm install level-q

By default adding items to a queue will be ordered first in, first out.

var queue = require('level-q'),
    level = require('level'),
    bytewise = require('bytewise');
 
// instantiate queue (adds a 'queue' property on the db) 
var q = queue(level('/db/path', { keyEncoding: bytewise, valueEncoding: 'json' }));
 
// data to put in queue 
var data = {
  id: 1,
  name: 'Eugene',
  value: 42
};
 
// add to queue 
q.queue.push(data);
 
// read a single item from the queue, then stop listening 
q.queue.read(function (errvalue) {
  // value should equal data 
});
 
// keep listening to the queue 
q.queue.listen(function (errvaluekeynext) {
  // do something with the value 
  console.log(value);
  // ok, done processing, get the next item from the queue 
  next();
});

By providing an order by function you can return a key that can be used to sort the queue. This example uses a quality of service (QOS) field so that the higher the QOS, the earlier it will get serviced.

var queue = require('level-q'),
    level = require('level'),
    bytewise = require('bytewise');
 
// order the queue by the QOS field 
var db = queue(level('/db/path', { keyEncoding: bytewise, valueEncoding: 'json' }),
  { order: orderByQos });
 
function orderByQos(data) {
  return data.QOS;
}
 
// data to put in queue 
var data = [
  // should be serviced last 
  {
    id: 1,
    QOS: 'C',
    name: 'Eugene',
    value: 42
  },
  // should be serviced first 
  {
    id: 2,
    QOS: 'A',
    name: 'Eugene',
    value: 42
  },
  // should be serviced second 
  {
    id: 3,
    QOS: 'B',
    name: 'Eugene',
    value: 42
  }
];
 
// add to queue 
var count = data.length;
data.forEach(function (item) {
  db.queue.push(item, function (err) {
    if (err) throw err;
    --count || listen();
  });
});
 
function listen() {
  db.queue.listen(function (errvaluekeynext) {
    console.log(value);
    // should print the second item, then the third item, then the first 
    next();
  });
}

You can use the release option when instantiating your queue to control when an item is able to be returned to consumers of the queue.

For example, you may wish to create a job queue, and delay jobs until to a scheduled time in the future (eg. like a cron job);

var bytewise = require('bytewise'),
    level = require('level')
    range = require('range'),
    queue = require('level-q');
 
// create a priority queue based on time to next service 
var db = queue(
  level('/db/path', { keyEncoding: bytewise, valueEncoding: 'json' }),
  { order: orderByDeadline, release: releaseOnDeadline });
 
// sort queue by deadline 
function orderByDeadline(data) {
  return data.deadline;
}
 
// only allow an item to be read from queue if it's time is up 
function releaseOnDeadline(data) {
  return Date.now() >= data.deadline;
}
 
// add some work to the queue that will be scheduled 5 seconds in the future 
db.queue.push({
  deadline: Date.now() + 5000,
  state: 'square'
  n: 2,
  squareResult: 0,
  sinResult: 0
});
 
// The job will ONLY be available for processing once the 5 seconds has passed 
db.queue.listen(function (errvaluekeynext) {
  if (err) throw err;
  switch (value.state) {
    case 'square':
      // do the work 
      value.squareResult = value.* value.n;
 
      // schedule then next part of work 2 seconds in the future 
      value.state = 'sin';
      value.deadline = Date.now() + 2000;
      db.queue.push(value, next);
      break;
 
    case 'sin':
      // do the work 
      value.sinResult = Math.sin(value.n);
 
      console.log(value);
      // Should print out: 
      // { deadline: 1390487269236, 
      //    state: 'sin', 
      //    n: 2, 
      //    squareResult: 4, 
      //    sinResult: 0.9092974268256817 } 
      next();
      break;
  }
});