flinkcep(flinkcep延时)

本篇文章给大家谈谈flinkcep,以及flinkcep延时对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。

本文目录一览:

Flink架构、原理

Apache Flink是一个面向分布式数据流处理和批量数据处理的开源计算平台,它能够基于同一个Flink运行时,提供支持流处理和批处理两种类型应用的功能。

现有的开源计算方案,会把流处理和批处理作为两种不同的应用类型,因为它们所提供的SLA(Service-Level-Aggreement)是完全不相同的:流处理一般需要支持低延迟、Exactly-once保证,而批处理需要支持高吞吐、高效处理。

Flink从另一个视角看待流处理和批处理,将二者统一起来:Flink是完全支持流处理,也就是说作为流处理看待时输入数据流是无界的; 批处理被作为一种特殊的流处理,只是它的输入数据流被定义为有界的。

Flink流处理特性:

Flink以层级式系统形式组件其软件栈,不同层的栈建立在其下层基础上,并且各层接受程序不同层的抽象形式。

1. 流、转换、操作符

Flink程序是由Stream和Transformation这两个基本构建块组成,其中Stream是一个中间结果数据,而Transformation是一个操作,它对一个或多个输入Stream进行计算处理,输出一个或多个结果Stream。

Flink程序被执行的时候闭做滚,它会被映射为Streaming Dataflow。一个Streaming Dataflow是由一组Stream和Transformation Operator组成,它类似于一个DAG图,在启动的时候从一个或多个Source Operator开始,结束于一个或多个Sink Operator。

2. 并行数据流

一个Stream可以被分成多个Stream分区(Stream Partitions),一个Operator可以被分成多个Operator Subtask,每一个Operator Subtask是在不同的线程中独立执行的。一个Operator的并行度,等于Operator Subtask的个数,一个Stream的并行度总是等于生成它的Operator的并行度。

One-to-one模式

比如从Source[1]到map()[1],它保持了Source的分区特性(Partitioning)和分区内元素处理的有序性,也就是说map()[1]的Subtask看胡或到数据流中记录的顺序,与Source[1]中看到的记录顺序是一致的。

Redistribution模式

这种模式改变了输入数据流的分区,比如从map()[1]、map()[2]到keyBy()/window()/apply()[1]、keyBy()/window()/apply()[2],上游的轿余Subtask向下游的多个不同的Subtask发送数据,改变了数据流的分区,这与实际应用所选择的Operator有关系。

3.任务、操作符链

Flink分布式执行环境中,会将多个Operator Subtask串起来组成一个Operator Chain,实际上就是一个执行链,每个执行链会在TaskManager上一个独立的线程中执行。

4. 时间

处理Stream中的记录时,记录中通常会包含各种典型的时间字段:

Event Time:表示事件创建时间

Ingestion Time:表示事件进入到Flink Dataflow的时间

Processing Time:表示某个Operator对事件进行处理的本地系统时间

Flink使用WaterMark衡量时间的时间,WaterMark携带时间戳t,并被插入到stream中。

5. 窗口

Flink支持基于时间窗口操作,也支持基于数据的窗口操作:

窗口分类:

Tumbling/Sliding Time Window

// Stream of (sensorId, carCnt)

val vehicleCnts: DataStream[(Int, Int)] = ...

val tumblingCnts: DataStream[(Int, Int)] = vehicleCnts

// key stream by sensorId

.keyBy(0)

// tumbling time window of 1 minute length

.timeWindow(Time.minutes(1))

// compute sum over carCnt

.sum(1)

val slidingCnts: DataStream[(Int, Int)] = vehicleCnts

.keyBy(0)

// sliding time window of 1 minute length and 30 secs trigger interval

.timeWindow(Time.minutes(1), Time.seconds(30))

.sum(1)

Tumbling/Sliding Count Window

// Stream of (sensorId, carCnt)

val vehicleCnts: DataStream[(Int, Int)] = ...

val tumblingCnts: DataStream[(Int, Int)] = vehicleCnts

// key stream by sensorId

.keyBy(0)

// tumbling count window of 100 elements size

.countWindow(100)

// compute the carCnt sum

