Net Possibility Multiplier

    @byndyusoft/nest-kafka
    TypeScript icon, indicating that this package has built-in type declarations

    1.2.1 • Public • Published

    nest-kafka

    npm@latest test code style: prettier semantic-release

    Kafka for NestJS

    Features

    • Multiple connections
    • Consumer and Producer with Schema Registry support (using kafkajs and @kafkajs/confluent-schema-registry under the hood)
    • Integration with nest-template
    • Consumer
      • Subscribe topic is not static, you can pick it from config
      • Process message in async context with Tracing and Logging
      • String, JSON and Schema Registry decoders for key and value, headers decoder with array support
      • Dead letter queue pattern support with smart retry mechanism
      • Support custom decoders and error handling patterns

    Requirements

    • Node.js v14 LTS or later
    • npm or yarn

    Install

    npm install @byndyusoft/nest-kafka @byndyusoft/class-validator-extended @byndyusoft/nest-opentracing @kafkajs/confluent-schema-registry @nestjs/common @nestjs/microservices class-transformer class-validator kafkajs nestjs-pino rxjs

    or

    yarn add @byndyusoft/nest-kafka @byndyusoft/class-validator-extended @byndyusoft/nest-opentracing @kafkajs/confluent-schema-registry @nestjs/common @nestjs/microservices class-transformer class-validator kafkajs nestjs-pino rxjs

    Usage

    Init

    1. Create KafkaConfigDto
    import {
      KafkaClusterConfigDto,
      KafkaConsumerConfigDto,
      KafkaProducerConfigDto,
      KafkaSchemaRegistryArgsConfigDto,
    } from "@byndyusoft/nest-kafka";
    import { Type } from "class-transformer";
    import { IsDefined, IsString, ValidateNested } from "class-validator";
    
    export class KafkaConfigDto {
      @Type(() => KafkaClusterConfigDto)
      @IsDefined()
      @ValidateNested()
      public readonly cluster!: KafkaClusterConfigDto;
    
      @Type(() => KafkaConsumerConfigDto)
      @IsDefined()
      @ValidateNested()
      public readonly consumer!: KafkaConsumerConfigDto;
    
      @Type(() => KafkaProducerConfigDto)
      @IsDefined()
      @ValidateNested()
      public readonly producer!: KafkaProducerConfigDto;
    
      @Type(() => KafkaSchemaRegistryArgsConfigDto)
      @IsDefined()
      @ValidateNested()
      public readonly schemaRegistry!: KafkaSchemaRegistryArgsConfigDto;
    
      @IsString()
      public readonly topic!: string;
    
      @IsString()
      public readonly errorTopic!: string;
    }
    2. Add KafkaConfigDto into ConfigDto
    import { Type } from "class-transformer";
    import { IsDefined, ValidateNested } from "class-validator";
    
    import { KafkaConfigDto } from "./kafkaConfigDto";
    
    export class ConfigDto {
      /// ...
    
      @Type(() => KafkaConfigDto)
      @IsDefined()
      @ValidateNested()
      public readonly kafka!: KafkaConfigDto;
    
      /// ...
    }
    3. Add env variables mapping
    import { Module } from "@nestjs/common";
    
    import { ConfigDto } from "./dtos";
    
    @Module({})
    export class ConfigModule {
      // ...
    
      private static __loadConfig(): ConfigDto {
        const plainConfig: ConfigDto = {
          // ...
          kafka: {
            cluster: {
              brokers: process.env.KAFKA_BROKERS as string,
              saslMechanism: process.env.KAFKA_SASL_MECHANISM,
              username: process.env.KAFKA_USERNAME,
              password: process.env.KAFKA_PASSWORD,
              ssl: process.env.KAFKA_SSL,
              ca: process.env.KAFKA_CA,
            },
            consumer: {
              groupId: process.env.KAFKA_CONSUMER_GROUP_ID as string,
              allowAutoTopicCreation:
                process.env.KAFKA_CONSUMER_ALLOW_AUTO_TOPIC_CREATION ?? true,
            },
            producer: {
              allowAutoTopicCreation:
                process.env.KAFKA_PRODUCER_ALLOW_AUTO_TOPIC_CREATION ?? true,
            },
            schemaRegistry: {
              host: process.env.KAFKA_SCHEMA_REGISTRY_HOST as string,
              username: process.env.KAFKA_SCHEMA_REGISTRY_USERNAME,
              password: process.env.KAFKA_SCHEMA_REGISTRY_PASSWORD,
            },
            topic: process.env.KAFKA_TOPIC as string,
            errorTopic: process.env.KAFKA_ERROR_TOPIC as string,
          },
          // ...
        };
    
        // ...
      }
    }
    4. Import KafkaModule
    import {
      KafkaClusterConfigDto,
      KafkaConsumerConfigDto,
      KafkaModule,
      KafkaProducerConfigDto,
      KafkaSchemaRegistryArgsConfigDto,
    } from "@byndyusoft/nest-kafka";
    
    import { ConfigDto } from "./config";
    
    @Module({
      imports: [
        // Extra modules
        // ...
        KafkaModule.registerAsync({
          inject: [ConfigDto],
          useFactory: (config: ConfigDto) => ({
            connections: [
              {
                cluster: KafkaClusterConfigDto.toRawConfig(config.kafka.cluster),
                consumer: KafkaConsumerConfigDto.toRawConfig(config.kafka.consumer),
                producer: KafkaProducerConfigDto.toRawConfig(config.kafka.producer),
                schemaRegistry: {
                  args: KafkaSchemaRegistryArgsConfigDto.toRawConfig(
                    config.kafka.schemaRegistry,
                  ),
                },
              },
            ],
            topicPickerArgs: [config],
          }),
        }),
        // ...
      ],
    })
    export class InfrastructureModule {
      // ...
    }
    4.1. You can describe multiple connections (farther use connectionName parameter in some functions to specify your connection)
    import {
      KafkaClusterConfigDto,
      KafkaConsumerConfigDto,
      KafkaModule,
      KafkaProducerConfigDto,
      KafkaSchemaRegistryArgsConfigDto,
    } from "@byndyusoft/nest-kafka";
    
    import { ConfigDto } from "./config";
    
    @Module({
      imports: [
        // Extra modules
        // ...
        KafkaModule.registerAsync({
          inject: [ConfigDto],
          useFactory: (config: ConfigDto) => ({
            connections: [
              {
                name: "connection1",
                cluster: KafkaClusterConfigDto.toRawConfig(config.kafka1.cluster),
                consumer: KafkaConsumerConfigDto.toRawConfig(
                  config.kafka1.consumer,
                ),
                producer: KafkaProducerConfigDto.toRawConfig(
                  config.kafka1.producer,
                ),
                schemaRegistry: {
                  args: KafkaSchemaRegistryArgsConfigDto.toRawConfig(
                    config.kafka1.schemaRegistry,
                  ),
                },
              },
              {
                name: "connection2",
                cluster: KafkaClusterConfigDto.toRawConfig(config.kafka2.cluster),
                consumer: KafkaConsumerConfigDto.toRawConfig(
                  config.kafka2.consumer,
                ),
                producer: KafkaProducerConfigDto.toRawConfig(
                  config.kafka2.producer,
                ),
                schemaRegistry: {
                  args: KafkaSchemaRegistryArgsConfigDto.toRawConfig(
                    config.kafka2.schemaRegistry,
                  ),
                },
              },
            ],
            topicPickerArgs: [config],
          }),
        }),
        // ...
      ],
    })
    export class InfrastructureModule {
      // ...
    }
    4.2. If you want, you can not create consumer, producer or schemaRegistry
    import {
      KafkaClusterConfigDto,
      KafkaConsumerConfigDto,
      KafkaModule,
      KafkaProducerConfigDto,
      KafkaSchemaRegistryArgsConfigDto,
    } from "@byndyusoft/nest-kafka";
    
    import { ConfigDto } from "./config";
    
    @Module({
      imports: [
        // Extra modules
        // ...
        KafkaModule.registerAsync({
          inject: [ConfigDto],
          useFactory: (config: ConfigDto) => ({
            connections: [
              {
                cluster: KafkaClusterConfigDto.toRawConfig(config.kafka.cluster),
                consumer: KafkaConsumerConfigDto.toRawConfig(config.kafka.consumer),
              },
            ],
            topicPickerArgs: [config],
          }),
        }),
        // ...
      ],
    })
    export class InfrastructureModule {
      // ...
    }
    5. Connect microservice to start consuming messages
    import { KafkaConsumer } from "@byndyusoft/nest-kafka";
    import { MicroserviceOptions } from "@nestjs/microservices";
    
    async function bootstrap(): Promise<void> {
      // ...
    
      app.connectMicroservice<MicroserviceOptions>({
        strategy: app.get(KafkaConsumer),
      });
    
      await app.startAllMicroservices();
    
      // ...
    }
    
    // ...

    Consuming Messages

    1. Create controller and use KafkaConsumerEventPattern to describe consumer
    import {
      IKafkaConsumerPayload,
      KafkaConsumerEventPattern,
    } from "@byndyusoft/nest-kafka";
    import { Controller } from "@nestjs/common";
    import { Payload } from "@nestjs/microservices";
    
    import { ConfigDto } from "~/src";
    
    @Controller()
    export class UsersConsumer {
      @KafkaConsumerEventPattern({
        topicPicker: (config: ConfigDto) => config.kafka.topic,
        fromBeginning: true,
      })
      public async onMessage(
        @Payload() payload: IKafkaConsumerPayload,
      ): Promise<void> {
        // ...
      }
    }
    2. Decode payload
    import {
      IKafkaConsumerPayload,
      KafkaConsumerEventPattern,
      KafkaConsumerPayloadDecoder,
    } from "@byndyusoft/nest-kafka";
    import { Controller, UseInterceptors } from "@nestjs/common";
    import { Payload } from "@nestjs/microservices";
    
    import { ConfigDto } from "~/src";
    import { UserDto } from "ᐸDtosᐳ";
    
    @Controller()
    export class UsersConsumer {
      @KafkaConsumerEventPattern({
        topicPicker: (config: ConfigDto) => config.kafka.topic,
        fromBeginning: true,
      })
      @UseInterceptors(
        new KafkaConsumerPayloadDecoder({
          key: "string",
          value: "json",
          headers: "string",
        }),
      )
      public async onMessage(
        @Payload() payload: IKafkaConsumerPayload<string, UserDto>,
      ): Promise<void> {
        // ...
      }
    }
    2.1. You can use param decorators to get key, value or headers
    import {
      IKafkaConsumerPayloadHeaders,
      KafkaConsumerEventPattern,
      KafkaConsumerPayloadDecoder,
      KafkaHeaders,
      KafkaKey,
      KafkaValue,
    } from "@byndyusoft/nest-kafka";
    import { Controller, UseInterceptors } from "@nestjs/common";
    
    import { ConfigDto } from "~/src";
    import { UserDto } from "ᐸDtosᐳ";
    
    @Controller()
    export class UsersConsumer {
      @KafkaConsumerEventPattern({
        topicPicker: (config: ConfigDto) => config.kafka.topic,
        fromBeginning: true,
      })
      @UseInterceptors(
        new KafkaConsumerPayloadDecoder({
          key: "string",
          value: "json",
          headers: "string",
        }),
      )
      public async onMessage(
        @KafkaKey() key: string,
        @KafkaValue() value: UserDto,
        @KafkaHeaders() headers: IKafkaConsumerPayloadHeaders,
      ): Promise<void> {
        // ...
      }
    }
    3. Always use some exception filter for correct error handling
    import {
      KafkaConsumerBaseExceptionFilter,
      KafkaConsumerEventPattern,
    } from "@byndyusoft/nest-kafka";
    import { Controller, UseFilters } from "@nestjs/common";
    
    import { ConfigDto } from "~/src";
    
    @Controller()
    export class UsersConsumer {
      @KafkaConsumerEventPattern({
        topicPicker: (config: ConfigDto) => config.kafka.topic,
        fromBeginning: true,
      })
      @UseFilters(/* ... */)
      public async onMessage(): Promise<void> {
        throw new Error("some error");
      }
    }
    3.1. Use KafkaConsumerBaseExceptionFilter if you prefer Stop on error pattern
    import {
      KafkaConsumerBaseExceptionFilter,
      KafkaConsumerEventPattern,
    } from "@byndyusoft/nest-kafka";
    import { Controller, UseFilters } from "@nestjs/common";
    
    import { ConfigDto } from "~/src";
    
    @Controller()
    export class UsersConsumer {
      @KafkaConsumerEventPattern({
        topicPicker: (config: ConfigDto) => config.kafka.topic,
        fromBeginning: true,
      })
      @UseFilters(new KafkaConsumerBaseExceptionFilter())
      public async onMessage(): Promise<void> {
        throw new Error("some error");
      }
    }
    3.2. Use KafkaConsumerErrorTopicExceptionFilter if you prefer Dead letter queue pattern
    import {
      KafkaConsumerErrorTopicExceptionFilter,
      KafkaConsumerEventPattern,
    } from "@byndyusoft/nest-kafka";
    import { Controller, UseFilters } from "@nestjs/common";
    
    import { ConfigDto } from "~/src";
    
    @Controller()
    export class UsersConsumer {
      @KafkaConsumerEventPattern({
        topicPicker: (config: ConfigDto) => config.kafka.topic,
        fromBeginning: true,
      })
      @UseFilters(
        new KafkaConsumerErrorTopicExceptionFilter({
          topicPicker: (config: ConfigDto) => config.kafka.errorTopic,
        }),
      )
      public async onMessage(): Promise<void> {
        throw new Error("some error");
      }
    }

    Producing Messages

    1. Inject KafkaProducer
    import { InjectKafkaProducer, KafkaProducer } from "@byndyusoft/nest-kafka";
    import { Injectable } from "@nestjs/common";
    
    @Injectable()
    export class UsersService {
      public constructor(
        @InjectKafkaProducer()
        private readonly __kafkaProducer: KafkaProducer,
      ) {}
    }

    Schema Registry

    1. Inject KafkaSchemaRegistry
    import {
      InjectKafkaSchemaRegistry,
      KafkaSchemaRegistry,
    } from "@byndyusoft/nest-kafka";
    import { Injectable } from "@nestjs/common";
    
    @Injectable()
    export class UsersService {
      public constructor(
        @InjectKafkaSchemaRegistry()
        private readonly __kafkaSchemaRegistry: KafkaSchemaRegistry,
      ) {}
    }

    Maintainers

    License

    This repository is released under version 2.0 of the Apache License.

    Keywords

    none

    Install

    npm i @byndyusoft/nest-kafka

    DownloadsWeekly Downloads

    60

    Version

    1.2.1

    License

    Apache-2.0

    Unpacked Size

    290 kB

    Total Files

    334

    Last publish

    Collaborators

    • alexanderbyndyu
    • sadcitizen
    • dmitriy.litichevskiy
    • razonrus