flinkcep(flinkcep动态改变times次数)

本篇文章给大家谈谈flinkcep,以及flinkcep动态改变times次数对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。

本文目录一览:

[TOC]

什么是 CEP:

CEP 的特征如下:

市场上有多种 CEP 的解决方案,例如Spark、Samza、Beam等,但他们都没有提供专门的库支持。然而,Flink提供了专门的CEP库。

Flink CEP 包含如下组件:Event Stream、Pattern定义、Pattern检测和生成Alert。

简单来说一下,其实可卜判以把使用 flink CEP 当做平时用的型物改正则表达式,cep中的 Pattern 就是定义的正则表达式,flink 中的DataStream 就是正则表达式中待匹配的字符串,flink 通过DataStream 和 自定义的Pattern进行匹配,生成一个经过过滤之后的DataStream。

基于自定义的pattern,可以做很多工作,比如监控报警、风控、反爬等等。

处理事件的规则,被叫作模式(Pattern)。Flink CEP提供了Pattern API用于对输入流数据进行复杂事件规则定义,用来提取符合规则的事件序列。

模式大致分为两类:

个体模式(Individual Patterns):组成复杂规则的每一个单独的模式定义,就是个体模式。

组合模式(Combining Patterns,也叫模式序列):很多个体模式组合起来,就形成了整个的模式序列。

个体模式包括单例模式和循环模式。单例模式只接收一个事件,而循环模式可以接收多个事件。

可以在一个个体模式后追加量词,也就是指定循环次数。

每个模式都需要指定触发条件,作为模式是否接受事件进入的判断依据。

CEP中的个体模式主要通过调用.where()、.or()和.until()来指定条件。按不同的调用方式,可以分成以下几类:

简单条件: 通过.where()方法对事件中的字段进行判断筛选,决定是否接收该事件

组合条件: 将简单的条件进行合并,or()方法表示或逻辑相连,where的直蚂橘接组合就相当于与and。

终止条件: 如果使用了oneOrMore或者oneOrMore.optional,建议使用.until()作为终止条件,以便清理状态。

迭代条件: 能够对模式之前所有接收的事件进行处理,调用.where((value,ctx) = {…}),可以调用ctx.getEventForPattern("name")

了解了独立模式,现在看看如何将它们组合成一个完整的模式序列。

模式序列必须以初始模式开始,如下所示:

接下来,可以通过指定它们之间所需的连续条件,为模式序列添加更多模式。 Flink CEP 支持 事件之间 以下形式的邻接:

要在连续模式之间应用它们,可以使用:

除了以上模式序列外,还可以定义“不希望出现某种近邻关系”:

需要注意 :

宽松的连续性(Relaxed contiguity )意味着仅匹配第一个匹配事件,而具有非确定性的松弛连续性(non-deterministic relaxed contiguity),将为同一个开始发出多个匹配。 例如模式“a b”,给定事件序列“a”,“c”,“b1”,“b2”将给出以下结果:

也可以为模式定义时间约束以使其有效。 例如,可以通过pattern.within()方法定义模式应在10秒内发生。 处理和事件时间都支持时间模式。

注意模式: 序列只能有一个时间约束。 如果在不同的单独模式上定义了多个这样的约束,则应用最小的约束。

可以在循环模式中应用与上一节中讨论的相同的连续条件。

连续性将应用于接受到这种模式的元素之间。 为了举例说明上述情况,模式序列“a b + c”(“a”后跟一个或多个“b”的任何(非确定性宽松)序列,后跟“c”),输入“a” “,”“b1”,“d1”,“b2”,“d2”,“b3”“c”将产生以下结果:

对于循环模式(例如oneOrMore()和times()), 默认是宽松的连续性 。 如果想要严格的连续性,必须使用continuous()调用显式指定它,如果想要非确定性的松弛连续性,可以使用allowCombinations()调用。

consecutive()

与oneOrMore()和times()结合使用,并在匹配事件之间强加严格的连续性,即任何不匹配的元素都会中断匹配(像next())。

如果不应用,则使用松弛的连续性(如followBy())。

为输入序列生成以下匹配项:C D A1 A2 A3 D A4 B.

allowCombinations()

与oneOrMore()和times()一起使用,并在匹配事件之间强加非确定性的松散连续性(像followAyAny())。

如果不应用,则使用宽松的连续性(像followBy())。

将为输入序列生成以下匹配项:C D A1 A2 A3 D A4 B.

指定要查找的模式序列后,就可以将其应用于输入流以检测潜在匹配。调用CEP.pattern(),给定输入流和模式,就能得到一个PatternStream。

创建PatternStream之后,就可以应用select或者flatSelect方法,从检测到的事件序列中提取事件了。

select()方法需要输入一个select function作为参数,每个成功匹配的事件序列都会调用它。

select()以一个Map[String,Iterable[IN]]来接收匹配到的事件序列,其中key就是每个模式的名称,而value就是所有接收到的事件的Iterable类型。

每当模式具有通过within关键字附加的窗口长度时,部分事件序列可能因为超过窗口长度而被丢弃。 要对超时的部分匹配进行操作,可以使用TimedOutPartialMatchHandler接口。

这是来自尚硅谷的一个例子:检测一个用户在3秒内连续登陆失败。

首先要导入依赖:

LoginLog.csv 内容如下:

也可看我 github:

Flink CEP详解

Flink难点:彻底明白CEP4,组合模式、循环模式介绍

Flink 使用介绍相关文档目录

总的来说,Flink CEP greedy的含义为对于pattern中多个不互斥的条件,优先去匹配哪个。

例如:

下面我们分两种情况来测试:

完整程序为:

输入统一为:

输出为:

程序和情况1相同,但是pattern去掉greedy,如下所示:

对于同样的输入,输码李出为:

我们发现不使用greedy的时候多了两个输出:

"aaa"数册这个数据既符合条件a又符合条件b。对于贪婪(greedy)的情况,优先使用条件a来匹配。"aaa"迟毕迟符合条件a,所以greedy匹配的结果中"aaa"元素都必须作为条件a使用。但如果没有指定greedy,"aaa"既可以当做a又可以当做b。这样不使用greedy的时候为什么会多两个输出就不难理解了。

场景

我们在写CEP规则的时候动态的逻辑大致可以分为三个部分:

1、通过条件过滤数据

2、事件链(Pattern)

3、每条事件匹配的条件(Condition)

所以可以构想开发一个通用的Job,然后凳此镇通过一个配置文件来达到规则的触发。

设计

1、首扒好先为了方便处理数据,我们先把数据都转成JSONObject,方便后续直接使用

2、所有涉及到条件判断的部分,我们都可以使用Jexl来实现(关于jexl的使用我已经有过介绍了)

3、整个事件链的描述需要定枣粗义一套DSL语言然后来解析(前端可以实现可视化的配置)

实现

1、条件的过滤

这里的Jexl是我对jexl的封装

2、事件条件的判断

提示:

功能FLINK CEP SQL本事是实现的,这里主要是探讨Flink SQL大致实现的过程

[img]

关于flinkcep和flinkcep动态改变times次数的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。

标签列表