你好,我是陶辉。

上节课我们介绍了在有边界的存量数据上进行的 MapReduce 离线计算,这节课我们来看看对于无边界数据,怎样实时地完成流式计算。

对于不再变化的存量数据,可以通过分而治之的 MapReduce 技术将数据划分到多台主机上并行计算,由于待处理的数据量很大,我们只能获得分钟级以上的时延。当面对持续实时产生动态数据的场景时,业务上通常需要在秒级时延中及时地拿到运算结果。

比如,商家为了拉新促活,会为特定的用户群体(比如新用户或者不活跃用户)推出优惠活动,为了防止“羊毛党”通过大量主机并行地**“薅羊毛”**,系统要能实时地聚合分析所有优惠券的使用者特点,再基于业务规则及时地封掉“羊毛党”帐号或者 IP 地址,才可以控制住风险范围,提高营销活动的收益。那对于整个系统持续生成的大量订单数据,怎样才能提供秒级的聚合分析结果呢?

最初的流式计算方案,是在时间维度上定期地将数据分片,再基于 MapReduce 思想在空间维度的多台主机上实现并行计算,这样也能获得实时计算结果。然而,对每片数据执行批量计算,想要在秒级甚至毫秒级拿到计算结果并不容易。当网络不稳定时,数据会因为报文延误而乱序,简单的基于时序分片会导致计算结果失真。当数据之间具有明显的业务关系时,固定的时间窗口更是难以得到预期的分析结果。

接下来我们就深入学习一下流式计算的工作原理,以及流式计算常用的数据分片窗口。

流式计算是如何实现的?

在数据库、HDFS 等分布式系统中存放的静态数据,由于拥有清晰的边界,所以被称为 InBound Data 有边界数据。然而,线上运行中的互联网产品生命周期并不确定,它产生的数据有明确的开始,却没有截止时间点。对于这样有始无终的实时数据流,我们把它称为 OutBound Data 无边界数据,如下图所示:

从业务需求上看,有边界数据与无边界数据的计算目的是完全不同的。比如对于分布式监控系统,我们需要基于 IP 地址、用户帐号、请求类型等许多特征进行定时的聚合统计,例如获取每分钟内所有请求处理时延的平均值、中位数、最大值等,监控系统性能。此时,可以根据请求执行结果的产生时间对数据进行分片计算。比如,下表中有 7 条监控数据,需要求出每分钟请求时延的平均值。

如果我们按照分钟整点对数据进行分片,就可以在 02:00 时对蓝色的消息 1、2 求出窗口内的平均时延 192 毫秒,并立刻返回结果。之后当接收完红色的消息 3、4、5 后,在第 2 分钟结束时再对 3 个数字求出平均值。以此类推。

**这种设计思想就是基于固定时间窗口的批处理解决方案。**当然,并不是一定要等到时间窗口结束时,才对这一批次的所有数据统一计算。我们完全可以在每个消息到来时,就计算出中间状态,当所在的时间窗口结束时,再将中间状态转换为最终结果。仍然以上表为例,我们可以在每个监控事件到达时,计算出请求时延和以及当前窗口内的事件个数,这样,在窗口结束时我们只需要将时延和除以事件个数,就能得到平均值。

因此,中间状态可以更均衡地使用计算资源,提高流式计算的整体性能。我们既可以把中间状态放在内存中,也可以把它持久化到本地磁盘中获取更高的可用性,为了方便计算节点的调度,我们通常还会将备份状态存放至远端的数据库。

图片来源:https://flink.apache.org/

当然,流式计算最主要的性能提升思路,还是基于 MapReduce 思想,将同一窗口的数据从空间维度中分发到不同的计算节点进行并行的 Map 计算,再将 Map 映射出的结果 Reduce 为最终结果。由于流式计算天然是基于消息事件驱动的,因此它往往直接从 Kafka 等消息队列中获取输入数据,如同[第 27 讲] 的介绍,消息队列很容易协助流式计算实现数据拆分。

到这里,我们已经看到了实现流式计算的基本思路,其中基于固定时间窗口的数据划分方式还有很大的改进空间,目前它还无法解决较为复杂的有状态计算。所谓有状态计算,是指在时间窗口内,不同的消息之间会互相作用并影响最终的计算结果,比如求平均值就是这样一个例子,每个新到达的数据都会影响中间状态值。

相反,无状态计算处理到达的数据时,并不涉及窗口内的其他数据,处理流程要简单的多。例如,当监控到请求时延超过 3 秒时,就产生一条告警。此时,只需要单独地判断每个消息中的时延数据,就能够得到计算结果。

