sparkjoin(sparkjoin例子)

本篇文章给大家谈谈sparkjoin,以及sparkjoin例子对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。

本文目录一览:

Spark的join什么情况下可以避免shuffle?

Spark的join操作可能触发shuffle操作。shuffle操作要经过磁盘IO,网络传输,对性能影响比较大。本文聊一聊Spark的join在哪些情况下可以避免shuffle过程。

针对Spark DataFrame/DataSet的join,可以通过broadcast join和bucket join来避免shuffle操作。

Broadcast join很好理解,小表被分发到所有executors,所以不需要做shuffle就可以完成join. Spark SQL控制自动broadcast join的参数是:spark.sql.autoBroadcastJoinThreshold , 默认为10MB. 就是说当join中的一张表的size小于10MB时,spark会自动将其封装为broadcast发送到枣核所有结点,然后进行broadcast join. 当然也可以手动将join中的某张表转化成broadcast : 

                 sparkSession.sparkContext.broadcast(df)

Bucket join其实就是将要join的两张表按照join columns(或join columns的子集)根据相同的partitioner预先做好分区,并将这些分区信息存储到catalog中(比如HiveExternalCatalog);然后在读取这两张表并做join时,spark根据bucket信息将两张表的相同partition进行join即可,从而避免了shuffle的伏轿过程。注意,这里是避免了shuffle过程,并没有完全避免网络传输,由于两张表的相同partition不一定在同一台机器上,所以这里仍需要对其中一张表的partition进行网络传输。关于spark bucketing的原理和使用细节可以参见这个 视频 。

笔者这里想讨论的是PairRDDFunctions类的join方法。在RDD对象中有一个隐式转换可以将rdd转换成PairRDDFunctions对象,这样就可以直接在rdd对象上调用join方法:

先来看看PairRDDFunctions的join方法:

PairRDDFunctions有多个重载的join方法,上面这个只带一个RDD对象的参数,我们接着看它调用的另一个重载的join方法:

可以看到,RDD的join实现是由cogroup方法完成的,cogroup完后得到的是类型为RDD[(K, (Iterable[V], Iterable[W]))]的rdd对象,其中K为key的类型,V为第一张join表的value类型,W为第二张join表的value类型;然后,调用flatMapValues将其转换成RDD[(K, V, W)]的rdd对象。

下面来看看PairRDDFunctions.cogroup方法的实现:

cogroup中生成了CoGroupedRDD对象,所以关键是这个RDD的getDependencies方法返回的dependencies中是否存在shuffle dependency.

看看这个RDD的getDependencies方法:

其中的rdds就是进行cogroup的rdd序列,也就是PairRDDFunctions.cogroup方法中传入的 Seq(self, other)  .

重点来了,对于所有参与cogroup的rdd,如果它的partitioner和结果CoGroupedRDD的partitioner相同,则该rdd会成为CoGroupedRDD的一个oneToOne窄依赖,否则就是一凳厅掘个shuffle依赖,即宽依赖。

我们知道,只有宽依赖才会触发shuffle,所以RDD的join可以避免shuffle的条件是: 参与join的所有rdd的partitioner都和结果rdd的partitioner相同。

那么,结果rdd的partitioner是怎么确定的呢?上文讲到PairRDDFunctions.join方法有多个重载,其中就有可以指定partitioner的重载,如果没有指定,则使用默认的partitioner,看看默认的partitioner是怎么确定的:

简单地说就是:

1. 如果父rdds中有可用的合格的partitioner,则直接使用其中分区数最大的那个partitioner;

2. 如果没有,则根据默认分区数生成HashPartitioner.

至于怎样的partitioner是合格的,请读者阅读上面的Partitioner.defaultPartitioner方法和Partitioner.isEligiblePartitioner方法。

RDD的compute方法是真正计算得到数据的方法,我们来看看CoGroupedRDD的compute方法是怎么实现的:

可以看到,CoGroupedRDD的数据是根据不同的依赖从父rdd中获取的:

1. 对于窄依赖,直接调用父rdd的iterator方法获取对应partition的数据

2. 对于宽依赖,从shuffleManager获取shuffleReader对象进行读取,这里就是shuffle read了

还有一个重点是读取多个父rdds的数据后,怎么将这些数据根据key进行cogroup?

这里用到了ExternalAppendOnlyMap来构建key和grouped values的映射。先来看看createExternalMap的实现:

相关类型定义如下:

可以看到,ExternalAppendOnlyMap的构造函数的参数是是三个方法参数:

1. createCombiner : 对每个key创建用于合并values的combiner数据结构,在这里就是一个CoGroup的数据,数组大小就是dependencies的数量

2. mergeValue : 将每个value合并到对应key的combiner数据结构中,在这里就是将一个CoGroupValue对象添加到其所在rdd对应的CoGroup中

3. mergeCombiners : 合并相同key的两个combiner数据结构,在这里就是合并两个CoGroupCombiner对象

CoGroupedRDD.compute会调用ExternalAppendOnlyMap.insertAll方法将从父rdds得到的数据一个一个地插入到ExternalAppendOnlyMap对象中进行合并。

最后,以这个ExternalAppendOnlyMap对象作为参数构造InterruptibleIterator,这个iterator会被调用者用于访问CoGroupedRDD的单个partition的所有数据。

