flinkflatmap的简单介绍

本篇文章给大家谈谈flinkflatmap,以及对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。

本文目录一览:

Flink状态管理和恢复机制

1、什么是状态?

2、Flink状态类型有哪几种?

3、状态有什么作用?

4、如何使用状态,实现什么样的API?

5、什么是checkpoint与savepoint?

6、如何使用checkpoint与savepoint?

7、checkpoint原理是什么?

8、checkpint存储到hdfs上又是什么意思?

1 增量计算

聚合操作、机器学习训练模型迭代运算时保存当前模型等等

2 容错

Job故障重启、升级

定义: 某task或者operator 在某一时刻的在内存中的状态。

而checkpoint是,对于这个中间结果进行一次快照。

作用:State是可以被记录的,在失败的情况下可以恢复。

checkpoint则表示了一个Flink Job,在一个特定时刻的一份全局状态快照,即包含了一个job下所有task/operator某时刻的状态。

比如任务挂掉的时候或被手动停止的时候,可以从挂掉的点重新继续消费。

基本类型:Operator state、Keyed state

特殊的 Broadcast State

适用场景:

增量计算:

1聚合操作

2机器学习训练模型迭代运算时保存当段锋前模型

等等

容错:

Job故障重启

使用状态,必须使用RichFunction,因为状态是使用RuntimeContext访问的,只能在RichFunction中访问

假设现在存在输入源数据格式为(EventID,Value)

输出数据,直接flatMap即可,无状态。

如果要输出某EventID最大值/最小值等,HashMap是否可以?

程序一旦Crash,如何恢复?

答案:Flink提供了一套状态保存的方法,不需要借助第三方存储系统来解决状态存储问题。

Operator State跟一个特定operator的一个并发实例绑定,整个operator只对应一个state。相比较而言,在一个operator上,可能有很多个key,从而对应多个keyed state。

所以一个并行度为4的source,即有4个实例,那么就会有4个状态

举例:Flink中的Kafka Connector,就使用了operator state。有几个并行度,就会有几个connector实例,消费的分区不一样,它会在每个connector实例中,保存该实例中消费topic的所有(partition,offset)映射。

数据结构:ListStateT

一般编码过程:实现CheckpointedFunction接口,必须实现两个函数,分别是:

initializeState和snapshotState

如何保存状态?

通常是定义一个private transient ListStateLong checkPointList;

注意:使用Operator State最好不要在keyBy之后使用,另外不要将太大的state存放到这个里面。

是基于KeyStream之上的状态,keyBy之后的Operator State。

那么,一个并行度为3的keyed Opreator有几个状态,这个就不一定是3了,这里有几个状态是由keyby之后有几个key所决定的。

案例:有一个事件流Tuple2[eventId,val],求不同的事件eventId下灶坦,相邻3个val的平均值,事件流如下:

(1,4),(2,3),(3,1),(1,2),(3,2),(1,2),(2,2),(2,9)

那么事件1:8/3=2

那么事件2:14/3=4

Keyed State的数据结构类型有:

ValueState握辩晌T:update(T)

ListStateT:add(T)、get(T)和clear(T)

ReducingStateT:add(T)、reduceFunction()

MapStateUK,UV:put(UK,UV)、putAll(MapUK,UV)、get(UK)

FlatMapFunction是无状态函数;RichFlatMapFunction是有状态函数

这里没有实现CheckpointedFunction接口,而是直接调用方法 getRuntimeContext(),然后使用getState方法来获取状态值。

特殊场景: 来自一个流的一些数据需要广播到所有下游任务,在这些任务中,这些数据被本地存储并且用于处理另一个流上的所有处理元素 。例如:一个低吞吐量流,其中包含一组规则,我们希望对来自另一个流的所有元素按照规则进行计算

典型应用:常规事件流.connect(规则流)

常规事件流.connect(配置流)

1 创建常规事件流DataStream或者KeyedDataStream

2 创建BroadcastedStream:创建规则流/配置流(低吞吐)并广播

3 连接两个Stream并实现计算处理

process(可以是BroadcastProcessFunction 或者 KeyedBroadcastProcessFunction )

BroadcastProcessFunction:

processElement(...):负责处理非广播流中的传入元素

processBroadcastElement(...):负责处理广播流中的传入元素(如规则),一般广播流的元素添加到状态里去备用,processElement处理业务数据时就可以使用

ReadOnlyContext和Context:

ReadOnlyContext对Broadcast State只有只读权限,Conetxt有写权限

KeyedBroadcastProcessFunction:

注意:

