sparksql(sparksql的程序入口)

本篇文章给大家谈谈sparksql,以及sparksql的程序入口对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。

本文目录一览:

Spark SQL CBO 基于代价的优化

Spark CBO 背景

本文将介绍 CBO,它充分考虑了数据本身的特点(如大小、分布)以及操作算子的特点(中间结果集的分布及大小)及代价,从而更好的选择执行代价最小的物理执行计划,即 SparkPlan。

Spark CBO 原理

CBO 原理是计算所有可能的物理计划的代价,并挑选出代价最小的物理执行计划。其核心在于评估一个给定的物理执行计划的代价。

物理执行计划是一个树状结构,其代价等于每个执行宴袜节点的代价总合,如下图所示。

而每个执行节点的代价,分为两个部分

每个操作算子的代价相对固定,可用规则来描述。而执行节点输出数据集的大小与分布,分为两个部分:1) 初始数据集,也即原始表,其数据集的大小与分布可直接通过统计得到;2)中间节点输出数据集的大小与分布可由其输入数据集的信息与操作本身的特点推算。

所以,最终主要需要解决两个问题

Statistics 收集

通过如下 SQL 语句,可计算出整个表的记录总数以及总大小

从如下示例中,Statistics 一行可见, customer 表数据总大小为 37026233 字节,即 35.3MB,总记录数为 28万,与事实相符。

通过如下 SQL 语句,可计算出指定列的统计信息

从如下示例可见,customer 表的 c_customer_sk 列最小值为 1, 最大值为 280000,null 值个数为 0,不同值个数为 274368,平均列长度为 8,最大列长度为 8。

除上述示例中的统计信息外,Spark CBO 还直接等高直方图。在上例中,histogram 为 NULL。其原因是,spark.sql.statistics.histogram.enabled 默认值为 false,也即 ANALYZE 时默认不计算及存储 histogram。

下例中,通过 SET spark.sql.statistics.histogram.enabled=true; 启用 histogram 后,完整的统计信息如下。

从上图可见,生成的 histogram 为 equal-height histogram,且高度为 1102.36,bin 数为 254。其中 bin 个数可由 spark.sql.statistics.histogram.numBins 配置。对于每个 bin,匀记录其最小值,最大值,以及 distinct count。

值得注意的是,这里的 distinct count 并不是精确值,而是通过 HyperLogLog 计算出来的近似值。使用 HyperLogLog 的原因有二

算子对数据集影响估计

对于中间算子,可以根据输入数据集的统计信息以及算子的特性,可以估算出输出数据集的统计结果。

本节以 Filter 为例说明算子对数据集的影响。

对于常见的 Column A value B Filter,可通过如下方式估算输出中间结果的统计信息山祥链

上述估算的前提是,字段 A 数据均匀分布。但很多时候,数据分布并不均匀,且当数据倾斜严重是,上述估算误差较大。此时,可充逗孙分利用 histogram 进行更精确的估算

启用 Historgram 后,Filter Column A value B 的估算方法为

在上图中,B.value = 15,A.min = 0,A.max = 32,bin 个数为 10。Filter 后 A.ndv = ndv(B.value) = ndv(15)。该值可根据 A 15 的 5 个 bin 的 ndv 通过 HyperLogLog 合并而得,无须重新计算所有 A 15 的数据。

算子代价估计

SQL 中常见的操作有 Selection(由 select 语句表示),Filter(由 where 语句表示)以及笛卡尔乘积(由 join 语句表示)。其中代价最高的是 join。

Spark SQL 的 CBO 通过如下方法估算 join 的代价

其中 rows 即记录行数代表了 CPU 代价,size 代表了 IO 代价。weight 由 *spark.sql.cbo.joinReorder.card.weight *决定,其默认值为 0.7。

Build侧选择

对于两表Hash Join,一般选择小表作为build size,构建哈希表,另一边作为 probe side。未开启 CBO 时,根据表原始数据大小选择 t2 作为build side

而开启 CBO 后,基于估计的代价选择 t1 作为 build side。更适合本例

优化 Join 类型

Spark SQL 中,Join 可分为 Shuffle based Join 和 BroadcastJoin。Shuffle based Join 需要引入 Shuffle,代价相对较高。BroadcastJoin 无须 Join,但要求至少有一张表足够小,能通过 Spark 的 Broadcast 机制广播到每个 Executor 中。

在不开启 CBO 中,Spark SQL 通过 spark.sql.autoBroadcastJoinThreshold 判断是否启用 BroadcastJoin。其默认值为 10485760 即 10 MB。

并且该判断基于参与 Join 的表的原始大小。

