@fastcar/rpc
TypeScript icon, indicating that this package has built-in type declarations

0.0.16 • Public • Published

@fastcar/core框架下整合的rpc框架

快速安装

npm install @fastcar/rpc

框架说明

  • 参考koa框架,结合koa-compose的思想,制定了一套rpc基本的通信规则
  • 顶层主要是RpcServer和RpcClient两个类的封装,其中RpcSevrer整合成了一个组件,直接在入口处配置即可
  • 消息通信采用长连接,一般按规定抽象成SocketServer和SocketClient,再由底层进行实现

框架类型支持

  • 从第三方依赖库上来说支持socket.io,mqtt,ws和grpc
  • 从长连接通讯协议来说支持ws,wss,mqtt,mqtts,grpc等

第三方依赖包安装

  • 使用socket.io时,安装 socket.io 和 socket.io-client

  • 使用mqtt时,安装 aedes 和 mqtt,如果要支持ws协议还需要装websocket-stream

  • 使用ws时,安装 ws

  • 使用grpc时,安装@grpc/proto-loader和@grpc/grpc-js

消息通信配置说明

//服务端配置
type RpcConfig = {
 list: SocketServerConfig[];
 retry: {
  retryCount: number; //错误重试次数 默认三次
  retryInterval: number; //重试间隔 默认一秒
  maxMsgNum: number; //最大并发数
  timeout: number; //超时时间
 };
};

//客户端配置
type RpcClientConfig = {
 retryCount: number; //错误重试次数 默认三次
 retryInterval: number; //重试间隔 默认一秒
 maxMsgNum: number; //最大并发数
 timeout: number; //超时时间
} & SocketClientConfig;
//服务端连接配置
 type SocketServerConfig = {
 id: string; //编号名称
 type: SocketEnum; //具体为哪一种型号的连接器
 server: ServerConfig; //这边和@fastcar/server的配置一致
 extra?: any; //第三方拓展配置 用于灵活的调用第三方
 serviceType: string; //服务器用途类型 用于表名是何种服务器
 encode?: EncodeMsg; //编码解码
 decode?: DecodeMsg;
 secure?: SecureClientOptions;
} & { [key: string]: any };

//长连接配置
 type SocketClientConfig = {
 url: string; //连接地址
 type: SocketEnum; //具体为哪一种型号的连接器
 extra?: any; //第三方拓展参数
 encode?: EncodeMsg; //解码器
 decode?: DecodeMsg;
 disconnectInterval?: number;
 secure?: SecureClientOptions;
} & { [key: string]: any };
application:
    name: "simple"
settings:
    rpc:
        list:
            - {
                  id: "rpc-server-1",
                  type: "socket.io",
                  server: { port: 1235 },
                  extra: {},
                  serviceType: "rpc",
              }
            - {
                  id: "rpc-server-2",
                  type: "mqtt",
                  server: { port: 1236, protocol: "net" },
                  extra: {},
                  serviceType: "rpc", #如果是ws则协议连接为http
              }
            - {
                  id: "rpc-server-3",
                  type: "socket.io",
                  server: { port: 1237 },
                  extra: {},
                  serviceType: "rpc",
                  secure: { username: "user", password: "123456" }, #连接前进行校验
              }
            - {
                  id: "rpc-server-4",
                  type: "ws",
                  server: { port: 1238 },
                  serviceType: "rpc",
                  secure: { username: "user", password: "123456" }, #连接前进行校验
              }
            - {
                  id: "rpc-server-5",
                  type: "mqtt",
                  server:
                      {
                          port: 1239,
                          protocol: "https",
                          ssl:
                              {
                                  key: "./ssl/localhost-key.pem",
                                  cert: "./ssl/localhost-cert.crt",
                              },
                      },
                  serviceType: "rpc",
              }
            - {
              id: "rpc-server-6",
              type: grpc,
              server: {
                port: 1240,
                ssl: {
                  ca: "./cert/ca.crt",
                  key: "./cert/server.key",
                  cert: "./cert/server.crt",
                },
              },
              serviceType: "rpc",
              codeProtocol: "protobuf",
              extra: {
                checkClientCertificate: true,
              },
            }

消息使用示例说明

//客户端测试
class NotifyHandle implements RpcAsyncService {
 async handleMsg(url: string, data: Object): Promise<void | Object> {
  console.log("收到服务端消息", url, data);
  return {
   url,
   data: "来自客户端的消息---",
  };
 }
}