.sum(1)

val slidingCnts: DataStream[(Int, Int)] = vehicleCnts

.keyBy(0)

// sliding count window of 100 elements size and 10 elements trigger interval

.countWindow(100, 10)

.sum(1)

自定义窗口

基本操作:

6. 容错

Barrier机制:

对齐:

当Operator接收到多个输入的数据流时,需要在Snapshot Barrier中对数据流进行排列对齐:

基于Stream Aligning操作能够实现Exactly Once语义,但是也会给流处理应用带来延迟,因为为了排列对齐Barrier,会暂时缓存一部分Stream的记录到Buffer中,尤其是在数据流并行度很高的场景下可能更加明显,通常以最迟对齐Barrier的一个Stream为处理Buffer中缓存记录的时刻点。在Flink中,提供了一个开关,选择是否使用Stream Aligning,如果关掉则Exactly Once会变成At least once。

CheckPoint:

Snapshot并不仅仅是对数据流做了一个状态的Checkpoint,它也包含了一个Operator内部所持有的状态,这样才能够在保证在流处理系统失败时能够正确地恢复数据流处理。状态包含两种:

7. 调度

在JobManager端,会接收到Client提交的JobGraph形式的Flink Job,JobManager会将一个JobGraph转换映射为一个ExecutionGraph,ExecutionGraph是JobGraph的并行表示,也就是实际JobManager调度一个Job在TaskManager上运行的逻辑视图。

物理上进行调度,基于资源的分配与使用的一个例子:

8. 迭代

机器学习和图计算应用,都会使用到迭代计算,Flink通过在迭代Operator中定义Step函数来实现迭代算法,这种迭代算法包括Iterate和Delta Iterate两种类型。

Iterate

Iterate Operator是一种简单的迭代形式:每一轮迭代,Step函数的输入或者是输入的整个数据集,或者是上一轮迭代的结果,通过该轮迭代计算出下一轮计算所需要的输入(也称为Next Partial Solution),满足迭代的终止条件后,会输出最终迭代结果。

流程伪代码:

IterationState state = getInitialState();

while (!terminationCriterion()) {

state = step(state);

}

setFinalState(state);

Delta Iterate

Delta Iterate Operator实现了增量迭代。

流程伪代码:

IterationState workset = getInitialState();

IterationState solution = getInitialSolution();

while (!terminationCriterion()) {

(delta, workset) = step(workset, solution);

solution.update(delta)

}

setFinalState(solution);

最小值传播:

9. Back Pressure监控

流处理系统中,当下游Operator处理速度跟不上的情况,如果下游Operator能够将自己处理状态传播给上游Operator,使得上游Operator处理速度慢下来就会缓解上述问题,比如通过告警的方式通知现有流处理系统存在的问题。

Flink Web界面上提供了对运行Job的Backpressure行为的监控,它通过使用Sampling线程对正在运行的Task进行堆栈跟踪采样来实现。

默认情况下,JobManager会每间隔50ms触发对一个Job的每个Task依次进行100次堆栈跟踪调用,过计算得到一个比值,例如,radio=0.01,表示100次中仅有1次方法调用阻塞。Flink目前定义了如下Backpressure状态:

OK: 0 = Ratio = 0.10

LOW: 0.10 Ratio = 0.5

HIGH: 0.5 Ratio = 1

1. Table

Flink的Table API实现了使用类SQL进行流和批处理。

详情参考:

2. CEP

Flink的CEP(Complex Event Processing)支持在流中发现复杂的事件模式,快速筛选用户感兴趣的数据。

详情参考:

3. Gelly

Gelly是Flink提供的图计算API,提供了简化开发和构建图计算分析应用的接口。

详情参考:

4. FlinkML

FlinkML是Flink提供的机器学习库,提供了可扩展的机器学习算法、简洁的API和工具简化机器学习系统的开发。

详情参考:

明天更新部署与测试

本文仅代表个人的观点,如果阐述的不好欢迎大家指导纠正,在此感激不尽。

[TOC]

什么是 CEP:

CEP 的特征如下:

