amqplib-binary-retry
TypeScript icon, indicating that this package has built-in type declarations

1.0.2 • Public • Published

amqplib-binary-retry

Retry failed attempts to consume a message, with increasing delays between each attempt. For the delay use binary delay message arhitecture.

The problem with a simple wait exchange and Queue Pattern.

The common pattern for the delay is that you set up a wait exchange and queue where you send messages for retries. You set a message TTL and set the dead letter exchange of your wait queue to your principal exchange. So if you set the TTL on your message to 5 minutes, then the message sits in the wait queue for 5 minutes then gets dead lettered back to your applications exchange to be consumed again.

The problem is because messages are only removed from the head of the queue. You cannot use a single wait queue for any back-off strategy. If you have a message with a TTL of 10 minutes at the head of the queue and a message with a TTL of 1 minute behind it, the second message will wait for ten minutes.

How this lib solves that

Create multiple wait queues and set a message TTL on the queues themselves. Bind those queues to the cooresponding exchanges on the each layer.

By default we create 16 delay layers. Each layer delays the message for the 2^<level> seconds. Then we set custom headers on the each retryed message to know in which layers the messages should be delayed. Last delay layer forwards the message to the 'ready' queue. Then that message is republished with original headers to the original consumer queue.

Instalation

npm install amqplib-binary-retry

Example

(async () => {
    const amqplib = require('amqplib');
    const { retryer } = require('amqplib-binary-retry');
 
    const CONSUMER_QUEUE = "example-service";
 
    const connection = await amqplib.connect('amqp://localhost:5672');
    const channel = await connection.createChannel();
 
    // Create the client queue
    await channel.assertQueue(CONSUMER_QUEUE, { durable: false, autoDelete: true });
 
    // Define a message handler
    const handler = function (msg) {
        // no need to 'ack' or 'nack' messages
        // messages that generate an exception (or a rejected Promise) will be retried
        console.log(msg);
    };
 
    // Define the optional delayFunction
    // const delayFunction = function (attempt) {
    //     // After three retries fail the message
    //     if (attempt > 3) {
    //         return -1;
    //     }
    //
    //     // Delay for 5 seconds
    //     return 5000;
    // };
 
    // Use retryer as a consumer
    // For more configuration options check the 'Options' section
    channel.consume(CONSUMER_QUEUE, retryer({
        channel,
        consumerQueue: CONSUMER_QUEUE,
        handler,
        // delayFunction,
    }));
})();

Options

Parameter Name Required Default Description
channel X Amqplib channel
consumerQueue X Name of the queue that holds the amqp messages that need to be processed.
handler X Callback to be invoked with each message.
failureQueue <consumerQueue>.failure Name of the queue that holds the amqp messages that could not be processed in spite of the retries.
delayFunction Math.pow(2, <# of attempts>) Delay in milliseconds between retries. The function accepts the number of retry attempts.
exchangePrefixName delay_exchange_<level> Name of the exchanges used on the delay levels.
queuePrefixName delay_queue_<level> Name of the queues used on the delay levels.
delayLevels 16 Number of delay layers used. Max avaliable seconds for the delay is 2^<levels>

Testing

Setup the required services.

docker-compose up

Transpile the typescript source.

npm run build

Run tests.

npm run test

License

MIT

Package Sidebar

Install

npm i amqplib-binary-retry

Weekly Downloads

111

Version

1.0.2

License

MIT

Unpacked Size

43.6 kB

Total Files

26

Last publish

Collaborators

  • cernic