enn-egg-kafka
依赖说明
依赖的 egg 版本
enn-egg-kafka 版本 | egg 1.x |
---|---|
1.x | |
0.x |
依赖的插件
开启插件
// config/plugin.js
exports.kafka = {
enable: true,
package: 'enn-egg-kafka',
};
使用场景
- 对kafkajs模块封装的eggjs插件
配置
- 配置
config/config.default.js
config.kafka = {
client: {
clientId: 'broker-data-sync',
brokers: ['10.0.3.42:9092'],
consumer: {
groupId: 'group-test',
},
producer: {
allowAutoTopicCreation: true,
},
},
afterReady: false,
};
其中afterReady为新增字段,设置为true后,等待端口启动完成,再启动插件,其他配置字段详见 https://kafka.js.org/docs/getting-started
使用
/*
* 生产消息
*/
await app.kafka.producerSend('topic-enn-rd', [{ key: 'key2111', value: 'hey hey!1111' }])
/*
committed offset when starting to fetch messages. If the offset is invalid or not defined, fromBeginning defines the behavior of the consumer group. This can be configured when subscribing to a topic.
//When fromBeginning is true, the group will use the earliest offset. If set to false, it will use the latest offset. The default is false.
/*
* 消费消息-单条 fromBeginning默认为true,从commit后的offset开始消费
*/
await app.kafka.consumerInit('test_topic');
app.kafka.consumer.run({
eachMessage: async ({
topic,
partition,
message,
heartbeat
}) => {
const data = {
partition,
topic,
key: message.key.toString(),
value: message.value.toString(),
headers: message.headers,
timestamp: message.timestamp,
offset: message.offset,
};
if (message.extra) {
data.extra = message.extra.toString();
}
console.log(message)
}
});
/*
* 消费消息-批量
*/
await app.kafka.consumerInit('test_topic');
app.kafka.consumer.run({
eachBatchAutoResolve: true,
eachBatch: async ({
batch,
resolveOffset,
heartbeat,
commitOffsetsIfNecessary,
uncommittedOffsets,
isRunning,
isStale,
}) => {
const datas = [];
for (let message of batch.messages) {
console.log(message)
const data = {
topic: batch.topic,
partition: batch.partition,
highWatermark: batch.highWatermark,
message: {
offset: message.offset,
key: message.key.toString(),
value: message.value.toString(),
headers: message.headers,
}
};
if (message.extra) {
data.extra = message.extra.toString();
}
resolveOffset(message.offset);
await heartbeat();
}
},
})