redis5-stream

1.1.0 • Public • Published

使用说明

redis包其实对stream已经进行了封装,只是能参考的文献太少,并且传参结构不明确,经过我一天多的试错,把传参及调用都封装了一下,提交上npm供大家使用

安装

npm install redis
npm install redis5-rstream

更新日志

V1.1.0

  • xack封装为promise,思考再三此处回调无太大意义(但回调方法依然保留),外层可直接使用await获取结果,具体可见用例
  • 增加xreadGroup、xread回调函数的同步阻塞,解决streams时效性内容的处理顺序隐患
  • 建议每个xreadGroup监听一个流或有时效性内容的多个streams,这样既能提高并发性能,又能保证数据按正确顺序获取并处理,监听多个streams启用多个数据库连接分别监听,近期会放出demo到github上

V1.0.9

  • 解决了新建消费组历史数据无法获取bug,新建分组时不执行回调函数

使用

stream.write

  • 输入数据(支持链式操作)
const redis = require('redis');
const streams = require('redis5-stream');
//数据库配置文件
const config = require('./config/config');
client = redis.createClient(config.port,config.host);
if(config.redis.auth) client.auth(config.redis.auth);
//输入数据: client为数据库对象,login为streams命名
var stream = new streams(client, 'login');
//结尾使用.quit()中断操作
stream.write({test: '23'}).write({test: '1231223'}).quit();
 

stream.xreadGroup()

  • xreadGroup监听消费组数据(可监听多个streams,自动对streams创建消费组,可对空streams创建消费组)
const redis = require('redis');
const streams = require('redis5-stream');
//数据库配置文件
const config = require('./config/config');
client = redis.createClient(config.port,config.host);
if(config.redis.auth) client.auth(config.redis.auth);
 
//阻断监听消费组数据(循环监听,获取1条数据后执行业务代码,自动再次监听):
 
var group_name = 'g';
var consumer_name = 'c';
var stream = new streams(client, 'common,login');
stream.xreadGroup(group_name, consumer_name, (res) => {
    for (let val of res) {
        var streamName = val[0];
        var id = val[1][0][0];
        var valule = val[1][0][1];
        console.log('监听数据并执行业务代码',valule);
         //xack消费确认
        var ret = await stream.xack(streamName, group_name, id);
    }
    let id = str[0];
    let mystream = res[0][0];
    
   
});

stream.xack()

  • 确认消费组消费(xreadGroup获取streams消息后,必须要执行该命令)
//第四个callback回调为可选参数
stream.xack(mystream, group_name, id,(res) => {
    //res返回值为1时,执行成功
});
V1.1.0版本以后可以使用await获取结果
var ret = await stream.xack(mystream, group_name, id);

stream.xread

  • xread阻塞监听数据(可监听多个streams)
const redis = require('redis');
const streams = require('redis5-stream');
//数据库配置文件
const config = require('./config/config');
client = redis.createClient(config.port,config.host);
if(config.redis.auth) client.auth(config.redis.auth);
var stream = new streams(client, 'common,login');
stream.xread((res) => {
    console.log('监听数据并执行业务代码',res);
})

其他

  • 支持streams.del()操作
  • 预计2020年1月23日提供程序DEMO

Dependents (0)

Package Sidebar

Install

npm i redis5-stream

Weekly Downloads

42

Version

1.1.0

License

ISC

Unpacked Size

7.28 kB

Total Files

3

Last publish

Collaborators

  • nodein