百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术教程 > 正文

Flink实时计算应用实践:问题剖析及解决方案

suiw9 2025-01-20 16:21 34 浏览 0 评论

1.简介

随公司业务的发展,对实时计算的需求也越来越多,目前Flink已广泛应用于实时ETL,实时数仓、特征工程和在线数据服务等业务场景。

本文首先简单介绍了Flink实时计算基本概念,然后引出基于Flink进行实时计算开发过程当中碰到的一些问题,通过对这些常见实时计算问题的产生原因进行深入分析,进而给出相应问题的解决方案。

2.Flink基本概念

Flink是一个分布式大数据处理引擎和框架,可对有无界和有界数据流进行有状态的计算,能够部署在各种集群环境中,对各种规模大小的数据进行快速计算。

在进行Flink应用开发前,我们先理解流treams、状态State、时间Time ,Watermark等Flink基础语义概念以及Flink 兼顾灵活性和方便性的多层次API。

2.1 流(Stream)

Stream分为有限数据流与无限数据流,unbounded stream 是有始无终的数据流,即无限数据流;而bounded stream 是限定大小的有始有终的数据集合,即有限数据流,二者的区别在于无限数据流的数据会随时间的推演而持续增加,计算持续进行且不存在结束的状态,相对的有限数据流数据大小固定,计算最终会完成并处于结束的状态。

2.2 状态(State)

状态是计算过程中的产生的计算结果,在容错恢复和Checkpoint中有重要的作用。流计算在本质上是Incremental Processing,因此需要将计算的中间结果保存到状态中,例如按天统计每个区域的GMV,因此当订单数据到达时,需要将前面计算的该订单所属区域的GMV数据从状态中读取出来,然后再与该订单额进行累加,累加完后将结果再次更新到状态中;Flink正是通过将计算中间结果保存到状态后端中,才保证在整个分布式系统运行失败或者挂掉的情况下做到Exactly-once,增强容灾的效果。图2-1是算子在聚合计算时,读写state状态示意图。

2.3 时间语义(Time)

Flink的时间语义分为Event time、Ingestion time和Processing time。Flink的无限数据流是一个持续的过程,时间是我们判断业务状态是否滞后,数据处理是否及时的重要依据。

Event Time是在事件生产端产生该事件的时间,例如用户下的订单,订单的下单时间就是事件产生的时间Event Time。

Ingestion time是事件进入Flink的时间。

Processing time是事件被处理时,当前的系统时间。

2.4 Watermark

流处理从事件产生,到流经source,再到operator算子,中间是有一个过程和时间的,虽然大部分情况下,流到operator算子的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、分布式等原因,导致乱序的产生。

因此一旦出现乱序,如果只根据Event Time来决定window的运行,不能保证在窗口关闭之前,所有的数据都能到达,但我们又不能无限期的等待下去,此时我们需要有一个机制来保存在一个特定时间后,必须触发窗口计算,这一个机制就是watermark。Watermark特点如下:

Watermark是一种衡量Event Time进展的机制。

Watermark是用于处理乱序事件的, 它与window相结合,可以正确处理乱序事件。

watermark是单调递增的。watermark = Math.max(eventTime - dalayTime ,watermark), 一旦watermark大于等于某个窗口的end_time时,就会触发窗口的计算,因此Watermark就是用触发窗口计算的。图2-3是无序数据流中的watermark示例,它的最大乱序时间为2。

Watermark具有传递性。如果一个算子上游有多个算子,则该算子的watermark值为取上游所有流入该算子的watermark的最小值。图2-4为watermark的传递性图。

2.5 API层

API 通常分为三层,由上而下可分为SQL 、Table API、DataStream API、ProcessFunction三层,API的表达能力及业务抽象能力都非常强大,但越接近SQL层,表达能力会逐步减弱,抽象能力会增强,反之,ProcessFunction 层API的表达能力非常强,可以进行多种灵活方便的操作,但抽象能力也相对越小。图2-5为Flink API的层结构图。

3.Flink实时计算常见问题分析

3.1 数据乱序问题分析

