Node.js Redis Message Queue
Example
var KongMQ = require('kong-mq');
var MQ = KongMQ.create({ port: 6379, host: '127.0.0.1' });
var Q = MQ.queue({ name: "test" });
setInterval( () => {
for (let index = 0; index < 1000; index++) {
Q.push({ index: index, date: Date.now() }, (err, res)=>{
console.log(res);
});
}
}, 10000);
var MQ = KongMQ.create({ port: 6379, host: '127.0.0.1' });
var Q = MQ.queue({ name: "test", auto: true });
Q.on('process', (message) => {
console.log(Date.now(), process.pid, JSON.stringify(message));
});
var MQ = KongMQ.create({ port: 6379, host: '127.0.0.1' });
MQ.subscribe('channel',()=>{
});
MQ.on('message', function (channel, message) {
console.log('Receive message %s from channel %s', message, channel);
});
var MQ = KongMQ.create({ port: 6379, host: '127.0.0.1' });
MQ.publish('channel', 'Hello world!');
var MQ = KongMQ.create({ port: 6379, host: '127.0.0.1' });
var Q = MQ.queue({ name: "test", auto: true, limit: 1 });
Q.on('process', (message) => {
console.log(Date.now(), process.pid, JSON.stringify(message));
await new Promise(resolve => setTimeout(resolve, 5000));
console.log( Date.now(),process.pid, "end wait 5s");
Q.ack(message);
});
var MQ = KongMQ.create({ port: 6379, host: '127.0.0.1' });
MQ.process(['USDT-BTC', 'GEEK-BTC', 'XLM-BTC', 'USDT-GEEK','XRP-USDT', 'XLM-USDT', 'DASH-USDT'], { limit: 1});
MQ.on('process', async (queue, message) => {
console.log(Date.now(), process.pid, queue.name, JSON.stringify(message));
queue.ack(message);
});