flinkrocksdb的简单介绍

本篇文章给大家谈谈flinkrocksdb,以及对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。

本文目录一览:

Flink 容错机制主要是 状态的保存和恢复,涉及 state backends 状态后端、checkpoint 和 savepoint,还有 Job 和 Task 的错误恢复 。

Flink 状态后端是指 保存 Checkpoint 数据的容器 ,其分类有 MemoryStateBackend、FsStateBackend、RocksDBStateBackend ,状态的分类有 operator state 和 keyed state 。

Flink 状态保存和恢复主要依靠 Checkpoint 机制和 Savepoint 机制,两者的区别如下表所示。

快照的概念来源于相片,指照相馆的一种冲洗过程短搜隐的照片。在计算机领域, 快照是数据存储的某一时刻的状态记录 。 Flink Snapshot 快照是指作业状态的全局一致记录 。一个完整的快照是包括 source 算子的状态(例如,消费 kafka partition 的 offset)、状态算子的缓存数据和 sink 算子的状态(批量缓存数据、事务数据等)。

Checkpoint 检查点可以自动产生快照,用于Flink 故障恢复 。Checkpoint 具有分布式、异步、增量的特点。

Savepoint 保存点是用户手动触发的,保存全量的作业状态数据 。一般使用场景是作业的升级、作业的并发度缩放、迁移集群等。

Flink 是采用轻量级的分布式异步快照,其实现是采用栅栏 barrier 作为 checkpoint 的传递信号,与业务数据一样无差别地传递下去 ,目的是使得数据流被切分成微批,进行 checkpoint 保存为 snapshot。当 barrier 经过流图节点的时候,Flink 进行 checkpoint 保存状态数据。

如下图所示,checkpoint n 包含每个算子的状态,该状态是指checkpoint n 之前的全部事件,而不包含它之后戚誉的所有事件。

针对用户作业出现故障而导致结果丢失或者重复的问题,Flink 提供3种语义:

① At-Least-Once 最少一次 :不会丢失数据,但可能会有重复结果。

② Exactly-Once 精确一次 :checkpoint barrier 对齐机制可以保障精确一次。

① FailureRateRestartStrategy :允许在指定时间间隔内的最大失败次数,同时可以设置重启延时时间。

② FixedDelayRestartStrategy :允许指定的失败次数,同时可以设置重启延时时间。

③ NoRestartStrategy :不需要重启,即 Job 直接失败。

④ ThrowingRestartStrategy :不需要重启,直接抛异常。

Job Restart 策略可以通过 env 设置。

上述策略的父类接口是RestartStrategy,其关键是restart(重启操作)。

① RestartAllStrategy :重启全部 task,默认策略。

② RestartIndividualStrategy :恢复单个 task。如果该 task 没有source,可能导致数据丢失。

③ NoOpFailoverStrategy :不恢复 task。

上述策略的父类接口是FailoverStrategy,其关键是Factory的create(创建 strategy)、onTaskFailure(处理错误)。

如何高漏段产生可靠的全局一致性快照是分布式系统的难点,其传统方案是使用的全局时钟,但存在单点故障、数据不一致等可靠性问题 。为了解决该问题, Chandy-Lamport 算法采用 marker 的传播来代替全局时钟 。

① 进程 Pi 记录自己的进程状态,同时生产一个标识信息 marker(与正常 message 不同),通过 ouput channel 发送给系统里面的其他进程。

② 进程 Pi 开始记录所有 input channel 接收到的 message

进程 Pj 从 input channel Ckj 接收到 marker。如果 Pj 还没有记录自己的进程状态,则 Pj 记录自己的进程状态,向 output channel 发送 marker;否则 Pj 正在记录自己的进程状态(该 marker 之前的 message)。

所有的进程都收到 marker 信息并且记录下自己的状态和 channel 的状态(包含的 message)。

Flink 的分布式异步快照实现了Chandy Lamport 算法,其核心思想是 在 source 插入 barrier 代替 Chandy-Lamport 算法中的 marker,通过控制 barrier 的同步来实现 snapshot 的备份和 Exactly-Once 语义 。

Checkpoint Coordinator 向所有 source 节点 trigger Checkpoint。

source task向下游广播barrier。

当source task备份完自己的状态后,会将备份数据的地址(state handle)通知 Checkpoint Coordinator。

map和sink task收集齐上游source的barrier n,执行本地快照。下面例子是RocksDB增量Checkpoint 的流程:首先RocksDB会全量保存到磁盘上(红色大三角表示),然后Flink会从中选择没有上传的文件进行持久化备份(紫色小三角)。

map和sink task在完成Checkpoint 之后,将状态地址state handle返回通知 Coordinator。

当Checkpoint Coordinator收到全部task的state handle,就确定该Checkpoint已完成,并向持久化存储中备份一个Checkpoint Meta(元数据,包括该checkpoint状态数据的备份地址)。