数据乱序是指Flink在使用Event Time处理流式数据时,由于分布式或网络原因,导致数据到达处理机制进行处理时并不是按照数据产生的时间先后顺序到达的。导致数据乱序的场景如下:

1、kafka多分区导致数据乱序

图3-1中的数字表示数据产生的时间顺序,数据生产端按数据产生时间将数据推送到kafka,因由kafka存在多个分区,在kafka的默认分发机制下,数据到流动如图3-1所示:

图3-1所示,假如Flink的source的并行度与kafka的分区一致,数据经过Flink的source后,再经过keyby分区开窗后,数据就会存在乱序情况,例如key为0的分区算子,窗口接受到的数据顺序是:1,11,26,2。

2、由于网络延迟导致数据乱序

当存在多个数据生产端时,每一个生产端都将产生的数据,通过网络传输将数据发送到kafka,这里为了说明网络延迟,将kafka的分区设置为1。由于每一个生产端到kafka端走的网络路径的不同,可能存在有一些存在链路阻塞,长短等情况,从而导致数据到kafak的时间并不是按照数据产生的时间顺序到达的。图3-2为网络原因导致数据乱序的示意图。

3.2 Flink大状态场景及问题分析

Flink大状态形成的主是因为flink在计算过程中,存在大量的计算中间结果。那么在那些场景下会使Flink的状态比较大,目前在我们实践中主要有下面几种场景:

1、数据量大且开窗的时间比较长,例如开1天,1周甚至1个月的窗口。这种开窗时间比较长,要缓存的数据就比较大,因此状态也会比较大。例如按天统计UV数据。

2、数据量比较大的二条流join时相互等待的时间比较长,例如订单数据与物流的配送数据做join得到订单的物流状态,这种情况下,订单数据可能要等待物流配送数据最大时间要1天左右。这也会导致二条流的状态比较大。

3、Group By分组的key的数据量大,计算的指标项非常多或计算的步骤多复杂度高,这种情况会导致要保存大量的中间计算结果,从而导致状态比较大。

图3-3是一个后端曝光流与前端用户点击流join场景下checkpoint信息图,在二条流的数据相互等待30分钟的情况下,checkpoint就已经比较大了,如果等待时间更长,状态会更大。

在状态比较大的场景下,可能存在以下问题:

1、Flink的堆外内存超额使用,导致Yarn将Task Manager(以下简称TM)的容器kill掉,导致任务Failover。这种情况主要是Flink对RocksDB的使用内存没有限制,当状态越来越大的时,堆外内存使用量就会超额使用,一旦TM的内存超过Yarn分配给TM所在容器的内存,就会导致Yarn将容器kill掉。这种情况我们可以通过Yarn的日志分析得到原因。

2、Checkpoint状态大,Checkpoint时间长。这种情况,我们可以首先从Sub Task checkpoint信息入手。如图3-4所示:

先看checkpoint Data Size大小,如果状态很大,再看End to End Duration时间,如果时间比较大,比如做一次checkpoint要花十几甚至几十分钟的话,我们接着再看具体的sub Task checkpoint的时间耗时情况。

1、Checkpoint时sub Task的Latest Acknowledgement的状态一直为n/a

这种情况下要去分析是任务是否存在背压的情况。我们可以通过Flink Web UI查看算子merics的inPoolUsage和outPoolUsage的使用率。如果outPoolUsage的使用低而inPoolUsage使用率高,说明背压是由该算子产生的。如果inPoolUsage使用率低,而outPoolUsage使用率很高则说明是下游算子计算处理不及时导致上游算子背压。继续按上面的方法排查,最终找到产生背压的算子

2、关注Sync Duration和Async Duration时间

◇Sync Duration时间很长,说明磁盘IO可能成为性能瓶颈

◇Async Duration时间很长,说明是网络IO可能成为性能瓶颈或是数据同步线程太少

3.3 数据倾斜问题

数据倾斜由于数据分布不均匀,造成数据大量的集中到一点,造成数据热点。在这种Flink实时计算场景中,我们会发现在某个算子的处理上,一些sub task处理的数据量很大,有一些sub task处理的数据量很小,从flink的集群界面上,我们也可以看到算子的背压情况,在这种场景下,我们通过增加算子的并行度,并不能改善算子的背压情况。例如PV的场景中,有一些网页的点击量很高,而有一些网页的点量很少,从而导致某些sub task背压严重。

