stream-computation
TypeScript icon, indicating that this package has built-in type declarations

1.1.8 • Public • Published

流计算

  1. 无限数据集

  2. 有状态

  3. 计算结束后数据丢弃

  4. 事件驱动

流式计算是把数据汇集到有限的数据集中,就像是把源源不断的水流收集到各个桶中。 其中的一个原因是数据分析的需求,例如很多时候,人们关心的是“过去五分钟一共有多少次点击”、“过去一分钟最大的一笔订单是多少”之类的问题。 这种需求就是要对一个有限的数据集进行统计分析。 而从技术出发,像排序、聚合等数据操作,也都只是在有限的数据集上清晰的定义和成熟的算法。

窗口

这种有限的数据集也被称作是窗口

什么时候去关闭并提交一个窗口,什么的情况下值得我们重新提交一个窗口。

什么是事件驱动型应用?

事件驱动型应用是一类具有状态的应用,它从一个或多个事件流提取数据,并根据到来的事件触发计算、状态更新或其他外部动作。

DataSource

DataStream

Window

Transformation

Operator

每隔5分钟输出最近一小时内同时在线用户数

用户平均使用速率

每隔5分钟用户行为分析

输入器连接数据源 => 数据流

想好输入输出,各个接口之间的交互情况

假设一条日志到达数据源,输入器通过订阅获取到这台数据,通知计算器进行计算,计算器存放计算结果,并在下条数据来临时更新结果

输入器

  • 负责连接数据源,从数据源中获取数据

  • 当获取到数据时通知计算器

计算器

  • 接收输入器消息通知,进行业务逻辑计算,计算结果发送给输出器

  • 维护时间窗口(保留一定时间范围内的数据),比如1小时

  • 维护时间切片(这个时间范围内的数据按某个时间大小切片),比如5分钟

  • 支持对时间窗口里的数据做聚合操作

  • 通知输出器输出

输出器

  • 接受计算器的计算结果

  • 将数据根据配置落地,推送,邮件日报

  • 一份输出可能对应着多个计算器的计算结果

调度器

  • 读取配置文件

  • 注入属性

interface Input { DataStream GetDataStream()

} DataStream ds = Input.fromDataSource()

// 输出器接入数据源,等待数据源中返回数据,一旦有数据,通知计算器计算 class Input {

public void fromDataSource(DataStream ds) {
    ds.wait().then(function(msg) {
        // 当获取到一条消息时,通知计算器
        msg.notifyAllCalculator()
    })
}

}

// 维护着一个时间范围内的数据 class Calculator {

public Window window

private void updateWindowTime(Message msg) {}
public abstract compute(Message msg);

public void onMessage(Message msg) {
    // 更新时间窗口的范围
    updateWindowTime(msg)
    // 计算消息落到哪个时间窗口
}

}

需求:

  1. 正常日报内容输出

  2. 实时监控各种数据

对于需要用到历史数据的指标怎么计算?

  1. 数据回流,利用老数据计算某个指标,工具

提供数据回流功能,将数据重新流入计算系统

  1. 自定义实时监控

各模块的依赖关系

计算器 -> 算法

存放在algorithm文件夹里的都是算法

以下操作都需要什么改动

增加输入器

输入器需要做抽象,适配不同数据源

数据源:

  • 消息队列

    topic -> msg

  • 数据库

    msg

不同的数据源的数据不一样,怎么搞定

增加计算器

计算器的结构是稳定的,主要是算法不同,不同算法的输入输出都不一样

增加输出器

core

  1. 提供给输入器api,告知输入器要给我哪些数据(input接口)

  2. 管理所有的算法列表,给算法传递数据

  3. 算法的输出也可以是input接口

  4. 4个列表(输出器列表,算法列表,值列表,输入器列表)

算法

  1. 定义

  2. 输入输出

输出器

  1. 定义我所需要的值

  2. 输出方式

sys_login 计算在线用户数

数据源管理 DataSourceManager

给输入器的数据: MQ里的一条数据,SQL的一个结果集

DataSourceManager 订阅着所有的主题,主题消息一到,调用输入器的onMessage方法

subscribeEvent -> onEvent -> input.onMessage()

