flinkcep(flinkcep动态改变times次数)
本篇文章给大家谈谈flinkcep,以及flinkcep动态改变times次数对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。
本文目录一览:
【Flink】Flink cep
[TOC]
什么是 CEP:
CEP 的特征如下:
市场上有多种 CEP 的解决方案,例如Spark、Samza、Beam等,但他们都没有提供专门的库支持。然而,Flink提供了专门的CEP库。
Flink CEP 包含如下组件:Event Stream、Pattern定义、Pattern检测和生成Alert。
简单来说一下,其实可卜判以把使用 flink CEP 当做平时用的型物改正则表达式,cep中的 Pattern 就是定义的正则表达式,flink 中的DataStream 就是正则表达式中待匹配的字符串,flink 通过DataStream 和 自定义的Pattern进行匹配,生成一个经过过滤之后的DataStream。
基于自定义的pattern,可以做很多工作,比如监控报警、风控、反爬等等。
处理事件的规则,被叫作模式(Pattern)。Flink CEP提供了Pattern API用于对输入流数据进行复杂事件规则定义,用来提取符合规则的事件序列。
模式大致分为两类:
个体模式(Individual Patterns):组成复杂规则的每一个单独的模式定义,就是个体模式。
组合模式(Combining Patterns,也叫模式序列):很多个体模式组合起来,就形成了整个的模式序列。
个体模式包括单例模式和循环模式。单例模式只接收一个事件,而循环模式可以接收多个事件。
可以在一个个体模式后追加量词,也就是指定循环次数。
每个模式都需要指定触发条件,作为模式是否接受事件进入的判断依据。
CEP中的个体模式主要通过调用.where()、.or()和.until()来指定条件。按不同的调用方式,可以分成以下几类:
简单条件: 通过.where()方法对事件中的字段进行判断筛选,决定是否接收该事件
组合条件: 将简单的条件进行合并,or()方法表示或逻辑相连,where的直蚂橘接组合就相当于与and。
终止条件: 如果使用了oneOrMore或者oneOrMore.optional,建议使用.until()作为终止条件,以便清理状态。
迭代条件: 能够对模式之前所有接收的事件进行处理,调用.where((value,ctx) = {…}),可以调用ctx.getEventForPattern("name")
了解了独立模式,现在看看如何将它们组合成一个完整的模式序列。
模式序列必须以初始模式开始,如下所示:
接下来,可以通过指定它们之间所需的连续条件,为模式序列添加更多模式。 Flink CEP 支持 事件之间 以下形式的邻接:
要在连续模式之间应用它们,可以使用:
除了以上模式序列外,还可以定义“不希望出现某种近邻关系”:
需要注意 :
宽松的连续性(Relaxed contiguity )意味着仅匹配第一个匹配事件,而具有非确定性的松弛连续性(non-deterministic relaxed contiguity),将为同一个开始发出多个匹配。 例如模式“a b”,给定事件序列“a”,“c”,“b1”,“b2”将给出以下结果:
也可以为模式定义时间约束以使其有效。 例如,可以通过pattern.within()方法定义模式应在10秒内发生。 处理和事件时间都支持时间模式。
注意模式: 序列只能有一个时间约束。 如果在不同的单独模式上定义了多个这样的约束,则应用最小的约束。
可以在循环模式中应用与上一节中讨论的相同的连续条件。
连续性将应用于接受到这种模式的元素之间。 为了举例说明上述情况,模式序列“a b + c”(“a”后跟一个或多个“b”的任何(非确定性宽松)序列,后跟“c”),输入“a” “,”“b1”,“d1”,“b2”,“d2”,“b3”“c”将产生以下结果:
对于循环模式(例如oneOrMore()和times()), 默认是宽松的连续性 。 如果想要严格的连续性,必须使用continuous()调用显式指定它,如果想要非确定性的松弛连续性,可以使用allowCombinations()调用。
consecutive()
与oneOrMore()和times()结合使用,并在匹配事件之间强加严格的连续性,即任何不匹配的元素都会中断匹配(像next())。
如果不应用,则使用松弛的连续性(如followBy())。
为输入序列生成以下匹配项:C D A1 A2 A3 D A4 B.
allowCombinations()
与oneOrMore()和times()一起使用,并在匹配事件之间强加非确定性的松散连续性(像followAyAny())。
如果不应用,则使用宽松的连续性(像followBy())。
将为输入序列生成以下匹配项:C D A1 A2 A3 D A4 B.
指定要查找的模式序列后,就可以将其应用于输入流以检测潜在匹配。调用CEP.pattern(),给定输入流和模式,就能得到一个PatternStream。
创建PatternStream之后,就可以应用select或者flatSelect方法,从检测到的事件序列中提取事件了。
select()方法需要输入一个select function作为参数,每个成功匹配的事件序列都会调用它。
select()以一个Map[String,Iterable[IN]]来接收匹配到的事件序列,其中key就是每个模式的名称,而value就是所有接收到的事件的Iterable类型。
每当模式具有通过within关键字附加的窗口长度时,部分事件序列可能因为超过窗口长度而被丢弃。 要对超时的部分匹配进行操作,可以使用TimedOutPartialMatchHandler接口。
这是来自尚硅谷的一个例子:检测一个用户在3秒内连续登陆失败。
首先要导入依赖:
LoginLog.csv 内容如下:
也可看我 github:
Flink CEP详解
Flink难点:彻底明白CEP4,组合模式、循环模式介绍
Flink CEP greedy条件的理解
Flink 使用介绍相关文档目录
总的来说,Flink CEP greedy的含义为对于pattern中多个不互斥的条件,优先去匹配哪个。
例如:
下面我们分两种情况来测试:
完整程序为:
输入统一为:
输出为:
程序和情况1相同,但是pattern去掉greedy,如下所示:
对于同样的输入,输码李出为:
我们发现不使用greedy的时候多了两个输出:
"aaa"数册这个数据既符合条件a又符合条件b。对于贪婪(greedy)的情况,优先使用条件a来匹配。"aaa"迟毕迟符合条件a,所以greedy匹配的结果中"aaa"元素都必须作为条件a使用。但如果没有指定greedy,"aaa"既可以当做a又可以当做b。这样不使用greedy的时候为什么会多两个输出就不难理解了。
动态Flink CEP规则构想
场景
我们在写CEP规则的时候动态的逻辑大致可以分为三个部分:
1、通过条件过滤数据
2、事件链(Pattern)
3、每条事件匹配的条件(Condition)
所以可以构想开发一个通用的Job,然后凳此镇通过一个配置文件来达到规则的触发。
设计
1、首扒好先为了方便处理数据,我们先把数据都转成JSONObject,方便后续直接使用
2、所有涉及到条件判断的部分,我们都可以使用Jexl来实现(关于jexl的使用我已经有过介绍了)
3、整个事件链的描述需要定枣粗义一套DSL语言然后来解析(前端可以实现可视化的配置)
实现
1、条件的过滤
这里的Jexl是我对jexl的封装
2、事件条件的判断
提示:
功能FLINK CEP SQL本事是实现的,这里主要是探讨Flink SQL大致实现的过程
[img]关于flinkcep和flinkcep动态改变times次数的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。