使用说明
redis包其实对stream已经进行了封装,只是能参考的文献太少,并且传参结构不明确,经过我一天多的试错,把传参及调用都封装了一下,提交上npm供大家使用
安装
npm install redisnpm install redis5-rstream
更新日志
V1.1.0
- xack封装为promise,思考再三此处回调无太大意义(但回调方法依然保留),外层可直接使用await获取结果,具体可见用例
- 增加xreadGroup、xread回调函数的同步阻塞,解决streams时效性内容的处理顺序隐患
- 建议每个xreadGroup监听一个流或有时效性内容的多个streams,这样既能提高并发性能,又能保证数据按正确顺序获取并处理,监听多个streams启用多个数据库连接分别监听,近期会放出demo到github上
V1.0.9
- 解决了新建消费组历史数据无法获取bug,新建分组时不执行回调函数
使用
stream.write
- 输入数据(支持链式操作)
const redis = ;const streams = ;//数据库配置文件const config = ;client = redis;ifconfigredisauth client;//输入数据: client为数据库对象,login为streams命名var stream = client 'login';//结尾使用.quit()中断操作stream;
stream.xreadGroup()
- xreadGroup监听消费组数据(可监听多个streams,自动对streams创建消费组,可对空streams创建消费组)
const redis = ;const streams = ;//数据库配置文件const config = ;client = redis;ifconfigredisauth client; //阻断监听消费组数据(循环监听,获取1条数据后执行业务代码,自动再次监听): var group_name = 'g';var consumer_name = 'c';var stream = client 'common,login';stream;
stream.xack()
- 确认消费组消费(xreadGroup获取streams消息后,必须要执行该命令)
//第四个callback回调为可选参数stream;V110版本以后可以使用await获取结果var ret = await stream;
stream.xread
- xread阻塞监听数据(可监听多个streams)
const redis = ;const streams = ;//数据库配置文件const config = ;client = redis;ifconfigredisauth client;var stream = client 'common,login';stream
其他
- 支持streams.del()操作
- 预计2020年1月23日提供程序DEMO