4.实时计算常见问题的解决方案

4.1 数据乱序场景的处理

Flink处理数据乱序的方式有三种:

1、Watermark

我们在设置watermark时,可以设置一个最大的乱序时间,而watermark是以事件时间减去所允许的最大乱序时间作为watermark,因此相当于多给了数据一定的时间,然后关闭窗口,触发计算。

2、允许迟到(allowedLateness)

如果在watermark的基础上有的数据还是可能会迟到,这时我们可以再多给数据一定的可以迟到的时间,此时当watermark到达窗口大小时触发计算,但是不会关闭窗口,而是直到所允许的迟到时间后,才会真正关闭窗口。

3、侧输出流

当数据迟到的时间非常久,前两种都失效时使用,相当于迟到数据归放入一个分支流中进行单独计算。

4.1.1 Flink SQL处理乱序

◇创建kafka动态表

在创建Kafka动态表时,我们使用watermark for orderTime as orderTime – interval ‘2’MINUTE语句来说明提取的watermark为订单时间减2分钟。

◇开窗聚合计算

如果想每10秒输出一次计算结果,可以设置下面参数:

在SQL层面,目前只支持使用watermark来处理数据乱序问题。

4.2.1 DataStream API处理乱序

从上面代码可以看出,我们使用时间分配器时,使用WatermarkStrateg。

forBoundedOutOfOrderness(Duration.ofMillis(jobPrarameter.getOutOfBoundMs()))方法来设置数据的最大乱序时间。

在开窗口后,我们又使用DataStream的allowedLatenes(Time.seconds(30))来设置窗口延迟关闭时间为30秒。

如果数据在watermark大于等于窗口的结束时间+最大延迟时间之后才到达,这时,如果我们不做任何处理的话的数据就会被丢弃掉,如果当我们不想丢弃掉这些数据,我们还可以通过侧输出流来解决。解决步骤如下:

1、创建侧输出的Tag标志对象.

2、在allowedLateness之后,设置将数据输出到outputTag标志的侧输出流。

3、从窗口计算后得到的流中通过getSideOutput(outputTag)获取迟到数据的侧输出流。

4、得到迟到数据的侧输出流后,我们就可以根据业务需要对侧输出流的数据进行相应的处理。在下面的代码中我们将窗口聚合计算得到的流与侧输出流进行connect连接后,得到一个ConnectedStreams对象,在ConnectedStreams对象上使用flatMap方法的CoFlatMapFunction将侧输出流的数据累加到聚合计算的结果流上时进行输出,从而保证了聚合计算后的数据的准确性。

因此,与SQL相比,使用DataStream API我们可以更加灵活的处理数据,因此在处理数据乱序时,根据需要,我们可以同时使用三种处理乱序的方案来处理乱序数据。而SQL目前只支持使用Watermark的方式来处理乱序数据。

4.2 Flink大状态问题解决方案

在上面的Flink大状态场景及问题分析章节,我们可以知道大状态会导致如下问题:

1、内存超额使用

2、Checkpoint时间过长

以上问题,我们可以通过以下办法来解决:

4.2.1 减少状态大小

如果状态比较大,我们是否可以减少状态呢,答案是肯定的。例如当我们状态使用的POJO类对象时,我们可以使用ProtoStuff来减少对象的存储空间,从而减少对象序列化后的存储空间大小。下面是使用ProtoStuff进行序列化和反序列化的代码示例。

注:在目前使用的ProtoStuff的版本,由于不支持时间类型,所以对时间类型可以转换成字符串或Long类型来存储。

4.2.2 内存超额使用

如果TM是因为内存不足被kill掉,解决方案有二种:

1、想办法减少状态大小(如上面的第一点)。

2、增加TM的内存,如原来TM为4G,可以增加到6G,甚至更多。

4.2.3 Checkpoint时间长

◇修改checkpoint策略

默认情况下,flink checkpoint 是全量备份,当状态很大,全量备份会导致每次的checkpoint耗时会很长,开启checkpoint的增量配置,在可以显著减少每次checkpoint状态的大小,以及checkpoint的时间。修改flink-conf.yaml中的配置,将state.backend.incremental设置为true。

