@johnmmackey/amqp-msrvlib

1.0.2 • Public • Published

amqp-msrvlib

Wrapper for ampqlib providing wrappers useful for microservice frameworks:

  • persistent connections
  • RPC helper framework, providing queue management & JSON serialization
  • convience methods for application event pub/sub

Due to the use of ES6 features, this module requires Node.js V8 or above.

Quickstart

npm install @johnmmackey/amqp-msrvlib
const amqpms = require('@johnmmackey/amqp-msrvlib');

// Establish a connection to server
// Server URI is taken from AMQP_URI environment variable.
// If not set, defaults to 'amqp://localhost'
var c = ampqms.amqpPersistentConnect();

// Alternately, a server can be specified:
var c = ampqms.amqpPersistentConnect('amqp://amqpserver.domain.com');

amqpPersistentConnect() returns an instance of EventEmitter. Events:

  • debug: informational
  • ready: when connection is ready
  • failed: if connection drops

Usage example:

    c.on('debug', status => {
        console.log('AMPQ Connection debug:', status);
    });

Core Methods

A connection has the following core methods:

  • addConfig(f), where f is a function returning a promise. Example:
    c.addConfig(async ch => {
        await ch.assertExchange('sillyex', 'fanout');
        await ch.assertQueue('sillyq');
        await ch.bindQueue('sillyq', 'sillyex', '');
        ch.consume('sillyq', msg => {
            myConsumerFunction(msg);
        }, { noAck: true });
    });
  • ack(msg) - acknowledge a message (used in consumer functions)

AppEvent Methods

  • registerAppEventRxer
c.registerAppEventRxer(function (msg) {
    winston.info('Got queue message'
    winston.info(msg.properties.timestamp, msg.properties.appId, msg.properties.type);
    winston.info('Msg headers:', JSON.stringify(msg.properties.headers));
    winston.info('Msg content:', msg.content.toString());

    var e = new Event({
        source: msg.properties.appId,
        created: msg.properties.timestamp,
        objectType: msg.properties.headers.objectType,
        operation: msg.properties.headers.operation,
        eventData: msg.parsedContent
    });
  • publishAppEvent(objType, operation, msg)

RPCQ Class

This class is used to manage a queue towards an RPC server. Note that under the hood, json-rpc 2.0 is used, although this is transparent to the class consumer.

var rpcQ = new (ampqms.RPCQ)(connection, 5000);     //connection and RPC ttl for this queue
// its an event emitter - only emits 'debug'
rpcQ.on('debug', msg => {
    winston.debug('rpcQ:', msg);
})

To send an RPC request, use the add method. Parameters are:

  • the destination queue name (string)
  • the method to invoke on the server (string)
  • parameters (any type, optional, and can be an array)
rpcQ.add('server', 'test', {a: 2})
        .then(r => {
            // response is an object
            winston.verbose('Result is', JSON.stringify(r));
        })
        .catch(err => {
            winston.error('RPC failed:', err.message);
        });

RPCServer Class

This class is used by the RPC Server.

var n = new amqpms.RPCServer(connection, server_queue_name, callback);

The callback function must be an async function. It is invoked with two parameters - the method and params from the RPC request above.

var c = amqpms.amqpPersistentConnect();
(new amqpms.RPCServer(c, 'serverq', async (method, params) => {
    if (there is a problem)
        return Promise.reject(new Error('Cant do that'));
    // do some async work
    return result;
}))
    .on('debug', debug => {
		//console.log('RPCServer Debug:', debug);
	})

Readme

Keywords

none

Package Sidebar

Install

npm i @johnmmackey/amqp-msrvlib

Weekly Downloads

1

Version

1.0.2

License

ISC

Unpacked Size

13.5 kB

Total Files

4

Last publish

Collaborators

  • johnmmackey