rabbitmq-queue-stream
Tests
$ make test
Usage
$ npm i rabbitmq-queue-stream
var RabbitMQStream = ;var stream = ; var options = connection: url: "amqp://user:password@rabbitmq.com" nodeAmqp: reconnect: false // defaults to true, see https://github.com/postwait/node-amqp#connection-options-and-url (search for reconnect) queue: name: "myQueue" subscribe: /* Any option accepted by https://github.com/postwait/node-amqp#queuesubscribeoptions-listener */ connection: /* Any option accepted by https://github.com/postwait/node-amqp#connectionqueuename-options-opencallback */ ; /* * Initialize two consumer channels to our queue.*/RabbitMQStream;
There also a helper method that helps with integration test
var RabbitMQStream = ;var Transform = Transform;var myTransformStream = objectMode: true;myTransformStream { console; this; ;};var streamifiedQueues = RabbitMQStream;/* * streamifiedQueues.channels will contain one channel with a * streamable .source and .sink. */ var channel = streamifiedQueueschannels; channelsource ; //channel .sink emits 'requeued', 'rejected', and 'acknowledged' events channelsink;
Emitted Events
AMQPStreams
- ready - AMQP client connected or reconnected
- error - Emitted if connection to broker dies
RabbitMQStream;
.source
- parseError - Emitted when a message specifies contentType: application/json but is malformed JSON.
myQueueStreamsource;
.sink
- acknowledged - Emitted everytime a message is acknowledged
- rejected - Emitted when a message is rejected
- requeued - Emitted when a message is requeued
var totalAcked = 0;myQueueStreamsource;
- formatError - Sink received a job that does not have the necessary information to be deleted from the queue. Most likely emitted when objects not originating from .source are written to sink.
myQueueStreamsink;
TODO
- Add a jsonMode to make automatic parsing of source data optional