oomq

3.0.0 • Public • Published

Object-Oriented AMQP Client

oomq to mimics the API of EventEmitter and hides the fact that the events are handled through AMQP server. So instead of doing:

var q = 'tasks';
 
var open = require('amqplib').connect('amqp://localhost');
 
// Publisher
open.then((conn) => {
  return conn.createChannel();
}).then((ch) => {
  return ch.assertQueue(q).then((ok) => {
    const payload = {
      some: 'sent value',
    };
    return ch.sendToQueue(q, Buffer.from(JSON.stringify(payload)));
  });
}).catch(console.warn);
 
// Consumer
open.then((conn) => {
  return conn.createChannel();
}).then(function(ch) {
  return ch.assertQueue(q).then(function(ok) {
    return ch.consume(q, (msg) => {
      if (msg !== null) {
        const payload = JSON.parse(msg.content);
        console.log(payload.some);
        ch.ack(msg);
      }
    });
  });
}).catch(console.warn);

one would write:

const q = 'tasks';
 
const {
  EventEmitter,
  events,
} = require('../');
const emitter = new EventEmitter('amqp://localhost');
 
// Publisher
const event = events.JSONEvent.create({
  some: 'sent value',
});
 
emitter.emit(event)
  .catch(console.warn);
 
// Consumer
emitter.on(events.JSONEvent, q, (content) => {
  console.log('got value:', content.some);
  process.exit(0);
}).catch(console.warn);

It aims to avoid common pitfalls of building an event-driven architecture through:

  • Enabling defining common event type and constructors(the static create function) for them;
  • Forces clients to document which projects can emit which events;
  • Simplifies routing logic by just requiring the client to specify the event type and queue name;
  • Enables defining validation and serialization for each event type.

It doesn't aim to:

  • define event types out of the box, but provides mechanisms for that;
  • enforce serialization, but allows implementing any as an meta Event type to extend.

Install

npm install oomq --save

API

EventEmitter

Interfaces
interface OnOptions {
  /**
   * AMQP prefetch parameter.
   * @type {number} 
   */
  prefetch?: number;
  /**
   * AMQP autoDelete queue parameter.
   * @type {boolean} 
   */
  autoDelete?: boolean;
  /**
   * Use passed queue name exactly instead
   * of using it in conjunction with the event type.
   * @type {boolean} 
   */
  forceExactQueueName?: boolean;
  /**
   * Don't bind exchange specified in the event type.
   * Option is useful together with `forceExactQueueName` if binding
   * exchange is not required.
   * @type {boolean} 
   */
  noBind?: boolean;
};
 
interface EmitOptions {
  /**
   * Use sendToQueue instead of publishing to an exchange.
   * @type {[type]}
   */
  queue?: string;
}
 
type HandlerFunction = (content?: any, message?) => Promise<any>;
new EventEmitter(url: string, options: OnOptions = {})
  • url: AMQP server connection string.
  • options: default options for on calls.
EventEmitter#on(Event: T, queue: string, handler: HandlerFunction, options: OnOptions = {}): Promise
  • Event: Event type to listen to.
  • queue: Queue identifier. This with stringified name of event type makes the actual queue name in the amqp server.
  • handler: Handler function which is expected to return a promise. Errors from handler are thrown. If events are expected to be retried on failure, catch them inside the handler and resolve with emitter.REQUEUE_EVENT symbol instead.
EventEmitter#emit(event: BaseEvent, options: EmitOptions = {}): Promise
  • event: Instance of event to be emitted.

Usage

There are several examples of usage in examples folder. In simplest form:

const {
  EventEmitter,
} = require('oomq');
const emitter = new EventEmitter('amqp://localhost');
 
// emitter.emit( ... );
 
emitter.close(); // Will close the channel and connection used by the emitter

After closing the connection with emitter.close() the client is no longer usable. Publishing events will throw and consumers will be stopped.

Common Setup

oomq doesn't define any event types in your system, but provides mechanisms for that. It does not enforce the serialization(protobuf, JSON) nor the structure. That makes getting off ground with oomq a bit more work. Generally you'd either:

  1. define all event types in a project that does producing and consuming of those events or
  2. in a separate project if producers and consumers are implemented in different projects so both ends could depend on the package that implements the same event types.

Every all the events have to inherit from Event

Example of defining a event type that handles serialization: JSONEVENT.ts. Example of defining a event type that handles validation and event structure: Product event.

Package Sidebar

Install

npm i oomq

Weekly Downloads

0

Version

3.0.0

License

MIT

Unpacked Size

306 kB

Total Files

15

Last publish

Collaborators

  • r56