@labs42/messaging
TypeScript icon, indicating that this package has built-in type declarations

1.0.4 • Public • Published

@labs42/messaging

A powerful framework for asynchronous messaging.

npm Build Status Coverage Status npm

// HelloMessage.ts
export class HelloMessage{
  constructor(public text: string){ }
}
// HelloHandler.ts
export class HelloHandler {
  handle(msg: HelloMessage){
    return `Hello ${msg.text}`;
  }
}
// setup.ts
export const sender = eventSender('demo');
export const receiver = eventReceiver('demo');

const config = Configurator.receiver()
  .message(HelloMessage)
  .handleWith(() => new HelloHandler);
// example.ts
import { sender } from './sender'

const msg = new HelloMassage('World');
sender.submit(msg).then(result => console.log(result));

// prints: `Hello World`

Installation

  $ npm install @labs42/messaging --save

Features

  • Decouple large systems into small, reusable code blocks
  • Configuration using decorators
  • Configuration using fluent API
  • Support for custom communication protocols
  • Interceptors: @Retry, @Timeout, @Return, @Throw, @Delay
  • Support for custom message interception

Philosophy

@labs42/messaging is a simple framework that helps build systems composed of small, independent, reusable and testable code blocks.
Imagine that any code that is executed by the application is a response to some request. Be that request a method call, or a command to change something, or a request to query some data, or a transaction, etc. they all contain only information about how to execute a specific operation. We call this kind of requests Message.
A Message is used only to transport information about how they should be handled. Messages don't have behavior. For the application to be able to react to messages, we introduce another kind of actor - Handler.
A Handler is able to react to a specific message type.
Putting it all together, we decompose the application into messages and handlers, that have a one-to-one relationship.
The above Message-Handler pattern is implemented by using a:

  • Sender that serves as a dispatcher to submit messages and wait for results;
  • Receiver that serves as an observer to listen to messages, handle them with an appropriate handler and propagate back the result (if any);

The Sender and Receiver communication rely on abstractions, so that custom protocols like HTTP, IPC etc. can be configured.

The framework allows to intercept the communication process between a sender and a receiver. An interceptor provides full control on how a message is dispatched and received. Interceptors can be easily attached/detached from messages and handlers using decorators.
For example you could define a timeout for a message execution:

@Timeout(1000)
class MyMessage { ... }

or just mock a result for a specific message using the return decorator, while the handler hasn't been implemented yet.

@Return(new User('John', 'Doe'))
class MyHandler {
  handler(msg: GetUserMessage){ throw new Error('Not implemented.') }
}

See interception below for more information about built-in interceptors and implementing a custom interceptor.

Setup

To enable an application to send and respond to messages, an instance of Sender and Receiver has to be created. The sender's instance must be exported in order to be referenced in any place where a message has to be submitted.

Sender

For an application to be able to send messages, an instance of Sender has to be created.

import { Sender, MessageSender, EventDispatcher } from '@labs42/messaging';

const dispatcher = new EventDispatcher('unique-channel-name');
export const sender: Sender = new MessageSender(dispatcher);

This creates and exports a new instance of message sender, using a EventDispatcher. The dispatcher's constructor accepts a parameter, which is the name of event channel.
The EventDispatcher can be used to dispatch messages only within the same application.

A helper method eventSender is available that is equivalent with above setup:

import { eventSender, Sender } from '@labs42/messaging';
export const sender: Sender = eventSender('unique-channel-name')

A sender can be configured with global or message specific interceptors. See configuration for more details.

Receiver

For an application to be able to receive messages, an instance of Receiver has to be created.

import { Receiver, MessageReceiver, EventObserver } from '@labs42/messaging'

const observer = new EventObserver('unique-channel-name');
export const receiver:Receiver = new MessageReceiver(observer);
receiver.configure(...);

This creates and exports a new instance of message receiver, using the EventObserver. The observer's constructor accepts a parameter, which is the name of event channel. The EventObserver can be used to listen to messages only dispatched within the same application. By configuring the same channel name in the dispatcher and observer, the sender and receiver are connected and ready to communicate.

A helper method eventReceiver is available that is equivalent with above setup:

import { eventReceiver, Receiver } from '@labs42/messaging'

export const receiver = eventReceiver('unique-channel-name');
receiver.configure(...);

Although in above examples the receiver is exported, this is not needed unless it is referenced in other places to be configured. The only requirement is that a receiver instance is created and alive for the whole application lifetime.

Receivers once created, don't know how to handle and respond to incoming messages. In order to instruct the receiver how to handle specific message, it has to be configured. See configuration for more details.

Configuration

