amqp.channel
A simplified way to setup an AMQP connection/channel with amqplib. It's a function that takes an AMQP url as the first parameter and an optional second parameter that defines which methods and arguments should be called on the channel
. The function returns a Promise
that will resolve with the a channel
object once all the method invocations defined in the second parameter have been resolved. Please see amqplib's documentation for the channel
API.
Simplified Configuration
amqplib syntax:
;
amqp.channel syntax:
url assertExchange : 'exchange' 'fanout' durable: true checkExchange : 'exchange' bindExchange : 'alt.exchange' 'exchange' '' unbindExchange : 'alt.exchange' 'exchange' '' deleteExchange : 'alt.exchange' ifEmpty: true assertQueue : 'first' durable: true 'second' checkQueue : 'first' bindQueue : 'first' 'exchange' '' unbindQueue : 'first' 'exchange' '' purgeQueue : 'first' deleteQueue : 'first' ifEmpty: true 'second';
Simplified Usage
The channel
object resolved by the returned Promise
will behave differently from a normal channel
object returned by the amqplib library in a few (hopefully convenient) ways:
- The
consume
,publish
, andsendToQueue
channel methods have been changed to explicitly handle JSON. - The
publish
andsendToQueue
methods have been "promisified" in a way that will still provide information to know whether or not the write buffer is full (and therefore, whether or not you should continue writing to it) by adding an additionalok
boolean property to the promise. - A
channel
consumer callback will no longer receivenull
when that consumer had been cancelled by Rabbit MQ. Instead, thechannel
object will emit a'cancelled'
event with all the arguments passed to thechannel.consume()
call for the consumer that was cancelled.
Examples of Modified Usage:
Automatic translation of JS object to JSON string to Buffer for sending/publishing:
channel;channel;
Promisification of sendToQueue
and publish
methods:
return channel;
Automatic translation of message Buffer to JSON string to JS object for consuming:
channel;channel;
Handling a consumer getting cancelled by Rabbit MQ:
channel;channel;
The ok
property on the promises returned by the sendToQueue
and publish
methods:
var sent = channel;if sentok // continue sending else // maybe pause sending until unblocked? channel;
Real World Example
Say you wanted to listen to the 'foo'
exchange and send a different message to the 'bar'
queue every time the message's baz
property contained the word 'qux'
.
In your config.js:
var env = processenv;var cfg = exchange: envEXCHANGE_TO_BIND_TO || 'foo' queue: toSendTo: envQUEUE_TO_SEND_TO || 'bar' toConsumeFrom: envQUEUE_TO_CONSUME_FROM || 'baz' amqpUrl: envRABBIT_MQ_URL || 'amqp://test:test@192.168.2.2:5672'; cfgchannelMethodsToCall = assertQueue: // Channel method to invoke // Array of channel method invocations // channel.assertQueue( cfg.queue.toConsumeFrom ) cfgqueuetoConsumeFrom // channel.assertQueue( cfg.queue.toSendTo, { durable: true } ); cfgqueuetoSendTo durable: true assertExchange: cfgexchange 'fanout' // channel.assetExchange(cfg.exchange, 'fanout') bindQueue: cfgqueuetoConsumeFrom cfgexchange '' moduleexports = cfg;
In your app.js:
var cfg = ;var amqp = ; moduleexports = ; { return { // Only process `maxMessages` at a time and don't consume another // message until we've either `ack` or `nack` the current one. return channel; }} { return { channel; channel; return channel; }}