@concorde2k/bus.mq
TypeScript icon, indicating that this package has built-in type declarations

1.1.26 • Public • Published

Enterprise Service Bus

The enterprise bus is a consumer agnostic bus that allows consumers of any type to pass messages down the pipe, and with the correct configuration, get a response as well. This package is exposes classes and methods for interacting with the bus.

Installation

npm install concorde2k/bus.mq

Configuration

When you load this library, you must provide a url for both Rabbit and Kue. These can be specified in a configuration file or via the environment, with the environment the preferred method to make inclusion in the container more portable.

MQ_URL (env)/qUrl (file) - the url of the message queue (Rabbit). It would take the form of MQ_URL=amqp://concorde:1835MarketStreet@192.168.56.101

JS_URL (env)/jUrl (file) - the url of the job queue (Kue). It looks like JQ_URL=redis://192.168.56.101:6379

Technical Docs

A description of the internal API for contributors can be found on bitbucket or locally.

Message Types

The bus can handle 4 types of messages, depending on what your needs are. These message types are referred to as "exchanges" in line with RabbitMQ terminology.

Pub/Sub

A publisher will send a message to all of the workers on a given exchange. Unlike the other exchange types, this one does not discriminate who gets the message. All listeners will get the message at the same time. This pattern is described in the Rabbit docs. The sender is the [[Publisher]] and the listener is [[Subscriber]].

Sender Example

const { Publisher }    = require( "../dist/Publisher" );
const publisher  = new Publisher( { exchange: "Lotus" } );

publisher.connect().then( () => {
    publisher.write( "JimmyClark" );
}).catch((e)=>{
    console.error("An error occurred", e);
});

Listener Example

const { Subscriber } = require( "concorde2k/bus.mq" );
const worker = new Subscriber( { exchange: "Lotus" } );

worker.connect().then( () => {
    worker.on(Subscriber.onWorkItem, (message)=>{
        console.log(message);
    });
}).catch((e)=>{
    console.error("An error occurred", e);
});

Task Messages

A Task models the competing consumers pattern. Messages are routed to the listeners round-robin and it provides a simple kind of load balancing. See the Rabbit docs for more information. The sender is the [[Task]] and the listener is a [[TaskWorker]].

Sender Example

const { Task } = require( "concorde2k/bus.mq" );
const tasks = new Task( { exchange: "Mclaren" } );

tasks.connect().then( () => {
    tasks.write( "AyrtonSenna" );
}).catch((e)=>{
    console.error("An error occurred", e);
});

Listener Example

const { TaskWorker } = require( "concorde2k/bus.mq" );
const worker = new TaskWorker( { exchange: "Mclaren" } );

worker.connect().then( () => {
    worker.on(TaskWorker.onWorkItem, (message)=>{
        console.log(message);
    });
}).catch((e)=>{
    console.error("An error occurred", e);
});

Router Messages

A route allows a sender to send messages to a subset of listeners. Wildcards are used to filter who receives a message. If no wildcard is used, it becomes the same as Direct messages. If the receiver ([[RouteWorker]]) specifies that it will listen to all messages that start with doc.hollywood by declaring a key of doc.hollywood.*, then a sender will be successful with doc.hollywood, doc.hollywood.sucks, doc.hollywood.great, but not with avengers.end.game. The key is the the use of periods (.), stars (*), and hashes (#) in the key when spooling up a listener. See the Rabbit docs for more information. The sender is [[Route]] and the listener is [[RouteWorker]]

Sender Example

const { Route } = require( "concorde2k/bus.mq" );
const route = new Route( { exchange: "Ferarri", key: "Drivers.Past" } );
const route2 = new Route( { exchange: "Ferarri", key: "Drivers.Present" } );

route.connect().then( () => {
    route.write( "JodyScheckter" );
}).catch((e)=>{
    console.error("An error occurred", e);
});

route2.connect().then( () => {
    route2.write( "SebVettel" );
}).catch((e)=>{
    console.error("An error occurred", e);
});

Listener Example

const { RouteWorker } = require( "concorde2k/bus.mq" );
const worker = new RouteWorker( { exchange: "Ferarri", key: "Drivers.*" } );

worker.connect().then( () => {
    worker.on(RouteWorker.onWorkItem, (message, key)=>{
    	if (key === "Drivers.Past"){
    	    console.log("Past driver:", message);	
    	} else if (key === "Drivers.Present") {
    		console.log("Present driver:", message);	
    	}else{
    		console.warn("Unknown key");
    	}        
    });
}).catch((e)=>{
    console.error("An error occurred", e);
});

Direct Messages

A Direct exchange allows senders and receivers to selectively receive messages based on a key. This is very similar to a Route/RouteWorker but differs in that the key does not accept wildcards: listeners to the exchange must match the key exactly. This pattern is described in the Rabbit docs. The sender for a Direct message is [[Direct]] and the listener is [[DirectWorker]].

Sender Example

const { Direct } = require( "concorde2k/bus.mq" );
const direct = new Direct( { exchange: "March", key: "Drivers" } );

direct.connect().then( () => {
    direct.write( "JackieStewart" );
}).catch((e)=>{
    console.error("An error occurred", e);
});

Listener Example

const { DirectWorker } = require( "concorde2k/bus.mq" );
const worker = new TaskWorker( { exchange: "March", key: "Drivers" } );

worker.connect().then( () => {
    worker.on(TaskWorker.onWorkItem, (message)=>{
        console.log(message);
    });
}).catch((e)=>{
    console.error("An error occurred", e);
});

RPC Messages

An RPC feels like a synchronous call, and it is kind of is, but it is non blocking, You make a request to the exchange and a response it delivered by Promise. Unlike the other message types, it is not driven by Rabbit, but by Kue, a job queue whereas Rabbit is a message queue. It is the only messages that returns a response. The sender is [[RPC]] and the listener is [[RPCWorker]].

Sender Example

const rpc    = new RPC( {
   exchange: "UnitTestExchange5"
} );

rpc.connect().then( async () => {
    const res = await rpc.publish( "I am a squirrel" );
    console.info("answer:", res);
});

Listener Example

const worker = new RPCWorker( {
    exchange: "UnitTestExchange5"
} );

worker.connect().then( () => {    

    worker.listen( ( job, cts, done ) => {
        done( null, `${job.data} I am a prarie dog` );
    } );
    
} );

[[include:./BUILD.md]]

Readme

Keywords

none

Package Sidebar

Install

npm i @concorde2k/bus.mq

Weekly Downloads

39

Version

1.1.26

License

UNLICENSED

Unpacked Size

131 kB

Total Files

55

Last publish

Collaborators

  • kevin.concorde
  • nick.concorde2k
  • terry.concorde