在真实的业务场景中,有状态计算还要更复杂。比如,对两个不同的数据源(可以理解为数据库中的表)做 join 连接时,采用内连接、外连接这两种不同的连接方式,就会影响到我们的时间窗口长度。再比如,当不同的事件具有逻辑关系时,窗口长度则应该由业务规则确定,不同的请求可能拥有不等的窗口大小。接下来,我们再来看看流式计算中的几种常见窗口。

如何通过窗口确定待计算的数据?

首先来看滑动窗口,它是从固定窗口衍生出的一种窗口。我们继续延续求每分钟平均值的例子,当业务上需要更平滑的曲线时,可以通过每 20 秒求最近 1 分钟请求时延的平均值实现,这就是滑动窗口,其中窗口长度则是 1 分钟,但每次计算完并不会淘汰窗口中的全部数据,而只是将步长向后移动 20 秒,即只淘汰最早 20 秒中的数据。当窗口长度与步长一致时,滑动窗口就退化成了固定窗口。

当然,我们还可以把窗口的计量单位从时间改为事件个数,此时可以称为计数窗口。仍然延续上面的例子,固定计数窗口可以改为求每 100 个访问记录的平均时延,滑动计数窗口可以改为每 10 条记录中求最近 100 个记录的平均时延。由于消息本身是有时序的,所以这些都可以称为时间驱动的窗口。事实上,还有另外一种事件驱动的窗口与此完全不同,如下图所示:

固定窗口、滑动窗口并不会解析业务字段,区别对待图中不同的 Key 关键字,这就很难解决以下这类场景:当需要统计用户在一个店铺内浏览的商品数量时,就需要针对用户的店铺停留时长来设计动态的窗口大小。毕竟不同的用户在不同的店铺内停留时长不可能相同,此时,动态的窗口大小可以通过事件来驱动,我们称为会话窗口。

事实上,我们还面临着信息统计准确性上的问题。在基于时间驱动的窗口中,这里的时间其实是事件到达流式系统时产生的系统处理时间,而不是事件发生的时间。仍然以访问日志为例,每条日志都有明确的请求访问时间,但在分布式系统传输时,由于网络波动的传输时延,以及各主机节点应用层的处理时延,这些事件到达流式计算框架的顺序已经发生了变化。如果仍然以固定的时间窗口来处理,就会得到错误的统计结果。

为了避免乱序事件扰乱统计结果,我们可以使用水位线 Watermark 减少乱序概率。比如下图中,消息队列中的数字表示事件时间,其中事件 7 先于事件 3、5 到达了流式计算系统:

如果设置了水位 4,窗口就不再以事件顺序严格划分,而是通过水位上的时间来划分窗口,这样事件 7 就会放在第 2 个窗口中处理:

当然,并不是有了水位线,第 1 个窗口就会无限制的等下去。在经历一个时间段后,第 1 个窗口会认定窗口关闭(这未必准确),它会处理 3、1、3、2 这 4 个事件。基于业务规则,下一个水位被设置为 9:

这样第 2 个窗口会处理 6、5、7 事件,而事件 9 就放在了第 3 个窗口中处理:

以此类推。根据业务特性和经验值,找到最大乱序时间差,再基于此设置合适的水位线,就能减轻乱序事件的影响。

小结

这一讲我们介绍了流式计算的实现原理,以及常用的几种分片窗口。

对于无边界的实时数据流,我们可以在时间维度上将其切分到不同的窗口中,再将每个窗口内的数据从空间维度上分发到不同的节点并行计算,在窗口结束时汇总结果,这就实现了流式计算。Apache Flink、Spark、Storm 等开源产品都是这样的流式计算框架。

通过不同的窗口划分规则,可以实现不同的计算目的,包括以时间驱动的固定窗口、滑动窗口和计数窗口,以及以事件驱动的会话窗口。为了避免乱序事件的影响,还可以通过携带超时时间的 Watermark 水位,基于事件发生时间更精准地划分窗口。

思考题

最后,留给你一道讨论题。你知道 Lambda 架构吗?它通过分开部署的 MapReduce、流式计算系统,分别完成离线计算与实时流计算,如下图所示:

图片来源:https://www.oreilly.com/radar/questioning-the-lambda-architecture/

这套系统的 IT 成本很高,因此大家致力于使用一套系统同时解决这两个问题。你认为这种解决方案是如何实现的?你又是如何看待流式计算发展方向的?欢迎你在留言区与大家一起探讨。

感谢阅读,如果你觉得这节课让你有所收获,也欢迎你把今天的内容分享给你的朋友。