基于 kafka-node 的一层封装。
npm i @blued-core-oversea/kafka-client
import { KafkaClient } from '../index'
const kafka = {
oversea: '/blued/backend/ukafka/oversea_app/bootstrap',
}
const kafkaClient = new KafkaClient(kafka)
export const liveLogKafkaClient = () => kafkaClient.getClient('oversea')
const testData = { timestamp: 1629460128, data: { uid: 90000024, extra: { event: 15, targetUid: 90000024, liveId: 0, taskLevel: '4', taskId: '8' } }, app: 2 }
const testData2 = { timestamp: 1629460144, data: { uid: 90000024, extra: { event: 15, targetUid: 90000024, liveId: 0, taskLevel: '4', taskId: '8' } }, app: 2 }
const testData3 = { timestamp: 1629460155, data: { uid: 90000024, extra: { event: 15, targetUid: 90000024, liveId: 0, taskLevel: '4', taskId: '8' } }, app: 2 }
const testData4 = { timestamp: 1629460166, data: { uid: 90000024, extra: { event: 15, targetUid: 90000024, liveId: 0, taskLevel: '4', taskId: '8' } }, app: 2 }
const testData5 = { timestamp: 1629460177, data: { uid: 90000024, extra: { event: 15, targetUid: 90000024, liveId: 0, taskLevel: '4', taskId: '8' } }, app: 2 }
function getPartition(size: number) {
const data = Math.floor(Math.random() * size)
return data >= 1 ? data : 1
}
export const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms))
const main = async () => {
// 自封装调用
liveLogKafkaClient().send(
'report-log',
JSON.stringify(testData5),
{
key: '',
partition: getPartition(3),
attributes: 0,
}
)
console.log('--------------before ready')
await sleep(10000)
console.log('---------------start')
// 自封装调用
liveLogKafkaClient().send(
'report-log',
JSON.stringify(testData),
{
key: '',
partition: getPartition(3),
attributes: 0,
}
)
// 自封装调用「async模式」
await liveLogKafkaClient().sendAsync(
'report-log',
JSON.stringify(testData3),
{
key: '',
partition: getPartition(3),
attributes: 0,
}
)
// 原生调用
liveLogKafkaClient().primarySend([{
topic: 'report-log',
key: '',
partition: getPartition(3),
attributes: 0,
messages: JSON.stringify(testData2),
}])
// 原生调用「async模式」
await liveLogKafkaClient().primarySendAsync([{
topic: 'report-log',
key: '',
partition: getPartition(3),
attributes: 0,
messages: JSON.stringify(testData4),
}])
console.log('---------------over')
await sleep(10000)
}
main().then(() => {
console.log('success')
}).catch(error => {
console.log(error)
})