graphql-amqp-subscriptions
This package implements the PubSubEngine Interface from the graphql-subscriptions package. It allows you to connect your subscriptions manager to a AMQP PubSub mechanism.
This package is influenced by graphql-redis-subscriptions and graphql-rabbitmq-subscriptions.
Installation
npm
npm i @salo/graphql-amqp-subscriptions
yarn
yarn add @salo/graphql-amqp-subscriptions
Usage
Basic
import { AMQPPubSub } from '@salo/graphql-amqp-subscriptions';
import amqp from 'amqplib';
amqp.connect('amqp://guest:guest@localhost:5672?heartbeat=30')
.then(conn => {
const pubsub = new AMQPPubSub({
connection: conn
/* exchange: 'graphql_subscriptions' */
});
// Use the pubsub instance from here on
})
.catch(err => {
console.error(err);
});
Example
import { AMQPPubSub } from '@salo/graphql-amqp-subscriptions';
import amqp from 'amqplib';
const queue = [];
// eslint-disable-next-line import/no-mutable-exports
export let pubsub = {
publish: (key, content) => {
queue.push({ key, content });
console.log('🐰 RabbitMQ connection not ready yet');
},
asyncIterator: () => console.log('🐰 RabbitMQ connection not ready yet')
};
amqp
.connect(`amqp://${ user }:${ password }@${ host }:${ port }?heartbeat=30`)
.then((connection) => {
pubsub = new AMQPPubSub({
connection,
exchange: 'main',
exchangeType: 'direct',
queueName: 'queuey mcqueueface',
queueOptions: {
durable: true,
passive: false,
exclusive: false,
autoDelete: false
}
});
console.log('🐰 RabbitMQ ready to hop');
// Go through items queued while waiting
queue.forEach(({ key, content }) => {
pubsub.publish(key, content);
});
if (queue.length) {
console.log(`🐰 Processed ${ queue.length } queued messages`);
queue.length = 0;
}
})
.catch((error) => {
console.error('🐰 Error connecting to RabbitMQ:', error);
});
Benefits
- Reusing existing amqplib Connection.
- Reusing channels (one for subscriptions, one for publishing).
- Performance/Ressource-usage benefits on AMQP (RabbitMQ) because of the aforementioned reasons more info.
- Using Topic Exchange (e.g. you publish to
agreements.eu.berlin.headstore
and subscribe toagreements.eu.#
) more info.
What the fork?
On top of the original repo this fork adds:
- More lenient engine definition in package.json (able to be used in Node >= 10).
- Additional configuration that you can pass to a queue and exchange.
- Upstream bugfixes.
Debug
This package uses Debug.
To show the logs run your app with the environment variable DEBUG=AMQPPubSub