amqp-connection-manager-rpc

1.2.1 • Public • Published

NPM Package Build Status Coverage Status Greenkeeper badge semantic-release

Dependency Status devDependency Status peerDependency Status

Extend amqp-connection-manager connection management for amqplib to support Remote procedure call (RPC).

amqp-connection-manager-rpc

Features

  • Time to live for RPC requests.
  • Exceptions transmitted from RPC server to RPC client.
  • Simple async function API design

Installation

npm install --save amqplib amqp-connection-manager amqp-connection-manager-rpc

Basics

The basic idea described at rabbitmq. To manage responses from an RPC server, node-cache is used.

Here's the RPC client example:

var amqp = require('amqp-connection-manager-rpc');
 
// Create a new connection manager
var connection = amqp.connect(['amqp://localhost'], {json: true});
 
// Setup a channel for RPC requests.
const ttl = 60; // Time to live for RPC request (seconds). 0 - infinite
var channelWrapper = connection.createRPCClient('RPC-QUEUE-test', ttl);
 
// Send some request to RPC server and receive reply. Exception can occupied!
let req = { a: 1, b: 2}; //request data
try{
    let prc_reply = await channelWrapper.sendRPC(req);
    console.log("RPC reply: ", prc_reply);
} catch (err) {
    console.log("RPC error: ", err);
}
 

Here's the RPC server example:

var amqp = require('amqp-connection-manager-rpc');
 
// Create a new connection manager
var connection = amqp.connect(['amqp://localhost'], {json: true});
 
// Set up a channel for RPC requests.
var channelWrapper = connection.createRPCServer('RPC-QUEUE-test', doRpcJob);
 
//do RPC job
async function doRpcJob(msgJson, msg) {
    if (!msgJson.b) throw new Error('B is not set'); //Exceptions allowed! Will be send to RPC client.
    let reply = {
        a: msgJson.a ? msgJson.a + 1 : null
    }
    return reply;
}
 

See a complete example in the examples folder.

API

See amqp-connection-manager API.

AmqpConnectionManager#createRPCClient(queue_name[, ttl [, setup]])

Create a new RPC client ChannelWrapper.

  • queue_name - Name of queue for RPC request.
  • ttl - time to live for RPC request (seconds). To infinite set to 0. If not defined used 0.
  • setup - async function(channel) for setup queue and exchange. Must return RPC queue. Default: async function (channel) => { return await channel.assertQueue('', { exclusive: true }) };

Returns ChannelWrapper

AmqpConnectionManager#createRPCServer(queue_name, callback[, options] )

Create a new RPC server ChannelWrapper.

  • queue_name - Name of queue for RPC request.
  • callback - A callback function, which returns a Promise. This should return RPC server json reply. Callback function has two argument: json message from RPC client, full message from RPC client.

Options:

  • options.sendErrorStack - if true errors stack will be send to client. Default - false.
  • options.setup - async function(channel) for setup channel, exchange. Must return RPC queue name. Default: async function (channel) => { channel.prefetch(1); await channel.assertQueue(queue_name, { durable: false }); return queue_name; };

Returns ChannelWrapper

ChannelWrapper#sendRPC(msg [,ttl [, exchangeName [, routingKey]]])

Send RPC request to RPC server. Call it on client only.

  • msg - request Object to RPC server.
  • ttl - time to live for RPC request (seconds). To infinite set to 0. If not defined used value from createRPCClient().
  • exchangeName - name of exchange for RPC request.
  • routingKey - routing key for RPC request.

Returns Object with RPC job reply or Exception

Fork it!

Pull requests, issues, and feedback are welcome.

Package Sidebar

Install

npm i amqp-connection-manager-rpc

Weekly Downloads

43

Version

1.2.1

License

MIT

Unpacked Size

36.1 kB

Total Files

19

Last publish

Collaborators

  • abernov