RxMsg
A powerfull and simple universal messaging abstraction
This library makes it easy to send messages in a distributed network transparent way via various brokers using RxJS streams.
RxMsg uses a versatile middleware pattern to create messaging endpoints that are extremely flexible.
Sending a message
;;; ;; // RxJS observerproducer.next;
Receiving a message
;;; ;; // RxJS observableconsumer.subscribe;
Configure your broker
module.exports.amqpConfig =;
Using Middleware
The endpoint creators each accept a list of middleware as arguments. When the producer sends a message it passes top down through the list of middleware.
Producer middleware
Messages come into the system top to bottom. In this case from a producer.next(msg)
call.
;
Consumer middleware
Again, messages come into the system top to bottom. Here this would be from an external broker via the top middleware.
;
Creating your own Middleware
Middleware is simple as they are only functions designed to decorate RxJS streams. Here is their signature:
;
Here is an example:
You might use a middleware by passing it as one of the arguments to the createProducer()
or createConsumer()
functions.
;
Manipulating messages
Note that because consumer is simply an RxJS observable you can apply filtering and throttling or do whatever you want to it
.pipefiltermsg.body.toLowerCase.includes'world' .subscribe;
Installation
You can install over npm.
yarn add rxmsg
npm install rxmsg --save
Getting Started Examples
You can checkout the getting started example here:
- RabbitMQ
- Kafka (coming soon)
- Node Processes (coming soon)
- Web Workers (coming soon)
- Socket.io (coming soon)
RabbitMQ Examples as tests
For usage and examples please look at the basic tests thrown together here
Usage with Typescript
Messages
Generic message objects look like this:
// Generic message
You might use a message by sending it to the next()
method of a producer.
producer.next;
Project Principles:
- Declarative over imperative.
- Functions over classes.
- Simplicity over complexity.
- Immutable over mutable.
- Flexible and composable over fixed heirarchy.
- Pure over impure.
- Minmalistic sensible defaults over boilerplate.
- Idiomatic API's over reinventing the wheel.
Environments
- Basic framework should work in all V8 environments. eg.
- Middleware is environment specific. Eg.
rxmsg/amqp
requires node.rxmsg/socketio-browser
(coming soon) requires a browser environment eg.window
,document
etc.
Broker Support
Currently we support the following brokers:
- AMQP / RabbitMQ
- Kafka
- Node Processes
- Web Workers
- Socket.io
Is there a message broker you would like to see on this list? Want to get a specific integration sooner?
Create an issue or talk to me about sponsoring this project.
Architectural Roadmap
- Refactor to lerna
RxJS References
Docs
Videos
NOTE: Using version 6
rxmsg
uses RxJS v6.0 so you need to pipe all your operators:
; // ... consumer.pipefilterforUserEventsuserId.subscribe ,;
Other References
- https://www.rabbitmq.com/tutorials/tutorial-one-javascript.html
- https://www.rabbitmq.com/tutorials/tutorial-two-javascript.html
- https://www.rabbitmq.com/tutorials/tutorial-three-javascript.html
- https://www.rabbitmq.com/tutorials/tutorial-four-javascript.html
- https://www.rabbitmq.com/tutorials/tutorial-five-javascript.html
- https://aws.amazon.com/blogs/compute/building-scalable-applications-and-microservices-adding-messaging-to-your-toolbox/