flink版本(Flink版本查看)
本篇文章给大家谈谈flink版本,以及Flink版本查看对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。
本文目录一览:
- 1、Flink commons-cli no such Method 问题排查
- 2、flink1.12.1扩展flink-sql 支持写入到sqlserver
- 3、flink 高版本能在低版本的环境上跑吗
- 4、Pyflink 本地开发与测试
- 5、Flink新特性之非对齐检查点(unaligned checkpoint)简介
Flink commons-cli no such Method 问题排查
Flink 使用介绍相关文档目录
Flink on yarn 模式提交任务早消异常,解析运行参数的时候抛出 NoSucnMethodError ,如下图所示:
NoSuchMethod 问题通常为依赖版本冲突问题导致。排查顺序:
先检查使用的Flink版本依赖的 commons-cli 版本。查看 flink-parent 的 pom.xml :
如例子中所示,可知Flink本身依赖的 commons-cli 版本为1.5.0。
然后排查 hadoop classpath 命令输出,例如:
这里面我们发现tez引入了 commons-cli 包,版本为1.2,可能存在冲突。需要进一步检查是否是版本差异导致对应class没有所需的方法。
最后检查job项目的依赖。在作业项目目录执行:
检查是悉顷否有 commons-cli 依赖,以及它的版本。如果有,需陆陆知要exclude掉这个依赖。
[img]flink1.12.1扩展flink-sql 支持写入到sqlserver
目前业务上有同步数据到sqlServer的需求明数,但是flink1.12.1版本的JdbcDialects不支持SqlServerDialect,
科学上网后发现袋鼠云的flinkStreamSql已经有支持sqlserver,那就开始动手,参考实现一波
主要实现getUpsertStatement的方法,本来以为能直接copy一波flinkStreamSql 的实现,结果发现
报错 SQL statement must not contain ? character.
查看源码发现, flink在构建mysql的Statement,是先把需要替换的字段前面拼接了 : fieldNames,然后在org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl类的parseNamedStatement 替换成 ?号, 既然如此,就针对了buildDualQueryStatement进行修改
完整的SqlServerDialect文件
最后替换原有的flink-jar包后,就可搭槐租以用类似知兆flink定义mysql的ddl进行定义表了
注意url写法为:jdbc:jtds:sqlserver://xxx:1433;databaseName=master;
[flinkStreamSQL链接]
flink 高版本能在低版本的环境上跑吗
不行
flink1.5.1版本的本地环境安装部署,该版本要求jdk版本1.8以棚行滑上。 因此,带好版链腊本搞得flink是无法在低环境运行的。
Pyflink 本地开发与测试
首先,准备 python 虚拟环境。2020年11月3日时的 pyflink 的最高版本为 1.11.2,请开发者按照实际需要或者线上环境要求来指定 pyflink 版本。
setup-pyflink-virtual-env.sh 脚本会自动安装 miniconda 并在当前目录下创建虚拟环境文件夹 venv,然后自动安装 apache-flink 模块后压缩虚拟环境为 venv.zip。
Pyflink 脚本开发请认真参考 官方文档v1.11 。
Pyflink 正在快速发展的阶段,每次版本更新都会增加很多新的特性,同时会取消旧特性,因此务必确保开发渗厅时所参考的文档与本地 pyflink 版本一致。
Flink 中的 Jar 包是 connector 的扩伏拆展,允许在 flink 脚本中连接和使用各种数据存储工具,包括:
Pyflink 默认支持有限的几种 jar 包,如有特殊需要(例如以 json 格式来消费 kafka 里的数据),需要手动指定脚本依赖的 jar 包所在的路径。
已知有 3 种方式来指定 jar 包依赖。
在脚本中完成 TableEnvironment 的初始化后,添加下面的脚本以指定 jar 包路径(多个 jar 包的路径用 ; 隔开)。
注意,本地环境的 jar 包路径与线上环境的 jar 包路径可能不同,因此每次提交到线上时还需要修改脚本中的路径为对应的路径。
找到 pyflink 模块的安装路径,以及对应的 lib 目录。
然后使用 cp 命令复制 jar 包到 lib 目录下即可。
这种方法一次运行,一劳永逸,比较适合本地开发。
这种方式不适用于本地开发,而是用于提交到集群上时指定 jar 包的路径,但为了较为系统地介绍 jar 包依赖的指定方式,故在此介绍。
命令如下:
通过 -j 参数来指定一个 jar 包路径,多个 jar 包则使用多个 -j 。
Flink 支持使用 local-singleJVM 模式 来进行本缺喊枣地测试,即只需简单的执行 Python xxxx.py 命令,pyflink 就会默认启动一个 local-singleJVM 的 flink 环境来执行作业。
在运行过程中,可以另起终端,输入 jps 来查看 java 进程。
Flink新特性之非对齐检查点(unaligned checkpoint)简介
在食用本文之前,建议看官先充分食用这两篇文章: 《Chandy-Lamport分布式快照算行腊法小记》 与 《深入理解Flink的轻量级异步屏障快照(ABS)算法》 。
在Flink的检查点机制中,屏障(barrier)是划分快照(状态)的边界。在启用exactly once语义的条件下,当一个算子有多个输入流时,需要等待所有输入流中当前检查点N的屏障都到达其输入缓冲区,才能安全地触发检查点,否则检查点N的快照数据和检查点N + 1的快照数据就会混在一起。图示如下。
屏障对齐不仅保证了状态的准确性,洞仿还巧妙地消去了原生C-L算法中记录输入流状态的步骤(之前说过,即使作业执行计划是有环图,也只需要记录回边流的状态),十分轻量级。
但是,屏障对齐是阻塞式的,在作业出现反压时可能会成为不定时炸弹。我们知道,检查点屏障是从Source端产生并源源不断地向下游流动的。如果作业出现反压(哪怕整个DAG中的一条链路反压),数据流动的速度减慢,屏障到达下游算子的延迟就会变大,进而影响到检查点完成的延时(变大甚至超时失败)。如果反压长久不能得到解决,快照数据与实际数据之间的差距就越来越明显,一旦作业failover,势必丢失较多的处理进度。另一方面,作业恢复后需要重新处理的数据又会积压,加重反压,造成恶性循环。
为了规避风险,Flink 1.11版本中通过 FLIP-76 引入了非对齐检查点(unaligned checkpoint)的feature,下面简要介绍之。
顾名思义,非对齐检查点取消了屏障对齐操作。其流程图示如下。
简单解说:
a) 当算子的所有输入流中的第一个屏障到达算子的输入缓冲区时,立即将这个屏障发往下游(输出缓冲区)。
b) 由于第一个屏障没有被阻塞,它的步调会比较快,超过一部分缓冲区中的数据。算子会标记两部分数据:一是屏障首先到达的那条流中被超过的数据,二是其他流中位于当前检查点屏障之前的所有数据(当然也包括进入了输入缓冲区的数据),如下图中标黄的部分所示。
c) 将上述两部分数据连同算子的状态一起做异步快照。
由此可见,非对齐检查点的机制与原生C-L算法更为相似一些(即需要由算子来记录输入流的状态)。它与对齐检查点的区别主要有三:
显然,即使再考虑反压的情况,屏障也不会因为输入流速度变慢而堵在各个算子的入口处,而是能比较顺畅地由Source端直达Sink端,从而缓解检查点失败超时的现象。
既然不同检查点的数据都混在一起了,非对齐检查点还能保证exactly once语义吗?答案是肯定的。当任务从非对齐检查点恢复时,除了对齐检查点也会涉及到的Source端重放和算子的计算状态恢复之外,未对齐的流数据也会被恢复到各个链路,档颤滑三者合并起来就是能够保证exactly once的完整现场了。
非对齐检查点目前仍然作为试验性的功能存在,并且它也不是十全十美的(所谓优秀的implementation往往都要考虑trade-off),主要缺点有二:
所以,官方当前推荐仅将它应用于那些容易产生反压且I/O压力较小(比如原始状态不太大)的作业中。随着后续版本的打磨,非对齐检查点肯定会更加好用。
还有其他事情要做,民那晚安晚安。
关于flink版本和Flink版本查看的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。