在下图示例中,Table 1 大小为 1 TB,Table 2 大小为 20 GB,因此在对二者进行 join 时,由于二者都远大于自动 BroatcastJoin 的阈值,因此 Spark SQL 在未开启 CBO 时选用 SortMergeJoin 对二者进行 Join。

而开启 CBO 后,由于 Table 1 经过 Filter 1 后结果集大小为 500 GB,Table 2 经过 Filter 2 后结果集大小为 10 MB 低于自动 BroatcastJoin 阈值,因此 Spark SQL 选用 BroadcastJoin。

优化多表 Join 顺序

未开启 CBO 时,Spark SQL 按 SQL 中 join 顺序进行 Join。极端情况下,整个 Join 可能是 left-deep tree。在下图所示 TPC-DS Q25 中,多路 Join 存在如下问题,因此耗时 241 秒。

开启 CBO 后, Spark SQL 将执行计划优化如下

优化后的 Join 有如下优势,因此执行时间降至 71 秒

总结

5万人关注的大数据成神之路,不来了解一下吗?

5万人关注的大数据成神之路,真的不来了解一下吗?

5万人关注的大数据成神之路,确定真的不来了解一下吗?

[img]

sparkSQL和spark有什么区别?

Spark为结构化数据处理引入了一没侍个称为Spark SQL的编程模块。简而言之,sparkSQL是Spark的前身,是在Hadoop发展过程中,为了给熟悉RDBMS但又不理解MapReduce的技术人员提供快速上手的工具。

sparkSQL提供了一个称为DataFrame(数据框)的编程抽象,DF的底层仍然是RDD,并且可以充当分布式SQL查询引擎。

SparkSql有哪些特点呢?

1)引脊察带入了新的RDD类型SchemaRDD,可以像传统数据库定义表一样来定樱芦义SchemaRDD。

2)在应用程序中可以混合使用不同来源的数据,如可以将来自HiveQL的数据和来自SQL的数据进行Join操作。

3)内嵌了查询优化框架,在把SQL解析成逻辑执行计划之后,最后变成RDD的计算。

spark -SQL 配置参数

官网:

或者缓存dataFrame

或者CACHE TABLE

可通过两种配置方式开启缓存数据功能:

使和穗用SQLContext的setConf方法

执行SQL命令 SET key=value

用到的配置

-- spark.sql.autoBroadcastJoinThreshold, broadcast表的最大值10M,当这是为-1时, broadcasting不可用,内存允许的情况下加大这个值

-- spark.sql.shuffle.partitions 当join或者聚合产生shuffle操作时, partitions的数量, 这个值可以调大点, 我一般配置500, 切分更多的task, 有助于数据倾斜的减缓, 但是如果task越多, shuffle数据量也会迹碰增多

对于broadcast join模式,会将小于spark.sql.autoBroadcastJoinThreshold值(默认为10M)的表广播到其他计算节点,不走shuffle过程,所以会更加高效唤州卜。

官网的原话是这个样子:

The BROADCAST hint guides Spark to broadcast each specified table when joining them with another table or view. When Spark deciding the join methods, the broadcast hash join (i.e., BHJ) is preferred, even if the statistics is above the configuration spark.sql.autoBroadcastJoinThreshold. When both sides of a join are specified, Spark broadcasts the one having the lower statistics. Note Spark does not guarantee BHJ is always chosen, since not all cases (e.g. full outer join) support BHJ. When the broadcast nested loop join is selected, we still respect the hint.

注意 : 确定broadcast hash join的决定性因素是hive的表统计信息一定要准确。并且,由于视图是没有表统计信息的,所以所有的视图在join时都不会被广播。所以至少要有一张hive表。

------------------------待完善------------------------

Spark Sql 函数使用

round -  保留数据精度 

如 round(col("col1"),0) 对闹孙应数值为 21.23 -液高链 21.0 ;21.73 - 22.0

如 round(col("col1"),1) 对应数值为 21.23 -念顷 21.2

如 round(col("col1"),-1) 对应数值为 21.23 - 20.0

Spark Sql 源码剖析(二): TreeNode

使用 object CurrentOrigin 为 TreeNodes 提供一个可以查找上下文的地方,比如当前正在解析哪行 code。

object CurrentOrigin 主燃颤含要包含一个 private val value = new ThreadLocal[Origin]() ,目前 CurrentOrigin 仅在 parser 中使用,在 visit 每个节点的时候都会使用,记录当前 parse 的节点是哪行哪列

另外,从 value 是 ThreadLocal 类型可以看出,在 Spark SQL 中,parse sql 时都是在单独的 thread 里进行的(不同的 sql 不同的 thread)

返回该节点的 seq of children,children 是不可变的。有三种情况:

