node package manager

carrotmq

carrotmq

a much easy way to use rabbitmq

中文文档

Build Status Version npm NPM Downloads Dependencies NPM

usage

const carrotmq = require('carrotmq');
//var rabbitmqSchema = require('rabbitmq-schema'); 
const rabbitmqSchema = carrotmq.schema;
 
//see https://www.npmjs.com/package/rabbitmq-schema 
const schema = new rabbitmqSchema({
    exchange: 'exchange0',
    type: 'topic',
    bindings: [{
      routingPattern: 'foo.bar.#',
      destination: {
        queue: 'fooQueue',
        messageSchema: {}
      }
    }]
});
const mq = new carrotmq('amqp://localhost', schema);
 
const publisher = new carrotmq('amqp://localhost'); //also can use without schema 
 
mq.queue('fooQueue', function (data){
    console.log(data);
    this.ack();
    //this.nack(); 
    //this.reject(); 
    //this.cancel(); cancel this consumer; 
    this.reply({date: new Date}); //reply to message.properties.relyTo 
    this.carrotmq //carrotmq instrance 
    this.channel  //current channel 
    return Promise.reject(); // or throw new Error('some thing happened') will execute `this.reject()` if this message hadn't been ack 
});
 
mq.sendToQueue('queue', {msg: 'message'});
mq.publish('exchange', 'foo.bar.key', {msg: 'hello world!'});

RPC

mq.rpc('queue', {data: new Date})
.then((reply)=>{
  reply.ack();
  console.log(reply.data); //some reply result 
});

RPC Over Exchange

 
//{ 
//    routingPattern: 'rpc.#', 
//   destination: { 
//      queue: 'rpcQueue', 
//      messageSchema: {} 
//    } 
//  } 
 
app.queue('rpcQueue', function (data) {
  this.reply(data);
  this.ack();
}, true);   /* true here for mark this queue is a rpc queue,
carrotmq will wrap real content with json {replyTo: 'queue', content: {buffer}}
for replyTo properties,because of rabbitMQ will ignore
message sent to exchange with vanilla replyTo ,
if server side doesn't using carrotmq ,just handle {replyTo: 'queue', content: {buffer}}*/
 
let time = new Date();
app.rpcExchange('exchange0', 'rpc.rpc', {time})
.then(function (reply){
  reply.ack();
  console.log(reply.data)//{time: time} 
})

events

ready

emit after connection established

mq.on('ready', function(){});

error

emit when something happened

mq.on('error', function (err){});

message

emit when message come

mq.on('message', function (data){
  data.channel; //channel object 
  data.queue   //queue name 
  data.message  //message object 
})

upgrade

V2 to V3

breaking change

  • mq.rpc() and mq.rpcExchange() method remove the 4th consumer argument.And using Promise

used to

  mq.rpc('someQueue', {data}, function(data) {
    const that = this;
    // or some data async logic 
    doSomeThingAsync(data)
    .then(() => that.ack())
    .catch(() => that.nack());
    return data;
  }).then((data) => console.log(data));

now can replaced by

    let reply = await mq.rpc('someQueue', {data});
    try {
      await doSomeThingAsync(reply.data);
      reply.ack();
    } catch (e) {
      reply.nack();
    }