数据库

计算器

计算维度

如何计算各个唯独的维度,如何存储,怎么告诉

计算器如何表达我计算的东西是什么

输出器

输出器怎么规定输出的内容,怎么表述

输出的

输出器告诉core,我需要哪些数据

日报输出器,

需要定时一天

  1. 怎么保证计算结果不丢失
  • 写入文件?

  • 定期写入数据库?

  1. 输出器应该是各项指标的混合,整理成需要的格式后按某种方式输出(邮件,入数据库)

Core

Blackboard

  • 保存需要实时显示的各种指标

EventEmitter

  • 提供给计算器计算完后发出事件

  • 接受计算完成事件,通知黑板刷新,并按需要将计算结果流回core

数据提供器

数据源

  • kafka

  • Mysql

计算器

一个计算器只计算一个指标,多个计算器可以计算多个值

时间窗口

计算器计算的结果属于某个时间范围,计算需要的中间数据以及计算的结构都只属于某个时间窗口,当窗口销毁时,应该重新计算

临时数据

为了计算某个指标,计算器通常需要对收到的数据产生一份临时的中间数据

输出数据

标记该计算器计算的结果的输出位置和格式,再对每一条新数据在原有的临时数据上做计算,最后将某个指标的值输出,每次计算完毕发出事件通知core, 计算器的输出是单个值

输出器

一个输出器输出的内容是某个事件黑板上N个指标的整合输出,它只管去跟core要数据,要到数据后发出去就行嘞

输出途径

  • 数据库

  • 邮件

输出时间

什么时候去输出,日报按天,小时,分钟,秒

输出结构

所有的指标需指定一定唯一名字,是否可以不要结构,只有单一的值

新增用户 | 活跃用户

维度

  • 小时

  • 平台

  • 版本

  • 渠道

  • 国家

      // 日期	2019-07-31
      // US新增	30
      // US活跃	257
      // US前日次留	21.21%
      // US连接用户数	86
      // US连接次数	254
      // US平均连接次数	2.95
      // US矿机Crash	0
      // US连通率	18.0%
      // US虚拟分配成功率	22.0%
      // US矿机连接成功率	93.0%
      // US数据获取成功率	90.0%
      // US矿机流量统计	
      // 上行:61.73G
      // 下行:78.14G
      // US用户平均使用流量	122.34MB
      // US平均上行速度	0.31 KB/S
      // US平均下行速度	17.77 KB/S
      // US用户平均使用时长	115.5 min
      // 注册成功率	73%
      // 登陆成功率	-- 
    

存储过程的作用

  • ot_user_login_hour 每小时计算活跃

  • ot_user_conn_detail_hour 按小时计算连接用户前四项数据

  • ot_queryClientSessionConn_hour 计算矿机连接成功率

  • ot_business_line_data 更新业务线平台级联关系

  • ot_user_keep 计算每日留存

select queryTime, count(DISTINCT username) nums from ot_user_login_hour t where 1=1 and queryTime >= ? and queryTime <= ? group by queryTime order by queryTime desc

流式计算的难点

时间

有限的内存和无限的数据

错误恢复

内存计算

需求:

正常日报内容输出

将现有利用数据库计算的一系列指标,迁移到流式计算框架中,目前有如下指标:

  • [x] US新增

  • [x] US活跃

  • [x] US前日次留

  • [x] US连接用户数

  • [x] US连接次数

  • [x] US平均连接次数

  • [x] US虚拟分配成功率

  • [x] US矿机连接成功率

  • [x] US数据获取成功率

  • [x] 矿机crash次数

  • [x] US流量统计

  • [x] US上下行速度

  • [x] US用户平均使用流量

  • [x] US用户平均使用时长

  • [x] 注册成功率

  • [ ] 登陆成功率

实时监控各种数据

  • [ ] 可为某个指标设置阈值,超过时触发事件

命令行工具

  • [x] 可将数据库的历史数据重新流入计算框架,重新计算某指标

自定义监控任务

如果启动后,考虑恢复数据

考虑定时任务,时间间隔

  • 分钟

  • 小时

  • 天(按天定点输出)

当一个时间窗口关闭的时候,记录下状态值

