amqp-extension
TypeScript icon, indicating that this package has built-in type declarations

2.0.0 • Public • Published

AMQP Extension 🏰

npm version codecov Master Workflow Known Vulnerabilities semantic-release: angular

This is a library on top of the famous amqplib library and defines a message format for queue messages through a message broker across multiple standalone services. All utility functions support the usage of multiple registered connections.

Table of Contents

Installation

npm install amqp-extension --save

Usage

Publish to Queue

The publish function allows you to send messages quickly. Existing options can be added or overwritten

import {
    useConnection,
    publish,
    setConfig
} from "amqp-extension";

// This will set the default connection :)
setConfig({
    connection: 'amqp://<user>:<password>@<host>',
    exchange: {
        name: '<name>',
        type: 'topic'
    }
});

(async () => {
    await publish({
        content: {
            type: 'resourceCreated',
            name: 'foo'
        }
    });
})();

Consume Queue

To consume a queue use the consume function. As first argument it accepts a configuration object and as second argument and object to specify an async callback function handler for a specific message type.

import {
    consume,
    ConsumeMessage,
    ConsumeOptions,
    setConfig
} from "amqp-extension";

// This will set the default connection :)
setConfig({
    connection: 'amqp://<user>:<password>@<host>',
    exchange: {
        name: '<name>',
        type: 'topic'
    }
});

const options: ConsumeOptions = {
    exchange: {
        routingKey: '<routing-key>'
    }
}

(async () => {
    await consume(options, {
        resourceCreated: async (message: ConsumeMessage) => {
            const content = message.content.toString('utf-8');
            const payload = JSON.parse(content);
            console.log(payload);
            // { type: 'resourceCreated', name: 'foo' }
        }
    });
})();

Multiple Connections

To define multiple concurrent connections just specify a different alias in the configuration object, so it does not get overwritten.

import {
    ConsumeOptions,
    publish,
    PublishOptionsExtended,
    setConfig
} from "amqp-extension";

// This will set the default connection :)
setConfig({
    connection: 'amqp://<user>:<password>@<host>',
    exchange: {
        name: '<name>',
        type: 'topic'
    }
});

setConfig({
    alias: 'foo',
    connection: 'amqp://<user>:<password>@<host>',
    exchange: {
        name: '<name>',
        type: 'topic'
    }
});
(async () => {
    await consume(
        {
            routingKey: '<routing-key>',
            alias: 'foo' // <--- use another connection :)
        },
        {
            // ... handlers
        }
    );

    await publish({
        routingKey: '<routing-key>',
        alias: 'foo', // <--- use another connection :)
        content: {
            foo: 'bar'
        }
    });
})();

Functions

setConfig

function setConfig(key?: string | ConfigInput, value?: ConfigInput): Config

Register a connection as default alias or specify an <alias> as config property.

Example

Simple

Define the default connection config.

import {setConfig, useConnection} from "amqp-extension";

(async () => {
    setConfig({
        connection: 'amqp://<user>:<password>@<host>',
        exchange: {
            name: '<name>',
            type: 'topic'
        }
    });

    const connection = await useConnection();
})();

Define a non default connection.

import { setConfig, useConnection } from "amqp-extension";

(async () => {
    setConfig({
        alias: '<alias>',
        connection: 'amqp://<user>:<password>@<host>',
        exchange: {
            name: '<name>',
            type: 'topic'
        }
    });

    setConfig('foo', {
        connection: 'amqp://<user>:<password>@<host>',
        exchange: {
            name: '<name>',
            type: 'topic'
        }
    });

    const connection = await useConnection('<alias>');
    const fooConnection = await useConnection('foo');
})();

Type parameters

Name Description

Parameters

Name Type Description
key string or ConfigInput Config object or alias of config. more
value Config Config object. more

Returns

Config

The function returns the Config object.

useConnection

function useConnection(key?: string | ConfigInput): Promise<Connection>

Either register a connection as default alias or specify an alias as config property. If you have registered a connection you can receive the connection by specifying no arguments or provide an alias name if specified.

Example

Simple

Receive underlying driver connection.

import {useConnection} from "amqp-extension";

(async () => {
    const connection = await useConnection();
})();

Advanced

Use a none default connection.

import {setConfig, useConnection} from "amqp-extension";

(async () => {
    setConfig({
        alias: '<alias>',
        connection: 'amqp://<user>:<password>@<host>',
        exchange: {
            name: '<name>',
            type: 'topic'
        }
    });

    const connection = await useConnection('<alias>');
})();

Type parameters

Name Description

Parameters

Name Type Description
key string or Config Config or alias of config. more

Returns

Promise<Connection>

The function returns the Connection object of the amqplib.

publish

function publish(message: Message, options?: PublishOptions): Promise<void>

Send the constructed queue message to the message broker. As second parameter a registered config can be used by specifying the alias or provide the full config object.

Example

Simple

import {
    publish
} from "amqp-extension";

(async () => {
    await publish({
        content: {
            type: 'resourceCreated'
        }
    });
})();

Type parameters

Name Description

Parameters

Name Type Description
message Message Constructed message object.
options PublishOptions Publish options.

Returns

Promise<void>

The function does not return a value.

consume

function consume(options: ConsumeOptions, cb: ConsumeHandlers): Promise<void>

Send the constructed queue message to the message broker. As second parameter a registered config can be used by specifying the alias or provide the full config object.

Example

Simple

import {
    consume,
    ConsumeOptions,
    ConsumeMessage,
} from "amqp-extension";

const options: ConsumeOptions = {
    routingKey: '<routing-key>'
}

(async () => {
    await consume(options, {
        '<type>': async (message: ConsumeMessage) => {
            // do some async action :)
        }
    });
})();

Type parameters

Name Description

Parameters

Name Type Description
options ConsumeOptions Consume options. )
handlers ConsumeHandlers Handlers object.

Returns

Promise<void>

The function does not return a value.

Types

Config Types

import { Options } from 'amqplib';
import { ExchangeOptions } from '../exchange';
import { ConsumeOptions, PublishOptions } from '../type';

export type Config = {
    alias: string,
    connection: Options.Connect | string,
    exchange: ExchangeOptions,
    publish: PublishOptions,
    consume: ConsumeOptions
};

export type ConfigInput = Partial<Exclude<Config, 'connection'>> &
    Pick<Config, 'connection'>;

License

Made with 💚

Published under MIT License.

Install

DownloadsWeekly Downloads

1

Version

2.0.0

License

MIT

Unpacked Size

39.7 kB

Total Files

53

Last publish

Collaborators

  • tada5hi