Intro
Light weight in memory message broker. There are no stat logging for task. Tasks are run in the order they arrive/complete. There is still work that can be done at the moment it handles around 10k tasks a sec.
Features
- atomic task execution
- guarantee task being pooled
- sub/unsub event message system
- light weight in memory storage
- task chaining
- zero dependencies
How does it work?
- A worker connects to the broker.
- He tells the broker he can work.
- The server pushes a task to the worker it sends a acknowledge back.
- Runs the task and sends back the output if there's a next/chained task.
Install
npm install --save git+https://git@github.com/anzerr/atium.broker.git
Example of a task
const {Task, Server, Event} = require('atium.broker'),
Request = require('request.libary');
class TestTask extends Task {
constructor(c) {
super({
...c,
type: 'default'
});
this.store = {};
this.on('event:done', (msg) => { // sub to done event
this.store[msg.n] = false;
});
}
run(task) {
console.log('t', this.who, task);
this.event('done', {n: task.stuff}); // send event to everyone execpt me
return Promise.resolve();
}
}
(() => {
const config = {
socket: 'localhost:3001',
api: 'localhost:3002',
tasks: ['task_10001']
};
const send = (task) => {
return new Request(`http://${config.api}`).json(task).post('/add');
};
let t = null;
let s = new Server(config); // server
let e = new TestTask({ // client to receive events no tasks
socket: config.socket,
api: config.api,
tasks: []
});
let out = [];
for (let x = 0; x < 10; x++) {
e.store[x] = true;
e.store[x + 100] = true;
out.push({
tasks: [ // chain tasks
{
task: config.tasks[0], // run this task first
input: {
stuff: x
}
},
{
task: config.tasks[0], // run this task second
input: {
stuff: x + 100
}
}
]
});
}
const eventClient = new Event(config); // event client without tasks handling
eventClient.init().then(() => {
eventClient.subscribe('done');
eventClient.on('event:done', (msg) => {
console.log(msg);
});
});
e.on('connect', () => {
e.subscribe('done').then(() => {
console.log('subscribe to "done"');
t = new TestTask(config); // task client
Promise.all([
new Promise((resolve) => t.on('connect', () => resolve())), // wait for client to connect
send(out) // send out task creation
]).then(() => {
console.log('setup done');
setTimeout(() => { // dirty close
for (let i in e.store) {
if (e.store[i]) {
console.log('missing', i, 'did not recive event');
}
}
console.log('done task', JSON.stringify(e.store));
t.close();
e.close();
s.close();
}, 1000);
});
});
});
})();