iamqp

2.2.1 • Public • Published

iamqp.js

Known Vulnerabilities Build

A simple AMQP (Advanced Message Queuing Protocol) wrapper written based on amqplib. Has several AMQP flavors (wrappers):

  • basic producer with the class PlainPublisher
  • basic consumer with the class PlainConsumer
  • fanout exchange producer with the class FanoutProducer
  • fanout exchange consumer with the class FanoutConsumer
  • RPC client with the class RPCClient
  • RPC server with the class RPCServer

Example usage of these is (fast) communication between:

  • processes
  • instances in distributed systems
  • web, worker communication

Each flavour has two sides; producer and a consumer. Producer (worker) publishes/distributes data/information to consumers. Special flavour are RPC instances where a client/server relation exists.

Prerequisites

A guide to installing RabbitMQ and some introduction:

Some AMQP service providers:

WoW

Install with:

npm install iamqp --save
yarn add iamqp

Common

All instances have:

  • openConnection() as to establish the connection to the RabbitMQ instance
  • closeConnection() as to close the established connection
  • isReady() as to get the info if the connection is ready
  • getEventer() as to get an instance of a Event Emitter that the code uses to signal messages and information.

Several things can be listened to on the eventer that are common to all instances:

  • error - you listen on this one when an error occurs - if you don't, the error bubbles and crashes your execution:
producer.getEventer().on('error', (err) => {
  console.log('ERROR ' + err.message);
});
  • connected - you can listen for this one as to know when the instance connects to the server:
producer.getEventer().on('connected', () => {
  console.log('... connected');
});
  • closed - you can listen for this one as to know if the connection closes:
producer.getEventer().on('closed', () => {
  console.log('... closed');
});

Individual instances then have their own events for data manipulation. However, take care, as it might happen that you set to many listeners on a given event which can be a leak.

The connection needs to be established via the mentioned openConnection() method but there is an automatic reconnect trigger if the connection fails. This trigger is however not triggered if you call the mentioned closeConnection() method.

Init/configuration

All instances require at least two parameters on initialization:

  • URI of the AMQP
  • name of the channel

The same URI and channel name is to be set on both instances that are communicating:

  • PlainConsumer <-> PlainProducer
  • FanoutConsumer <-> FanoutProducer
  • RPCClient <-> RPCServer
let instance = new iamqp['...'](amqpUri, channelName);

All instances accept an optional third argument which configures several things:

  • printMessages - if true, the messages will be printed when the logging is enabled. Defaults to true.
  • maxMessageLengthToPrint - f the messages are to printed, this is the max number of bytes that a message can be to be printed. Defaults to 257.
  • queueArguments - is to configure pairs:
    • PlainConsumer <-> PlainProducer
    • RPCClient <-> RPCServer
  • connectionOptions
QueueArguments

queueArguments are related to the configuration of the Queue (over which they are communicating):

  • messageTtl - 0 <= n < 2^32 expires messages arriving in the queue after n milliseconds
  • expires - 0 < n < 2^32 the queue will be destroyed after n milliseconds of disuse, where use means having consumers, being declared
  • maxLength - sets a maximum number of messages the queue will hold

Default options for all queueArguments are to be considered to be infinity.

When setting any of queueArguments on one instance the same value is to be set on the sister instance. This is so because they both try to assert an Queue on the same channel and if one has already asserted the Queue, the other will fail if it provides different options.

Note on this

In general, if your Queues are persistent, before changing any of the options related to them, you would need to manually delete to Queue in question as to allow the code to re-assert it.

ConnectionOptions

Configuration of the connection:

  • heartbeat - period of the connection heartbeat, in seconds. Defaults to 60.
Configuration options
const iamqp = require('iamqp');
const instance = new iamqp['...']('...', '...', {
  printMessages: true,
  maxMessageLengthToPrint: 512,
  queueArguments: {
    maxLength: 100,     // max number of queue entries
    messageTtl: 10000,  // message TTL
    expires: 60000      // remove queue after dis-use
  },
  connectionOptions: {
    heartbeat: 60
  }
});

Plain

Plain producer/consumer pair is when we need one or multiple producers (on the same channel) publishing to one consumer. Example could be multiple web instances that periodically publish some data to a central aggregator.

For detailed documentation check the classes:

  • PlainConsumer
  • PlainProducer

Plain Consumer

const iamqp              = require('iamqp');
const amqpUri            = 'amqp://localhost';
const channel            = 'amqp-channel-b';
let plainConsumer        = new iamqp.PlainConsumer(amqpUri, channel);
 
// This needs to be done or the errors will bubble up.
plainConsumer.getEventer().on('error', (err) => {
    console.log('consumer error:' + err.message);
});
 
// For when the connection is established.
plainConsumer.getEventer().on('connected', () => {
    console.log('consumer connected');
});
 
// Get messages.
plainConsumer.getEventer().on('message', (message) => {
    console.log('consumer message:' + message);
});
 
// Open the connection - this takes some time and is not sync
plainConsumer.openConnection();
 
// ...
 
// Close the connection
if (plainConsumer.closeConnection()) {
    console.log('consumer connection closing ...');
}

Plain Producer

const iamqp              = require('iamqp');
const amqpUri            = 'amqp://localhost';
const channel            = 'amqp-channel-b';
let plainProducer        = new iamqp.PlainProducer(amqpUri, channel);
 
// This needs to be done or the errors will bubble up.
plainProducer.getEventer().on('error', (err) => {
    console.log('producer error:' + err.message);
});
 
// For when the connection is established.
plainProducer.getEventer().on('connected', () => {
    console.log('producer connected');
});
 
// Open the connection - this takes some time and is not sync
plainProducer.openConnection();
 