市场上有多种 CEP 的解决方案,例如Spark、Samza、Beam等,但他们都没有提供专门的库支持。然而,Flink提供了专门的CEP库。

Flink CEP 包含如下组件:Event Stream、Pattern定义、Pattern检测和生成Alert。

简单来说一下,其实可卜判以把使用 flink CEP 当做平时用的型物改正则表达式,cep中的 Pattern 就是定义的正则表达式,flink 中的DataStream 就是正则表达式中待匹配的字符串,flink 通过DataStream 和 自定义的Pattern进行匹配,生成一个经过过滤之后的DataStream。

基于自定义的pattern,可以做很多工作,比如监控报警、风控、反爬等等。

处理事件的规则,被叫作模式(Pattern)。Flink CEP提供了Pattern API用于对输入流数据进行复杂事件规则定义,用来提取符合规则的事件序列。

模式大致分为两类:

个体模式(Individual Patterns):组成复杂规则的每一个单独的模式定义,就是个体模式。

组合模式(Combining Patterns,也叫模式序列):很多个体模式组合起来,就形成了整个的模式序列。

个体模式包括单例模式和循环模式。单例模式只接收一个事件,而循环模式可以接收多个事件。

可以在一个个体模式后追加量词,也就是指定循环次数。

每个模式都需要指定触发条件,作为模式是否接受事件进入的判断依据。

CEP中的个体模式主要通过调用.where()、.or()和.until()来指定条件。按不同的调用方式,可以分成以下几类:

简单条件: 通过.where()方法对事件中的字段进行判断筛选,决定是否接收该事件

组合条件: 将简单的条件进行合并,or()方法表示或逻辑相连,where的直蚂橘接组合就相当于与and。

终止条件: 如果使用了oneOrMore或者oneOrMore.optional,建议使用.until()作为终止条件,以便清理状态。

迭代条件: 能够对模式之前所有接收的事件进行处理,调用.where((value,ctx) = {…}),可以调用ctx.getEventForPattern("name")

了解了独立模式,现在看看如何将它们组合成一个完整的模式序列。

模式序列必须以初始模式开始,如下所示:

接下来,可以通过指定它们之间所需的连续条件,为模式序列添加更多模式。 Flink CEP 支持 事件之间 以下形式的邻接:

要在连续模式之间应用它们,可以使用:

除了以上模式序列外,还可以定义“不希望出现某种近邻关系”:

需要注意 :

宽松的连续性(Relaxed contiguity )意味着仅匹配第一个匹配事件,而具有非确定性的松弛连续性(non-deterministic relaxed contiguity),将为同一个开始发出多个匹配。 例如模式“a b”,给定事件序列“a”,“c”,“b1”,“b2”将给出以下结果:

也可以为模式定义时间约束以使其有效。 例如,可以通过pattern.within()方法定义模式应在10秒内发生。 处理和事件时间都支持时间模式。

注意模式: 序列只能有一个时间约束。 如果在不同的单独模式上定义了多个这样的约束,则应用最小的约束。

可以在循环模式中应用与上一节中讨论的相同的连续条件。

连续性将应用于接受到这种模式的元素之间。 为了举例说明上述情况,模式序列“a b + c”(“a”后跟一个或多个“b”的任何(非确定性宽松)序列,后跟“c”),输入“a” “,”“b1”,“d1”,“b2”,“d2”,“b3”“c”将产生以下结果:

对于循环模式(例如oneOrMore()和times()), 默认是宽松的连续性 。 如果想要严格的连续性,必须使用continuous()调用显式指定它,如果想要非确定性的松弛连续性,可以使用allowCombinations()调用。

consecutive()

与oneOrMore()和times()结合使用,并在匹配事件之间强加严格的连续性,即任何不匹配的元素都会中断匹配(像next())。

如果不应用,则使用松弛的连续性(如followBy())。

为输入序列生成以下匹配项:C D A1 A2 A3 D A4 B.

allowCombinations()

与oneOrMore()和times()一起使用,并在匹配事件之间强加非确定性的松散连续性(像followAyAny())。

如果不应用,则使用宽松的连续性(像followBy())。

将为输入序列生成以下匹配项:C D A1 A2 A3 D A4 B.