@Application
@BasePath(__dirname)
@BaseFilePath(__filename)
@EnableRPC
class APP {
 app!: FastCarApplication;
}
const appInstance = new APP();

describe("rpc交互测试", () => {
 it("服务端和客户端交互", async () => {
  let client1 = new RpcClient(
   {
    // url: "ws://localhost:1235",
    // type: SocketEnum.SocketIO,
    // url: "mqtt://localhost:1236",
    // type: SocketEnum.MQTT,
    url: "ws://localhost:1238",
    type: SocketEnum.WS,
    secure: { username: "user", password: "123456" },
   },
   new NotifyHandle()
  );
  await client1.start();
  let result = await client1.request("/hello");
  console.log("普通调用", result);
  let result2 = await client1.request("/head/hello");
  console.log("追加了head的url", result2);
  let sessionId = client1.getSessionId();
  let server: RpcServer = appInstance.app.getComponentByTarget(RpcServer);
  let result3 = await server.request(sessionId, "/server/test", "发送至客户端");
  console.log("服务端收到客户端应答", result3);
  let result4 = await client1.request("/asynchello");
  console.log("普通调用", result4);
 });

 it("客户端主动断开连接", async () => {
  let client2 = new RpcClient(
   {
    // url: "ws://localhost:1235",
    // type: SocketEnum.SocketIO,
    url: "mqtt://localhost:1236",
    type: SocketEnum.MQTT,
   },
   new NotifyHandle()
  );
  await client2.start();
  client2.stop("客户端主动关闭");
 });
 it("服务端主动断开客户端连接", async () => {
  let client3 = new RpcClient(
   {
    // url: "ws://localhost:1235",
    // type: SocketEnum.SocketIO,
    url: "mqtt://localhost:1236",
    type: SocketEnum.MQTT,
   },
   new NotifyHandle()
  );
  await client3.start();
  let sessionId = client3.getSessionId();
  let server: RpcServer = appInstance.app.getComponentByTarget(RpcServer);
  server.kickSessionId(sessionId, "服务端强制客户端下线");
 });


 it("服务端断线重连", async () => {
  let client4 = new RpcClient(
   {
    // url: "ws://localhost:1235",
    // type: SocketEnum.SocketIO,
    url: "mqtt://localhost:1236",
    type: SocketEnum.MQTT,
    retryCount: 3, //错误重试次数 默认三次
    retryInterval: 100, //重试间隔 默认一秒
    maxMsgNum: 10000, //最大消息并发数
    timeout: 3000,
    disconnectInterval: 1000,
   },
   new NotifyHandle()
  );
  await client4.start();
  let server: RpcServer = appInstance.app.getComponentByTarget(RpcServer);
  await server.stop();
  setTimeout(() => {
   server.start();
  }, 2000);
  // setTimeout(async () => {
  //  try {
  //   let result = await client4.request("/hello");
  //   console.log(result);
  //  } catch (e) {
  //   console.log(e);
  //  }
  // }, 2000);
 });

 it("连接认证测试", async () => {
  let client1 = new RpcClient(
   {
    url: "ws://localhost:1237",
    type: SocketEnum.SocketIO,
    retryCount: 3, //错误重试次数 默认三次
    retryInterval: 100, //重试间隔 默认一秒
    maxMsgNum: 10000, //最大消息并发数
    timeout: 3000,
    disconnectInterval: 1000,
   },
   new NotifyHandle()
  );
  let connect1 = await client1.start();
  console.assert(connect1 == true);
  if (!connect1) {
   client1.close();
  }
  let client2 = new RpcClient(
   {
    url: "ws://localhost:1237",
    type: SocketEnum.SocketIO,
    retryCount: 3, //错误重试次数 默认三次
    retryInterval: 100, //重试间隔 默认一秒
    maxMsgNum: 10000, //最大消息并发数
    timeout: 3000,
    disconnectInterval: 1000,
    secure: {
     username: "user",
     password: "123456",
    },
   },
   new NotifyHandle()
  );
  let connect2 = await client2.start();
  console.assert(connect2 == true);
 });

 it("ssl 测试", async () => {
  let client1 = new RpcClient(
   {
    url: "wss://localhost:1239",
    type: SocketEnum.MQTT,
    extra: {
     rejectUnauthorized: false, //当没有证书时这边设置为忽略
    },
   },
   new NotifyHandle()
  );
  await client1.start();
  let result = await client1.request("/hello");
  console.log("普通调用", result);
 });
});
 //优雅的调用请求
  let result = await ClientRequestStatic<HelloPBRequest, HelloPBReply>({
   url: "/pbhello",
   data: {
    message: "来自客户端的pb调用",
   },
   client: client1,
  });

