http-rabbitmq

1.0.12 • Public • Published

Hapi Rabbitmq Library

An awesome RabbitMQ Library for the Hapi ecosystem to work seamlessly with rabbit queues as if they were regular HTTP calls.

Build Status

Lead Maintainer: webnator

Introduction

The idea behind this library is to provide a simple and seamless interface to integrate a HapiJS API project with RabbitMQ based queues.

You won't need to change much in your current API implementation to make it work as an event listener from the Queue, and you can also have dual HTTP/Queue endpoints.

This library also works very well with Maestro A very cool workflow manager that you should also check out now that you're here.

Getting started

Install with

npm i http-rabbitmq

Publish only initialization

For the simplest implementation you can call the init method with your RabbitMQ connection parameters and this will initialize the connection and return the instance.

const RabbitQueue  = require('http-rabbitmq');
const queueConfig = {
    host: 'localhost',
    port: '5672',
    user: 'rabb1tAdm1n',
    pass: '123',
    vhost: '/',
    exchange: 'rabbit_exchange',
    errorQueue: 'error',
    errorTopic: 'error.test',
    maxRetries: 3,
    reconnectionTime: 2000,
    delayed: false
}

const myConnection = await RabbitQueue.init({ queueConfig });

Initialization registering handlers

In order to consume the queue in real time, we can set handlers that listen to an specific topic and perform actions with them.

To configure these listeners we need to create a routes file first:

//queueRoutes-1.js

'use strict';

const testController = require('./controllers/testController');

module.exports = function(server) {
  server.route({
    topic: 'mytopic.test',
    handler: testController.handleTopic
  });
};

We can have as many routes file as we want, splitting for module or functionality. After created, we just need to group all of them into a single file:

// queueRouter.js
'use strict';

module.exports = function(queueRouter) {
  require('./queueRoutes-1')(queueRouter);
  require('./queueRoutes-2')(queueRouter);
  require('./queueRoutes-3')(queueRouter);
};

Then, instead of initializing as we showed before. We need to pass the routes file as a parameter to the init function.

const queueRouter = require('./queueRouter');
const myConnection = await RabbitQueue.init({ queueConfig, routes: queueRouter });

That's it!


After initialization, we can start working with the methods in the queue instance.

Consuming the queue

If we take a look at the example above, we can see that we set up a handler for the topic mytopic.test and we said that the handler would be testController.handleTopic

So, inside of our testController file, we need to have a function handleTopic that should have more or less the following structure

function handleTopic(request, reply) {
    return reply(request.payload).code(200);
}

Ring a bell?

Queue responses work in the following way:

  • 2XX responses: Ack the message in the queue
  • 4XX responses: Ack the message in the queue, but also publishes the message to the error queue and topic we defined when we initialized the library
  • 5XX responses: Ack the message in the queue, but re-queues the message again in order to be consumed. It only retries X times as defined in the maxRetries properties of the configuration
  • Any other: Ack the message in the queue, but also publishes the message to the error queue and topic we defined when we initialized the library

Publishing to the queue

In order to publish into the queue we can use 2 functions:

  • publish({key, msg})
  • publishHTTP(key, { payload, headers, query, params, traceId })
  • publishDelayedHTTP(key, { payload, headers, query, params, traceId }, delay)
  • queueManager()

publishHTTP is an implementation of the publish method, that wraps and stringifies the received parameters, and afterwards calls the publish function.

.publishHTTP(String key, { Object payload, Object headers, Object query, Object params, String traceId })

Publishes an stringified HTTP request into the topic.

Parameters:

  • key: The routing ke (topic) where the message will be published
  • payload: The HTTP payload in JSON format
  • headers: The HTTP headers in JSON format
  • query: The HTTP query in JSON format
  • params: The HTTP params in JSON format

Returns:

  • A promise

.publishDelayedHTTP(String key, { Object payload, Object headers, Object query, Object params, String traceId }, Int delay)

Same as below, but RabbitMQ queues that implement the delayed plugin.

If your rabbit doesn't have this plugin installed, please don't use this function. It will fail!

Parameters:

  • key: The routing ke (topic) where the message will be published
  • payload: The HTTP payload in JSON format
  • headers: The HTTP headers in JSON format
  • query: The HTTP query in JSON format
  • params: The HTTP params in JSON format
  • delay: The delay period in milliseconds

Returns:

  • A promise

.publish({String key, String message})

Publishes an string into the topic.

Parameters:

  • key: The routing key (topic) where the message will be published
  • message: The message that we want to publish into the queue

Returns:

  • A promise

.queueManager()

Returns the instance of the queue manager to work with its inner functions

Configuration

.init({ Object queueConfig[, Object extraOptions] [, Function routes]})

The library initialization method, it accepts the following parameters:

  • queueConfig: The queue configuration, accepts the following paramenters:
    • host: The host to connect to required,
    • port: The port to connect to required,
    • user: The user to authenticate in the queue required,
    • pass: The password to authenticate in the queue required,
    • vhost: The rabbitMQ vhost (Usually '/') required,
    • exchange: The rabbitMQ exchange required,
    • errorQueue: The queue where to output the errors for unprocessed incoming calls (Usually 'test-error') required,
    • errorTopic: The topic (routing key) where to output the error (Usually error.MICROSERVICE_NAME) required,
    • prefetch: Defaults to 1,
    • delaySeconds: Defaults to 3000,
    • reconnectionTime: Defaults to 2000,
    • maxRetries: Defaults to 5,
    • timeBetweenRetries: _Defaults to 3000
  • extraOptions: Object that Allows to set some extra options for the queue configuration, all parameters are optional
    • exchangeOptions: The settings to assert the rabbitMQ exchange More Options
      • durable: Defaults to true
    • queueOptions: The settings to assert the rabbitMQ queue More Options
      • noAck: Defaults to false
    • publisherOptions: The settings to publish into the rabbitMQ queue More Options
      • persistent: Defaults to true
    • retryOptions:
      • retries: The maximum times that the retry policy will try to send a message to the queue if there's an error Defaults to 10
      • time: The interval that the retry policy will use when trying to re-send a message to the queue if there's an error. In milliseconds Defaults to 1000
    • routes: The router file as described above.
    • delayed: If delayed plugin is enabled in RabbitMQ

Package Sidebar

Install

npm i http-rabbitmq

Weekly Downloads

5

Version

1.0.12

License

MIT

Unpacked Size

34.6 kB

Total Files

15

Last publish

Collaborators

  • webnator