tick-core-kafka: kafka client
下载
npm install @tick-core/kafka --save
npm install kafka-node --save
使用
import { BaseKafkaClient, KafkaConsumerInter, KafkaProducerInter, Config } from '@tick-core/kafka'
const conf: Config = {
test: {
clientOption: {
kafkaHost: 'host:port'
}
}
}
const baseKafkaClient: BaseKafkaClient = new BaseKafkaClient(conf)
const kafkaConsumer: KafkaConsumerInter = await baseKafkaClient.getConsumerClient({
key: 'test',
groupId: 'test',
topic: 'ceshi',
partitions: 0,
})
kafkaConsumer.initMessage({
messageCall: (message, originalMessage) => {
console.log(message)
console.log(originalMessage)
},
errorCall: error => {
console.log(error)
},
offsetOutOfRangeErrorCall: error => {
console.log(error)
}
})
const kafkaProducer: KafkaProducerInter = await baseKafkaClient.getProducerClient('test')
kafkaProducer.send({
topic: 'ceshi',
messages: {
ss: 'sssss'
},
partition: 0
})
OR
import { BaseKafkaClient, KafkaConsumerInter, KafkaProducerInter, Config } from '@tick-core/kafka'
import {
Zconf,
} from '@tick-core/zconf'
const zconf = new Zconf({
zookeeperConfs: [{
host: '127.0.0.1',
port: 2181,
}]
})
const conf: Config = {
test: {
clientOption: {
kafkaHost: '/xxxx/xxxxxx'
}
}
}
const baseKafkaClient: BaseKafkaClient = new BaseKafkaClient(conf)
const kafkaConsumer: KafkaConsumerInter = await baseKafkaClient.getConsumerClient({
key: 'test',
groupId: 'test',
topic: 'ceshi',
partitions: 0,
})
kafkaConsumer.initMessage({
messageCall: (message, originalMessage) => {
console.log(message)
console.log(originalMessage)
},
errorCall: error => {
console.log(error)
},
offsetOutOfRangeErrorCall: error => {
console.log(error)
}
})
const kafkaProducer: KafkaProducerInter = await baseKafkaClient.getProducerClient('test')
kafkaProducer.send({
topic: 'ceshi',
messages: {
ss: 'sssss'
},
partition: 0
})