Flink常见异常和错误信息小结

周末了,不想搞长篇大论,就写写这样的流水账吧。

Flink的常见异常众多,不可能面面俱到,所以想到哪儿写到哪儿,有漏掉的之后再补充。

这不是个显式错误,但是JDK版本过低很有可能会导致Flink作业出现各种莫名其妙的问题,因此在生产环境中建议采用JDK 8的较高update(我们使用的是181)。

该信息不甚准确,因为绝大多数情况下都不是JAR包本身有毛病,而是在作业提交过程中出现异常退出了。因此需要查看本次提交产生的客户端日志(默认位于$FLINK_HOME/logs目录下),再根据其中的信息定位并解决问题。

一般都是因为用户依赖第三方包的版本与Flink框架依赖的版本有冲突导致。如果是采用Maven做项目管理的话,可参照我之前写的 这篇文章 来解决冲突。

就是字面意思,YARN集群内没有足够的资源启动Flink作业。检查一下当前YARN集群的状态、正在运行的YARN App以及Flink作业所处的队列,释放一些资源或者加入新的资源。

slot分配请求超时,是因为TaskManager申请资源时无法正常获得,按照上一条的思路检查即可。

TaskManager的Container因为使用资源超限被kill掉了。首先需要保证每个slot分配到的内存量足够,特殊情况下可以手动配置SlotSharingGroup来减少单个slot中共享Task的数量。如果资源没问题,那么多半就是程序内部发生了内存泄露。建议仔细查看TaskManager日志,并按处理JVM OOM问题的常规操作来排查。

TaskManager心跳超时。有可能是TaskManager已经失败,如果没有失败,那么有可能是因为网络不好导致JobManager没能收到心迟橘跳信号,或者TaskManager忙于GC,无法发送心跳信号。JobManager会重启心跳超时的TaskManager,如果频繁出现此异常,应该通过日志进一步定位问题所在。

Flink on YARN的其他问题,还可以参考 这篇 ,非常有帮助。

该异常几乎都是由于程序业务逻辑有误,或者数据流里存在未处理好的脏数据导致的,继续向下追溯异常栈一般就可以看到具体的出错原因,比较常见的如POJO内有空字段,或者抽取事件时间的时间戳为null等。

很多童鞋拿着这两条异常信息来求助,大轿但实际上它们只是表示BufferPool、MemoryManager这些Flink运行时组件被销毁,亦即作业已经失败。具体的原因多种多样,根据经验,一般是上一条描述的情况居多(即Could not forward element to next operator错误会伴随出现),其次是JDK版本问题。具体情况还是要根据TaskManager日志具体分析。

Akka超时导致,一般有两种原因:一是集群负载比较大或者网络比较拥塞,二是业务逻辑同步调用耗时的外部服务。如果负载或网络问题无法彻底缓解,需考虑调大 akka.ask.timeout 参数的值(默认只有10秒);另外,调用外部服务时尽量异步操作(Async I/O)。

这个异常我们应该都不陌生,首先检查系统 ulimit -n 的文件描述符限制,再注意检查程序内是否有资源(如各种连接池的连接)未及时释放。值得注意的是,Flink使用RocksDB状态后端也有可能会抛出这个异常,此时需修改flink-conf.yaml中的 state.backend.rocksdb.files.open 参数,如果不限制,可以码仿团改为-1。

关于文件描述符的一些有趣知识,可以参见之前我写的 这一篇 。

在Flink内使用Java Lambda表达式时,由于类型擦除造成的副作用(详情见 这篇文章 ),注意调用returns()方法指定被擦除的类型。

在当前检查点还未做完时,收到了更新的检查点的barrier,表示当前检查点不再需要而被取消掉,一般不需要特殊处理。

首先应检查 CheckpointConfig.setCheckpointTimeout() 方法设定的检查点超时,如果设的太短,适当改长一点。另外就是考虑发生了反压或数据倾斜,或者barrier对齐太慢。具体思路不再赘述,看官可以参考 这篇文章 ,非常详细。

我们知道Flink的状态是按key组织并保存的,如果程序逻辑内改了keyBy()逻辑或者key的序列化逻辑,就会导致检查点/保存点的数据无法正确恢复。所以如果必须要改key相关的东西,就弃用之前的状态数据吧。

在1.9之前的Flink版本中,如果我们使用RocksDB状态后端,并且更改了自用MapState的schema,恢复作业时会抛出此异常,表示不支持更改schema。这个问题已经在 FLINK-11947 解决,升级版本即可。

