Nascent Personality Manifestation

    nestjs-nsq-transporter
    TypeScript icon, indicating that this package has built-in type declarations

    1.4.2 • Public • Published

    Nestjs Transporter For NSQ

    Overview

    This library provides a nestjs transporter by taking NSQ as its message broker. We could simply take the concept of a transporter as an event-driven framework as follows:

    • As a message producer, I could send a message to NSQ to various topics by emitting various events based on the topic names.
    • As a message consumer, I could receive a message by subscribing to those emitted events.

    For more details on the overall architecture, please kinldy refer to this article

    Restriction

    This library has not implemented the request-response communication style and currently it only supports the event-dispatching style.

    Produce a message

    Add the nsq client provider in your module's definition:

    import { DynamicModule, Module } from '@nestjs/common';
    import { ClientProxy } from '@nestjs/microservices';
    import { ClientNsq, NsqOptions } from 'nestjs-nsq-transporter';
    
    @Module({
      providers: [
        {
          provide: 'NSQ_CLIENT',
          useFactory: (): ClientProxy => new ClientNsq(options),
        }
        SomeService,
      ],
    })
    export class SomeModule {}

    Then you can inject the ClientNsq instance and use it to emit the message event:

    import { Injectable } from '@nestjs/common';
    
    @Injectable()
    export class SomeService {
      constructor(@Inject('NSQ_CLIENT') private nsqProducer: ClientNsq) {}
    
      sendMessage(topic: string, msg: any) {
        return this.nsqProducer.emit(topic, msg);
      }
    }

    Notes: You can not blindly pass any key/value as the 2nd argument(msg) of the emit function because there are two special reserved keys: meta and options. The meta is used by the default OutboundEventSerializer to attach extra informations; options is for setting the options for the nsq message producing, see the sample below.

        this.nsqProducer.emit(topic,
          { foo: 1, bar: 2, meta: { component: 'XYZ'}, options: {retry: { retries: 3 } }
        );
        // The `emit` above will send a nsq message with payload as follow with 3 times retry strategy.
        { data: { foo: 1, bar: 2 }, meta: { component: 'XYZ'} }

    Receive a message

    Connect to the nsq microservice in your app and specify the nsq options as follows:

    import { MicroserviceOptions } from '@nestjs/microservices';
    import { serverNsq } from 'nestjs-nsq-transporter';
    
    app.connectMicroservice<MicroserviceOptions>({
      strategy: new ServerNsq(options),
    });

    Then use @EventPattern to mark a function as the handler for messages from specific event pattern:

    import { Injectable } from '@nestjs/common';
    import { EventPattern, Ctx, Payload } from '@nestjs/microservices';
    
    @Controller()
    export class AppController {
    
      @EventPattern({
        topic: 'topic1',
        channel: 'channel1',
        options: { // optional
          maxAttempts: 3
        }
      })
      messageHandlerForTopic1(@Payload() payload: any, @Ctx() context: NsqContext)
        // Handle messages
        // Notes: if you throw an Error from this messageHandler, it's better to use `RpcException` so that we have explicit log in nsq-transporter.
      }
    }

    Definition of NsqOptions

    The options that passed to either ServerNsq or ClientNsq has the type as NsqOptions and the available fields are as folows:

    Field Name Is Required Type Description Example
    lookupdHTTPAddresses No string[] http address list for nsq lookupds ['http://localhost:4161']
    strategy No 'round_robin' or 'fan_out' message sending strategy round_robin
    discardHandler No (arg: any) => void handler function to process when message is discarded (arg: any) => console.log(arg)
    maxInFlight No number The maximum number of messages to process at once 1
    requeueDelay No number The default amount of time (milliseconds) a message requeued should be delayed by before being dispatched by nsqd. 90000
    lookupdPollInterval No number The frequency in seconds for querying lookupd instances. 60
    maxAttempts No number The number of times a given message will be attempted (given to MESSAGE handler) before it will be handed to the DISCARD handler and then automatically finished 3
    serializer No Serializer in @nestjs/microservices The instance of Serializer class which provides a serialize method to serialize the outbound message serialize(value: any) => value
    deserializer No ConsumerDeserializer in @nestjs/microservices The instance of ConsumerDeserializer class which provides a deserialize method to deserialize the inbound message deserialize(value: any) => value

    Install

    npm i nestjs-nsq-transporter

    DownloadsWeekly Downloads

    36

    Version

    1.4.2

    License

    MIT

    Unpacked Size

    37.3 kB

    Total Files

    33

    Last publish

    Collaborators

    • xavierchow
    • eronekogin