查找第一个符合 f 条件(比如某个类型的)的 TreeNode,先序遍历。

将函数 f 递归应用洞猛于节点及其子节点

与 foreach 不同的是,foreach 先应用于 parent,再应用与 child;而 foreachUp 是先应用于 child 再应用与 parent

调用 foreach,foreach 中应用的函数是 ret += f(_) ,最终返回一个 seq,包含将 f 通过 foreach 方式应用于所有节点并 add 到 ret。其中 f 本身是 BaseType = A 类型

原理与 map 一致,只是 f 变成了 BaseType = TraversableOnce[A]

PartialFunction#lift :将 partial func 转换为一个返回 Option 结果的函数。将 pf 函数应用于符合 pf 定义的节点(即 pf.lift(node)返回的 Option 不是 None )并都皮笑 add 到 ret = new collection.mutable.ArrayBuffer[B] 以 Seq 形式返回

以 Seq 的形式返回 tree 的所有叶子节点

def collectFirst[B](pf: PartialFunction[BaseType, B]): Option[B] :注意,因为可能没有符合 pf 定义的节点,所有返回的 Option 可能是 None

相当于 productIterator.map(f).toArray ,即对于 productIterator 每个元素执行 f 然后将 ret 组成一个 arr 返回

注意:TreeNode 没有实现 Product 相关方法,都由其子类自行实现

使用 new children 替换并返回该节点的拷贝。该方法会对 productElement 每个元素进行模式匹配,根据节点类型及一定规则进行替换。

调用 transformDown

rule: PartialFunction[BaseType, BaseType]

返回 f 应用于所有子节点(非递归,一般将递归操作放在调用该函数的地方)后该节点的 copy。其内部的原理是调用 mapProductIterator,对每一个 productElement(i) 进行各种模式匹配,若能匹配上某个再根据一定规则进行转换,核心匹配转换如下:

以上都是适用于有 children 的 node,如果是 children 为 null 的 node 直接返回

反射生成节点副本

返回该类型 TreeNode 的 name,默认为 class name;注意,会移除物理操作的 Exec$ 前缀

所有应该以该节点内嵌套树表示的 nodes,比如,可以被用来表示 sub-queries

(children ++ innerChildren).toSet[TreeNode[_]]

主要用于交互式 debug,返回该 tree 指定下标的节点,num 可以在 numberedTreeString 找到。最终调用的

我的博客即将搬运同步至腾讯云+社区,邀请大家一同入驻:

Spark SQL(十):Hive On Spark

Hive是目前大数据领域,事实上的SQL标准。其底层默认是基于MapReduce实现的,但是由于MapReduce速度实在比较慢,因此这几年,陆续出来了新的SQL查询引擎,包括Spark SQL,Hive On Tez,Hive On Spark等。

Spark SQL与Hive On Spark是不一样的。Spark SQL是Spark自己研发出来的针对各种数据源,包括Hive、JSON、Parquet、JDBC、RDD等都可以执行查询的,一套基于Spark计算引擎的查询引擎。因此它是Spark的一个项目,只不过提供了逗闭针对Hive执行查询的工功能而已,适合在一些使用Spark技术栈的大数据应用类系统中使用。

而Hive On Spark,是Hive的一个项目,它是将Spark作为底层的查询引擎(不通过MapReduce作为唯一的查询引擎)。Hive On Spark,只适用于Hive,在可预见的未来,很有可能Hive默认的底层引擎就从MapReduce切换为Spark了;适合于将原有早粗的Hive数据仓库以及数据统计分析替山睁裂换为Spark引擎,作为全公司通用的大数据统计分析引擎。

Hive On Spark做了一些优化:

1、Map Join

Spark SQL默认对join是支持使用broadcast机制将小表广播到各个节点上,以进行join的。但是问题是,这会给Driver和Worker带来很大的内存开销。因为广播的数据要一直保留在Driver内存中。所以目前采取的是,类似乎MapReduce的Distributed Cache机制,即提高HDFS replica factor的复制因子,以让数据在每个计算节点上都有一个备份,从而可以在本地进行数据读取。

2、Cache Table

对于某些需要对一张表执行多次操作的场景,Hive On Spark内部做了优化,即将要多次操作的表cache到内存中,以便于提升性能。但是这里要注意,并不是对所有的情况都会自动进行cache。所以说,Hive On Spark还有很多不完善的地方。

Hive QL语句 =

语法分析 = AST =

生成逻辑执行计划 = Operator Tree =

优化逻辑执行计划 = Optimized Operator Tree =

生成物理执行计划 = Task Tree =

优化物理执行计划 = Optimized Task Tree =

执行优化后的Optimized Task Tree

关于sparksql和sparksql的程序入口的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。

标签列表