Both, sender and receiver can be configured using configurators.

import { Configurator, eventSender, eventReceiver } from '@labs42/messaging';

const sender = eventSender('messaging');
const senderConfig = Configurator.sender();
// setup senderConfig ...
sender.configure(senderConfig);

const receiver = eventReceiver('messaging');
const receiverConfig = Configurator.receiver();
// setup receiverConfig...
receiver.configure(receiverConfig);

Multiple configurators can be applied.

const receiver = eventReceiver('messaging');
const securityConfig = Configurator.receiver();
// setup securityConfig
receiver.configure(securityConfig);

const usersConfig = Configurator.receiver();
// setup usersConfig
receiver.configure(usersConfig)

Sender configuration

To create a new sender configurator:

import { Configurator } from '@labs42/messaging'

// create a new sender fluent configurator
const config = Configurator.sender();

.namespace() defines a default namespace for all configured messages within the current configurator:

const config = Configurator.sender();
config.namespace('my-application')
  .message(Msg1)
  .message(Msg2, { namespace: 'ns2' })
  .message(Msg3, { type: 'message-3', namespace: 'ns3' })
  .message(Msg4, { type: 'message-4' });

// Following types will be defined in config:
// Msg1 -> 'my-application.Msg1'
// Msg2 -> 'ns2.Msg2'
// Msg3 -> 'ns3.message-3'
// Msg4 -> 'my-application.Msg4'

.interceptAll() configures an interceptor to be applied for all messages defined within the current configurator:

const config = Configurator.sender();

// Apply the Timeout interceptor (2 seconds)
config.namespace('my-application')
  .interceptAll(Timeout(2000));
  .message(Msg1)
  .message(Msg2);

.message() configures a message type and allows chaining to configure message specific interceptors:

const config = Configurator.sender();

config
  .message(Msg1) // 'Msg1'
  .message(Msg2, { type: 'message-2'}) // 'message-2'
  .message(Msg3, { namespace: 'ns'}) // 'ns.Msg3'
  .message(Msg4, { type: 'message-4', namespace: 'ns'}); // 'ns.message-4'

If a message has the @Message decorator applied, then it's used as a default:

@Message({ type: 'hello', namespace: 'ns' })
class HelloMessage {}

const config = Configurator.sender();
config.message(Hello);

is equivalent to:

class HelloMessage {}

const config = Configurator.sender();
config.message(Hello, { type: 'hello', namespace: 'ns' });

.message().intercept() configures an interceptor for a specific message type:

const config = Configurator.sender();

// configures retry and timeout interceptors for `Login` message
config
  .message(Login)
  .intercept(Retry(5))
  .intercept(Timeout(3));

@Message() decorator can be applied to a message type to provide custom type metadata:

@Message({ type: 'hello', namespace: 'ns' })
class HelloMessage {}

// `HelloMessage` type full name is: 'ns.hello'

If type parameter is not provided, then the class name is used as a default

Receiver configurator

To create a new receiver configurator:

import { Configurator } from '@labs42/messaging'

// create a new receiver fluent configurator
const config = Configurator.receiver();

.namespace() defines a default namespace for all configured messages within the current configurator:

const config = Configurator.receiver();
config.namespace('my-application')
  .message(Msg1)
  .message(Msg2, { namespace: 'ns2' })
  .message(Msg3, { type: 'message-3', namespace: 'ns3' })
  .message(Msg4, { type: 'message-4' });

// Following types will be defined in config:
// Msg1 -> 'my-application.Msg1'
// Msg2 -> 'ns2.Msg2'
// Msg3 -> 'ns3.message-3'
// Msg4 -> 'my-application.Msg4'

.interceptAll() configures an interceptor to be applied for all messages defined within the current configurator:

const config = Configurator.receiver();

// Apply the Timeout interceptor (2 seconds)
config.namespace('my-application')
  .interceptAll(Timeout(2000));
  .message(Msg1)
  .message(Msg2);

.message() configures a message type and allows chaining to configure message specific interceptors and/or a message handler :

const config = Configurator.receiver();

config
  .message(Msg1) // 'Msg1'
  .message(Msg2, { type: 'message-2'}) // 'message-2'
  .message(Msg3, { namespace: 'ns'}) // 'ns.Msg3'
  .message(Msg4, { type: 'message-4', namespace: 'ns'}); // 'ns.message-4'

If a message has the @Message decorator applied, then it's used as a default:

@Message({ type: 'hello', namespace: 'ns' })
class HelloMessage {}

const config = Configurator.receiver();
config.message(Hello);

is equivalent to:

class HelloMessage {}

const config = Configurator.receiver();
config.message(Hello, { type: 'hello', namespace: 'ns' });

