amqp-utils
Helper functions to use with ampqlib. Provides persistent connection to the AMQP server, an RPC layer and an application event service.
Due to the use of ES6 features, this module requires Node.js V8 or above.
Connection Manager Quickstart
npm install @johnmmackey/amqp-utils
const { ConnectionManager, ConnectionManagerClass } = require('@johnmmackey/amqp-utils');
// Establish a connection to server
// Server URI is taken from AMQP_URI environment variable.
// If not set, defaults to 'amqp://localhost'
ConnectionManager.connect();
// Alternately, a server can be specified:
ConnectionManager.connect('amqp://amqpserver.domain.com');
ConnectionManager
is an instance of EventEmitter
. Events:
-
debug
: informational -
ready
: when connection is ready -
failed
: if connection drops
Usage example:
ConnectionManager.on('debug', status => {
console.log('AMPQ Connection debug:', status);
});
Connection Manager Core Methods
A connection has the following core methods:
- addConfig(f), where f is a function returning a promise. Example:
ConnectionManager.addConfig(async ch => {
await ch.assertExchange('sillyex', 'fanout');
await ch.assertQueue('sillyq');
await ch.bindQueue('sillyq', 'sillyex', '');
ch.consume('sillyq', msg => {
myConsumerFunction(msg);
}, { noAck: true });
});
- ConnectionManager.ack(msg) - acknowledge a message (used in consumer functions)
RPC
Support is provided for RPC Operations using the RPC
component.
Server side uses RPC.RPCServer
to set up a destination and method:
new RPC.RPCServer('rpc_dest', async (method, params) => {
winston.debug('Got RPC request for some function. Method:', method, 'Parameters:', JSON.stringify(params));
return method(params);
})
.on('debug', msg => {
winston.debug('RPCServer debug:', msg);
});
As shown above, RPCServer
is an instance of EventEmitter
.
Client side uses RPCQueueManager
as follows:
var rpcQM = new RPC.RPCQueueManager();
rpcQM.on('debug', debug => {
winston.debug('RPCQM debug:', debug);
});
rpcQM.add('rpc_dest', 'somemethod', params)
.then(r => {
// response is an object
winston.debug('RPC result is', JSON.stringify(r));
})
.catch(err => {
winston.error('RPC Failed failed:', err.message);
});
AppEvents
Under review. Currently used by checkin controller in cgmem.