就酱吧,民那晚安(不是

[img]

Flink的可靠性保证 - 状态存储

与批计算相比,State是流计算特有的,批计算的failover机制,是失败后重新计算;流计算在大多数场景下是增量计算,数据逐条处理,每次计算是在上一次计算结果之上进行处理的,这就要求对上一次的计算结果进行存储,当因为机器,网络,脏数据等原因导致程序错误的时候,可以重启Job进行state恢复。Flink就是基于state存储,通过CheckPoint机制来保证数据的准确性。

此外,State存储的内容还有流计算过程中计算节点的中间结果或元数据属性,比如Window方面的操作,需要累加数据;在aggregation过程中的中间聚合结果;在以Apache Kafka作为数据源时候,记录已经读取数据的offset等

Flink内部有三种state的存储实现,如果不做配置,Flink默认茄租指使用的是MemoryStateBackend,三种实现分别是:

1 基于内存的MemoryStateBackend- 在debug模式使用,在生产模式下不建议使用;

2基于HDFS的FsStateBackend –基于分布式文件系统的持久化,每次读写都产生网络IO,整体性能不太好;

3基于RocksDB的RocksDBStateBackend - 本地文件+异步HDFS持久化,当前版本在生产环境下使用的

选择用RocksDB+HDFS的方式进行State的存储,State存储分两个阶段,首先本地存储到RocksDB,然后异步的颤配同步到远程的HDFS。 这样的而设计既消除了HeapStateBackend的局限(内存有限,宕机数据丢失),也减型或少了纯分布式存储的网络IO开销。RocksDBStateBackend存储value的大小是有限制的,

RocksDB’s的bridge API是基于byte[]的,所以这种state存储支持的每个key的value最大不超过2^31,有些merge操作的值可能会超过2^31 bytes,这点要注意。

StateBackend的控制粒度到job级别,如果想为所有job设置StateBackend,可以通过更改flink-conf.yaml文件里state.backend的值 ,上述3类sate backend对应的值是;

jobmanager(MemoryStateBackend),filesystem(FsStateBackend),和 rocksdb(RocksDBStateBackend);也可为单独的job设置StateBackend的方法:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));

KeyedState - 这里面的key就是我们用KeyBy(x)里面的key,key与key之间的State是不可见的。KeyedState只能用在KeyedStream上的数据处理上。OperatorState是和Operator联系在一起的,比如Source Connector的实现中会用OperatorState来记录source数据读取的offset。

无论选用何种stateBackend,这些state都是优先存在本机上,当计算并行度发生变化,这些state也会被重新分发到不同机器上去。

OperatorState的分发,需要在Source Connector中实现,重点是把source的partition重新分配,并把之前记录的每个partition的offset也告诉新分配到的Source Connector。

KeyedState存的数据量比较大,如果调整并发度,copy的东西可能比较多,Flink为了避免过多的拷贝,采用了一个keygroup的机制。每个key通过hash方法分配到不同的keygroup中,当并发度调整的时候,调整粒度是keygroup,也就是一个key通过hash后所在的keygroup保持不变。

managed state是我们常用的那些ValueState,ListState,MapState等,这些State类型,由Flink控制它们的数据结构和存取方法。

Raw state是在自己实现operator的时候使用,相当于自定义state类型,自己控制数据结构和存取方式。

State可以手动地删除已存的值,也可以设置Time-To-Live (TTL),让state过期自动失效。存取State前,要先创建StateDescriptor,StateDescriptor含有state名称和state值的数据类型,有时候还需要自定义函数。在keyedstream上应用的函数,存取state的时候,key是由Flink自动提供的,直接使用xxx.value()函数就可以取到当前key对应的值;由Flink自动控制的话,可以统一控制state和stream的分区。

欢迎阅读,有问题可以通过邮件kaiyuan0989爱特163.com一起探讨。

5 一文看完flink的内存管理

1)java对象的存储密度比较竖冲低,对象主要包含 对象粗纤清头,对象数据,对齐填充。 其中对齐填充是没用的,纯粹是为了让对象的大小到达8的倍数

2)Full GC非常影响性能,对大数据量的计算来说,fullGC可能会持续很久(秒级甚至分钟级)

3)OOM导致JVM崩溃,因为是大数据计算,很有可能会分配岩前出大的对象。

4)缓存未命中,CPU在进行计算时,会先从CPU的缓存中抓取数据,但是jvm堆上的内存不是连续的,会导致CPU缓存不命中,CPU空转,影响效率。

5)传输过程,要序列化和反序列化

Flink将对象存储在堆外内存中,或者存在 memorySegment上

memorySegment: 

1 翻译为内存段,存储序列化后的对象

2 它是一段固定长度的内存(大小为32KB)

3 是FLink中最小的内存分配单元

4 读写非常高效,很多算子可以直接操作其二进制数据,不需要反序列化

5 Java本身自带的序列化和反序列化的功能,但是辅助信息占用空间比较大,在序列化对象时记录了过多的类信息。

