lean-mq
Lean and customizable in-memory message queue. Main motivation is to provide grouping based message / job queue functionality.
In future - a plan is to provide different persistence options.
Quick start
Simple queue
var leanmq = require('lean-mq');
var q = leanmq.Queue('test');
// function to handle tasks
q.handle(function(obj){
console.log('handle - obj = ', obj );
return Promise.resolve(true);
});
// add tasks
q.add( { key: "1", data:"abc" } );
//
// Prints: handle - obj = { key: "1", data:"abc" }
//
Chain (groupped queue)
var leanmq = require('lean-mq');
var opts = {window:5000};
// window -> time-window (ms) to use for groupping, default is 5 seconds
var c = leanmq.Chain('test', opts);
// function to attach one or more workers
q.attach(function(obj){
console.log('worker 1 - obj = ', obj );
return Promise.resolve(true);
});
q.attach(function(obj){
console.log('worker 2 - obj = ', obj );
return Promise.resolve(true);
});
// add tasks
q.add( "a", { id: 1 } );
q.add( "b", { id: 2 } );
q.add( "a", { id: 3 } );
q.add( "b", { id: 4 } );
//
// Prints:
// worker 1 - obj = { key: "a", data:[{ id: 1 }, { id: 3 }] }
// worker 2 - obj = { key: "b", data:[{ id: 2 }, { id: 4 }] }
//
Chain [persistent]
var leanmq = require('lean-mq');
var opts = {window: 5000, storage: 'redis', connection: {host: "localhost" }};
var c = leanmq.Chain('test', opts);
// everything else is the same
// for all connection options, see:
// https://github.com/NodeRedis/node-redis#options-object-properties
/*
{
host: '127.0.0.1',
port: 6379,
db: 0,
options: {},
}
*/
API
Queue
constructor(name, opts)
Constructor for the simple queue object, opts are optional
// DEFAULT_OPTIONS
{
storage: 'memory', // memory, redis
connection: null,
attempts: 3,
retry_delay: 1000
}
.handle(function)
Sets a handler function. Function should accept an object as argument and return promise.resolve / reject
.add(msg)
Adds a task to the simple queue, that is then processed by function
.on(eventName, function)
Listens to event emitter - new
Chain
constructor(name, opts)
Constructor for the chain (groupped queue) object, opts are optional
// DEFAULT_OPTIONS
{
window: 5000 // 5 seconds
}
.attach(function)
Sets a worker function. Function should accept an object as argument and return promise.resolve / reject Round-robin is used to balance the load.
.add(key, msg)
Adds a task to the chain, that is then processed by one of functions after time. Key is used for groupping
.on(eventName, function)
Listens to event emitter - error
Broker
constructor(opts)
// DEFAULT_OPTIONS
{
port: 2525 // to accept Socket connection
}
Client (requires running broker)
constructor(opts)
// DEFAULT_OPTIONS
{
name: 'none',
url: 'localhost:2525' // broker address and port
}
.send / .connect / .subsribe / .close
Performance
lean-mq performance
comparision
name | add | feature 1 | feature 2 | feature 3 |
---|---|---|---|---|
lean-mq | 0 ms | + | + | - |
some other | 0 ms | + | - | + |
Other projects
- lean-cache - https://www.npmjs.com/package/lean-cache