-
无限数据集
-
有状态
-
计算结束后数据丢弃
-
事件驱动
流式计算是把数据汇集到有限的数据集中,就像是把源源不断的水流收集到各个桶中。 其中的一个原因是数据分析的需求,例如很多时候,人们关心的是“过去五分钟一共有多少次点击”、“过去一分钟最大的一笔订单是多少”之类的问题。 这种需求就是要对一个有限的数据集进行统计分析。 而从技术出发,像排序、聚合等数据操作,也都只是在有限的数据集上清晰的定义和成熟的算法。
这种有限的数据集也被称作是窗口
什么时候去关闭并提交一个窗口,什么的情况下值得我们重新提交一个窗口。
事件驱动型应用是一类具有状态的应用,它从一个或多个事件流提取数据,并根据到来的事件触发计算、状态更新或其他外部动作。
DataSource
DataStream
Window
Transformation
Operator
每隔5分钟输出最近一小时内同时在线用户数
用户平均使用速率
每隔5分钟用户行为分析
输入器连接数据源 => 数据流
想好输入输出,各个接口之间的交互情况
假设一条日志到达数据源,输入器通过订阅获取到这台数据,通知计算器进行计算,计算器存放计算结果,并在下条数据来临时更新结果
-
负责连接数据源,从数据源中获取数据
-
当获取到数据时通知计算器
-
接收输入器消息通知,进行业务逻辑计算,计算结果发送给输出器
-
维护时间窗口(保留一定时间范围内的数据),比如1小时
-
维护时间切片(这个时间范围内的数据按某个时间大小切片),比如5分钟
-
支持对时间窗口里的数据做聚合操作
-
通知输出器输出
-
接受计算器的计算结果
-
将数据根据配置落地,推送,邮件日报
-
一份输出可能对应着多个计算器的计算结果
-
读取配置文件
-
注入属性
interface Input { DataStream GetDataStream()
} DataStream ds = Input.fromDataSource()
// 输出器接入数据源,等待数据源中返回数据,一旦有数据,通知计算器计算 class Input {
public void fromDataSource(DataStream ds) {
ds.wait().then(function(msg) {
// 当获取到一条消息时,通知计算器
msg.notifyAllCalculator()
})
}
}
// 维护着一个时间范围内的数据 class Calculator {
public Window window
private void updateWindowTime(Message msg) {}
public abstract compute(Message msg);
public void onMessage(Message msg) {
// 更新时间窗口的范围
updateWindowTime(msg)
// 计算消息落到哪个时间窗口
}
}
-
正常日报内容输出
-
实时监控各种数据
对于需要用到历史数据的指标怎么计算?
- 数据回流,利用老数据计算某个指标,工具
提供数据回流功能,将数据重新流入计算系统
- 自定义实时监控
计算器 -> 算法
存放在algorithm文件夹里的都是算法
输入器需要做抽象,适配不同数据源
数据源:
-
消息队列
topic -> msg
-
数据库
msg
不同的数据源的数据不一样,怎么搞定
计算器的结构是稳定的,主要是算法不同,不同算法的输入输出都不一样
-
提供给输入器api,告知输入器要给我哪些数据(input接口)
-
管理所有的算法列表,给算法传递数据
-
算法的输出也可以是input接口
-
4个列表(输出器列表,算法列表,值列表,输入器列表)
-
定义
-
输入输出
-
定义我所需要的值
-
输出方式
sys_login 计算在线用户数
数据源管理 DataSourceManager
给输入器的数据: MQ里的一条数据,SQL的一个结果集
DataSourceManager 订阅着所有的主题,主题消息一到,调用输入器的onMessage方法
subscribeEvent -> onEvent -> input.onMessage()
数据库
如何计算各个唯独的维度,如何存储,怎么告诉
计算器如何表达我计算的东西是什么
输出器怎么规定输出的内容,怎么表述
输出的
输出器告诉core,我需要哪些数据
日报输出器,
需要定时一天
- 怎么保证计算结果不丢失
-
写入文件?
-
定期写入数据库?
- 输出器应该是各项指标的混合,整理成需要的格式后按某种方式输出(邮件,入数据库)
- 保存需要实时显示的各种指标
-
提供给计算器计算完后发出事件
-
接受计算完成事件,通知黑板刷新,并按需要将计算结果流回core
-
kafka
-
Mysql
一个计算器只计算一个指标,多个计算器可以计算多个值
计算器计算的结果属于某个时间范围,计算需要的中间数据以及计算的结构都只属于某个时间窗口,当窗口销毁时,应该重新计算
为了计算某个指标,计算器通常需要对收到的数据产生一份临时的中间数据
标记该计算器计算的结果的输出位置和格式,再对每一条新数据在原有的临时数据上做计算,最后将某个指标的值输出,每次计算完毕发出事件通知core, 计算器的输出是单个值
一个输出器输出的内容是某个事件黑板上N个指标的整合输出,它只管去跟core要数据,要到数据后发出去就行嘞
-
数据库
-
邮件
什么时候去输出,日报按天,小时,分钟,秒
所有的指标需指定一定唯一名字,是否可以不要结构,只有单一的值
维度
-
天
-
小时
-
平台
-
版本
-
渠道
-
国家
// 日期 2019-07-31 // US新增 30 // US活跃 257 // US前日次留 21.21% // US连接用户数 86 // US连接次数 254 // US平均连接次数 2.95 // US矿机Crash 0 // US连通率 18.0% // US虚拟分配成功率 22.0% // US矿机连接成功率 93.0% // US数据获取成功率 90.0% // US矿机流量统计 // 上行:61.73G // 下行:78.14G // US用户平均使用流量 122.34MB // US平均上行速度 0.31 KB/S // US平均下行速度 17.77 KB/S // US用户平均使用时长 115.5 min // 注册成功率 73% // 登陆成功率 --
-
ot_user_login_hour 每小时计算活跃
-
ot_user_conn_detail_hour 按小时计算连接用户前四项数据
-
ot_queryClientSessionConn_hour 计算矿机连接成功率
-
ot_business_line_data 更新业务线平台级联关系
-
ot_user_keep 计算每日留存
select queryTime, count(DISTINCT username) nums from ot_user_login_hour t where 1=1 and queryTime >= ? and queryTime <= ? group by queryTime order by queryTime desc
将现有利用数据库计算的一系列指标,迁移到流式计算框架中,目前有如下指标:
-
[x] US新增
-
[x] US活跃
-
[x] US前日次留
-
[x] US连接用户数
-
[x] US连接次数
-
[x] US平均连接次数
-
[x] US虚拟分配成功率
-
[x] US矿机连接成功率
-
[x] US数据获取成功率
-
[x] 矿机crash次数
-
[x] US流量统计
-
[x] US上下行速度
-
[x] US用户平均使用流量
-
[x] US用户平均使用时长
-
[x] 注册成功率
-
[ ] 登陆成功率
- [ ] 可为某个指标设置阈值,超过时触发事件
- [x] 可将数据库的历史数据重新流入计算框架,重新计算某指标
如果启动后,考虑恢复数据
考虑定时任务,时间间隔
-
分钟
-
小时
-
天(按天定点输出)
当一个时间窗口关闭的时候,记录下状态值
{ "time": { "begin": "", "end": "" }, "metrics": {} }
由于计算数据都是存放在计算器的temp和result里,都在某个时间段的时间窗口里, 所以如果需要恢复,就是恢复计算器的window,包括里面的temp和result
当重新启动的时候,收到第一条消息时,如果消息的时间仍在上次的时间窗口内,则恢复数据
那什么时候去保存数据
黑板上的指标按时间段来分,以满足不同的输出器的输出需要
{ "五分钟": { "begin": "", "end": "", "metrics": {} }, "十分钟": {}, "一小时": {}, "一天": {} }
对应算法计算的指标填入对应的时间段内
日报输出器查询黑板时,直接查询一天的数据,所以应该算法要对应计算到一天里去
时间窗口的区间怎么定义?
时间窗口的时间以服务器的时间来定,不再以第一条日志的时间作为起始值
启动时获取时间戳作为起始时间,再获取下一个整点的时间作为结束时间
计算器的数据怎么保存,防止重启
每一分钟定时将window的数据写入磁盘 data
"ActiveUserAlgorithm": { "begin": "", "end": "", "temp": "" "result": "" }
数据怎么保存?怎么恢复?如何将算法和数据对应起来,一个算法可能有多个实例
唯一标示: 算法名称 + 时间段
恢复时,查看算法对应时间段的数据是否存在,存在则恢复
场景:
总有一些指标是不适用与流式计算的,比如现在遇到的留存,对于这类指标,怎么计算方便
昨日留存率 = (昨日新增人 && 今天活跃人) / 昨日新增人数
像类似这种指标,需要用到历史数据,考虑范围加大,怎么算15日留存,难道必须等系统运行15天吗
是否通过某种方式,将需要用到的历史数据缓存到系统内
输出器:
当前所采集的数据只能是来源与黑板,如果可以来源与DB,问题是否可以解决
另外一个思路,多窗口是否能解决问题,那多窗口的时候,输出器怎么去哪里取数据, 数据在历史窗口里
写了一个bug,历史数据怎么处理
流入系统的数据类型
-
实时数据
-
无限数据
-
按原来的方式,固定窗口大小,时间取整,整5分钟,30分钟,一小时,一天
-
-
历史数据
-
有限数据
-
按数据的大小,数据输入有多少,就计算多少的量
-
今天完成的几个功能
-
算法兼容两种数据类型
-
实时无限
要按时间切片来算数据
-
历史有限
可以直接以数据量来算
-
-
留存
-
保留多窗口
-
或增加一种组件,接入历史数据
-
-
完善工具
将昨天的新增用户通过 某种方式 导入进系统,就可以算了,
数据一定在数据库,
- output的时候需要有一份外部数据,外部数据的更新需要一定的时间间隔
./kafka-console-consumer.sh --bootstrap-server kafkas:9092 --topic user_register --from-beginning
./kafka-console-producer.sh --broker-list kafkas:9092 --topic user_register
'{"accountname":"test@qq.com","action":"register","appname":"CoolLine","appversion":"1.6.xx","channelName":"googleplay","cityen":"","cityzh":"","continentsen":"","continentszh":"","countryen":"","countryZh":"美国","createTimestamp":1566376441,"deviceid":"edb3d8ce8df595ad","isPrivilegedUser":"true","line":"CoolLine","mail":"test@qq.com","pkgname":"cc.coolline.client","platform":"Android","provinceen":"","provincezh":"","pt":"2019-08-21","registerip":"10.244.2.64","registertime":"2019-08-21 16:34:01","registertype":"mail","userid":19348,"userName":"test@qq.com","userpwd":"86A8F132223D033619389988E663F6C2","userstate":0,"virtualCountryCode":"AF"}'
按任务计算
- 我去哪里取数据
查询一段时间,数据量大的问题,分批查,分批输入系统