Null Pointer Micromanagement

    redis5-stream

    1.1.0 • Public • Published

    使用说明

    redis包其实对stream已经进行了封装,只是能参考的文献太少,并且传参结构不明确,经过我一天多的试错,把传参及调用都封装了一下,提交上npm供大家使用

    安装

    npm install redis
    npm install redis5-rstream

    更新日志

    V1.1.0

    • xack封装为promise,思考再三此处回调无太大意义(但回调方法依然保留),外层可直接使用await获取结果,具体可见用例
    • 增加xreadGroup、xread回调函数的同步阻塞,解决streams时效性内容的处理顺序隐患
    • 建议每个xreadGroup监听一个流或有时效性内容的多个streams,这样既能提高并发性能,又能保证数据按正确顺序获取并处理,监听多个streams启用多个数据库连接分别监听,近期会放出demo到github上

    V1.0.9

    • 解决了新建消费组历史数据无法获取bug,新建分组时不执行回调函数

    使用

    stream.write

    • 输入数据(支持链式操作)
    const redis = require('redis');
    const streams = require('redis5-stream');
    //数据库配置文件
    const config = require('./config/config');
    client = redis.createClient(config.port,config.host);
    if(config.redis.auth) client.auth(config.redis.auth);
    //输入数据: client为数据库对象,login为streams命名
    var stream = new streams(client, 'login');
    //结尾使用.quit()中断操作
    stream.write({test: '23'}).write({test: '1231223'}).quit();
     

    stream.xreadGroup()

    • xreadGroup监听消费组数据(可监听多个streams,自动对streams创建消费组,可对空streams创建消费组)
    const redis = require('redis');
    const streams = require('redis5-stream');
    //数据库配置文件
    const config = require('./config/config');
    client = redis.createClient(config.port,config.host);
    if(config.redis.auth) client.auth(config.redis.auth);
     
    //阻断监听消费组数据(循环监听,获取1条数据后执行业务代码,自动再次监听):
     
    var group_name = 'g';
    var consumer_name = 'c';
    var stream = new streams(client, 'common,login');
    stream.xreadGroup(group_name, consumer_name, (res) => {
        for (let val of res) {
            var streamName = val[0];
            var id = val[1][0][0];
            var valule = val[1][0][1];
            console.log('监听数据并执行业务代码',valule);
             //xack消费确认
            var ret = await stream.xack(streamName, group_name, id);
        }
        let id = str[0];
        let mystream = res[0][0];
        
       
    });

    stream.xack()

    • 确认消费组消费(xreadGroup获取streams消息后,必须要执行该命令)
    //第四个callback回调为可选参数
    stream.xack(mystream, group_name, id,(res) => {
        //res返回值为1时,执行成功
    });
    V1.1.0版本以后可以使用await获取结果
    var ret = await stream.xack(mystream, group_name, id);

    stream.xread

    • xread阻塞监听数据(可监听多个streams)
    const redis = require('redis');
    const streams = require('redis5-stream');
    //数据库配置文件
    const config = require('./config/config');
    client = redis.createClient(config.port,config.host);
    if(config.redis.auth) client.auth(config.redis.auth);
    var stream = new streams(client, 'common,login');
    stream.xread((res) => {
        console.log('监听数据并执行业务代码',res);
    })

    其他

    • 支持streams.del()操作
    • 预计2020年1月23日提供程序DEMO

    Install

    npm i redis5-stream

    DownloadsWeekly Downloads

    11

    Version

    1.1.0

    License

    ISC

    Unpacked Size

    7.28 kB

    Total Files

    3

    Last publish

    Collaborators

    • nodein