Notable Pottery Manufacturer

    @sap/xb-msg

    0.9.12 • Public • Published

    Message Streams

    Provides a client for stream-based messaging.

    Table of contents

    Prerequisites

    Make sure to have a message broker available, e.g. RabbitMQ installed locally, having the plugins for AMQP 1.0, MQTT 3.1.1 and WebSocket binding enabled.

    Install

    See also: https://www.npmjs.com/package/@sap/xb-msg

    To add it to your project run:

    npm i @sap/xb-msg

    To generate complete API documentation run inside the library package folder

    npm run doc

    Overview

    A messaging application shall focus on its business logic. Bindings and protocol-specific settings shall move to configuration, without negative impact on performance. The package provides a solution for that, wrapping protocol-specific client implementations from separate packages.

    Library Protocol
    @sap/xb-msg-amqp-v091 AMQP v0.9.1
    @sap/xb-msg-amqp-v100 AMQP v1.0
    @sap/xb-msg-mqtt-v311 MQTT v3.1.1

    Direct usage of the listed libraries provides full access to protocol-specific features. With @sap/xb-msg-amqp-091 the application may control channels, queues and exchanges. With @sap/xb-msg-amqp-100 the application may for example define sessions, attach links and control link credit dynamically. With @sap/xb-msg-mqtt-311 last-will-messages could be used.

    However, many messaging applications will have less specific requirements. It may come down to:

    • Produce messages with a writable stream
    • Consume messages from a readable stream
    • Use messages with a binary payload
    • Receive message at least once and acknowledge messages after successful processing
    • Stay flexible with regards to the used messaging protocol
    • But still accept the need to maintain settings on the broker side, e.g. to create queues or assign topic bindings to it

    With these requirements @sap/xb-msg in combination with package @sap/xb-msg-env can be a good choice. For example to consume messages:

    const msg = require('@sap/xb-msg');
    const env = require('@sap/xb-msg-env');
     
    const options = env.msgClientOptions('my-service-instance', ['my-inp'], []);
    const client = new msg.Client(options);
     
    client.istream('my-inp')
        .on('error', () => {
            console.log(error);
        })
        .on('data', (message) => {
            console.log(`message: ${message.payload.toString()}`);
            message.done();
        })
    ;
     
    client.connect();

    @sap/xb-msg-env reads cf/xs environment variables to build up the client options (configuration). Switching the protocol, changing the binding to queues or topics or changing the quality of service becomes possible without changing the program code.

    A message producer is implemented in a similar way:

    const msg = require('@sap/xb-msg');
    const env = require('@sap/xb-msg-env');
     
    const options = env.msgClientOptions('my-service-instance', [], ['my-out']);
    const client = new msg.Client(options);
     
    client.ostream('my-out')
        .on('ready', () => {
            send();
        })
        .on('drain', () => {
            send();
        })
        .on('error', (error) => {
            console.log(error);
        })
    ;
     
    client.connect();

    Finally, using an own Transform stream (here class Converter) the application can also rely on fully automated flow control:

    const msg = require('@sap/xb-msg');
    const env = require('@sap/xb-msg-env');
     
    const options = env.msgClientOptions('my-service-instance', ['my-inp'], ['my-out']);
    const client = new msg.Client(options);
     
    client.istream('my-inp')
        .pipe(new Converter())
        .pipe(client.ostream('my-out'))
    ;
     
    client.connect();

    Client Options

    The client employs protocol-specific libraries that require protocol-specific settings, at least for stream definitions. For example, MQTT will accept a property qos whereas AMQP 1.0 accepts sndSettleMode and rcvSettleMode. However, these differences are restricted to the configuration. At runtime the application will always see a unified client behavior.

    Connections

    Based on client options one or more connections can be created, each for one of the supported protocols and each providing multiple incoming or outgoing streams. As soon as a client instance is connected the application can produce and/or consume messages.

    Connections via 'net' and 'tls' are supported for all protocols. Websocket is specified and in consequence implemented for MQTT 3.1.1 and AMQP 1.0 only. Use options attribute 'wss' or 'ws'.

    WebSocket connections can also be established using OAuth 2.0, for example when connecting a local application to SAP cloud. Relevant grant flows are: ClientCredentialsFlow and ResourceOwnerPasswordCredentialsFlow.

    If multiple settings are provided the preference is as follows: preferred 'tls' then 'net' then 'wss' then finally 'ws'.

    Message Streams

    Writable and Readable streams are provided to handle outgoing and incoming messages. These streams always run in object mode.

    Here, a single message is represented as a plain object with the following attributes:

    • source: defined by the incoming stream,
    • target: defined optionally by the application, similar to the source, accepted by the outgoing stream,
    • payload: message payload,
    • done(): a callback function to confirm final message processing,
    • failed(error): a callback function to indicate processing failure.

    A receiving application is expected to call either done or failed for each single message, exactly one time (maybe asynchronously) and independent from the used quality of service. A sending application can define the callbacks to get notified about the message state.

    const message = {
        payload : Buffer.from('test'),
        done : () => this._onSendDone(message),
        failed : (error) => this._onSendFailed(error, message)
    };
    const noPause = stream.write(message);

    In any case the application is expected to implement the flow control of writable streams correctly.

    Message Payload

    The application may provide the message payload for an outgoing message as follows:

    • a Buffer object,
    • an Array of Buffer objects,
    • a Payload object or a plain object with same properties as Payload.

    Common properties of a Payload object (specific protocol clients add more data):

    • chunks: an Array of Buffer objects,
    • type: an optional string providing the content type (not supported with MQTT)

    After the payload was given to a sender it must not be modified by the application anymore.

    Incoming messages will always provide a Payload object, just for application convenience.

    Examples

    In folder examples there are test programs, ready to run if a broker can be reached locally at the protocol-specific default port. Folder examples/cfg provides sample configurations.

    Keywords

    Install

    npm i @sap/xb-msg

    DownloadsWeekly Downloads

    106

    Version

    0.9.12

    License

    SEE LICENSE IN developer-license-3.1.txt

    Unpacked Size

    39.9 kB

    Total Files

    25

    Last publish

    Collaborators

    • sap_extncrepos