flinkcep(flinkcep延时)
本篇文章给大家谈谈flinkcep,以及flinkcep延时对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。
本文目录一览:
Flink架构、原理
Apache Flink是一个面向分布式数据流处理和批量数据处理的开源计算平台,它能够基于同一个Flink运行时,提供支持流处理和批处理两种类型应用的功能。
现有的开源计算方案,会把流处理和批处理作为两种不同的应用类型,因为它们所提供的SLA(Service-Level-Aggreement)是完全不相同的:流处理一般需要支持低延迟、Exactly-once保证,而批处理需要支持高吞吐、高效处理。
Flink从另一个视角看待流处理和批处理,将二者统一起来:Flink是完全支持流处理,也就是说作为流处理看待时输入数据流是无界的; 批处理被作为一种特殊的流处理,只是它的输入数据流被定义为有界的。
Flink流处理特性:
Flink以层级式系统形式组件其软件栈,不同层的栈建立在其下层基础上,并且各层接受程序不同层的抽象形式。
1. 流、转换、操作符
Flink程序是由Stream和Transformation这两个基本构建块组成,其中Stream是一个中间结果数据,而Transformation是一个操作,它对一个或多个输入Stream进行计算处理,输出一个或多个结果Stream。
Flink程序被执行的时候闭做滚,它会被映射为Streaming Dataflow。一个Streaming Dataflow是由一组Stream和Transformation Operator组成,它类似于一个DAG图,在启动的时候从一个或多个Source Operator开始,结束于一个或多个Sink Operator。
2. 并行数据流
一个Stream可以被分成多个Stream分区(Stream Partitions),一个Operator可以被分成多个Operator Subtask,每一个Operator Subtask是在不同的线程中独立执行的。一个Operator的并行度,等于Operator Subtask的个数,一个Stream的并行度总是等于生成它的Operator的并行度。
One-to-one模式
比如从Source[1]到map()[1],它保持了Source的分区特性(Partitioning)和分区内元素处理的有序性,也就是说map()[1]的Subtask看胡或到数据流中记录的顺序,与Source[1]中看到的记录顺序是一致的。
Redistribution模式
这种模式改变了输入数据流的分区,比如从map()[1]、map()[2]到keyBy()/window()/apply()[1]、keyBy()/window()/apply()[2],上游的轿余Subtask向下游的多个不同的Subtask发送数据,改变了数据流的分区,这与实际应用所选择的Operator有关系。
3.任务、操作符链
Flink分布式执行环境中,会将多个Operator Subtask串起来组成一个Operator Chain,实际上就是一个执行链,每个执行链会在TaskManager上一个独立的线程中执行。
4. 时间
处理Stream中的记录时,记录中通常会包含各种典型的时间字段:
Event Time:表示事件创建时间
Ingestion Time:表示事件进入到Flink Dataflow的时间
Processing Time:表示某个Operator对事件进行处理的本地系统时间
Flink使用WaterMark衡量时间的时间,WaterMark携带时间戳t,并被插入到stream中。
5. 窗口
Flink支持基于时间窗口操作,也支持基于数据的窗口操作:
窗口分类:
Tumbling/Sliding Time Window
// Stream of (sensorId, carCnt)
val vehicleCnts: DataStream[(Int, Int)] = ...
val tumblingCnts: DataStream[(Int, Int)] = vehicleCnts
// key stream by sensorId
.keyBy(0)
// tumbling time window of 1 minute length
.timeWindow(Time.minutes(1))
// compute sum over carCnt
.sum(1)
val slidingCnts: DataStream[(Int, Int)] = vehicleCnts
.keyBy(0)
// sliding time window of 1 minute length and 30 secs trigger interval
.timeWindow(Time.minutes(1), Time.seconds(30))
.sum(1)
Tumbling/Sliding Count Window
// Stream of (sensorId, carCnt)
val vehicleCnts: DataStream[(Int, Int)] = ...
val tumblingCnts: DataStream[(Int, Int)] = vehicleCnts
// key stream by sensorId
.keyBy(0)
// tumbling count window of 100 elements size
.countWindow(100)
// compute the carCnt sum
.sum(1)
val slidingCnts: DataStream[(Int, Int)] = vehicleCnts
.keyBy(0)
// sliding count window of 100 elements size and 10 elements trigger interval
.countWindow(100, 10)
.sum(1)
自定义窗口
基本操作:
6. 容错
Barrier机制:
对齐:
当Operator接收到多个输入的数据流时,需要在Snapshot Barrier中对数据流进行排列对齐:
基于Stream Aligning操作能够实现Exactly Once语义,但是也会给流处理应用带来延迟,因为为了排列对齐Barrier,会暂时缓存一部分Stream的记录到Buffer中,尤其是在数据流并行度很高的场景下可能更加明显,通常以最迟对齐Barrier的一个Stream为处理Buffer中缓存记录的时刻点。在Flink中,提供了一个开关,选择是否使用Stream Aligning,如果关掉则Exactly Once会变成At least once。
CheckPoint:
Snapshot并不仅仅是对数据流做了一个状态的Checkpoint,它也包含了一个Operator内部所持有的状态,这样才能够在保证在流处理系统失败时能够正确地恢复数据流处理。状态包含两种:
7. 调度
在JobManager端,会接收到Client提交的JobGraph形式的Flink Job,JobManager会将一个JobGraph转换映射为一个ExecutionGraph,ExecutionGraph是JobGraph的并行表示,也就是实际JobManager调度一个Job在TaskManager上运行的逻辑视图。
物理上进行调度,基于资源的分配与使用的一个例子:
8. 迭代
机器学习和图计算应用,都会使用到迭代计算,Flink通过在迭代Operator中定义Step函数来实现迭代算法,这种迭代算法包括Iterate和Delta Iterate两种类型。
Iterate
Iterate Operator是一种简单的迭代形式:每一轮迭代,Step函数的输入或者是输入的整个数据集,或者是上一轮迭代的结果,通过该轮迭代计算出下一轮计算所需要的输入(也称为Next Partial Solution),满足迭代的终止条件后,会输出最终迭代结果。
流程伪代码:
IterationState state = getInitialState();
while (!terminationCriterion()) {
state = step(state);
}
setFinalState(state);
Delta Iterate
Delta Iterate Operator实现了增量迭代。
流程伪代码:
IterationState workset = getInitialState();
IterationState solution = getInitialSolution();
while (!terminationCriterion()) {
(delta, workset) = step(workset, solution);
solution.update(delta)
}
setFinalState(solution);
最小值传播:
9. Back Pressure监控
流处理系统中,当下游Operator处理速度跟不上的情况,如果下游Operator能够将自己处理状态传播给上游Operator,使得上游Operator处理速度慢下来就会缓解上述问题,比如通过告警的方式通知现有流处理系统存在的问题。
Flink Web界面上提供了对运行Job的Backpressure行为的监控,它通过使用Sampling线程对正在运行的Task进行堆栈跟踪采样来实现。
默认情况下,JobManager会每间隔50ms触发对一个Job的每个Task依次进行100次堆栈跟踪调用,过计算得到一个比值,例如,radio=0.01,表示100次中仅有1次方法调用阻塞。Flink目前定义了如下Backpressure状态:
OK: 0 = Ratio = 0.10
LOW: 0.10 Ratio = 0.5
HIGH: 0.5 Ratio = 1
1. Table
Flink的Table API实现了使用类SQL进行流和批处理。
详情参考:
2. CEP
Flink的CEP(Complex Event Processing)支持在流中发现复杂的事件模式,快速筛选用户感兴趣的数据。
详情参考:
3. Gelly
Gelly是Flink提供的图计算API,提供了简化开发和构建图计算分析应用的接口。
详情参考:
4. FlinkML
FlinkML是Flink提供的机器学习库,提供了可扩展的机器学习算法、简洁的API和工具简化机器学习系统的开发。
详情参考:
明天更新部署与测试
本文仅代表个人的观点,如果阐述的不好欢迎大家指导纠正,在此感激不尽。
【Flink】Flink cep
[TOC]
什么是 CEP:
CEP 的特征如下:
市场上有多种 CEP 的解决方案,例如Spark、Samza、Beam等,但他们都没有提供专门的库支持。然而,Flink提供了专门的CEP库。
Flink CEP 包含如下组件:Event Stream、Pattern定义、Pattern检测和生成Alert。
简单来说一下,其实可卜判以把使用 flink CEP 当做平时用的型物改正则表达式,cep中的 Pattern 就是定义的正则表达式,flink 中的DataStream 就是正则表达式中待匹配的字符串,flink 通过DataStream 和 自定义的Pattern进行匹配,生成一个经过过滤之后的DataStream。
基于自定义的pattern,可以做很多工作,比如监控报警、风控、反爬等等。
处理事件的规则,被叫作模式(Pattern)。Flink CEP提供了Pattern API用于对输入流数据进行复杂事件规则定义,用来提取符合规则的事件序列。
模式大致分为两类:
个体模式(Individual Patterns):组成复杂规则的每一个单独的模式定义,就是个体模式。
组合模式(Combining Patterns,也叫模式序列):很多个体模式组合起来,就形成了整个的模式序列。
个体模式包括单例模式和循环模式。单例模式只接收一个事件,而循环模式可以接收多个事件。
可以在一个个体模式后追加量词,也就是指定循环次数。
每个模式都需要指定触发条件,作为模式是否接受事件进入的判断依据。
CEP中的个体模式主要通过调用.where()、.or()和.until()来指定条件。按不同的调用方式,可以分成以下几类:
简单条件: 通过.where()方法对事件中的字段进行判断筛选,决定是否接收该事件
组合条件: 将简单的条件进行合并,or()方法表示或逻辑相连,where的直蚂橘接组合就相当于与and。
终止条件: 如果使用了oneOrMore或者oneOrMore.optional,建议使用.until()作为终止条件,以便清理状态。
迭代条件: 能够对模式之前所有接收的事件进行处理,调用.where((value,ctx) = {…}),可以调用ctx.getEventForPattern("name")
了解了独立模式,现在看看如何将它们组合成一个完整的模式序列。
模式序列必须以初始模式开始,如下所示:
接下来,可以通过指定它们之间所需的连续条件,为模式序列添加更多模式。 Flink CEP 支持 事件之间 以下形式的邻接:
要在连续模式之间应用它们,可以使用:
除了以上模式序列外,还可以定义“不希望出现某种近邻关系”:
需要注意 :
宽松的连续性(Relaxed contiguity )意味着仅匹配第一个匹配事件,而具有非确定性的松弛连续性(non-deterministic relaxed contiguity),将为同一个开始发出多个匹配。 例如模式“a b”,给定事件序列“a”,“c”,“b1”,“b2”将给出以下结果:
也可以为模式定义时间约束以使其有效。 例如,可以通过pattern.within()方法定义模式应在10秒内发生。 处理和事件时间都支持时间模式。
注意模式: 序列只能有一个时间约束。 如果在不同的单独模式上定义了多个这样的约束,则应用最小的约束。
可以在循环模式中应用与上一节中讨论的相同的连续条件。
连续性将应用于接受到这种模式的元素之间。 为了举例说明上述情况,模式序列“a b + c”(“a”后跟一个或多个“b”的任何(非确定性宽松)序列,后跟“c”),输入“a” “,”“b1”,“d1”,“b2”,“d2”,“b3”“c”将产生以下结果:
对于循环模式(例如oneOrMore()和times()), 默认是宽松的连续性 。 如果想要严格的连续性,必须使用continuous()调用显式指定它,如果想要非确定性的松弛连续性,可以使用allowCombinations()调用。
consecutive()
与oneOrMore()和times()结合使用,并在匹配事件之间强加严格的连续性,即任何不匹配的元素都会中断匹配(像next())。
如果不应用,则使用松弛的连续性(如followBy())。
为输入序列生成以下匹配项:C D A1 A2 A3 D A4 B.
allowCombinations()
与oneOrMore()和times()一起使用,并在匹配事件之间强加非确定性的松散连续性(像followAyAny())。
如果不应用,则使用宽松的连续性(像followBy())。
将为输入序列生成以下匹配项:C D A1 A2 A3 D A4 B.
指定要查找的模式序列后,就可以将其应用于输入流以检测潜在匹配。调用CEP.pattern(),给定输入流和模式,就能得到一个PatternStream。
创建PatternStream之后,就可以应用select或者flatSelect方法,从检测到的事件序列中提取事件了。
select()方法需要输入一个select function作为参数,每个成功匹配的事件序列都会调用它。
select()以一个Map[String,Iterable[IN]]来接收匹配到的事件序列,其中key就是每个模式的名称,而value就是所有接收到的事件的Iterable类型。
每当模式具有通过within关键字附加的窗口长度时,部分事件序列可能因为超过窗口长度而被丢弃。 要对超时的部分匹配进行操作,可以使用TimedOutPartialMatchHandler接口。
这是来自尚硅谷的一个例子:检测一个用户在3秒内连续登陆失败。
首先要导入依赖:
LoginLog.csv 内容如下:
也可看我 github:
Flink CEP详解
Flink难点:彻底明白CEP4,组合模式、循环模式介绍
记一次 Flink 反压问题排查过程
根据subtask的watermark发现延迟了10几分钟,然后查看是否有异常或者BackPressure的情况最终发现,source-watermarks-filter端三个subtask反压都显示High
重启多次,问题依然存在。
正常任务checkpoint时间端发现非常短
反压歼燃任务
大约可以看出来checkpoint做的时间过程,并且内部基本上是下游的subtask任务耗时比较长,因此初步怀疑是因为下游sink消费原因。
分析上游的subtask的Metrics
如果一个 Subtask 的发送端 Buffer 占用率很高,则表明它被下游反压限速了;如果一个 Subtask 的接受端 Buffer 占用很高,则表明它将反压传导至上游。
outPoolUsage 和 inPoolUsage 同为低或同为高分别表明当前 Subtask 正常或处于被下游反压,这应该没有太多疑问。而比较有趣的是当 outPoolUsage 和 inPoolUsage 表现不同时,这可能是出于反压传导的中间状态或者表明该 Subtask 就是反压的根源。
如果一个 Subtask 的 outPoolUsage 是高,通常是被下游 Task 所影响,所以可以排查它本身是反压根源的可能性。如果一个 Subtask 的 outPoolUsage 是低,但其 inPoolUsage 是高,则表明它有可能是反压的根源。因为通常反压会传导至其上游,导致上游某些 Subtask 的 outPoolUsage 为高,我们可以根据这点来进一步判断。值得注意的是,反压有时是短暂的且影响不大,比如来自某个 Channel 的短暂网络延迟或者 TaskManager 的正常 GC,这种情况下我们可以不用处理。
可以分析出来上游分下游限速里。
通常来说茄改棚,floatingBuffersUsage 为高则表明反压正在传导至上游,而 exclusiveBuffersUsage 则表明了反压是否存在倾斜(floatingBuffersUsage 高、exclusiveBuffersUsage 低为有倾斜,因为少数 channel 占用了大部分的 Floating Buffer)。
可以看出来subtask的数据并不是特别的倾斜
调整sink to kafka为print打印控制台
发现仍然存在反压问题,排除sink写入kafka速度过慢问题,因原来写es存在延迟因此改为kafka,因此这一次先排除这个问题。
降低cep时间时间窗口大小,由3分钟-》1分钟-》20s 反压出现的时间越来越靠后,大体问题定位在cep算子相关,并且此时每次checkpoint的时间在增大,尽管state的大小相同但是时间成倍增大,故修改checkpoint相关配置继续测试发现问题仍然存在
分析线程taskmanager线程占比,发现cep算子占用cpu百分之94,故增大算子并发度3为6线程cpu占用降低如下健康状态
反压经历1个颤则时间也没有再出现,后续会持续跟进,并且会尝试调大cep的时间窗口,尝试配置出最佳实践
增大分区后发现数据倾斜严重,因为kafka分区为3,但是并行度为6因此cep算子的6个subtask数据倾斜严重,因此在添加source端执行reblance方法,强制轮询方式分配数据
可以看出来这里数据相比以前均匀很多
[img]关于flinkcep和flinkcep延时的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。