nsi-queues

Node.js Services Integration - Queues helpers

nsi-queues

Node.js Services Integration - Queues helpers

This project will provide an uniform and easy way to send and receive messages in node.js using queues. It should be compatible with all possible major protocols and brokers.

The interface is based on asynchronous functions with callback passing for responses, acknowledgement, etc. This allows a smooth integration with many node.js libraries and particularily with async.

npm install nsi-queues

Initialize a connection then forward messages from a queue to another. Acknowledge reception of a message only when it is successfully transmitted.

var nsiQueues = require('nsi-queues');
 
nsiQueues('amqp', {}, function(errqueuesHelper){
    queuesHelper.from('my-receiving-queue', function(bodyheadersackCallback) {
        queuesHelper.to('my-destination-queue', body, headers, ackCallback);
    })
});

The body of the message can be a string or an object. If it is an object the 'content-type' header will be set to 'application/json'.

Initialize a queues helper for amqp message passing. The second parameter is the options object for a node-amqp connection.

var nsiQueues = require('nsi-queues');
 
nsiQueues('amqp', {}, function(errqueuesHelper){
    // queues helper is ready ! 
    // active amqp connection can be accessed here: 
    console.log(queuesHelper.connection)
});

Initialize a queues helper with a node-amqp connection.

var amqp = require('amqp');
var nsiQueues = require('nsi-queues');
 
var amqpConnection = amqp.createConnection();
amqpConnection.on('ready', function(){
    nsiQueues('amqp', amqpConnectionProducer, function(errqueuesHelper){
        // connection is active and queues helper is ready ! 
    });
});

Initialize a queues helper for stomp message passing. The second parameter is the options object for a stomp-js client.

var nsiQueues = require('nsi-queues');
 
nsiQueues('stomp', {}, function(errqueuesHelper){
    // queues helper is ready ! 
    // active stomp client can be accessed here: 
    console.log(queuesHelper.client)
});

Initialize a queues helper with a stomp-js client.

var stomp = require('stomp');
var nsiQueues = require('nsi-queues');
 
var stompClient = new stomp.Stomp({
    port: 61613,
    host: 'localhost',
    debug: false,
    login: 'guest',
    passcode: 'guest',
});
stompClient.connect();
stompClient.on('connected', function(){
    nsiQueues('stomp', stompClient, function(errqueuesHelper){
        // connection is active and queues helper is ready ! 
    });
});

Send messages to a queue, without expecting a response, using to().

  • The headers parameter can be omitted.
  • The callback is executed when the broker acknowledges reception of the message.
  • The message and headers parameters of the callback are the same as the original parameters as no response is expected when using to().
queuesHelper.to('my-queue', 'my message', {header1: 'header1'}, function(errbodyheaders) {
    if (err) console.log('Message sending failed.');
    else console.log('Message was sent and acknowledged !');
});

Send messages to a queue, and expect a response, using inOut().

  • The headers parameter can be omitted.
  • The callback is executed when the response is received or if an error occured when sending the message.
queuesHelper.inOut('my-queue', 'my message', {header1: 'header1'}, function(errbodyheaders) {
    if (err) console.log('Message sending failed.');
    else console.log('Response received: ' + body);
});

Expect messages from a queue and acknowledge reception to the broker using from().

  • The acknowledgement callback takes an optional error parameter.
  • from() takes a second optional parameter: a callback function that will be executed once subscription to the broker is effective.
queuesHelper.from('my-queue', function(errbodyheadersackCallback) {
    // do something with message 
    ackCallback(); // acknowlege reception to the broker 
});

Expect messages from a queue and send responses using from().

  • The responseCallback takes an optional fourth parameter: a callback that is executed when the broker acknowledges reception of the response message.
queuesHelper.from('my-queue', function(bodyheadersresponseCallback) {
    // do something with message and prepare response 
    responseCallback(null, responseBody, responseHeaders);
});

Use async for advanced control flow.

This lame example sends a message on 3 different queues and waits for all 3 acknowledgements to run its callback.

function myRoute(bodyheaderscallback) {
    async.parallel([
        function(callback){
            queuesHelper.to('queue1', body, headers, callback);
        },
        function(callback){
            queuesHelper.to('queue2', body, headers, callback);
        },
        function(callback){
            queuesHelper.to('queue3', body, headers, callback);
        }
    ], function (err) {
       callback(err, body, headers);
    }); 
}

AMQP helper tested with:

STOMP helper tested with:

The test suite covers both AMQP and STOMP helpers. Therefore it requires a broker compatible with AMQP<1 and STOMP. For example install and run rabbitmq with stomp plugin.

mocha -t 20000 -R spec

The project is integrated with travis-ci which only supports a bare installation of rabbitmq. So the default test command only runs the tests for AMQP helper.

npm test