◇算子背压导致checkpoint时间过长

如果是某个算子的背压原因导致checkpoint时间很长,我们可以去优化该算子,找出最耗时的操作进行优化。下面是我遇到一些场景的优化方案:

1、与维度表join。这种情况下,一是加缓存并设置数据过期时间。二是使用Redis来存放维度表的数据。

2、数据倾斜。这一块我们将在数据倾斜一节专门来讲,这里就不再说了。

算子的计算逻辑复杂。将计算过程拆分到多个算子中执行,而不是在一个算子中完成所有的逻辑计算。

◇磁盘IO导致checkpoint时间过长

如果是因为磁盘IO问题,我们可以通过以下方案来解决:

1、检查磁盘是否为SSD磁盘, 如果不是,则建议使用SSD磁盘。

2、挂载多块磁盘,在flink-conf.yaml中通过配置state.backend.rocksdb.localdir来指定多个挂载目录。不同的目录挂载到不同的磁盘。

3、通过上面的操作后,还是存在IO问题,比如有的磁盘IO使用高,有的使用低。在这种情况下,可能是由于多个sub task共用同一块磁盘的问题,导致负载不均衡。解决这一个问题,通常的策略就是负载均衡策略。通用的负载均衡策略有hash, 随机以及轮循等策略。Flink默认使用的策略是随机策略,源码如下:

如果想使用其他策略,可以修改这里的源码。

◇数据传输导致checkpoint时间过长

Async Duration是指RocksDB将本地checkpoint的状态备份到其他持久化存储系统所花费的时间,其他存储系统如HDFS文件系统。如果是由于Async Duration时间长,先检查网络IO使用是否很高,如果不是很高,说明RocksDB将本地状态数据上传到HDFS时花费的时间太多,这时我们可以提高RocksDB的数据传输线程数量,因为RocksDB默认的数据传输线程数量为1,所以我们可以增加线程数量来减少Async Duration的时间。增加数据传输线程数量的代码设置如下:

4.3 数据倾斜问题解决方案

4.3.1 Local-Global Aggregation策略

Flink对于数据倾斜的解决方案是采用Local-Global Aggregation策略。Local-Global将一个组聚合分为两个阶段,即首先在上游进行局部聚合,然后在下游进行全局聚合。它类似于MapReduce的Combine + Reduce模式。它的原理如下图4-1所示。

Flink sql使用Local-Global Aggregation策略非常简单,只要设置如下配置即可:

4.3.2 Split-Distinct Aggregation

Local-Global 优化可有效消除一般聚合的数据倾斜,例如 SUM、COUNT、MAX、MIN、AVG。但是在处理distinct 的聚合时,它的性能并不令人满意。例如下面的SQL:

如果不同键(即user_id)的值稀疏,则 COUNT DISTINCT不擅长减少记录。即使启用了Local-Global优化,也无济于事。因为累加器仍然包含几乎所有的原始记录,全局聚合将成为瓶颈。如下图4-2左侧所示。

Split-Distinct Aggregation优化的想法是将不同的聚合(例如COUNT(DISTINCT col))分成两个级别。

第一步是按照day和bucket键值进行分组聚合。bucket键值等于HASH_CODE(user_id) % BUCKET_NUM。BUCKET_NUM默认1024,可通过table.optimizer.distinct-agg.split.bucket-num选项配置。

第二步按原始键(即day)分组,SUM用于聚合来自不同存储桶的 COUNT DISTINCT 值。因为相同的distinct key只会在同一个bucket中计算,所以变换是等价的。

上面的sql通过split-distinct aggregation优化后,SQL相当于下面的SQL:

Flink SQL要开启split-distinct aggregation优化,只需要设置下面的配置即可:

5.总结

本文深入分析了Flink实时计算应用实践中的常见问题:数据乱序,大状态作业优化,数据倾斜,并且对相应问题提出了可行的解决方案。后续我们将探讨Flink在智能风控,实时数据入湖,实时监控等其它业务场景的应用实践。

相关推荐