.message().handleWith() configures a handler for a specific message type. To configure a singleton handler instance:

const config = Configurator.receiver();
config.message(Hello).handleWith(new HelloHandler());

To configure a handler factory method:

const config = Configurator.receiver();
config.message(Hello).handleWith(HelloHandler, () => new HelloHandler());

.message().intercept() configures an interceptor for a specific message type. Available only after .message chaining:

const config = Configurator.receiver();

// configures retry and timeout interceptors for `Login` message
config
  .message(Login)
  .intercept(Retry(5))
  .intercept(Timeout(3));

.handler() configures a handler which is decorated using the @HandlerFor decorator.

class Hello { }

@HandlerFor(Hello)
class HelloHandler { ... }

const config = Configurator.receiver();

// to configure a singleton handler 
config.handler(Login, new LoginHandler());

// to configure a factory method
config.handler(Login, () => new LoginHandler());

Interception

@Return

@Return interceptor allows to configure a result for a message type.
If the interceptor is applied on sender's side, then the result is returned without dispatching the message to the receiver.

@Return(true)
class LoginMessage { ... }

If the interceptor is applied on receiver's side, then the result is returned without executing the handler.

@Return(true)
class LoginHandler implements Handler { ... }

@Throw

@Throw interceptor allows to configure an error to be thrown whenever a message is submitted.
If the interceptor is applied on sender's side, then the error is thrown without dispatching the message to the receiver.

@Throw(new Error('No handler is implemented yet.'))
class LoginMessage { ... }

If the interceptor is applied on receiver's side, then the error is thrown without executing the handler.

@Throw('Not implemented.')
class LoginHandler implements Handler { ... }

@Timeout

@Timeout interceptor allows to configure a timeout interval for a message execution.
If the interceptor is applied on sender's side, then the timeout interval includes also the time to dispatch the message.

@Timeout(2000)
class LoginMessage { ... }

If the interceptor is applied on receiver's side, then the timeout interval includes includes the time to execute the message by the handler.

@Timeout(2000)
class LoginHandler implements Handler { ... }

@Delay

@Delay interceptor allows to configure a custom delay for a message execution.
If the interceptor is applied on sender's side, then the delay is applied before dispatching the message.

@Delay(2000) // 2 seconds
class LoginMessage { ... }

If the interceptor is applied on receiver's side, then the delay is applied before executing the message by the handler.

@Delay(2000)
class LoginHandler implements Handler { ... }

@Retry

@Retry interceptor allows to configure the number of retry attempts for a failing message.

@Retry() // defaults to retry once in case of a failure
class LoginMessage { ... }

@Retry(3) // defaults to retry three times in case of failures
class LoginMessage { ... }

If a message fails after the configured number of attempts, then last error is propagated.
Additionally, a retry can be configured for specific errors:

// Configures a retry only for errors with message: 'Service unavailable.'
@Retry(2, error => error.message === 'Service unavailable.')
class LoginMessage { ... }

The interceptor can also be applied on a message handler.

Implementing a custom interceptor

An interceptor can be created either to intercept messages on the sender's side, receiver's side, or both.

To create an interceptor to be used on the sender's side:

class MyInterceptor implements SenderInterceptor {
  submit(request: Request, next: RequestResolver): Promise<Response> {
    // clears the id property of a message before submitting
    request.data.id = null;

    // chain the request to next interceptor
    return next(request);
  }
}

To create an interceptor to be used on the receiver's side:

class MyInterceptor implements ReceiverInterceptor {
  handle(request: Request, next: RequestResolver): Promise<Response> {
    // request.data contains the original message
    // ...

    // chain the request to next interceptor
    return next(request);
  }
}

To define a decorator for a custom interceptor:

import { Decorator } from '@labs42/messaging'
const MyDecorator = () => Decorator(() => new MyInterceptor());

@MyDecorator()
class MyMessage { }

Running tests

$ npm install
$ npm test

Running examples

$ ts-node examples/basic
$ ts-node examples/message-decoration
$ ts-node examples/fluent-configuration
$ ts-node examples/decorators-mock-results

Roadmap

Implement HTTP protocol: @labs42/messaging-http

Implement IPC protocol: @labs42/messaging-ipc

Implement microservices infrastructure: @labs42/messaging-micro

License

MIT

Copyright © 2018 Labs42

Package Sidebar

Install

npm i @labs42/messaging

Weekly Downloads

0

Version

1.0.4

License

MIT

Unpacked Size

169 kB

Total Files

107

Last publish

Collaborators

  • dimadeveatii
  • labs42io
  • rvmoldova