1 Flink之间没有跨Task的通信

2 每个任务的广播状态的元素顺序有可能不一样

3 Broadcast State保存在内存中(并不在RocksDB)

第三章 flink流处理API - map和flatmap

map有映射的意思, 作用是可以把一个输入的数据转为另外一个数据(比如唯腊把小写字母转换为大写字母, 数字转换成他的相反数等)。

DataStream → DataStream: 输入一个参数产生一个参数。

实现MapFunction接口指腔滑后,实现这个接口中的map方法, 接入参数表示输入数据, return的结果表示转换后的数据。

如: 数据翻倍等操作: dataStream.map { x = x * 2 }

通过源码可以看到他的实际返回值是SingleOutputStreamOperator, 属于DataStream的子类。

flat有平坦的意思,和map结合起来表示把把输入的数据打平映射。 作用是可以把一个输入的数据转为0-N条数据(比如把一个单词中所有的字母拆出来)。

DataStream → DataStream: 输入一个参数,产生0个、1个或者多个输出.

实现FlatMapFunction接口后,实现这个接口中的flatMap方法, 第一个接入参数表示输入数据 ,第二个接入参数是一个数据收集器对象:如果希望圆派输出该数据,就调用CollectorString的collect将数据收集输出。

如:这个 flatmap 的功能是将句子中的单词拆分出来: dataStream.flatMap { str = str.split(" ") }

通过源码可以看到他的实际返回值是SingleOutputStreamOperator, 属于DataStream的子类。

[img]

有状态的函数和操作在处理各个元素或者事件时存储数据,使得state称为任何类型的复杂操作的关键构建部件,例如:

当一个应用程序搜索某些特定的事件模式时,状态会保存截止到目前为止遇到过的事件的顺序;

当每分钟聚合事件时,状态会保存挂慧衫铅起的聚合

当通过数据点来训练机器学习模型时,状态会保存当前版本的模型参数

为了使state容错,Flink需要识别state并 checkpoint 它, 在许多情况下,Flink还管理着应用程序的状态,这意味着Flink处理内存管理(如果需要,可能会将内存中的数据溢出到磁盘)来保存非常大的state。

这篇文档介绍了在开发应用程序时如何使用Flink的state 抽象概念。

在Flink中有两个基本的state:Keyed state和 Operator state

Keyed State 总是与key相关,并且只能应用于 KeyedStream 的函数和操作中。前好

你可以认为 Keyed State 是一个已经分区或者划分的,每个state分区对应一个key的 Operator State , 每个 keyed-state 逻辑上与一个并行操作实例, 键( parallel-operator-instance, key )绑定在一起,由于每个key属于唯一一个键控算子( keyed operator )的并行实例,我们可以简单地看作是 operator, key 。

Keyed State 可以进一步的组成 Key Group , Key Group 是Flink重新分配 Keyed State 的最小单元,这里有跟定义的最大并行数一样多的 Key Group ,在运行时 keyed operator 的并行实例与key一起为一个或者多个 Key Group 工作。

使用 Operator State (或者非键控的state)的话,每个算子状态绑定到一个并行算子实例中。 Kafka Connector 就是在Flink中使用 Operator State 的一个很好的例子,每个 Kafka consumer 的并行实例保存着一个 topic 分区和偏移量的map作为它的 Operator State 。

当并行数发生变化时, Operator State 接口支持在并行操作实例中进行重新分配,这里有多种方法来进行重分配。

Keyed State和 Operator State存在两种形式:托管的和原生的。

托管的State( Managed State )由Flink运行时控制的数据结构表示, 例如内部哈希表或者RocksDB,例子是"ValueSate", "ListState"等。Flink运行时会对State编码并将它们写入checkpoint中。

原生State( Raw State )是算子保存它们自己的数据结构的state,当checkpoint时,它们仅仅写一串byte数组到checkpoint中。Flink并不知道State的数据结构,仅能看到原生的byte数组。

所有的数据流函数都可以使用托管state,但是原生state接口只能在实现operator时才能使用。使用托管State(而不是原生state)被推荐使用是因为使用托管state,当并行度发生变化时,Flink可以自动地重新分配state,同时还能更好地管塌源理内存。

托管的键控state接口可以访问所有当前输入元素的key范围内的不同类型的state,这也就意味着这种类型的state只能被通过stream.keyBy(...)创建的KeyedStream使用。

现在我们首先来看一下可用的不同类型的state,然后在看它们是如何在程序中使用的,可用State的原形如下:

