@sinet/lapin

4.7.10 • Public • Published

Lapin wrapper for RabbitMQ

Currently this project is using Rabbus and Wascally. This project is aiming to support several producer / consumer patterns. The following are is a list of the planned patterns, and the checked ones are currently implemented:

  • [X] Send / Receive
  • [X] Publish / Subscribe
  • [X] Request / Response

The JSend specification is required to determine if an error has occurred in a response.

Installation and Usage

As lapin uses wascally you need to install it along with lapin:

npm install wascally
npm install lapin

Require lapin and wascally:

var rabbit = require( 'wascally' );
var lapin  = require( 'lapin' )( rabbit );

// or

var options = {
	'logger' : logger,
	'rabbit' : wascally
};

var lapin = require( 'lapin' )( options )

The following are simple usage examples:

Send / Receive

Sender Options

exchange, messageType, routingKey, autoDelete

Please refer to Rabbus options' info

Sender

options = 'v1.logs.log';
// or
options = {
    'messageType' : 'v1.logs.log',
    'exchange'    : 'logs'
}
lapin.send( options , message, function ( error, response ) {

	// handling the response is optional
	if ( !error ) {
		console.log( response );
	}

} );

Or use the promise style send

lapin.sendPromise( 'v1.logs.log', message )
	.then( function ( response ) {
		// Return for chain then and handle response
		console.log( response );

	} )
	.catch( function ( error ) {
		// Handler error
	} );

Receiver Options

queue, exchange, messageType, autoDelete, limit, noBatch

Receiver

options = 'v1.logs.log';
// or
options = {
    'messageType' : 'v1.logs.log',
    'exchange'    : logs
}

lapin.receive( options, function ( message, done ) {

	someDatabaseQuery( message, function ( err, body ) {

		if ( err ) {
			throw err;
		}

		done();

	} );

} );

Publish / Subscribe

Publisher Options

exchange, messageType, autoDelete

Publisher

options = 'v1.users.login';
// or
options = {
    'messageType' : 'v1.users.login',
    'exchange'    : 'users' // recommended not to prefix or suffix `exchange` lapin will do it for us
}
lapin.publish( options, message, function ( error, response ) {

		// handling the response is optional
	if ( !error ) {
		console.log( response );
	}

} );

Subscriber Options

queue, exchange, messageType, autoDelete, limit, noBatch

Subscriber

options = 'v1.users.login';
// or
options = {
    'messageType' : 'v1.users.login',
    'queue'       : 'users' // recommended not to put `queue` suffix or prefix, lapin will do it for you
    'exchange'    : 'users'
}
lapin.subscribe( options, function ( message, done ) {

	someDatabaseQuery( message, function ( err, body ) {

		if ( err ) {
			throw err;
		}

		done();

	} );

} );

Request / Response

Request Options

exchange, messageType, autoDelete, routingKey, forceAck

Requester

options = 'v1.users.findAll'
// or
options = {
    'messageType' : 'v1.users.findAll',
    'exchange'    : 'users'
}
lapin.request( options, message, function ( error, data ) {

	if ( error ) {
		return reply( error ).code( 500 );
	}

	return reply( data.data );
} );

Or use the promise style request

lapin.requestPromise( 'v1.users.findAll', message )
	.then( function ( data ) {
		// Handle data
		return reply( data.data );

	} )
	.catch( function ( error ) {
		// Handle error
	} );

Responder Options

exchange, queue, autoDelete, routingKey, limit, noBatch

Responder

options = 'v1.users.findAll';
// or
options = {
    'messageType' : 'v1.users.findAll',
    'limit'       : 1
}
lapin.respond( options, function ( message, respond ) {

	if ( message.invalid ) {
			return respond.fail( 'Invalid data' );
	}

	someDatabaseQuery().then( function ( result ) {

		// JSend success with data
		respond.success( result );

	} ).catch( function handleError ( error ) {

		// JSend error
		respond.error( 'Failed query', error, 500 );
		// or -- code is optional
		respond.error( 'Failed query', error );
		// or -- data is optional
		respond.error( 'Failed query' );

	} );

} );

Please refer to JSEND for standard reply attributes

Response with Validation using Joi

// Responder
lapin.respond( {
    'messageType' : 'v1.users.findAll',
    'validate'    : Joi.object().keys( {
  		'username'     : Joi.string().alphanum().min( 3 ).max( 30 ).required(),
  		'password'     : Joi.string().regex( /[a-zA-Z0-9]{3,30}/ ),
  		'access_token' : [ Joi.string(), Joi.number() ],
  		'birthyear'    : Joi.number().integer().min( 1900 ).max( 2013 ),
  		'email'        : Joi.string().email()
  	} ).with( 'username', 'birthyear' ).without( 'password', 'access_token' ),

    'validateOptions' : {} // <optional> see https://github.com/hapijs/joi for validation options

} , function ( message, respond ) {
    // consumer process
} );

If validation fails, lapin will bypass respond callback and response a fail status as seen below:

    respond( {
        'status' : 'fail',
        'data'   : <Validation error message>
    } );

Please refer to Joi Validation for validation examples, structure and validation options

To Consider

Make sure to use the same messageType, routingKey and exchange options. Whenever a String option is supplied instead of the Object option, lapin will automatically create the ff:

  • exchange and messageType ( Producer )
  • exchange, messageType and queue ( Consumer )

Contributing

All pull requests must follow coding conventions and standards.

Additional Information

RPC over RabbitMQ

In general, doing RPC over RabbitMQ is easy. A client sends a request message and a server replies with a response message. In order to receive a response the client needs to send a 'callback' queue address with the request.

RabbitMQ RPC

  • When the client starts up, it creates an exclusive callback queue.
  • For an RPC request, the Client sends a message with two required properties: reply_to, which is set to the callback queue and correlation_id, which is set to a unique value for every request.
  • The request is sent to an rpc_queue queue.
  • The RPC worker (aka: server) is waiting for requests on that queue. When a message appears, it does the job and sends a message with the result back to the Client, using the queue from the reply_to field.
  • The client waits for data on the callback queue. When a message appears, it checks the correlation_id property. If it matches the value from the request it returns the response to the application.

Standards/Conventions

  • messageType: <version>.<resource>.<action>

  • exchange: <pattern>.<resource>-exchange

  • queue: <pattern>.-queue

Where

Patterns:

  • req-res

  • pub-sub

  • send-rec

Version:

  • v1

  • v2

  • and so on.

Readme

Keywords

none

Package Sidebar

Install

npm i @sinet/lapin

Weekly Downloads

0

Version

4.7.10

License

MIT

Last publish

Collaborators

  • nakautot
  • faith28
  • jefectba
  • darbiol