sparkjoin(sparkjoin例子)
本篇文章给大家谈谈sparkjoin,以及sparkjoin例子对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。
本文目录一览:
- 1、Spark的join什么情况下可以避免shuffle?
- 2、Spark Join 的三种方式
- 3、spark的join和sql的join的区别
- 4、Spark利用广播变量简化大表和小表的join操作
Spark的join什么情况下可以避免shuffle?
Spark的join操作可能触发shuffle操作。shuffle操作要经过磁盘IO,网络传输,对性能影响比较大。本文聊一聊Spark的join在哪些情况下可以避免shuffle过程。
针对Spark DataFrame/DataSet的join,可以通过broadcast join和bucket join来避免shuffle操作。
Broadcast join很好理解,小表被分发到所有executors,所以不需要做shuffle就可以完成join. Spark SQL控制自动broadcast join的参数是:spark.sql.autoBroadcastJoinThreshold , 默认为10MB. 就是说当join中的一张表的size小于10MB时,spark会自动将其封装为broadcast发送到枣核所有结点,然后进行broadcast join. 当然也可以手动将join中的某张表转化成broadcast :
sparkSession.sparkContext.broadcast(df)
Bucket join其实就是将要join的两张表按照join columns(或join columns的子集)根据相同的partitioner预先做好分区,并将这些分区信息存储到catalog中(比如HiveExternalCatalog);然后在读取这两张表并做join时,spark根据bucket信息将两张表的相同partition进行join即可,从而避免了shuffle的伏轿过程。注意,这里是避免了shuffle过程,并没有完全避免网络传输,由于两张表的相同partition不一定在同一台机器上,所以这里仍需要对其中一张表的partition进行网络传输。关于spark bucketing的原理和使用细节可以参见这个 视频 。
笔者这里想讨论的是PairRDDFunctions类的join方法。在RDD对象中有一个隐式转换可以将rdd转换成PairRDDFunctions对象,这样就可以直接在rdd对象上调用join方法:
先来看看PairRDDFunctions的join方法:
PairRDDFunctions有多个重载的join方法,上面这个只带一个RDD对象的参数,我们接着看它调用的另一个重载的join方法:
可以看到,RDD的join实现是由cogroup方法完成的,cogroup完后得到的是类型为RDD[(K, (Iterable[V], Iterable[W]))]的rdd对象,其中K为key的类型,V为第一张join表的value类型,W为第二张join表的value类型;然后,调用flatMapValues将其转换成RDD[(K, V, W)]的rdd对象。
下面来看看PairRDDFunctions.cogroup方法的实现:
cogroup中生成了CoGroupedRDD对象,所以关键是这个RDD的getDependencies方法返回的dependencies中是否存在shuffle dependency.
看看这个RDD的getDependencies方法:
其中的rdds就是进行cogroup的rdd序列,也就是PairRDDFunctions.cogroup方法中传入的 Seq(self, other) .
重点来了,对于所有参与cogroup的rdd,如果它的partitioner和结果CoGroupedRDD的partitioner相同,则该rdd会成为CoGroupedRDD的一个oneToOne窄依赖,否则就是一凳厅掘个shuffle依赖,即宽依赖。
我们知道,只有宽依赖才会触发shuffle,所以RDD的join可以避免shuffle的条件是: 参与join的所有rdd的partitioner都和结果rdd的partitioner相同。
那么,结果rdd的partitioner是怎么确定的呢?上文讲到PairRDDFunctions.join方法有多个重载,其中就有可以指定partitioner的重载,如果没有指定,则使用默认的partitioner,看看默认的partitioner是怎么确定的:
简单地说就是:
1. 如果父rdds中有可用的合格的partitioner,则直接使用其中分区数最大的那个partitioner;
2. 如果没有,则根据默认分区数生成HashPartitioner.
至于怎样的partitioner是合格的,请读者阅读上面的Partitioner.defaultPartitioner方法和Partitioner.isEligiblePartitioner方法。
RDD的compute方法是真正计算得到数据的方法,我们来看看CoGroupedRDD的compute方法是怎么实现的:
可以看到,CoGroupedRDD的数据是根据不同的依赖从父rdd中获取的:
1. 对于窄依赖,直接调用父rdd的iterator方法获取对应partition的数据
2. 对于宽依赖,从shuffleManager获取shuffleReader对象进行读取,这里就是shuffle read了
还有一个重点是读取多个父rdds的数据后,怎么将这些数据根据key进行cogroup?
这里用到了ExternalAppendOnlyMap来构建key和grouped values的映射。先来看看createExternalMap的实现:
相关类型定义如下:
可以看到,ExternalAppendOnlyMap的构造函数的参数是是三个方法参数:
1. createCombiner : 对每个key创建用于合并values的combiner数据结构,在这里就是一个CoGroup的数据,数组大小就是dependencies的数量
2. mergeValue : 将每个value合并到对应key的combiner数据结构中,在这里就是将一个CoGroupValue对象添加到其所在rdd对应的CoGroup中
3. mergeCombiners : 合并相同key的两个combiner数据结构,在这里就是合并两个CoGroupCombiner对象
CoGroupedRDD.compute会调用ExternalAppendOnlyMap.insertAll方法将从父rdds得到的数据一个一个地插入到ExternalAppendOnlyMap对象中进行合并。
最后,以这个ExternalAppendOnlyMap对象作为参数构造InterruptibleIterator,这个iterator会被调用者用于访问CoGroupedRDD的单个partition的所有数据。
本文简单地介绍了DataFrame/DataSet如何避免join中的shuffle过程,并根据源码详述了RDD的join操作的具体实现细节,分析了RDD的join在什么情况下可以避免shuffle过程。
1. 源码版本:2.4.0
2. 水平有限,如有错误,望读者指出
[img]Spark Join 的三种方式
Spark join的三种掘氏方式:
1.broadcast hash join:将其中一张较小的表通过广播的方式,由driver发送到各个executor,大表正常被分成多个区,每个分区的数据和本地的广播变量进行join(相坦歼当于每个executor上都有一份小表的数据,并且这份数据是在内存中的,过来的分区中的数据和这份数据进行join)。broadcast适用于表很小,可以直接被广播的场景;
2.shuffle
hash join:一旦小表比较大,此时就不适合使用broadcast hash join了。这种情况下,可以对两张表分别进行shuffle,将相同key的数据分到一个分区中,然后分区和分区之间进行join。相当于将两张表都分成了若干小份,小份和小份之间进行hash join,充分利用集群资源。
上面介绍的方式只对于两张表有一张是小表的情况适用,而对于两张大表,但当两个表都非常大时,显然无论哪种都会对计算内存造成很大的压力。这是因为判信散join时两者采取都是hash join,是将一侧的数据完全加载到内存中,使用hash code取join key相等的记录进行连接。
3 .SparkSQL对两张大表join采用了全新的算法-sort-merge join,整个过程分为三个步骤:
(1).shuffle阶段:将两张大表根据join key进行重新分区,两张表数据会分布到整个集群,以便分布式并行处理
(2).sort阶段:对单个分区节点的两表数据,分别进行排序
(3).merge阶段:对排好序的两张分区表数据执行join操作。join操作很简单,分别遍历两个有序序列,碰到相同join key就merge输出,否则继续取更小一边的key。
spark的join和sql的join的区别
spark的join和sql的join的区别
没区别,inner join 是内连接 join默认就是inner join。
Table A
aid adate
1 a1
2 a2
3 a3
TableB
bid bdate
1 b1
2 b2
4 b4
两个表a,b相连接,要取出id相同的字段
select * from a inner join b on a.aid = b.bid这是仅取出匹配的数据.
此时的取出的是:
1 a1 b1
2 a2 b2
那么left join 指:
select * from a left join b on a.aid = b.bid
首先取出a表中所有数拆激据,然后再加上与a,b匹配的的数据
此时的取出的是:
1 a1 b1
2 a2 b2
3 a3 空字符
同样的也有right join
指的是首先取出b表中所有数据,然后再加上与a,b匹配的的数据
此时的取出的是:
1 a1 b1
2 a2 b2
4 空字符 b4
LEFT JOIN 或 LEFT OUTER JOIN。
左向外联接的结果集包括 LEFT OUTER 子句中指定的左表的所有行,而不仅仅是联接列所匹配的行。如果左表的某行在右表中没有匹配行,则在相关联的结果集行中右表的所有选择列表列均为空值
二. left join/right join/inner join操作演示
表A记录如下:
aID aNum
1 a20050111
2 a20050112
3 a20050113
4 a20050114
5 a20050115
表B记录如下:
bID bName
1 2006032401
2 2006032402
3 2006032403
4 2006032404
8 2006032408
实验如下:
1. left join
sql语句如下:
SELECT * FROM A
LEFT JOIN B
ON A.aID = B.bID
结果如下:
aID aNum bID bName
1 a20050111 1 2006032401
2 a20050112 2 2006032402
3 a20050113 3 2006032403
4 a20050114 4 2006032404
5 a20050115 NULL NULL
(所影响的行数为 5 行)
结果说明:
left join是以A表的记录为基础的,A可以看成左表,B可以看成右表,left join是以左表为旅虚袜准的.
换句话说,左表(A)的记录将会全部表示誉陆出来,而右表(B)只会显示符合搜索条件的记录(例子中为: A.aID = B.bID).
B表记录不足的地方均为NULL.
2. right join
sql语句如下:
SELECT * FROM A
RIGHT JOIN B
ON A.aID = B.bID
结果如下:
aID aNum bID bName
1 a20050111 1 2006032401
2 a20050112 2 2006032402
3 a20050113 3 2006032403
4 a20050114 4 2006032404
NULL NULL 8 2006032408
(所影响的行数为 5 行)
Spark利用广播变量简化大表和小表的join操作
两个RDD进行join操作(即 rdd1.join(rdd2)) 会导致shuffle ,这是因为join操作会对key一致的key-vlaue对进行合并,而** key相同的key-value对不太可能会在同一个partition , 因此很有可能是需要进行经过网络进行shuffle的,而shuffle会产生许多中间数据(小文件)并涉及到网络传输,这些通常比较耗时,Spark中要尽量避免shuffle 。
优化方法: 将小RDD的数据通过broadcast到每个executor中,各大RDD partition分别和小RDD做join操作 。庆败
具体是:在driver端将小RDD转换成数组array并broadcast到做差陆各executor端,然后再各executor task中对各partion的大RDD的key-value对和小rdd的key-value对进行join;由于 每个executor端都有完整的小RDD ,因此小RDD的各partition 不需要shuffle 到RDD的各partition,小RDD广播到大RDD的各partition后, 各partition分别进行join,最后再执行reduce , 所有分纯顷区的join结果汇总到driver端 。
如有错误,敬请指正!
关于sparkjoin和sparkjoin例子的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。