amqp-msrvlib
Wrapper for ampqlib providing wrappers useful for microservice frameworks:
- persistent connections
- RPC helper framework, providing queue management & JSON serialization
- convience methods for application event pub/sub
Due to the use of ES6 features, this module requires Node.js V8 or above.
Quickstart
npm install @johnmmackey/amqp-msrvlib
const amqpms = require('@johnmmackey/amqp-msrvlib');
// Establish a connection to server
// Server URI is taken from AMQP_URI environment variable.
// If not set, defaults to 'amqp://localhost'
var c = ampqms.amqpPersistentConnect();
// Alternately, a server can be specified:
var c = ampqms.amqpPersistentConnect('amqp://amqpserver.domain.com');
amqpPersistentConnect()
returns an instance of EventEmitter
. Events:
-
debug
: informational -
ready
: when connection is ready -
failed
: if connection drops
Usage example:
c.on('debug', status => {
console.log('AMPQ Connection debug:', status);
});
Core Methods
A connection has the following core methods:
- addConfig(f), where f is a function returning a promise. Example:
c.addConfig(async ch => {
await ch.assertExchange('sillyex', 'fanout');
await ch.assertQueue('sillyq');
await ch.bindQueue('sillyq', 'sillyex', '');
ch.consume('sillyq', msg => {
myConsumerFunction(msg);
}, { noAck: true });
});
- ack(msg) - acknowledge a message (used in consumer functions)
AppEvent Methods
- registerAppEventRxer
c.registerAppEventRxer(function (msg) {
winston.info('Got queue message'
winston.info(msg.properties.timestamp, msg.properties.appId, msg.properties.type);
winston.info('Msg headers:', JSON.stringify(msg.properties.headers));
winston.info('Msg content:', msg.content.toString());
var e = new Event({
source: msg.properties.appId,
created: msg.properties.timestamp,
objectType: msg.properties.headers.objectType,
operation: msg.properties.headers.operation,
eventData: msg.parsedContent
});
- publishAppEvent(objType, operation, msg)
RPCQ Class
This class is used to manage a queue towards an RPC server. Note that under the hood, json-rpc 2.0 is used, although this is transparent to the class consumer.
var rpcQ = new (ampqms.RPCQ)(connection, 5000); //connection and RPC ttl for this queue
// its an event emitter - only emits 'debug'
rpcQ.on('debug', msg => {
winston.debug('rpcQ:', msg);
})
To send an RPC request, use the add
method. Parameters are:
- the destination queue name (string)
- the method to invoke on the server (string)
- parameters (any type, optional, and can be an array)
rpcQ.add('server', 'test', {a: 2})
.then(r => {
// response is an object
winston.verbose('Result is', JSON.stringify(r));
})
.catch(err => {
winston.error('RPC failed:', err.message);
});
RPCServer Class
This class is used by the RPC Server.
var n = new amqpms.RPCServer(connection, server_queue_name, callback);
The callback function must be an async function. It is invoked with two parameters - the method and params from the RPC request above.
var c = amqpms.amqpPersistentConnect();
(new amqpms.RPCServer(c, 'serverq', async (method, params) => {
if (there is a problem)
return Promise.reject(new Error('Cant do that'));
// do some async work
return result;
}))
.on('debug', debug => {
//console.log('RPCServer Debug:', debug);
})