@flamescape/simplemq

2.1.6 • Public • Published

Description

A library built on the back of amqplib, adding some powerful opinionated features.

Features:

  • Reliable (self-repairing) Connections
  • RPC Client/Server implementation
  • Pub/Sub implementation
  • Automatic JSON (de-)serialisation

Todo:

  • Write API docs

Usage

Publish / Subscribe

const simplemq = require('@flamescape/simplemq');
const url = 'amqp://user:pass@127.0.0.1/';

async function main() {
    const mq = simplemq({url});

    const consumer = await mq.consume('testQueue', {prefetch:1}, msg => {
        console.log(msg.json); // {hello: 'world'}
        msg.ack();
    });

    await mq.sendToQueue('testQueue', {hello:'world'});
}
main();

RPC Server / Client

class Adder {
    sum(a, b) { return a + b; }
}
const simplemq = require('@flamescape/simplemq');
const url = 'amqp://user:pass@127.0.0.1/';

async function main() {
    const mq = simplemq({url});

    const server = mq.rpcServer('adder', new Adder());
    const client = mq.rpcClient();

    // either:
    const result1 = await client.call('adder', 'sum', [1, 2]);
    console.log(result1); // 3

    // - or -
    const rpcAdder = client.bind('adder');
    const result2 = await rpcAdder.sum(4, 5);
    console.log(result2); // 9

    // cleanup...
    client.close();
    server.close();
}
main();

Recovery

simplemq will automatically reconnect to the amqp server after connection loss. Unfortunately, some things can go wrong when this happens.

For example, create and consume a queue with an auto-expiration of 30 seconds. A 30 second network interruption would destroy your queue. After the connection is restored, simplemq will attempt to restore the consumer on the queue, but this will throw an error and kill the channel. You can listen for these errors on the consumer's error event.

const mq = simplemq({url});

await mq.assertQueue('testQueue', {expires: 1000})
const consumer = mq.consume('testQueue', msg => {
    msg.ack();
});
consumer.on('error', err => {
    console.error(err);
});

If this is likely to be a common problem in your project, simplemq provides an easy way to automatically assert, bind & consume an anonymous queue in a single call. When your connection is restored after a network interruption, simplemq will recreate this state and hopefully things will keep on moving.

const mq = simplemq({url});

const consumer = mq.consume({
    exchange: 'myExchange'
}, msg => {
    // Magic! a volatile anonymous queue is bound to the myExchange exchange.
    //
    // If the queue expires during a network interruption, all will be recreated
    // when the connection is restored and this callback will continue to work.
    //
    // Keep in mind, messages added to your old auto-expiring queue will be lost
    // - but you would have had that problem anyway.
    msg.ack();
});

Streams

You can create publisher and consumer streams...

mq.createConsumerStream({
    queueName, // name of queue to consume
    assertions, // (optional) ChannelAssertions object
    options, // (optional) consumer options
    signal, // (optional) AbortSignal
    channelName, // (optional) a name to identify the channel this consumer will run on (if not specified, will use the default channel)
    recoveryRetries: 2, // (optional) how many times to re-try recovery upon service failure
    concurrency: 1 // (optional) a.k.a. "prefetch" - the number of unacknowledged messages the consumer will process at once
});

mq.createPublisherStream({
    assertions, // (optional) ChannelAssertions object
    signal, // (optional) AbortSignal
    channelName, // (optional) a name to identify the channel this consumer will run on (if not specified, will use the default channel)
    recoveryRetries: 2, // (optional) how many times to re-try recovery upon service failure
    highWaterMark: 16, // (optional) the number of messages this stream will buffer while waiting for amqplib's `publish()` buffer to drain
    reassertOnReturn: false // (optional) causes assertions to be automatically re-performed if messages are not routed to any queue (also adds `mandatory` flag to every published message)
});

Cleaning up

After you're finished using a consumer, call .cancel() on it to stop consuming and prevent auto-recovery.

You can close down RPC servers & clients with the .close() method. This will stop them from listening on their call & reply queues respectively.

simplemq will automatically discard any unused channels and/or connections after they are no longer in use.

Other considerations

  • It's tempting to set recoveryRetries to a high number (or Infinity), but this could be a mistake. If a queue/exchange assertion conflicts with an existing one, this will count as a soft fail and will retry indefinitely, but will likely never succeed.
  • channelNames segregate activities into different channels. If an activity (such as assertion) throws an error, this usually topples to channel too. If you only use a single channel for all activities, then one single error can topple everything. On the other hand, creating a channel for each activity and having high channel churn is considered bad practice. Use sparingly.
  • Providing assertions to a stream/consumer/publisher will ensure that these are carried out every time a channel or connection fails. Assertions will also occur if a queue is deleted while being consumed.

API

// TODO: write API docs

Readme

Keywords

none

Package Sidebar

Install

npm i @flamescape/simplemq

Weekly Downloads

2

Version

2.1.6

License

ISC

Unpacked Size

48.7 kB

Total Files

10

Last publish

Collaborators

  • flamescape