pandakafka

0.0.3 • Public • Published

node-rdkafka的高级封装,在消息队列的基础上现实了类RPC远程调用.

特性

  • 消息队列
  • 远程调用
  • 推送回调.
  • 通用组件.
  • 拓展服务.
  • 中间件.

关于

该系统构建的目的在于解决Jessehealth公司的内部高并发集群架构问题.

用法

快速入门

const pandakafka = require("pandakafka")
 
// 消费
const consumer = new pandakafka({
  type: "consumer",
  configure: {
    name: "test",
    host: "localhost:9092",
    topic: "test"
  },
  callback: function (msg, callback) {
    console.log(msg.data.toString()) // "我的名字?"
    callback("panda")
  }
})
 
// 生产
const producer = new pandakafka({
  type: "producer",
  configure: {
    name: "test",
    host: "localhost:9092",
    topic: "test"
  }
})
 
// 生产测试消息并等待回调
producer.request("我的名字?", function (cb) {
  console.log(cb.data.toString()) // "panda"
})

配置

type: "consumer" // 模式 (consumer or producer)
configure: 
  name: "test" // 组ID
  host: "localhost:9092" // 服务器
  topic: "test" // 主题
  timeout: // 默认值为下面的示例
    loop: 600000 // 消息回调清扫的周期
    max: 1200000 // 消息回调的保留时间
callback: function // 处理需要回调的消息

异步处理请求并返回回调

// 消费
const consumer = new pandakafka({
  type: "consumer",
  configure: {
    name: "test",
    host: "localhost:9092",
    topic: "test"
  },
  callback: async function (msg, callback) {
    console.log(msg.data.toString()) // "我的名字?"
    return "panda"
  }
})

直接推送消息不要求回调

producer.send("这条消息不要求回调")

消息类型(send and request) 这里只举例send

producer.send("这条消息不要求回调")
producer.send({ text: "这条消息不要求回调" })
producer.send(Buffer.from("这条消息不要求回调"))

全局的API

.use(function) // 中间件
.data(callback function[massage]) // 接收直接推送的消息
.ready // true, false 获取是否已连接并准备完成
.undefinedBind(callback function[massage]) // 接收到的没有找到回调绑定函数的消息

许可

MIT

Copyright (c) 2018 Mr.Panda.

Package Sidebar

Install

npm i pandakafka

Weekly Downloads

0

Version

0.0.3

License

MIT

Unpacked Size

13.3 kB

Total Files

4

Last publish

Collaborators

  • xivistudios