rxmsg
TypeScript icon, indicating that this package has built-in type declarations

4.1.2 • Public • Published

RxJS Message Diagram

RxMsg

A powerfull and simple universal messaging abstraction

This library makes it easy to send messages in a distributed network transparent way via various brokers using RxJS streams.

RxMsg uses a versatile middleware pattern to create messaging endpoints that are extremely flexible.

Sending a message

const { createProducer } = require('rxmsg');
const { createAmqpConnector } = require('rxmsg/amqp');
const { amqpConfig } = require('./amqpConfig');
 
const middleware = createAmqpConnector(amqpConfig).sender();
const producer = createProducer(middleware);
 
// RxJS observer
producer.next({ body: 'Hello World!', to: 'hello' });

Receiving a message

const { createConsumer } = require('rxmsg');
const { createAmqpConnector } = require('rxmsg/amqp');
const { amqpConfig } = require('./amqpConfig');
 
const middleware = createAmqpConnector(amqpConfig).receiver({ noAck: true, queue: 'hello' });
const consumer = createConsumer(middleware);
 
// RxJS observable
consumer.subscribe(msg => {
  console.log(`Received: "${msg.body}"`);
});

Configure your broker

module.exports.amqpConfig = {
  declarations: {
    // List the queues, exchanges etc. you want to use here.
    queues: [
      {
        durable: false,
        name: 'hello'
      }
    ]
  },
  uri: 'amqp://user:password@somerabbitserver.io/user'
};

Using Middleware

The endpoint creators each accept a list of middleware as arguments. When the producer sends a message it passes top down through the list of middleware.

Producer middleware

Messages come into the system top to bottom. In this case from a producer.next(msg) call.

const producer = createProducer(
  transformMessageSomehow,      // Step 1 - Do some transformation
  broadCastsMessagesSomewhere   // Step 2 - The last middleware must do the broadcasting
);

Consumer middleware

Again, messages come into the system top to bottom. Here this would be from an external broker via the top middleware.

const consumer = createConsumer(
  receivesMessagesFromSomewhere, // Step 1 - The first middleware must emit the message.
  logOrTransformMessage,         // Step 2 - Perhaps send the message to a logger.
  doSomeMoreTransformation       // Step 3 - Run another transform on the message before subscription.
);

Creating your own Middleware

Middleware is simple as they are only functions designed to decorate RxJS streams. Here is their signature:

type Middleware<T> = (stream: Observable<T>) => Observable<T>;

Here is an example:

function logger(stream) {
  return stream.pipe(
    tap(
      (msg) => console.log(`Stream logged: ${msg.body}`
    )
  );
}

You might use a middleware by passing it as one of the arguments to the createProducer() or createConsumer() functions.

const consumer = createConsumer(amqpReceiver, logger);

Manipulating messages

Note that because consumer is simply an RxJS observable you can apply filtering and throttling or do whatever you want to it

const sub = consumer
  .pipe(filter(msg => msg.body.toLowerCase().includes('world')))
  .subscribe(msg => {
    console.log(`Received: ${msg.body}`);
  });

Installation

You can install over npm.

yarn add rxmsg
npm install rxmsg --save

Getting Started Examples

You can checkout the getting started example here:

  1. RabbitMQ
  2. Kafka (coming soon)
  3. Node Processes (coming soon)
  4. Web Workers (coming soon)
  5. Socket.io (coming soon)

RabbitMQ Examples as tests

For usage and examples please look at the basic tests thrown together here

  1. Hello World
  2. Work Queues
  3. PubSub
  4. Routing
  5. Topics

Usage with Typescript

Messages

Generic message objects look like this:

// Generic message
export interface IMessage {
  body: any;
  to: any;
  correlationId?: string;
  replyTo?: string;
}

You might use a message by sending it to the next() method of a producer.

producer.next({
  body: 'Hi there!',
  to: 'some-queue'
});

Project Principles:

  • Declarative over imperative.
  • Functions over classes.
  • Simplicity over complexity.
  • Immutable over mutable.
  • Flexible and composable over fixed heirarchy.
  • Pure over impure.
  • Minmalistic sensible defaults over boilerplate.
  • Idiomatic API's over reinventing the wheel.

Environments

  • Basic framework should work in all V8 environments. eg.
  • Middleware is environment specific. Eg. rxmsg/amqp requires node. rxmsg/socketio-browser (coming soon) requires a browser environment eg. window, document etc.

Broker Support

Currently we support the following brokers:

  • AMQP / RabbitMQ
  • Kafka
  • Node Processes
  • Web Workers
  • Socket.io

Is there a message broker you would like to see on this list? Want to get a specific integration sooner?

Create an issue or talk to me about sponsoring this project.

Architectural Roadmap

  • Refactor to lerna

RxJS References

Docs

Videos

NOTE: Using version 6

rxmsg uses RxJS v6.0 so you need to pipe all your operators:

import { filter } from 'rxjs/operators';
 
// ...
 
consumer.pipe(filter(forUserEvents(userId))).subscribe(
  msg => {
    dealWithMessage(msg.body);
  },
  () => {}
);

Other References

Readme

Keywords

none

Package Sidebar

Install

npm i rxmsg

Weekly Downloads

3

Version

4.1.2

License

MIT

Unpacked Size

106 kB

Total Files

98

Last publish

Collaborators

  • ryardley