指定要查找的模式序列后,就可以将其应用于输入流以检测潜在匹配。调用CEP.pattern(),给定输入流和模式,就能得到一个PatternStream。

创建PatternStream之后,就可以应用select或者flatSelect方法,从检测到的事件序列中提取事件了。

select()方法需要输入一个select function作为参数,每个成功匹配的事件序列都会调用它。

select()以一个Map[String,Iterable[IN]]来接收匹配到的事件序列,其中key就是每个模式的名称,而value就是所有接收到的事件的Iterable类型。

每当模式具有通过within关键字附加的窗口长度时,部分事件序列可能因为超过窗口长度而被丢弃。 要对超时的部分匹配进行操作,可以使用TimedOutPartialMatchHandler接口。

这是来自尚硅谷的一个例子:检测一个用户在3秒内连续登陆失败。

首先要导入依赖:

LoginLog.csv 内容如下:

也可看我 github:

Flink CEP详解

Flink难点:彻底明白CEP4,组合模式、循环模式介绍

根据subtask的watermark发现延迟了10几分钟,然后查看是否有异常或者BackPressure的情况最终发现,source-watermarks-filter端三个subtask反压都显示High

重启多次,问题依然存在。

正常任务checkpoint时间端发现非常短

反压歼燃任务

大约可以看出来checkpoint做的时间过程,并且内部基本上是下游的subtask任务耗时比较长,因此初步怀疑是因为下游sink消费原因。

分析上游的subtask的Metrics

如果一个 Subtask 的发送端 Buffer 占用率很高,则表明它被下游反压限速了;如果一个 Subtask 的接受端 Buffer 占用很高,则表明它将反压传导至上游。

outPoolUsage 和 inPoolUsage 同为低或同为高分别表明当前 Subtask 正常或处于被下游反压,这应该没有太多疑问。而比较有趣的是当 outPoolUsage 和 inPoolUsage 表现不同时,这可能是出于反压传导的中间状态或者表明该 Subtask 就是反压的根源。

如果一个 Subtask 的 outPoolUsage 是高,通常是被下游 Task 所影响,所以可以排查它本身是反压根源的可能性。如果一个 Subtask 的 outPoolUsage 是低,但其 inPoolUsage 是高,则表明它有可能是反压的根源。因为通常反压会传导至其上游,导致上游某些 Subtask 的 outPoolUsage 为高,我们可以根据这点来进一步判断。值得注意的是,反压有时是短暂的且影响不大,比如来自某个 Channel 的短暂网络延迟或者 TaskManager 的正常 GC,这种情况下我们可以不用处理。

可以分析出来上游分下游限速里。

通常来说茄改棚,floatingBuffersUsage 为高则表明反压正在传导至上游,而 exclusiveBuffersUsage 则表明了反压是否存在倾斜(floatingBuffersUsage 高、exclusiveBuffersUsage 低为有倾斜,因为少数 channel 占用了大部分的 Floating Buffer)。

可以看出来subtask的数据并不是特别的倾斜

调整sink to kafka为print打印控制台

发现仍然存在反压问题,排除sink写入kafka速度过慢问题,因原来写es存在延迟因此改为kafka,因此这一次先排除这个问题。

降低cep时间时间窗口大小,由3分钟-》1分钟-》20s 反压出现的时间越来越靠后,大体问题定位在cep算子相关,并且此时每次checkpoint的时间在增大,尽管state的大小相同但是时间成倍增大,故修改checkpoint相关配置继续测试发现问题仍然存在

分析线程taskmanager线程占比,发现cep算子占用cpu百分之94,故增大算子并发度3为6线程cpu占用降低如下健康状态

反压经历1个颤则时间也没有再出现,后续会持续跟进,并且会尝试调大cep的时间窗口,尝试配置出最佳实践

增大分区后发现数据倾斜严重,因为kafka分区为3,但是并行度为6因此cep算子的6个subtask数据倾斜严重,因此在添加source端执行reblance方法,强制轮询方式分配数据

可以看出来这里数据相比以前均匀很多

[img]

关于flinkcep和flinkcep延时的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。

标签列表