Flink实现了自己的序列化框架,使用TypeInformation表示每种数据类型,所以可以只保存一份对象Schema信息,节省存储空间。又因为对象类型固定,所以可以通过偏移量存取。

jobmanager.heap.size:1024m

jobmanager.memory.process.size:1600m

主要包含 堆内存和非堆内存,相对比较简单一些。

关于rocksDb内存管理:

由于rocksdb分配的是堆外内存,内存量理论上不受jvm控制。于是产生问题,如果进程的内存使用超过容器限定的量,就会被资源管理器杀死。

Flink内存管理

java所有数据类型对应的字节大小

java对象的组成 : 对象头,实例数据,对齐部分

jvm 序列化缺点

上面图为TaskManager内存模型,左边为细分的内存模型,右边为整体内存模型,该图摘自Flink官网

heap内存在jvm启动的时候申请的一块不变的内存区域,该内存实际上是Flink和task公用的一块区域,在flink层面通过闷衡控制来区分框架使用和task内存,heap内存管理起来是比较容易的,实际上non-heap的内存是难管理的一块,如果管理不当或者使用不当可能造成内存泄漏或者内存无限增长等问题

内存参数配置

在flink中对内存进行了抽象成了MemorySegment,�默认情况下,一个 MemorySegment 对应着一个 32KB 大小的内存块,这块内存既可以是堆上内存( byte数组) ,也可以是堆外内存(nio的ByteBufferr ) .

同时MemorySegment也提供了对二进制数据轿陵的操作方法,以及读取字节数组序列化以及序列化字节数组的方法等

下面是类继承图,该类有两MemorySegment实现类有两个分别为使用heap的以及混合的即有heap和non-heap,对于内存的访问有子类具体的实现

MemorySemgent是flink内存分配的最小单元了,对于数据夸MemorySemgent保存,那么对于上层的使用者来说闭罩戚,需要考虑考虑所有的细节,由于过于繁琐,所以在MemorySemgent上又抽象了一层内存也,内存也是在MemorySemgent数据访问上的视图,对数据输入和输出分别抽象为DataInputView/DataOutputView,有了这一层,上层使用者无需关心跨MemorySemgent的细节问题,内存也对自动处理跨MemorySemgent的内存操作

DataInputView

DataInputView继承DataInput,DataInputView是对MemorySemgent读取的抽象视图,提供一系列读取二进制数据不同类型的方法,AbstractPageInputView是DataInputView的一个抽象实现类,并且基本所有InputView都实现了该类,即所有实现该类的InputView都支持Page

InputView持有了多个MemorySemgent的引用(可以基于数组,list,deque等),这些MemorySemgent被视为一个内存页,可以顺序,随机等方式读取数据,要基于不同的实现类,实现类不同读取方式不同

方法图

DataOutputView

与DataInputView相对应,继承Output,并有一个拥有Page功能的抽象类(AbstractPagedOutputView),其大部outputView的实现都是继承自该抽象类,对一组MemorySemgent提供一个基于页的写入功能

方法图

类继承图

用于网络io数据的包装,每个buffer持有一个MemorySegment的引用,resultPartition写数据的时候,会向LocalBufferPool申请Buffer,会返回BufferBuilder,通过BufferBuilder想Buffe r实际写入的是MemorySegment 写数据

BufferBuilder是在上游Task中,负责想Buffer写入数据,BufferConsumer位于下游,与BufferBuilder相对应,用于消费Buffer的数据,每个bufferBuilder对应一个bufferConsumer

常用参数介绍

buffer申请

buffer回收

当buffer用完之后需要进行回收比如在netty的clientHandler收到响应之后进行处理就会把buffer回收掉,buffer回收之后并不会释放memorySegment,而是放回池中,变为可用内存,反复使用

flink托管的内存,托管内存使用堆外内存,用于批处理缓存排序等以及提供rocksDB内存

NetworkBufferPool是一个固定大小的MemorySegment实例吃,用于网络栈中,NettyBufferPool会为每个ResultPartition创建属于自己的LocalBufferPool,NettyBufferPool会作为全局的pool来提供内存,LocalBufferPool会通过限制来控制自己内存的申请,防止过多申请

LocalBufferPool继承关系,实现了bufferRecycler的接口,用于回收自己持有的buffer

在数据接收的时候会将数据封装成NettyBuffer,在数据发送的时候会通过BufferBilder向MemorySegment写入数据,然后通过BufferConsumer读取MemorySegment的数据

BufferManager主要用于为RemoteInputChannel提供buffer的,bufferManager在启动的时候会向全局bufferPool请求自己的独有buffer,当bufferManager的buffer不够的时候,则会向localBufferPool请求buffer,此时请求的buffer为浮动buffer

实际上提供的buffer是

关于flinkrocksdb和的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。

标签列表