c9ms
TypeScript icon, indicating that this package has built-in type declarations

0.0.1 • Public • Published

c9m

Consumerism aims to provide lightweight abstractions over most used message brokers in order to enabled quick prototyping.

Abstracting base connection handling comes with the loss of control but the ability of swapping out messaging drivers with minimal to no changes.

Compromises have to be made in order to provide a common interface for message consumption/production; but the aim is to keep respective broker terminology intact.

Installing

npm install --save c9m

Usage

Given a naive WorkerBehaviour extension

import { WorkerBehaviour } from 'c9m';

class MyWorker extends WorkerBehaviour {
  public async onMessage(message: Buffer, handlers): Promise<void> {
    logger.info(`MyWorker Received message: ${message}`);

    // processing

    await handlers?.ack();
  }
}

Can then npm install --save mqtt
And then be initialised to work with MQTT via e.g.

const worker = new MyWorker({
  name: 'my-mqtt-consumer',
  protocol: 'mqtt',
  logger,
  host: 'localhost',
  port: 1883,
  username: 'user',
  password: 'pwd',
  topic: 'my-topic',
});

await worker.initialise();

Or npm install --save kafkajs
And then be initialised to work with Kafka via e.g.

const worker = new MyWorker({
  name: 'my-kafka-consumer',
  protocol: 'kafka',
  logger,
  host: 'localhost',
  port: 9093,
  username: 'user_kafka',
  password: 'secret',
  topic: 'my-topic',
});

await worker.initialise();

Or npm install --save amqplib
And then be initialised to work with AMQP via e.g.

const worker = new MyWorker({
  name: 'my-rabbit-consumer',
  protocol: 'amqp',
  logger,
  host: 'localhost',
  port: 5672,
  username: 'user',
  password: 'password',
  topic: 'my-queue',
});

await worker.initialise();

Note: the handling logic of the message never actually changed, only the initialisation config.

A producer can be initialised in the same way

import { Producer } from 'c9m';

MQTT e.g.

const producer = new Producer({
  name: 'my-mqtt-producer',
  protocol: 'mqtt',
  logger,
  host: 'localhost',
  port: 1883,
  username: 'user',
  password: 'pwd',
  topic: 'my-topic',
});

await producer.initialise();

await producer.publish('my-topic', Buffer.from('Hello, World 1010102!'));

Kafka e.g.

const producer = new Producer({
  protocol: 'kafka',
  name: 'my-kafka-producer',
  logger,
  host: 'localhost',
  port: 9093,
  username: 'user_kafka',
  password: 'secret',
  topic: 'my-topic',
});

await producer.initialise();

await producer.publish('my-topic', Buffer.from('Hello, World 1010102!'));

AMQP e.g.

const producer = new Producer({
  name: 'my-amqp-producer',
  protocol: 'amqp',
  logger,
  host: 'localhost',
  port: 5672,
  username: 'user',
  password: 'password',
  topic: 'my-queue',
});

await producer.initialise();

await producer.publish('my-queue', Buffer.from('Hello, World 1010102!'));

TODOs

A lot needs to be going on here :)

License

This library is licensed under the Apache 2.0 License

Package Sidebar

Install

npm i c9ms

Weekly Downloads

0

Version

0.0.1

License

Apache-2.0

Unpacked Size

40.6 kB

Total Files

28

Last publish

Collaborators

  • razmat145