Learn about our RFC process, Open RFC meetings & more.Join in the discussion! »

nestx-amqp

2.0.0 • Public • Published

nestx-amqp

NPM Github Workflow Status Codecov Semantic-Release

Provide an AMQP connection as NestJS Module. Internally use amqp-connection-manager.


Features

  • Provide an AmqpModule create AmqpConnectionManager async
  • Provide an injectable amqp connection manager at global
  • Provide decorators like @PublishQueue and @SubscribeQueue as method decorator for simple usage

Installation

yarn add nestx-amqp

Examples

Register Module Async

import { Module } from '@nestjs/common'
import { AmqpModule } from 'nestx-amqp'
 
@Module({
  imports: [
    AmqpModule.forRootAsync({
      useFactory: () => ({
        urls: ['amqp://guest:guest@localhost:5672?heartbeat=60'],
      }),
    }),
  ],
  controllers: [],
  providers: [],
})
export class AppModule {}

Inject AmqpConnectionManager

Use Symbol AMQP_CONNECTION for Injection:

Below is an abstract producer code sample.

import { Inject, OnModuleInit } from '@nestjs/common'
import { AmqpConnectionManager, ChannelWrapper } from 'amqp-connection-manager'
import { Options } from 'amqplib'
import { AMQP_CONNECTION } from 'nestx-amqp'
 
export abstract class SimpleAbstractProducer implements OnModuleInit {
  channel: ChannelWrapper;
 
  abstract getQueue(): string;
  abstract getQueueOptions(): Options.AssertQueue;
 
  constructor(
    @Inject(AMQP_CONNECTION)
    readonly connectionManager: AmqpConnectionManager
  ) {}
 
  async onModuleInit() {
    this.channel = this.connectionManager.createChannel({
      json: true,
      setup: (channel) => channel.assertQueue(this.queue),
    })
    await this.channel.waitForConnect();
  }
 
  async send(message, options?: Options.Publish) {
    await this.channel.sendToQueue(this.queue, message, options);
  }
}

Advanced Usage with Decorators

Currently, only support direct queue publish and subscribe

Interface Queue

export interface Queue {
  name: string;
  options?: Options.AssertQueue;
}
 
export interface RetryOptions {
  maxAttempts: number;
}
 
export interface BaseConsumeOptions {
  prefetch: number;
  exceptionQueue?: string;
}
 
export type PublishQueueOptions = Options.Publish;
export type ConsumeQueueOptions = BaseConsumeOptions & Partial<RetryOptions> & Options.Consume;

@PublishQueue()

Provide a MethodDecorator easily publishing message to queue

Options:

@PublishQueue(queue: string | Queue, options?: amqplib.Options.Publish)
yourPublishQueueMethod(content:any, options?: amqplib.Options.Publish){}

Example:

(You must register and enable AmqpModule)

@Injectable()
class TestMessageService {
  queue = 'TEST.QUEUE';
 
  @PublishQueue(queue)
  async testPublishQueue(content) {
    console.log(`call test publish queue with ${JSON.stringify(content)}`);
  }
}

@SubscribeQueue()

Provide a MethodDecorator easily consuming message and support simply requeue logic

Options:

@SubscribeQueue(nameOrQueue: string | Queue, options?: ConsumeQueueOptions)
yourSubscribeQueueMethod(content){}

ConsumeQueueOptions:

export interface RetryOptions {
  maxAttempts: number;
}
 
export interface BaseConsumeOptions {
  prefetch: number;
  exceptionQueue?: string;
}
 
export type ConsumeQueueOptions = BaseConsumeOptions & Partial<RetryOptions>;

Example:

You must register and enable AmqpModule

@Injectable()
class TestMessageService {
  queue = 'TEST.QUEUE';
 
  @SubscribeQueue(queue)
  async testSubscribeQueue(content) {
    // do your business handling code
    // save db? send email?
    console.log(`handling content ${JSON.stringify(content)}`);
  }
}

Interface Exchange

import { Options } from 'amqplib'
 
/**
 * @desc simply wrap amqp exchange definitions as interface
 * */
export interface Exchange {
  name: string
  type: string | 'direct' | 'fanout' | 'topic' | 'headers'
  options?: Options.AssertExchange
}
 
/**
 * @desc wrap amqp.Channel.publish(exchange: string, routingKey: string, content, options?: Publish): boolean
 *       as interface
 * */
export interface PublishExchangeOptions {
  routingKey: string
  options?: Options.Publish
}

@PublishExchange()

Not Stable

Provide a MethodDecorator easily publishing message to exchange

Options:

@PublishExchange(exchange: string | Exchange, options?: PublishExchangeOptions)
yourPublishExchangeMethod(content:any, options?: PublishExchangeOptions){}

Example:

No Example for stable usage, you can refer to unit test case (or submit PR)


@UseAmqpConnection(name?:string)

Provide a MethodDecorator easily spec connection (when you register AmqpModule) with @PublisQueue() and @SubscribeQueue)

Recommend if you want to develop npm package using spec named connection

Example:

@Injectable()
class AmqpLoggerService {
  queue = 'LOGGER.QUEUE'
 
  @UseAmqpConnection('logger')
  @PublishQueue(queue)
  async logSideEffect(content) {
    // just do nothing here and auto send to LOGGER.QUEUE with spec `logger` connection
  }
}

for more details, you can refer unittest cases.

Change Log

See CHANGELOG.md


LICENSE

Released under MIT License.

Install

npm i nestx-amqp

DownloadsWeekly Downloads

22

Version

2.0.0

License

MIT

Unpacked Size

55.1 kB

Total Files

41

Last publish

Collaborators

  • avatar