nestx-amqp
    TypeScript icon, indicating that this package has built-in type declarations

    2.0.3 • Public • Published

    nestx-amqp

    NPM Github Workflow Status Codecov Semantic-Release

    Provide an AMQP connection as NestJS Module. Internally use amqp-connection-manager.


    Features

    • Provide an AmqpModule create AmqpConnectionManager async
    • Provide an injectable amqp connection manager at global
    • Provide decorators like @PublishQueue and @SubscribeQueue as method decorator for simple usage

    Installation

    yarn add nestx-amqp

    Examples

    Register Module Async

    import { Module } from '@nestjs/common'
    import { AmqpModule } from 'nestx-amqp'
    
    @Module({
      imports: [
        AmqpModule.forRootAsync({
          useFactory: () => ({
            urls: ['amqp://guest:guest@localhost:5672?heartbeat=60'],
          }),
        }),
      ],
      controllers: [],
      providers: [],
    })
    export class AppModule {}

    Inject AmqpConnectionManager

    Use Symbol AMQP_CONNECTION for Injection:

    Below is an abstract producer code sample.

    import { Inject, OnModuleInit } from '@nestjs/common'
    import { AmqpConnectionManager, ChannelWrapper } from 'amqp-connection-manager'
    import { Options } from 'amqplib'
    import { AMQP_CONNECTION } from 'nestx-amqp'
    
    export abstract class SimpleAbstractProducer implements OnModuleInit {
      channel: ChannelWrapper;
    
      abstract getQueue(): string;
      abstract getQueueOptions(): Options.AssertQueue;
    
      constructor(
        @Inject(AMQP_CONNECTION)
        readonly connectionManager: AmqpConnectionManager
      ) {}
    
      async onModuleInit() {
        this.channel = this.connectionManager.createChannel({
          json: true,
          setup: (channel) => channel.assertQueue(this.queue),
        })
        await this.channel.waitForConnect();
      }
    
      async send(message, options?: Options.Publish) {
        await this.channel.sendToQueue(this.queue, message, options);
      }
    }

    Advanced Usage with Decorators

    Currently, only support direct queue publish and subscribe

    Interface Queue

    export interface Queue {
      name: string;
      options?: Options.AssertQueue;
    }
    
    export interface RetryOptions {
      maxAttempts: number;
    }
    
    export interface BaseConsumeOptions {
      prefetch: number;
      exceptionQueue?: string;
    }
    
    export type PublishQueueOptions = Options.Publish;
    export type ConsumeQueueOptions = BaseConsumeOptions & Partial<RetryOptions> & Options.Consume;

    @PublishQueue()

    Provide a MethodDecorator easily publishing message to queue

    Options:

    @PublishQueue(queue: string | Queue, options?: amqplib.Options.Publish)
    yourPublishQueueMethod(content:any, options?: amqplib.Options.Publish){}
    

    Example:

    (You must register and enable AmqpModule)

    @Injectable()
    class TestMessageService {
      queue = 'TEST.QUEUE';
    
      @PublishQueue(queue)
      async testPublishQueue(content) {
        console.log(`call test publish queue with ${JSON.stringify(content)}`);
      }
    }

    @SubscribeQueue()

    Provide a MethodDecorator easily consuming message and support simply requeue logic

    Options:

    @SubscribeQueue(nameOrQueue: string | Queue, options?: ConsumeQueueOptions)
    yourSubscribeQueueMethod(content){}
    

    ConsumeQueueOptions:

    export interface RetryOptions {
      maxAttempts: number;
    }
    
    export interface BaseConsumeOptions {
      prefetch: number;
      exceptionQueue?: string;
    }
    
    export type ConsumeQueueOptions = BaseConsumeOptions & Partial<RetryOptions>;

    Example:

    You must register and enable AmqpModule

    @Injectable()
    class TestMessageService {
      queue = 'TEST.QUEUE';
    
      @SubscribeQueue(queue)
      async testSubscribeQueue(content) {
        // do your business handling code
        // save db? send email?
        console.log(`handling content ${JSON.stringify(content)}`);
      }
    }

    Interface Exchange

    import { Options } from 'amqplib'
    
    /**
     * @desc simply wrap amqp exchange definitions as interface
     * */
    export interface Exchange {
      name: string
      type: string | 'direct' | 'fanout' | 'topic' | 'headers'
      options?: Options.AssertExchange
    }
    
    /**
     * @desc wrap amqp.Channel.publish(exchange: string, routingKey: string, content, options?: Publish): boolean
     *       as interface
     * */
    export interface PublishExchangeOptions {
      routingKey: string
      options?: Options.Publish
    }

    @PublishExchange()

    Not Stable

    Provide a MethodDecorator easily publishing message to exchange

    Options:

    @PublishExchange(exchange: string | Exchange, options?: PublishExchangeOptions)
    yourPublishExchangeMethod(content:any, options?: PublishExchangeOptions){}
    

    Example:

    No Example for stable usage, you can refer to unit test case (or submit PR)


    @UseAmqpConnection(name?:string)

    Provide a MethodDecorator easily spec connection (when you register AmqpModule) with @PublisQueue() and @SubscribeQueue)

    Recommend if you want to develop npm package using spec named connection

    Example:

    @Injectable()
    class AmqpLoggerService {
      queue = 'LOGGER.QUEUE'
    
      @UseAmqpConnection('logger')
      @PublishQueue(queue)
      async logSideEffect(content) {
        // just do nothing here and auto send to LOGGER.QUEUE with spec `logger` connection
      }
    }

    for more details, you can refer unittest cases.

    Change Log

    See CHANGELOG.md


    LICENSE

    Released under MIT License.

    Install

    npm i nestx-amqp

    DownloadsWeekly Downloads

    33

    Version

    2.0.3

    License

    MIT

    Unpacked Size

    56.4 kB

    Total Files

    41

    Last publish

    Collaborators

    • aquariuslt