{ "time": { "begin": "", "end": "" }, "metrics": {} }

由于计算数据都是存放在计算器的temp和result里,都在某个时间段的时间窗口里, 所以如果需要恢复,就是恢复计算器的window,包括里面的temp和result

当重新启动的时候,收到第一条消息时,如果消息的时间仍在上次的时间窗口内,则恢复数据

那什么时候去保存数据

黑板

黑板上的指标按时间段来分,以满足不同的输出器的输出需要

{ "五分钟": { "begin": "", "end": "", "metrics": {} }, "十分钟": {}, "一小时": {}, "一天": {} }

对应算法计算的指标填入对应的时间段内

日报输出器查询黑板时,直接查询一天的数据,所以应该算法要对应计算到一天里去

时间窗口的区间怎么定义?

时间窗口的时间以服务器的时间来定,不再以第一条日志的时间作为起始值

启动时获取时间戳作为起始时间,再获取下一个整点的时间作为结束时间

计算器的数据怎么保存,防止重启

每一分钟定时将window的数据写入磁盘 data

"ActiveUserAlgorithm": { "begin": "", "end": "", "temp": "" "result": "" }

数据怎么保存?怎么恢复?如何将算法和数据对应起来,一个算法可能有多个实例

唯一标示: 算法名称 + 时间段

恢复时,查看算法对应时间段的数据是否存在,存在则恢复

场景:

总有一些指标是不适用与流式计算的,比如现在遇到的留存,对于这类指标,怎么计算方便

昨日留存率 = (昨日新增人 && 今天活跃人) / 昨日新增人数

像类似这种指标,需要用到历史数据,考虑范围加大,怎么算15日留存,难道必须等系统运行15天吗

是否通过某种方式,将需要用到的历史数据缓存到系统内

输出器:

当前所采集的数据只能是来源与黑板,如果可以来源与DB,问题是否可以解决

另外一个思路,多窗口是否能解决问题,那多窗口的时候,输出器怎么去哪里取数据, 数据在历史窗口里

写了一个bug,历史数据怎么处理

流入系统的数据类型

  • 实时数据

    • 无限数据

    • 按原来的方式,固定窗口大小,时间取整,整5分钟,30分钟,一小时,一天

  • 历史数据

    • 有限数据

    • 按数据的大小,数据输入有多少,就计算多少的量

今天完成的几个功能

  1. 算法兼容两种数据类型

    • 实时无限

      要按时间切片来算数据

    • 历史有限

      可以直接以数据量来算

  2. 留存

    • 保留多窗口

    • 或增加一种组件,接入历史数据

  3. 完善工具

将昨天的新增用户通过 某种方式 导入进系统,就可以算了,

数据一定在数据库,

  1. output的时候需要有一份外部数据,外部数据的更新需要一定的时间间隔

./kafka-console-consumer.sh --bootstrap-server kafkas:9092 --topic user_register --from-beginning

./kafka-console-producer.sh --broker-list kafkas:9092 --topic user_register

'{"accountname":"test@qq.com","action":"register","appname":"CoolLine","appversion":"1.6.xx","channelName":"googleplay","cityen":"","cityzh":"","continentsen":"","continentszh":"","countryen":"","countryZh":"美国","createTimestamp":1566376441,"deviceid":"edb3d8ce8df595ad","isPrivilegedUser":"true","line":"CoolLine","mail":"test@qq.com","pkgname":"cc.coolline.client","platform":"Android","provinceen":"","provincezh":"","pt":"2019-08-21","registerip":"10.244.2.64","registertime":"2019-08-21 16:34:01","registertype":"mail","userid":19348,"userName":"test@qq.com","userpwd":"86A8F132223D033619389988E663F6C2","userstate":0,"virtualCountryCode":"AF"}'

按任务计算

- 我去哪里取数据

查询一段时间,数据量大的问题,分批查,分批输入系统

数据延迟到达

数据到达了,被我丢弃未处理

Readme

Keywords

none

Package Sidebar

Install

npm i stream-computation

Weekly Downloads

19

Version

1.1.8

License

ISC

Unpacked Size

330 kB

Total Files

287

Last publish

Collaborators

  • 3ssr