ValueStateT:这里保存了一个可以更新和检索的值(由上述输入元素的key所限定,所以一个操作的每个key可能有一个值)。这个值可以使用 update(T) 来更新,使用 T value() 来获取。

ListStateT:这个保存了一个元素列表,你可以追加元素以及获取一个囊括当前所保存的所有元素的 Iterable ,元素可以通过调用 add(T) 来添加,而 Iterable 可以调用 IterableT get() 来获取。

ReducingStateT:这个保存了表示添加到state的所值的聚合的当个值,这个接口与ListState类似,只是调用add(T)添加的元素使用指定的ReduceFunction聚合成一个聚合值。

FoldingStateT, ACC:这将保存表示添加到状态的所有值的聚合的单个值,与ReducingState相反,聚合的数据类型可能跟添加到State的元素的数据类型不同,这个接口与ListState类似,只是调用add(T)添加的元素使用指定的FoldFunction折叠成一个聚合值。

MapStateT:这个保存了一个映射列表,你可以添加key-value对到state中并且检索一个包含所有当前保存的映射的Iterable。映射可以使用 put(UK, UV) 或者 putAll(MapUK, UV) 来添加。与key相关的value,可以使用 get(UK) 来获取,映射的迭代、keys及values可以分别调用 entries() , keys() 和 values() 来获取。

所有类型的state都有一个 clear() 方法来清除当前活动的key(及输入元素的key)的State。

注意: FoldingState会在下一个Flink版本中启用,并在将来的版本中彻底删除,将提供更加一般的替代方案。

值得注意的是这些State对象仅用于与State进行接口,State并不只保存在内存中,也可能会在磁盘中或者其他地方,第二个需要注意的是从State中获取的值依赖于输入元素的key,因此如果涉及的key不同,那么在一次调用用户函数中获得的值可能与另一次调用的值不同。

为了获得一个State句柄,你需要创建一个 StateDescriptor ,这个 StateDescriptor 保存了state的名称(接下来我们会讲到,你可以创建若干个state,但是它们必须有唯一的值以便你能够引用它们),State保存的值的类型以及用户自定义函数如:一个 ReduceFunction 。根据你想要检索的state的类型,你可以创建一个 ValueStateDescriptor , 一个 ListStateDescriptor , 一个 ReducingStateDescriptor , 一个 FoldingStateDescriptor 或者一个 MapStateDescriptor

State可以通过 RuntimeContext 来访问,所以只能在富函数中使用。 RichFunction 中的 RuntimeContext 有以下这些方法来访问state:

ValueStateT getState(ValueStateDescriptorT)

ReducingStateT getReducingState(ReduceingStateDescriptorT)

ListStateT getListState(ListStateDescriptorT)

FoldingStateT, ACC getFoldingState(FoldingStateDescriptorT, ACC)

MapStateUK, UV getMapState(MapStateDescriptorUK, UV)

这个 FlatMapFunction 例子展示了所有部件如何组合在一起:

这个例子实现了一个简单的计数器,我们使用元组的第一个字段来进行分组(这个例子中,所有的key都是1),这个函数将计数和运行时总和保存在一个ValueState中,一旦计数大于2,就会发出平均值并清理state,因此我们又从0开始。请注意,如果我们在第一个字段中具有不同值的元组,则这将为每个不同的输入键保持不同的state值。

除了上述接口之外,Scala API还具有快捷方式在KeyedStream上通过有状态的 map() 或 flatMap() 函数获取当个ValueState, 用户定义的Function以一个Option形式来获取ValueState的当前值,并且必须返回一个更新的值来更新State。

为了使用托管的算子State,有状态的函数可以实现更加通用的CheckpointedFunction接口或者ListCheckpointT extends Serializable接口

CheckpointedFunction接口可以通过不同的重分区模式来访问非键控的state,它需要实现两个方法:

无论何时执行checkpoint, snapshotState() 都会被调用,相应地,每次初始化用户定义的函数时,都会调用对应的 initializeState() ,当函数首次初始化时,或者当该函数实际上是从较早的检查点进行恢复时调用的。鉴于此, initializeState() 不仅是不同类型的状态被初始化的地方,而且还是state恢复逻辑的地方。

目前列表式托管算子状态是支持的,State要求是一个可序列化的彼此独立的列表,因此可以在重新调整后重新分配,换句话说,这些对象是可重新分配的非键控state的最小粒度。根据状态的访问方法,定义了一下重分配方案:

Even-split redistribution :每个算子返回一个State元素列表,

Union redistribution :每个算子返回一个State元素列表,

关于flinkflatmap和的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。

标签列表