febs-message
TypeScript icon, indicating that this package has built-in type declarations

3.0.0 • Public • Published

message 库封装了消息队列, 将复杂的调用封装成

  • rpc: 有返回值的双向通信, 只有一个接收者能接收到消息
  • subscribe: 无返回值的订阅发布消息, 所有订阅者都能接收到消息.

目前底层使用 rabbitmq, 系统初始化完成,如果中途发生断线将会自动重连

Example

//
// producer
//
var mq = require('febs-message');
 
/**
@desc: 初始化消息队列.
*/
mq.init(function(msg, filename, line){
  console.error(msg);
});
 
/**
@desc: 初始化rpc通信.
*/
mq.rpc.init({
  url:        'amqp://xxx',
  heartbeat:  10,      // in seconds.
  reconnect:  10000,   // 连接失败后多长时间重连.
  rpcTimout:  5000,    // rpc等待返回消息的超时.
  persistent: false,        // 是否持久消息.
  registerPublisher: true,
  registerSubscriber: false,
  // devSingle: ['live'],
},
null  // 不接收消息.
).then(()=>{
  console.log("Connect Success");
 
  // 发送消息.
  mq.rpc.request({handler:"handle1", recvSys:'live'}, {message:'hello', data:1})
    .then(ret=>{
      console.log('return message: ');
      console.log(ret);
    });
});
 
//
// customer
//
var mq = require('febs-message');
 
 
/**
@desc: 消息处理方法
*/
 
async function handle1(msg) {
  console.log('handle1:');
  console.log(msg);
 
  return {err:msg.data};  // return to sender.
}
 
async function handle2(msg) {
  console.log('handle2:');
  console.log(msg);
 
  return {err:msg.data};
}
 
 
/**
@desc: 初始化消息队列.
*/
mq.init(function(msg, filename, line){
  console.error(msg);
});
 
/**
@desc: 初始化rpc通信.
*/
mq.rpc.init({
  url:        'amqp://xxx',
  heartbeat:  10,      // in seconds.
  reconnect:  10000,   // 连接失败后多长时间重连.
  rpcTimout:  5000,    // rpc等待返回消息的超时.
  persistent: false,        // 是否持久消息.
  registerSubscriber: true,
  // devSingle: ['live'],
},
'live',  // 接收发送给live系统的消息.
'live2'  // 接收发送给live系统的消息.
).then(()=>{
 
  console.log('Connect success');
 
  // 绑定消息处理方法.
  mq.rpc.bind('handle1', handle1);
  mq.rpc.bind('handle2', handle2);
});
 

Error

系统错误使用 mq.Error 实例抛出异常.

全局配置

/**
@desc: 初始化系统.
@param errorLogCB: 错误log回调. function(msg, filename, line)
*/
function init(errorLogCB)
  • 全局初始化后, 对需要用到的rpc, subscribe类型的消息队列再进行各自的初始化.
  • 在开发模式下, 可以进行本机调试而不影响服务器上的服务; 例如: 存在一个名为 sysMain 的系统, 开发者使用 sysMain_xxx 系统名称 进行注册, 在api层进行url的处理, 如, /?sys=xxx, 将消息发送给 sysMain_xxx 系统从而实现本机调试;

rpc

RPC 消息是远程调用消息, 只有一个消费者能接收到消息, 并且有返回值返回给生产者.

/**
@desc:  连接消息队列.
*         - 使用direct模式,只有一个阅者都能接收到消息.
@param opt: { // 全局唯一配置, 只有第一次调用有效.
              url:        'amqp://xxxxx',
              heartbeat:  10,      // in seconds.
                reconnect:  10000,   // 连接失败后多长时间重连.
                persistent: false,        // 是否持久消息.
                registerPublisher: false,   // 是否注册发布者.
                registerSubscriber: false,  // 是否注册订阅者.
                errHandleCB: function(e, handleName, recvData:string):data; // 消息处理的错误处理函数. 返回对象将反馈给发送方.
                beforeHandleCB: function(requestData:any):any; // 消息处理前的回调. 返回null则正常处理, 否则将返回数据返回给rpc.
                beforeRequestCB: function(requestData:any):boolean;  // 消息发送前的统一处理, 返回false则不进行发送.
                beforeResponseCB: function(requestData:any, responseData:any);  // 消息返回前的统一处理.          
                beforeReturnCB: function(recvData:any); // 通过request接口发送消息后, 消息已经通过网络接收到, 方法返回前的处理.
                devSingle:   false,         // 在单包模式下开发, 此模式下不使用真实的消息队列, 而在接口一致的情况下使用本地缓存进行开发.
              }
@param recvFromSys: 接收哪些系统的消息,使用系统的名称 sys.main, ...
*                     如果不指定, 则不接收消息.
@return: throw in err.
*/
function init(opt:object, ...recvFromSys:string[])
 
/**
@desc: 注册消息处理器. 如果在处理消息的过程中发生了异常, 则会调用errHandleCB.
@param handleName: 处理器名称.
@param handle: async function(jsonData):data; 返回data给sender.
@return
*/
function bind(handleName:string, handle:func)
 
/**
@desc: 发送消息.
@param receiver: 
    {
      handler: '', // 消息处理器.
      recvSys: '', // 接收的系统.
    }
@param data: (json). 需要发送的数据.
@return: Promise.
            - resolve(msg)
            - catch('timeout')
*/
async function request(receiver, data)

subscribe

sbscribe 消息是订阅消息, 所有订阅者都能接收到消息, 无返回值返回给生产者.

/**
@desc:  连接消息队列.
*         - 使用subscribe模式,所有的订阅者都能接收到消息.
@param opt: { // 全局唯一配置, 只有第一次调用有效.
              url:        'amqp://xxxxx',
                heartbeat:  10,      // in seconds.
                reconnect:  10000,   // 连接失败后多长时间重连.
                persistent: false,        // 是否持久消息.
                registerPublisher: false,   // 是否注册发布者.
                registerSubscriber: false,  // 是否注册订阅者.
                errHandleCB: function(e, handleName, recvData:string):void; // 消息处理的错误处理函数. 返回true则会从队列中移除消息.
                beforeHandleCB: function(requestData:any); // 消息处理前的回调.
                beforeRequestCB: function(requestData:any):boolean;  // 消息发送前的统一处理, 返回false则不进行发送.
                devSingle:   false,         // 在单包模式下开发, 此模式下不使用真实的消息队列, 而在接口一致的情况下使用本地缓存进行开发.
              }
@param recvFromSys: 接收哪些系统的消息,使用系统的名称 sys.main, ...
*                     如果不指定, 则不接收消息.
@return: Promise
*/
function init(opt:object, ...recvFromSys:string[])
 
/**
@desc: 注册消息处理器.
@param handleName: 处理器名称.
@param handle: function(jsonData)
@return
*/
function bind(handleName:string, handle:func)
 
/**
@desc: 发布消息.
@param receiver: 
    {
      handler: '', // 消息处理器.
      recvSys: '', // 接收的系统.
    }
@param data: (json). 需要发送的数据.
@return: boolean.
*/
async function publish(receiver, data)

devSingle example

单包开发模式下, 可以将多个系统统一个入口启动 (单实例), 方便调试.

rpc client.

File: client.js

 
var mq = require('febs-message');
 
/**
@desc: 初始化消息队列.
*/
mq.init(function(msg, filename, line){ // log处理方法
  console.log(msg);
});
 
/**
@desc: 初始化rpc通信.
*/
mq.rpc.init({
  url:        'amqp://xxxx',
  heartbeat:  10,      // in seconds.
  reconnect:  10000,   // 连接失败后多长时间重连.
  rpcTimout:  5000,    // rpc等待返回消息的超时.
  persistent: false,        // 是否持久消息.
  registerPublisher: true,
  registerSubscriber: false,
  devSingle: ['sysname'],
},
null  // 不接收消息.
).then(()=>{
  console.log("OK");
 
  // 发送消息.
  mq.rpc.request({handler:"handle1", recvSys:'sysname'}, {message:'hello', data:1})
    .then(ret=>{
      console.log('return message: ');
      console.log(ret);
    });
});

rpc server.

File: server.js

 
var mq = require('febs-message');
 
 
/**
@desc: 消息处理方法
*/
 
async function handle1(msg) {
  console.log('handle1:');
  console.log(msg);
 
  return {err:msg.data};  // return to sender.
}
 
async function handle2(msg) {
  console.log('handle2:');
  console.log(msg);
 
  return {err:msg.data};
}
 
 
/**
@desc: 初始化消息队列.
*/
mq.init(function(msg, filename, line){
  console.error(msg);
});
 
/**
@desc: 初始化rpc通信.
*/
mq.rpc.init({
      url:        'amqp://xxxx',
      heartbeat:  10,      // in seconds.
      reconnect:  10000,   // 连接失败后多长时间重连.
      rpcTimout:  5000,    // rpc等待返回消息的超时.
      persistent: false,        // 是否持久消息.
      registerSubscriber: true,
      devSingle: ['sysname'],
    }, 'sysname'  // 接收发送给live系统的消息.
  )
  .then(()=>{
    console.log('ok');
 
    // 绑定消息处理方法.
    mq.rpc.bind('handle1', handle1);
    mq.rpc.bind('handle2', handle2);
 
    require('./client');
  });
 

仅启动 server.js 进程.

Readme

Keywords

none

Package Sidebar

Install

npm i febs-message

Weekly Downloads

0

Version

3.0.0

License

MIT

Unpacked Size

58.7 kB

Total Files

15

Last publish

Collaborators

  • brainpoint