本文简单地介绍了DataFrame/DataSet如何避免join中的shuffle过程,并根据源码详述了RDD的join操作的具体实现细节,分析了RDD的join在什么情况下可以避免shuffle过程。

1. 源码版本:2.4.0

2. 水平有限,如有错误,望读者指出

[img]

Spark Join 的三种方式

Spark join的三种掘氏方式:

1.broadcast hash join:将其中一张较小的表通过广播的方式,由driver发送到各个executor,大表正常被分成多个区,每个分区的数据和本地的广播变量进行join(相坦歼当于每个executor上都有一份小表的数据,并且这份数据是在内存中的,过来的分区中的数据和这份数据进行join)。broadcast适用于表很小,可以直接被广播的场景;

2.shuffle

hash join:一旦小表比较大,此时就不适合使用broadcast hash join了。这种情况下,可以对两张表分别进行shuffle,将相同key的数据分到一个分区中,然后分区和分区之间进行join。相当于将两张表都分成了若干小份,小份和小份之间进行hash join,充分利用集群资源。

上面介绍的方式只对于两张表有一张是小表的情况适用,而对于两张大表,但当两个表都非常大时,显然无论哪种都会对计算内存造成很大的压力。这是因为判信散join时两者采取都是hash join,是将一侧的数据完全加载到内存中,使用hash code取join key相等的记录进行连接。

3 .SparkSQL对两张大表join采用了全新的算法-sort-merge join,整个过程分为三个步骤:

(1).shuffle阶段:将两张大表根据join key进行重新分区,两张表数据会分布到整个集群,以便分布式并行处理

(2).sort阶段:对单个分区节点的两表数据,分别进行排序

(3).merge阶段:对排好序的两张分区表数据执行join操作。join操作很简单,分别遍历两个有序序列,碰到相同join key就merge输出,否则继续取更小一边的key。

spark的join和sql的join的区别

spark的join和sql的join的区别

没区别,inner join 是内连接 join默认就是inner join。

Table A

aid adate

1 a1

2 a2

3 a3

TableB

bid bdate

1 b1

2 b2

4 b4

两个表a,b相连接,要取出id相同的字段

select * from a inner join b on a.aid = b.bid这是仅取出匹配的数据.

此时的取出的是:

1 a1 b1

2 a2 b2

那么left join 指:

select * from a left join b on a.aid = b.bid

首先取出a表中所有数拆激据,然后再加上与a,b匹配的的数据

此时的取出的是:

1 a1 b1

2 a2 b2

3 a3 空字符

同样的也有right join

指的是首先取出b表中所有数据,然后再加上与a,b匹配的的数据

此时的取出的是:

1 a1 b1

2 a2 b2

4 空字符 b4

LEFT JOIN 或 LEFT OUTER JOIN。

左向外联接的结果集包括 LEFT OUTER 子句中指定的左表的所有行,而不仅仅是联接列所匹配的行。如果左表的某行在右表中没有匹配行,则在相关联的结果集行中右表的所有选择列表列均为空值

二. left join/right join/inner join操作演示

表A记录如下:

aID aNum

1 a20050111

2 a20050112

3 a20050113

4 a20050114

5 a20050115

表B记录如下:

bID bName

1 2006032401

2 2006032402

3 2006032403

4 2006032404

8 2006032408

实验如下:

1. left join

sql语句如下:

SELECT * FROM A

LEFT JOIN B

ON A.aID = B.bID

结果如下:

aID aNum bID bName

1 a20050111 1 2006032401

2 a20050112 2 2006032402

3 a20050113 3 2006032403

4 a20050114 4 2006032404

5 a20050115 NULL NULL

(所影响的行数为 5 行)

结果说明:

left join是以A表的记录为基础的,A可以看成左表,B可以看成右表,left join是以左表为旅虚袜准的.

换句话说,左表(A)的记录将会全部表示誉陆出来,而右表(B)只会显示符合搜索条件的记录(例子中为: A.aID = B.bID).

B表记录不足的地方均为NULL.

2. right join

sql语句如下:

SELECT * FROM A

RIGHT JOIN B

ON A.aID = B.bID

结果如下:

aID aNum bID bName

1 a20050111 1 2006032401

2 a20050112 2 2006032402

3 a20050113 3 2006032403

4 a20050114 4 2006032404

NULL NULL 8 2006032408

(所影响的行数为 5 行)

Spark利用广播变量简化大表和小表的join操作

两个RDD进行join操作(即 rdd1.join(rdd2)) 会导致shuffle ,这是因为join操作会对key一致的key-vlaue对进行合并,而** key相同的key-value对不太可能会在同一个partition , 因此很有可能是需要进行经过网络进行shuffle的,而shuffle会产生许多中间数据(小文件)并涉及到网络传输,这些通常比较耗时,Spark中要尽量避免shuffle 。

优化方法: 将小RDD的数据通过broadcast到每个executor中,各大RDD partition分别和小RDD做join操作 。庆败

具体是:在driver端将小RDD转换成数组array并broadcast到做差陆各executor端,然后再各executor task中对各partion的大RDD的key-value对和小rdd的key-value对进行join;由于 每个executor端都有完整的小RDD ,因此小RDD的各partition 不需要shuffle 到RDD的各partition,小RDD广播到大RDD的各partition后, 各partition分别进行join,最后再执行reduce , 所有分纯顷区的join结果汇总到driver端 。

如有错误,敬请指正!

关于sparkjoin和sparkjoin例子的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。

标签列表