俄罗斯的 HTTPS 也要被废了?(俄罗斯网站关闭)

发布该推文的ScottHelme是一名黑客,SecurityHeaders和ReportUri的创始人、Pluralsight作者、BBC常驻黑客。他表示,CAs现在似乎正在停止为俄罗斯域名颁发...

如何强制所有流量使用 HTTPS一网上用户

如何强制所有流量使用HTTPS一网上用户使用.htaccess强制流量到https的最常见方法可能是使用.htaccess重定向请求。.htaccess是一个简单的文本文件,简称为“.h...

https和http的区别(https和http有何区别)

“HTTPS和HTTP都是数据传输的应用层协议,区别在于HTTPS比HTTP安全”。区别在哪里,我们接着往下看:...

快码住!带你十分钟搞懂HTTP与HTTPS协议及请求的区别

什么是协议?网络协议是计算机之间为了实现网络通信从而达成的一种“约定”或“规则”,正是因为这个“规则”的存在,不同厂商的生产设备、及不同操作系统组成的计算机之间,才可以实现通信。简单来说,计算机与网络...

简述HTTPS工作原理(简述https原理,以及与http的区别)

https是在http协议的基础上加了一层SSL(由网景公司开发),加密由ssl实现,它的目的是为用户提供对网站服务器的身份认证(需要CA),以至于保护交换数据的隐私和完整性,原理如图示。1、客户端发...

21、HTTPS 有几次握手和挥手?HTTPS 的原理什么是(高薪 常问)

HTTPS是3次握手和4次挥手,和HTTP是一样的。HTTPS的原理...

一次安全可靠的通信——HTTPS原理

为什么HTTPS协议就比HTTP安全呢?一次安全可靠的通信应该包含什么东西呢,这篇文章我会尝试讲清楚这些细节。Alice与Bob的通信...

为什么有的网站没有使用https(为什么有的网站点不开)

有的网站没有使用HTTPS的原因可能涉及多个方面,以下是.com、.top域名的一些见解:服务器性能限制:HTTPS使用公钥加密和私钥解密技术,这要求服务器具备足够的计算能力来处理加解密操作。如果服务...

HTTPS是什么?加密原理和证书。SSL/TLS握手过程

秘钥的产生过程非对称加密...

图解HTTPS「转」(图解http 完整版 彩色版 pdf)

我们都知道HTTPS能够加密信息,以免敏感信息被第三方获取。所以很多银行网站或电子邮箱等等安全级别较高的服务都会采用HTTPS协议。...

HTTP 和 HTTPS 有何不同?一文带你全面了解

随着互联网时代的高速发展,Web服务器和客户端之间的安全通信需求也越来越高。HTTP和HTTPS是两种广泛使用的Web通信协议。本文将介绍HTTP和HTTPS的区别,并探讨为什么HTTPS已成为We...

HTTP与HTTPS的区别,详细介绍(http与https有什么区别)

HTTP与HTTPS介绍超文本传输协议HTTP协议被用于在Web浏览器和网站服务器之间传递信息,HTTP协议以明文方式发送内容,不提供任何方式的数据加密,如果攻击者截取了Web浏览器和网站服务器之间的...

一文让你轻松掌握 HTTPS(https详解)

一文让你轻松掌握HTTPS原文作者:UC国际研发泽原写在最前:欢迎你来到“UC国际技术”公众号,我们将为大家提供与客户端、服务端、算法、测试、数据、前端等相关的高质量技术文章,不限于原创与翻译。...

如何在Spring Boot应用程序上启用HTTPS?

HTTPS是HTTP的安全版本,旨在提供传输层安全性(TLS)[安全套接字层(SSL)的后继产品],这是地址栏中的挂锁图标,用于在Web服务器和浏览器之间建立加密连接。HTTPS加密每个数据包以安全方...

一文彻底搞明白Http以及Https(http0)

早期以信息发布为主的Web1.0时代,HTTP已可以满足绝大部分需要。证书费用、服务器的计算资源都比较昂贵,作为HTTP安全扩展的HTTPS,通常只应用在登录、交易等少数环境中。但随着越来越多的重要...

取消回复欢迎 发表评论: