message 库封装了消息队列, 将复杂的调用封装成
rpc
: 有返回值的双向通信, 只有一个接收者能接收到消息subscribe
: 无返回值的订阅发布消息, 所有订阅者都能接收到消息.
目前底层使用 rabbitmq
, 系统初始化完成,如果中途发生断线将会自动重连
Example
//// producer//var mq = ; /*** @desc: 初始化消息队列.*/mq; /*** @desc: 初始化rpc通信.*/mqrpc;
//// customer//var mq = ; /*** @desc: 消息处理方法*/ { console; console; return err:msgdata; // return to sender.} { console; console; return err:msgdata;} /*** @desc: 初始化消息队列.*/mq; /*** @desc: 初始化rpc通信.*/mqrpc;
Error
系统错误使用 mq.Error
实例抛出异常.
全局配置
/*** @desc: 初始化系统.* @param errorLogCB: 错误log回调. function(msg, filename, line)*/
- 全局初始化后, 对需要用到的rpc, subscribe类型的消息队列再进行各自的初始化.
- 在开发模式下, 可以进行本机调试而不影响服务器上的服务; 例如: 存在一个名为
sysMain
的系统, 开发者使用sysMain_xxx
系统名称 进行注册, 在api层进行url的处理, 如,/?sys=xxx
, 将消息发送给sysMain_xxx
系统从而实现本机调试;
rpc
RPC 消息是远程调用消息, 只有一个消费者能接收到消息, 并且有返回值返回给生产者.
/*** @desc: 连接消息队列.* - 使用direct模式,只有一个阅者都能接收到消息.* @param opt: { // 全局唯一配置, 只有第一次调用有效. url: 'amqp://xxxxx', heartbeat: 10, // in seconds. reconnect: 10000, // 连接失败后多长时间重连. persistent: false, // 是否持久消息. registerPublisher: false, // 是否注册发布者. registerSubscriber: false, // 是否注册订阅者. errHandleCB: function(e, handleName, recvData:string):data; // 消息处理的错误处理函数. 返回对象将反馈给发送方. beforeHandleCB: function(requestData:any):any; // 消息处理前的回调. 返回null则正常处理, 否则将返回数据返回给rpc. beforeRequestCB: function(requestData:any):boolean; // 消息发送前的统一处理, 返回false则不进行发送. beforeResponseCB: function(requestData:any, responseData:any); // 消息返回前的统一处理. beforeReturnCB: function(recvData:any); // 通过request接口发送消息后, 消息已经通过网络接收到, 方法返回前的处理. devSingle: false, // 在单包模式下开发, 此模式下不使用真实的消息队列, 而在接口一致的情况下使用本地缓存进行开发. }* @param recvFromSys: 接收哪些系统的消息,使用系统的名称 sys.main, ...* 如果不指定, 则不接收消息.* @return: throw in err.*/ /*** @desc: 注册消息处理器. 如果在处理消息的过程中发生了异常, 则会调用errHandleCB.* @param handleName: 处理器名称.* @param handle: async function(jsonData):data; 返回data给sender.* @return: */function bind(handleName:string, handle:func) /*** @desc: 发送消息.* @param receiver: { handler: '', // 消息处理器. recvSys: '', // 接收的系统. }* @param data: (json). 需要发送的数据.* @return: Promise. - resolve(msg) - catch('timeout')*/async function request(receiver, data)
subscribe
sbscribe 消息是订阅消息, 所有订阅者都能接收到消息, 无返回值返回给生产者.
/*** @desc: 连接消息队列.* - 使用subscribe模式,所有的订阅者都能接收到消息.* @param opt: { // 全局唯一配置, 只有第一次调用有效. url: 'amqp://xxxxx', heartbeat: 10, // in seconds. reconnect: 10000, // 连接失败后多长时间重连. persistent: false, // 是否持久消息. registerPublisher: false, // 是否注册发布者. registerSubscriber: false, // 是否注册订阅者. errHandleCB: function(e, handleName, recvData:string):void; // 消息处理的错误处理函数. 返回true则会从队列中移除消息. beforeHandleCB: function(requestData:any); // 消息处理前的回调. beforeRequestCB: function(requestData:any):boolean; // 消息发送前的统一处理, 返回false则不进行发送. devSingle: false, // 在单包模式下开发, 此模式下不使用真实的消息队列, 而在接口一致的情况下使用本地缓存进行开发. }* @param recvFromSys: 接收哪些系统的消息,使用系统的名称 sys.main, ...* 如果不指定, 则不接收消息.* @return: Promise*/ /*** @desc: 注册消息处理器.* @param handleName: 处理器名称.* @param handle: function(jsonData)* @return: */function bind(handleName:string, handle:func) /*** @desc: 发布消息.* @param receiver: { handler: '', // 消息处理器. recvSys: '', // 接收的系统. }* @param data: (json). 需要发送的数据.* @return: boolean.*/async function publish(receiver, data)
devSingle example
单包开发模式下, 可以将多个系统统一个入口启动 (单实例), 方便调试.
rpc client.
File: client.js
var mq = ; /*** @desc: 初始化消息队列.*/mq; /*** @desc: 初始化rpc通信.*/mqrpc;
rpc server.
File: server.js
var mq = ; /*** @desc: 消息处理方法*/ { console; console; return err:msgdata; // return to sender.} { console; console; return err:msgdata;} /*** @desc: 初始化消息队列.*/mq; /*** @desc: 初始化rpc通信.*/mqrpc ;
仅启动 server.js
进程.