flink实战(Flink实战 java)
本篇文章给大家谈谈flink实战,以及Flink实战 java对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。
本文目录一览:
- 1、Flink实战之FileSystem-parquet支持ZSTD压缩
- 2、Flink实战双流join之interval Join
- 3、Flink实战之Kafka To Hive
- 4、flink实战教程-集群的部署
- 5、Flink重点难点:维表关联理论和Join实战
Flink实战之FileSystem-parquet支持ZSTD压缩
ZSTD压缩算法是现在最流行压缩算法了,有着高压缩比和压缩性能强的有点,已经被各大框架所使用。
目前hadoop 3.1.0版本已经支持ZSTD算法。所以敏腔可以使用Flink写HDFS时使用这个算法。具体如何操作请往下看
Flink1.11已经支持hadoop 3.x+版本了,所以Flink依赖型配的hadoop shaded包版本不达标的需要升级。
由于从Flink1.11开始,官方已经不提供hadoop shaded支持,可以使用CLASS_PATH方式 参考 。
但是任然可以自己编译桥租衫hadoop 3.1版本的shaded包,具体步骤如下:
同时hadoop集群版本也要支持ZSTD,升级集群或者将ZSTD功能merge到低版本中。
现在就可以写flink任务了,DDL如下:
原因:线上环境是hadoop2.6版本 还不支持zstd压缩,之后在一个测试环境上进行,该环境还是2.6版本但是将3.1的特性merge了进来。
[img]Flink实战双流join之interval Join
前面学习的Window Join必须要在一个Window中进行JOIN,那如果没有Window如何处理呢?
interval join也是使用相同的key来join两个流(流A、流B),并且流B中的元素中的时间戳,和流A元素的时间戳,有一个时间间隔。也高判就是:流B的元素的时间戳 ≥ 流A的元素时间戳 + 下界,且流B的元素的时间戳 ≤ 流A的元素时间戳 + 上界。
我们来看Flink官方的一张图。
我们看到,流A的每一个元素,都会和流B的一定时间范围的元素进行JOIN。
其中,上界和下界可以是负数,也可以是整戚仔改数。Interval join目前只支持INNER JOIN。将连接后的元素传递给ProcessJoinFunction时,时间戳变为两个元素中最大的那个时间戳。
注意:
Interval Join只支持事件时间。
运行结果:
join() 和 coGroup() 都是基于窗口做关联的。但是在某些情况下,两条流的数据步调未必一致。例如,订单流的数据有可能在点击流的购买动作发生之后很久才被写入,如果用窗口来圈定,很容易 join 不上。所以 Flink 又提供了"Interval join"的语义,按照指定字段以及右流相对左流偏移的时间区间进行关联。interval join 也是 inner join,虽然不需要开窗,但是需戚旁要用户指定偏移区间的上下界,并且只支持事件时间。
按照指定字段以及右流相对左流偏移的时间区间进行关联,即:
right.timestamp ∈ [left.timestamp + lowerBound; left.timestamp + upperBound]
由上可见,interval join 与 window join 不同,是两个 KeyedStream 之上的操作,并且需要调用 between() 方法指定偏移区间的上下界。如果想令上下界是开区间,可以调用 upperBoundExclusive()/lowerBoundExclusive() 方法。
interval join 的实现原理
以下是 KeyedStream.process(ProcessJoinFunction) 方法调用的重载方法的逻辑。
Flink实战之Kafka To Hive
传统的入库任务一般借助于MapReduce或者差困Spark来写hive表,一般都是天级别最多小时级别的任务。随着实时性要求越来越高宏纯,传统的入库不太能满足需求。Flink完全基于流式处理,同时也支持了写Hive表虚绝念。本文介绍一下如果通过FlinkSQL实现kafka数据入库hive,并能够实时可查。
由于写hive表必须基于hive catalog,所以需要 注册hive catalog 。同时可以在一个job内切换catalog,如果我们不想把kafka的source table注册到hive metastore里面,那么就可以使用memory catalog。
完整SQL如下
以上sql需要借助 sql submit 来提交。
对于已有的hive表,同样也是可以写入的。但是得通过
alter table table_name set tblproperties('property_name'='new_value');语法将flink需要用到的属性设置进去。
flink实战教程-集群的部署
这种模式我们一般是在用IDE调试程序的时候用到,当我们在本地用IDE开发程序的时候,执行main方法,flink会在本地启动一个包含jobmanager和taskmanager的进程的minicluster,程序运行完成之后,这个cluster进程退出。
这种模式就是直接在物理机上启动flink集群。我们可以通过 {FLINK_HOME}/conf/flink-conf.yaml.
此外,我们可以用 ${FLINK_HOME}/bin/taskmanager.sh start 再启动一个taskmanager。
这时我们通过jps命令查看一下启动的进程
我们看到这时候启动了两个taskmanager
这种部署模式对flink集群的资源管理是flink自己维护的,在生产环境下用的不多,所以我们也不做过多描述.
启动集群的命令如下:
这个命令有很多的参数,可以在后面加 -h 看下,我这里着重介绍一下 -d参数。
加上知昌腔-d之后,指的是隔离模式,也就是启动之后和客户端就断了联系,如果要停止集群,需要通过yarn application -kill {applicationId} 来停止集群.
提交成功之后,我们会在yarn的管理页面看到一个类似的任务
这个启动命令也有很多的参数,我就不一一讲解了,我用大白话讲讲我认为最核心的几个参数。
第二,通过命令行来停止:
这个时候需要指定迅团yarn applicationId和flink job id
第三,通过程序来停止
如果我们做了一个实时平台这样的系搭衫统,就不能手工通过命令行来停止了,可以调用相应的api来停止任务.
这种模式是在flink 1.11 版本中提供的,flink的yarn per job模式启动的时候会把本地的flink的jar和用户的jar都上传到hdfs,这个过程非常的消耗网络的带宽,如果同时有多个人提交任务的话,那么对网络的影响就更大,此外,每次提交任务的时候flink的jar包是一样的,也不用每次都拷来拷去的,所以flink提供了一种新的application模式,可以把flink的jar和用户的jar都预先放到hdfs上,这样就能省去yarn per job模式提交任务的jar包拷贝工作,节省了带宽,加快了提交任务的速度.
具体的命令如下:
-yD yarn.provided.lib.dirs :用来指定存放flink jar的目录
最后一个参数是用户的jar在hdfs上的路径.
说一下题外话,其实我们当时在做实时平台的时候,这个提交慢的问题我也发现了,当时我的想法是先启动一个flink集群,然后再把程序的JobGraph提交到这个yarn集群,不过后来嘛,由于 * %%$$# ^ 的原因,也没弄.
对于把服务容器化,也越来越成为一种趋势,所以k8s部署也越来越受大家的重视。 对于k8s部署flink这块说实话我研究的不是很深,也就不多说了。
我们还可以将程序部署到mesos或者使用docker,这个我没有去实际调研过,但是从flink的邮件列表大家沟通的问题或者是网上查到的资料看,这种模式部署应该不多,所以这里就不详细描述了。
Flink重点难点:维表关联理论和Join实战
Flink 案例实战演练
Flink维表Join实践
常见的维表Join方式有四种:
下面分别使用这四种方式来实现一个join的需求,这个需求是:一个主流中数据是用户信息,字段包括用户姓名、城市id;维表是城市数据,字段包括城市ID、城市名称。要求用户表与城市表关联,输出为:用户名称、城市ID、城市名称。
用户表表结构如下:
城市余拆枣维表表结构如下:
通过定义一个类实现RichMapFunction,在open()中读取维表数据加载到内存中,在probe流map()方法中与维表数据进行关联。RichMapFunction中open方法里加载维表数据到内存的方式特点如下:
这种方式是将维表数据存储在Redis、HBase、MySQL等外部存御贺储中,实时流在关联维表数据的时候实时去外部存储中查询,这种方式特点如下:
(1) 使用cache来减轻访问压力
可以使用缓存来存储一部分常访问的维表数据,以减少访问外部系统的次数,比如使用guava Cache。
下面是一个例子:
(2) 使用异步IO来提高访问吞吐量
Flink与外部存储系统进行读写操作的时候可以使用同步方式,也就是发送一个请求后等待外部系统响应,然后再发送第二个读写请求,这样的方式吞吐量比较低,可以用提高并行度的方式来提高吞吐量,但是并行度多了也就导致了进程数量多了,占用了大量的资源。
Flink中可以使用异步IO来读写外部系统,这要求外部系统客户端支持异步IO,不过目前很多系统都支持异步IO客户端。但是如果使用异步就要涉及到三个问题:
下面是一个实例,演示了试用异步IO来访问维表:
利用Flink的Broadcast State将维度数据流广播到下游做join操作。特点如下:
下面是一个实例:
Temporal table是持续变化表上某一时刻的视图,Temporal table function是一个表函数,传递一个时间参数,返回Temporal table这一指定时刻的视图。
可以将维度数据流映射为Temporal table,主流与这个Temporal table进行关联,可以关联到某一个版本(历史上某一个时刻)的维度数据。
Temporal table function join的特点如下:
(1) ProcessingTime的一个实例
(2) EventTime的一个实例
结果输出为:
通竖拆过结果可以看到,根据主流中的EventTime的时间,去维表流中取响应时间版本的数据。
(3) Kafka Source的EventTime实例
依次向user和city两个topic中写入数据:
测试得到的输出如下:
关于flink实战和Flink实战 java的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。