Neurotoxin Powered Marketing
Learn about our RFC process, Open RFC meetings & more.Join in the discussion! »

nestx-amqp

1.3.2 • Public • Published

nestx-amqp

NPM Github Workflow Status Codecov Semantic-Release

Provide an AMQP connection as NestJS Module. Internally use amqp-connection-manager.


Features

  • Provide an AMQPModule create AmqpConnectionManager async
  • Provide an injectable amqp connection manager at global
  • Provide decorators like @PublishQueue and @SubscribeQueue as method decorator for simple usage

Installation

yarn add nestx-amqp

Examples

Register Module Async

import { Module } from '@nestjs/common'
import { AMQPModule } from 'nestx-amqp'
 
@Module({
  imports: [
    AMQPModule.forRootAsync({
      useFactory: () => ({
        urls: ['amqp://devuser:devuser@localhost:5672?heartbeat=60'],
      }),
    }),
  ],
  controllers: [],
  providers: [],
})
export class AppModule {}

Inject AMQPConnectionManager

Use Symbol AMQP_CONNECTION for Injection:

Below is an abstract producer code sample.

import { Inject, OnModuleInit } from '@nestjs/common'
import { AmqpConnectionManager, ChannelWrapper } from 'amqp-connection-manager'
import { Options } from 'amqplib'
import { AMQP_CONNECTION } from 'nestx-amqp'
 
export abstract class SimpleAbstractProducer implements OnModuleInit {
  channel: ChannelWrapper
 
  abstract getQueue(): string
  abstract getQueueOptions(): Options.AssertQueue
 
  constructor(
    @Inject(AMQP_CONNECTION)
    readonly connectionManager: AmqpConnectionManager
  ) {}
 
  async onModuleInit() {
    this.channel = this.connectionManager.createChannel({
      json: true,
      setup: (channel) => channel.assertQueue(this.queue),
    })
    await this.channel.waitForConnect()
  }
 
  async send(message, options?: Options.Publish) {
    await this.channel.sendToQueue(this.queue, message, options)
  }
}

Advanced Usage with Decorators

Currently, only support direct queue publish and subscribe

Interface Queue

export interface Queue {
  name: string
  options?: Options.AssertQueue
}
 
export interface RetryOptions {
  maxAttempts: number
}
 
export interface BaseConsumeOptions {
  prefetch: number
  exceptionQueue?: string
}
 
export type PublishQueueOptions = Options.Publish
export type ConsumeQueueOptions = BaseConsumeOptions & Partial<RetryOptions> & Options.Consume

@PublishQueue()

Provide a MethodDecorator easily publishing message to queue

Options:

@PublishQueue(queue: string | Queue, options?: amqplib.Options.Publish)
yourPublishQueueMethod(content:any, options?: amqplib.Options.Publish){}

Example:

(You must register and enable AMQPModule)

@Injectable()
class TestMessageService {
  queue = 'TEST.QUEUE'
 
  @PublishQueue(queue)
  async testPublishQueue(content) {
    console.log(`call test publish queue with ${JSON.stringify(content)}`)
  }
}

@SubscribeQueue()

Provide a MethodDecorator easily consuming message and support simply requeue logic

Options:

@SubscribeQueue(nameOrQueue: string | Queue, options?: ConsumeQueueOptions)
yourSubscribeQueueMethod(content){}

ConsumeQueueOptions:

export interface RetryOptions {
  maxAttempts: number
}
 
export interface BaseConsumeOptions {
  prefetch: number
  exceptionQueue?: string
}
 
export type ConsumeQueueOptions = BaseConsumeOptions & Partial<RetryOptions>

Example:

You must register and enable AMQPModule

@Injectable()
class TestMessageService {
  queue = 'TEST.QUEUE'
 
  @SubscribeQueue(queue)
  async testSubscribeQueue(content) {
    // do your business handling code
    // save db? send email?
    console.log(`handling content ${JSON.stringify(content)}`)
  }
}

Interface Exchange

import { Options } from 'amqplib'
 
/**
 * @desc simply wrap amqp exchange definitions as interface
 * */
export interface Exchange {
  name: string
  type: string | 'direct' | 'fanout' | 'topic' | 'headers'
  options?: Options.AssertExchange
}
 
/**
 * @desc wrap amqp.Channel.publish(exchange: string, routingKey: string, content, options?: Publish): boolean
 *       as interface
 * */
export interface PublishExchangeOptions {
  routingKey: string
  options?: Options.Publish
}

@PublishExchange()

Not Stable

Provide a MethodDecorator easily publishing message to exchange

Options:

@PublishExchange(exchange: string | Exchange, options?: PublishExchangeOptions)
yourPublishExchangeMethod(content:any, options?: PublishExchangeOptions){}

Example:

No Example for stable usage, you can refer to unit test case (or submit PR)


@UseAMQPConnection(name?:string)

Provide a MethodDecorator easily spec connection (when you register AMQPModule) with @PublisQueue() and @SubscribeQueue)

Recommend if you want to develop npm package using spec named connection

Example:

@Injectable()
class AMQPLoggerService {
  queue = 'LOGGER.QUEUE'
 
  @UseAMQPConnection('logger')
  @PublishQueue(queue)
  async logSideEffect(content) {
    // just do nothing here and auto send to LOGGER.QUEUE with spec `logger` connection
  }
}

for more details, you can refer unittest cases.

Change Log

See CHANGELOG.md


LICENSE

Released under MIT License.

Install

npm i nestx-amqp

DownloadsWeekly Downloads

55

Version

1.3.2

License

MIT

Unpacked Size

55.4 kB

Total Files

41

Last publish

Collaborators

  • avatar