enn-egg-kafka

2.0.11 • Public • Published

enn-egg-kafka

NPM version build status Test coverage David deps Known Vulnerabilities npm download

依赖说明

依赖的 egg 版本

enn-egg-kafka 版本 egg 1.x
1.x 😁
0.x

依赖的插件

开启插件

// config/plugin.js
exports.kafka = {
  enable: true,
  package: 'enn-egg-kafka',
};

使用场景

  • kafkajs模块封装的eggjs插件

配置

  • 配置 config/config.default.js
  config.kafka = {
    client: {
      clientId: 'broker-data-sync',
      brokers: ['10.0.3.42:9092'],
      consumer: {
        groupId: 'group-test',
      },
      producer: {
        allowAutoTopicCreation: true,
      },
    },
    afterReady: false,
  };

其中afterReady为新增字段,设置为true后,等待端口启动完成,再启动插件,其他配置字段详见 https://kafka.js.org/docs/getting-started

使用

/*
* 生产消息
*/
  await app.kafka.producerSend('topic-enn-rd', [{ key: 'key2111', value: 'hey hey!1111' }])
  /*
committed offset when starting to fetch messages. If the offset is invalid or not defined, fromBeginning defines the behavior of the consumer group. This can be configured when subscribing to a topic.
  //When fromBeginning is true, the group will use the earliest offset. If set to false, it will use the latest offset. The default is false.
/*
* 消费消息-单条 fromBeginning默认为true,从commit后的offset开始消费
*/
  await app.kafka.consumerInit('test_topic');
  app.kafka.consumer.run({
    eachMessage: async ({
      topic,
      partition,
      message,
      heartbeat
    }) => {
      const data = {
        partition,
        topic,
        key: message.key.toString(),
        value: message.value.toString(),
        headers: message.headers,
        timestamp: message.timestamp,
        offset: message.offset,
      };
      if (message.extra) {
        data.extra = message.extra.toString();
      }
      console.log(message)
    }
  });
  
/*
* 消费消息-批量
*/
  await app.kafka.consumerInit('test_topic');
  app.kafka.consumer.run({
    eachBatchAutoResolve: true,
    eachBatch: async ({
      batch,
      resolveOffset,
      heartbeat,
      commitOffsetsIfNecessary,
      uncommittedOffsets,
      isRunning,
      isStale,
    }) => {
      const datas = [];
      for (let message of batch.messages) {
        console.log(message)
        const data = {
          topic: batch.topic,
          partition: batch.partition,
          highWatermark: batch.highWatermark,
          message: {
            offset: message.offset,
            key: message.key.toString(),
            value: message.value.toString(),
            headers: message.headers,
          }
        };
        if (message.extra) {
          data.extra = message.extra.toString();
        }
        resolveOffset(message.offset);
        await heartbeat();
      }
    },
  })

License

MIT

Package Sidebar

Install

npm i enn-egg-kafka

Weekly Downloads

0

Version

2.0.11

License

MIT

Unpacked Size

8.69 kB

Total Files

6

Last publish

Collaborators

  • mhzever