// Publish a single message.
let isPublishing = plainProducer.publish({
    'a': 3
});
 
// Close the connection
if (plainProducer.closeConnection()) {
    console.log('producer connection closing ...');
}

Fanout

Fanout is when each producer (there can be many on the same channel) publishes a message that gets "faned-out" to all the fanout consumer on the same channel. Every consumer on the channel gets a copy of the message.

Multiple producers thus publish their messages to multiple consumers. Example could be a worker instance (or many) that publishes information to many web instances.

For detailed documentation check the classes:

  • FanoutConsumer
  • FanoutProducer

Fanout Consumer

const iamqp              = require('iamqp');
const amqpUri            = 'amqp://localhost';
const fanoutChannel      = 'fanout-channel-a';
let fanoutConsumer       = new iamqp.FanoutConsumer(amqpUri, fanoutChannel);
 
// This needs to be done or the errors will bubble up.
fanoutConsumer.getEventer().on('error', (err) => {
    console.log('fanout consumer error:' + err.message);
});
 
// For when the connection is established.
fanoutConsumer.getEventer().on('connected', () => {
    console.log('fanout consumer connected');
});
 
// Get messages as fanout by the producer.
fanoutConsumer.getEventer().on('message', (message) => {
    console.log('fanout consumer message:' + message);
});
 
// Open the connection - this takes some time and is not sync
fanoutConsumer.openConnection();
 
// ...
 
// Close the connection
if (fanoutConsumer.closeConnection()) {
    console.log('fanout consumer connection closing ...');
}

Fanout Producer

const iamqp              = require('iamqp');
const amqpUri            = 'amqp://localhost';
const fanoutChannel      = 'fanout-channel-a';
let fanoutProducer       = new iamqp.FanoutProducer(amqpUri, fanoutChannel);
 
// This needs to be done or the errors will bubble up.
fanoutProducer.getEventer().on('error', (err) => {
    console.log('fanout producer error:' + err.message);
});
 
// For when the connection is established.
fanoutProducer.getEventer().on('connected', () => {
    console.log('fanout producer connected');
});
 
// Open the connection - this takes some time and is not sync
fanoutProducer.openConnection();
 
// ...
 
// Fanout a single message.
let isFanout = fanoutProducer.fanout({
    'a': 3
});
 
// ...
 
// Close the connection
if (fanoutProducer.closeConnection()) {
    console.log('fanout producer connection closing ...');
}

RPC

RemoteProcedureCall is when communication of type "ask -> reply" happens. The RPC client calls the RPC server which (after some time) replies to the message. Each call (on each client) produces an unique ID string called correlation ID that the user also gets on a reply as to be able to distinguish what reply is for which question.

Multiple RPC client can thus perform multiple calls to the server. An example of this usage is if the server queries a DB on behalf of the clients. One must take care as to not overload the server instance.

An example; create an RPC client:

const iamqp              = require('iamqp');
const amqpUri            = 'amqp://localhost';
const rpcChannel         = 'rpc-channel-a';
let rpcClient            = new iamqp.RPCClient(amqpUri, rpcChannel);

... setup listeners:

rpcClient.getEventer().on('error', (err) => {
    console.log('RPC client error:' + err.message);
});
 
rpcClient.getEventer().on('connected', () => {
    console.log('RPC client connected');
});
 
rpcClient.getEventer().on('reply', (reply, correlationId) => {
    console.log('RPC client got a reply (' + correlationId + '): ' + reply);
});
 
rpcClient.openConnection();

Each reply gives the correlation ID - the same one that we get when making a call. Each call produces it's own unique correlation ID.

let correlationIdA = rpcClient.callRemote({'a': 5});

On the RPC server side we then have:

const iamqp              = require('iamqp');
const amqpUri            = 'amqp://localhost';
const rpcChannel         = 'rpc-channel-a';
let rpcServer            = new iamqp.RPCServer(amqpUri, rpcChannel);

... setup listeners:

rpcServer.getEventer().on('error', (err) => {
    console.log('RPC server error:' + err.message);
});
 
rpcServer.getEventer().on('connected', () => {
    console.log('RPC server connected');
});
 
rpcServer.getEventer().on('call', (callData, pCorrelationId) => {
    console.log('RPC server call (' + pCorrelationId + '): ' + callData);
    callData.a += 1;
    let isServerResponding = rpcServer.respond(callData, pCorrelationId);
});
 
rpcServer.openConnection();

For detailed documentation check the classes:

  • RPCClient
  • RPCServer

Logging

The module has the capability to log to:

  • the terminal
  • the Linux system log

Terminal

The terminal logging is to be enabled with a environmental variable:

DBTRC_TERM_F=OK

Syslog

The Linux system log logging is to be enabled with a environmental variable:

DBTRC_SYSLOG_F=OK

To enable this, open /etc/rsyslog.conf and enable UDP on port 514. This is usually just commented and needs uncommenting:

# provides UDP syslog reception
module(load="imudp")
input(type="imudp" port="514")

... and restart the service with ./etc/init.d/rsyslog restart. The logs should appear in /var/log/syslog. This was tested on Ubuntu 16.04.

Documentation

JSDoc generated documentation is in artifacts/docs/index.html.

Source

The repository is located on bitbucket.

Test

Snyk test badge included; report available on snyk

Integration tests:

yarn run test
DBTRC_TERM_F=OK yarn run test

Lint testing via:

yarn run test:lint

License

The MIT License (MIT) (look for the LICENSE file in the root of the module).

Package Sidebar

Install

npm i iamqp

Weekly Downloads

2

Version

2.2.1

License

MIT

Unpacked Size

1.86 MB

Total Files

79

Last publish

Collaborators

  • larsonvonh