常用功能集成说明

  • 会话的连接告知默认路由 /connect

  • 会话的离线默认路由 /disconnect

//示例
@Controller
export default class HelloController {
 @Autowired
 private rpcServer!: RpcServer;

 @RPCMethod()
 connect(session: ClientSession) {
  console.log("connect-----", session.sessionId);
  // //也可以这这里做一些权限校验的工作 如果不对则直接ko
  // this.rpcServer.kickSessionId(session.sessionId, "强制下线");
  return {
   code: 200,
   data: "socket is connect",
  };
 }

 @RPCMethod()
 disconnect({ session, reason }: DisconnectType) {
  console.log("disconnect-----", session.sessionId, reason);
  return {
   code: 200,
  };
 }
}
  • 连接前进行验证
//连接前的验证语法 实现接口
@RPCAuth
class Auth implements RpcAuthService {
 async auth(username: string, password: string, config: SocketServerConfig): Promise<boolean> {
  return config.secure?.username == username && config.secure.password == password;
 }
}
  • 消息重试次数和超时交由客户端进行管理,只需在配置中声明即可

  • 业务逻辑为统一的session 具有唯一的sessionId

//客户端会话值
type ClientSession = {
 sessionId: string;
 serverId: string;
 connectedTime: number; //连接的开始时间
 settings: Map<string | symbol, any>; //自定义设置项
};
  • 提供给客户端路由
@Controller
export default class HelloController {
  @RPCMethod() 
  hello() {
    return {
    code: 200,
    data: "我是一个快乐的rpc",
    };
  }

  @RPCMethod()
  async asynchello() { // 访问路径 /asynchello
    return new Promise((resolve) => {
    setTimeout(() => {
      resolve({
      code: 200,
      data: "这是一个异步rpc",
      });
    }, 200);
    });
  }
}

@Controller
@RPC("/head")
export default class HeadController {
  @RPCMethod()
  hello() {
    return {
    code: 200,
    data: "追加了头的url",
    };
  }
}
  • 编码默认协议支持json和protobuff。grpc仅支持pb协议
 it("protobuff格式传输示例", async () => {
  let client1 = new RpcClient(
   {
    url: "local.dev.com:1240",
    type: SocketEnum.Grpc,
    codeProtocol: CodeProtocolEnum.PROTOBUF,
    ssl: {
     ca: path.join(__dirname, "../resource/cert/ca.crt"),
     key: path.join(__dirname, "../resource/cert/client.key"),
     cert: path.join(__dirname, "../resource/cert/client.crt"),
    },
    extra: {
     options: {
      "grpc.ssl_target_name_override": "example",
      "grpc.default_authority": "example",
     },
    },
   },
   new NotifyHandle(),
   {
    retryCount: 0,
   }
  );

  client1.addProtoBuf({
   root: {
    protoPath: path.join(__dirname, "../../demo.proto"), //这边为绝对路径依赖
    service: "HelloPBController", //服务
   },
   // list: [
   //  {
   //   method: "pbhello",
   //   url: "/pbhello",
   //  },
   // ],
  });
  await client1.start();
  //优雅的调用请求
  let result = await ClientRequestStatic<HelloPBRequest, HelloPBReply>({
   url: "/pbhello",
   data: {
    message: "来自客户端的pb调用",
   },
   client: client1,
  });
  console.log("返回数据", result.data);
 });

注解说明

  • EnableRPC 开启rpc服务

  • RPC 路由头新增

  • RPCMethod 绑定路由方法

  • RPCMiddleware 增加链路的中间件

  • RPCAuth 强化初始连接是进行用户名和密码的拓展校验

  • RPCError 封装原有的响应和捕捉错误

更多用法

参考项目git地址 @fastcar/rpc/test 下的example内

项目开源地址

Package Sidebar

Install

npm i @fastcar/rpc

Weekly Downloads

2

Version

0.0.16

License

MIT

Unpacked Size

255 kB

Total Files

125

Last publish

Collaborators

  • fastcar