spark调优(spark调优工具)

本篇文章给大家谈谈spark调优,以及spark调优工具对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。

本文目录一览:

Spark Core 性能调优之配置进程参数

        Spark on YARN模式下,有Driver、ApplicationMaster、Executor三种进程。在任务调度和运行的过程中,Driver和岁御敬Executor承担了很大的责任,而ApplicationMaster主要负责container的启停。

        因而Driver和Executor的参数配置对spark应用的执行有着很大的影响意义。用户可通过如下操作对Spark集群性能做优化。

1. 配置Driver内存。

Driver负责任务的调度,和Executor、AM之间的消息通信。当任务数变多,任务平行度增大时,Driver内存都需要相应增大。可以根据实际任务数量的多少,为Driver设置一个合适的内存。

● 将“spark-defaults.conf”中的“spark.driver.memory”配置项或者“spark-env.sh”中的“SPARK_DRIVER_MEMORY”配置项设置为合适大小。

● 在使用spark-submit命令时,添加“--driver-memory MEM”参数设置内存。乎慎

2. 配置Executor个数。

每个Executor每个核同时能跑一个task,所以增加了Executor的个数相当于增大了任务的并发度。在资源充足的情况下,可以相应增加Executor的个数,以提高运行效率。

● 将 “spark-defaults.conf” 中的 “spark.executor.instance” 配置项或者 “spark-env.sh” 中的 “SPARK_EXECUTOR_INSTANCES” 配置项设置为合适大小。还可以设置动态资源调度功能进行优化,详情请参见   。

● 在使用spark-submit命令时,添加“--num-executors NUM”参数设置Executor个数。

3. 配置Executor核数。

每个Executor多个核同时能跑多个task,相当于增大了任务的并发度。但是由于所有核共用Executor的内存,所以要在内存和核数之间做好平衡。

● 将“spark-defaults.conf”中的“spark.executor.cores”配置项或者“spark-env.sh”中的“SPARK_EXECUTOR_CORES”配置项设置为合适大小。

● 在拆渗使用spark-submit命令时,添加“--executor-cores NUM”参数设置核数。

4. 配置Executor内存。

Executor的内存主要用于任务执行、通信等。当一个任务很大的时候,可能需要较多资源,因而内存也可以做相应的增加;当一个任务较小运行较快时,就可以增大并发度减少内存。

● 将“spark-defaults.conf”中的“spark.executor.memory”配置项或者“spark-env.sh”中的“SPARK_EXECUTOR_MEMORY”配置项设置为合适大小。

● 在使用spark-submit命令时,添加“--executor-memory MEM”参数设置内存。

在执行spark wordcount计算中。1.6T数据,250个executor。

在默认参数下执行失败,出现Futures timed out 和 OOM 错误。

因为数据量大,task数多,而wordcount每个task都比较小,完成速度快。当task数多时driver端相应的一些对象就变大了,而且每个task完成时executor和driver都要通信,这就会导致由于内存不足,进程之间通信断连等问题。

当把Driver的内存设置到4g时,应用成功跑完。

使用ThriftServer执行TPC-DS测试套,默认参数配置下也报了很多错误:Executor

Lost等。而当配置Driver内存为30g,executor核数为2,executor个数为125,executor内存为6g时,所有任务才执行成功。

2022-02-24-Spark-44(性能调优通用调优)

计算负载主要由 Executors 承担,Driver 主要负责分布式调度,调优空间有限,因此对 Driver 端的配置项我们不作考虑

通过如下参数就可以明确有多少 CPU 资源被划拨给 Spark 用于分布式计算。

spark.cores.max 集群

spark.executor.cores Executor

spark.task.cpus 计算任务

并行度

spark.default.parallelism 并行度

spark.sql.shuffle.partitions 用于明确指定数据关联或聚合操作中 Reduce 端的分区数量。

在平衡 Execution memory 与 Storage memory 的时候,如果 RDD 缓存是刚需,我们就把 spark.memory.storageFraction 调大,并且在应用中优先把缓存灌满,再把计算逻辑应用在缓存数据之上。除此之外,我们还可以同时调整 spark.rdd.compress 和 spark.memory.storageFraction 来缓和 Full GC 的冲击

spark.local.dir 这个配置项,这个参数允许开发者设置磁盘目录,该目录用于存储 RDD cache 落盘数据块和 Shuffle 中间文件。如果你的经费比较充胡谈裕,有条件在计算节点中配备足量的 SSD 存储,甚至是更多的内存资源,完全可以把 SSD 上的文件系统目录,或是内存文件系统添加到 spark.local.dir 配置项中去,从而提供更好的 I/O 性能。

Configuration - Spark 3.2.1

自 1.6 版本之后,Spark 统一采用 Sort shuffle manager 来管理 Shuffle 操作,在 Sort shuffle manager 的管理机制下,无论计算结果本身是否需要排序,Shuffle 计算过程在 Map 阶段和 Reduce 阶段都会引入排序操作。

在不需要聚合,也不需要排序的计算场景中,我们就可以通过设置 spark.shuffle.sort.bypassMergeThreshold 的参数,来改变 Reduce 端的并行度(默认乎凯值是 200)。当 Reduce 端的分区数小于这个设置值的时候,我们就能避免 Shuffle 在计算过程引入排序。

Spark SQL 下面的配置项还是蛮多的,其中对执行性能贡献最大的,当属 AQE(Adaptive query execution,自适应查询引擎)引入的那 3 个特性了,也就是自动分区合并、自动数据倾斜处理和 Join 策略调整。

AQE 事先并不判断哪些分区足够小,而是按照分区编号进行扫描,当扫描量超过“目标尺寸”时,就合并一次,那么,“目标尺寸”由什么来决定的呢?Spark 提供了两个配置项来共同决定分区合并的“目裤顷碰标尺寸”,分区合并的目标尺寸取 advisoryPartitionSizeInBytes 与 partitionSize (每个分区的平均大小)之间的最小值。

我们来举个例子。假设,Shuffle 过后数据大小为 20GB,minPartitionNum 设置为 200,反推过来,每个分区的尺寸就是 20GB / 200 = 100MB。再假设,advisoryPartitionSizeInBytes 设置为 200MB,最终的目标分区尺寸就是取(100MB,200MB)之间的最小值,也就是 100MB。因此你看,并不是你指定了 advisoryPartitionSizeInBytes 是多少

首先,分区尺寸必须要大于 spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes 参数的设定值,才有可能被判定为倾斜分区。然后,AQE 统计所有数据分区大小并排序,取中位数作为放大基数,尺寸大于中位数一定倍数的分区会被判定为倾斜分区,中位数的放大倍数也是由参数 spark.sql.adaptive.skewJoin.skewedPartitionFactor(默认值是 5 倍) 控制。

实际上指的是,把会引入 Shuffle 的 Join 方式,如 Hash Join、Sort Merge Join,“降级”(Demote)为 Broadcast Join。

在 Spark 发布 AQE 之前,开发者可以利用 spark.sql.autoBroadcastJoinThreshold 配置项对数据关联操作进行主动降级。这个参数的默认值是 10MB,参与 Join 的两张表中只要有一张数据表的尺寸小于 10MB

不过,autoBroadcastJoinThreshold 这个参数虽然好用,但是有两个让人头疼的短板。一是可靠性较差。尽管开发者明确设置了广播阈值,而且小表数据量在阈值以内,但 Spark 对小表尺寸的误判时有发生,导致 Broadcast Join 降级失败。二来,预先设置广播阈值是一种静态的优化机制,它没有办法在运行时动态对数据关联进行降级调整。

AQE 很好地解决了这两个头疼的问题。首先,AQE 的 Join 策略调整是一种动态优化机制,对于刚才的两张大表,AQE 会在数据表完成过滤操作之后动态计算剩余数据量,当数据量满足广播条件时,AQE 会重新调整逻辑执行计划,在新的逻辑计划中把 Shuffle Joins 降级为 Broadcast Join。再者,运行时的数据量估算要比编译时准确得多,因此 AQE 的动态 Join 策略调整相比静态优化会更可靠、更稳定。

每个 Map Task 生成的数据文件,都包含所有 Reduce Task 所需的部分数据。因此,任何一个 Reduce Task 要想完成计算,必须先从所有 Map Task 的中间文件里去拉取属于自己的那部分数据。索引文件正是用于帮助判定哪部分数据属于哪个 Reduce Task。Reduce Task 通过网络拉取中间文件的过程,实际上就是不同 Stages 之间数据分发的过程。

显然,Shuffle 中数据分发的网络开销,会随着 Map Task 与 Reduce Task 的线性增长,呈指数级爆炸。

Shuffle Joins

第一步就是对参与关联的左右表分别进行 Shuffle,Shuffle 的分区规则是先对 Join keys 计算哈希值,再把哈希值对分区数取模。Shuffle 完成之后,第二步就是在同一个 Executors 内,Reduce task 就可以对 userID 一致的记录进行关联操作。

Broadcast Join

使用广播阈值配置项让 Spark 优先选择 Broadcast Joins 的关键,就是要确保至少有一张表的存储尺寸小于广播阈值(数据表在磁盘上的存储大小,同一份数据在内存中的存储大小往往会比磁盘中的存储大小膨胀数倍)

Spark 将内存分成了 Execution Memory 和 Storage Memory 两类,分别用于分布式任务执行和 RDD 缓存。其中,RDD 缓存虽然最终占用的是 Storage Memory,但在 RDD 展开(Unroll)之前,计算任务消耗的还是 Execution Memory。因此,Spark 中 CPU 与内存的平衡,其实就是 CPU 与执行内存之间的协同与配比。

并行度指的是为了实现分布式计算,分布式数据集被划分出来的份数。并行度明确了数据划分的粒度:并行度越高,数据的粒度越细,数据分片越多,数据越分散。并行度可以通过两个参数来设置,分别是 spark.default.parallelism 和 spark.sql.shuffle.partitions。前者用于设置 RDD 的默认并行度,后者在 Spark SQL 开发框架下,指定了 Shuffle Reduce 阶段默认的并行度。并发度呢?Executor 的线程池大小由参数 spark.executor.cores 决定,每个任务在执行期间需要消耗的线程数由 spark.task.cpus 配置项给定。两者相除得到的商就是并发度,也就是同一时间内,一个 Executor 内部可以同时运行的最大任务数量。又因为,spark.task.cpus 默认数值为 1,并且通常不需要调整,所以,并发度基本由 spark.executor.cores 参数敲定。就 Executor 的线程池来说,尽管线程本身可以复用,但每个线程在同一时间只能计算一个任务,每个任务负责处理一个数据分片。因此,在运行时,线程、任务与分区是一一对应的关系。

对于 User Memory 内存区域来说,使用 空间去重复存储同样的数据,本身就是降低了内存的利用率

对于存储级别来说,实际开发中最常用到的有两个,MEMORY_ONLY 和 MEMORY_AND_DISK,它们分别是 RDD 缓存和 DataFrame 缓存的默认存储级别。对于缓存计算来说,它分为 3 个步骤,第一步是 Unroll,把 RDD 数据分片的 Iterator 物化为对象值,第二步是 Transfer,把对象值封装为 MemoryEntry,第三步是把 BlockId、MemoryEntry 价值对注册到 LinkedHashMap 数据结构。另外,当数据缓存需求远大于 Storage Memory 区域的空间供给时,Spark 利用 LinkedHashMap 数据结构提供的特性,会遵循 LRU 和兔子不吃窝边草这两个基本原则来清除内存空间:LRU:按照元素的访问顺序,优先清除那些“最近最少访问”的 BlockId、MemoryEntry 键值对兔子不吃窝边草:在清除的过程中,同属一个 RDD 的 MemoryEntry 拥有“赦免权”

PROCESS_LOCAL:任务与数据同在一个 JVM 进程中

NODE_LOCAL:任务与数据同在一个计算节点,数据可能在磁盘上或是另一个 JVM 进程中

RACK_LOCAL:任务与数据不在同一节点,但在同一个物理机架上

ANY:任务与数据是跨机架、甚至是跨 DC(Data Center,数据中心)的关系访问数据源是否会引入网络开销,取决于任务与数据的本地性关系,也就是任务的本地性级别

Shuffle 作为大多数计算场景的“性能瓶颈担当”,确实是网络开销的罪魁祸首。根据“能省则省”的开发原则,我们自然要想尽办法去避免 Shuffle。

在数据通过网络分发之前,我们可以利用 Kryo Serializer 序列化器,提升序列化字节的存储效率,从而有效降低在网络中分发的数据量,整体上减少网络开销。

[img]

Spark应用 | Hive On Spark性能调优

我们迹岩公司yarn node节点的可用资源配置为:单台node节点可用资源数:核数33cores、内存110G。Hive on Spark任务的基础配置,主要配置对象包括:Executor和Driver内存,Executor配额,任务并行度。

配置参数为spark.executor.memory和spark.executor.cores。如果要最大化使用core,建议将core设置为4、5、6,且满足core的个数尽量可以整除yarn资源核数。yarn资源可用33核,建议spark.executor.cores设置为4,最多剩下一个core,如果设置为5,6都会有3个core剩余。 spark.executor.cores=4,由于总共有33个核,那么最大可以申请的executor数是8。总内存处以8,也即是 110/8,可以得到每个executor约13.75GB内存。

建议 spark.executor.memoryOverhead(spark的executor堆外内存)站总内大档存的 15%-20%。 那么最终 spark.executor.memoryOverhead=2.75 G 和spark.executor.memory=11 G

注意:默认情况下 spark.executor.memoryOverhead = max(executorMemory * 0.10, 384M),正常情况下不需要手动设置spark堆外内存,如果spark任务出现如下报错,可以手动提高堆外内存大小。

注意:默认情况下 spark.executor.memoryOverhead = max(executorMemory * 0.10, 384M),正常情况下不需要手动设置spark堆外内存,如果spark任务出现如下报错,可以手动提高堆外内存大小。

Container killed by YARN for exceeding memory limits. 16.9 GB of 16 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

对于drvier的内存配置,主要有两个参数:

Driver的内存通常来说不设置,或者设置1G左右应该就够了。需要注意的是,如果需要使用collect算子将RDD的数据全部拉取到Driver端进行处理,那么必须确保Driver的内存足够大,否则会出现OOM内存溢出的问题。

配置参数为spark.executor.instances。该参数用于设置Spark作业总共要用多少个Executor进程来执行。

executor的数目是由每个节点运行的executor数目和集群的节点数共同决定。我们离线集群27个节点,那么离线spark任务使用的最大executor数就是 216(27*8). 最大数目可能比这个小点,因为driver也会消耗核数和内存。

该参数可以结合spark.executor.cores设置,默认单个spark任务最大不超过60cores,spark.executor.cores设置为4,则spark.executor.instances不超过15。

设置spark任务的并行度参数为spark.default.parallelism。spark任务每个stage的task个数=max(spark.default.parallelism, HDFS的block数量)。如滚州乱果不设置该参数,Spark自己根据底层HDFS的block数量来设置task的数量,默认是一个HDFS block对应一个task。spark默认spark.default.parallelism配置较少,如果task个数比较少的话,前面spark资源配置没有意义。官网建议:该参数设置为 num-executors * executor-cores的2~3倍较为合适。

当一个运行时间比较长的spark任务,如果分配给他多个Executor,可是却没有task分配给它,而此时有其他的yarn任务资源紧张,这就造成了很大的资源浪费和资源不合理的调度。动态资源调度就是为了解决这种场景,根据当前应用任务的负载情况,实时的增减Executor个数,从而实现动态分配资源,使整个Spark系统更加健康。

开启spark动态资源分配后,application会在task因没有足够资源被挂起的时候去动态申请资源。当任务挂起或等待spark.dynamicAllocation.schedulerBacklogTimeout(默认1s)的时间后,会开始动态资源分配;之后每隔spark.dynamicAllocation.sustainedSchedulerBacklogTimeout(默认1s)时间申请一次,直到申请到足够的资源。每次申请的资源量是指数增长的,即1,2,4,8等。

当application的executor空闲时间超过spark.dynamicAllocation.executorIdleTimeout(默认60s)后,就会被回收。

使用场景:同一个SQL语句需要同时更新多个分区,类似于如下SQL语句:

Spark(二十二)Shuffle调优之调节Map端内存缓冲与Reduce端内存占比

Map端内存缓冲,Reduce端内存占比;很多资料、网上视频,都会说,这两个参数,是调节Shuffle性能的不二选择,很有效果的样子,实际上,不是这样的。

以实际的生产经验来说,这两个参数没有那么重要,往往来说,shuffle的性能不是因为这方面的原判碧因导致的

默认情况下,shuffle的map task,输出到磁盘文件的时候,统一都会先写入每个task自己关联的一个内存缓冲区。

这个缓冲区大小,默认是32kb。

每一次,当内存缓冲区满溢之后,才会进行spill操作,溢写操作,溢写到磁盘文件中去。

在实际生产环境中,我们在什么时候来调节两个参数?

2、看Spark UI,如果你的公司是决定采用standalone模式,那么狠简单,你的spark跑起来,会显示一个Spark UI的地址,4040的端口,进去看,依次点击进去,可以看到,你的每个stage的详情,有哪些executor,有哪些task,每个task的shuffle write和shuffle read的量,shuffle的磁盘和内存,读写的数据键缓量;如果是用的yarn模式来提交,课程最前面,从yarn的界面进去,点击对应的application,进入Spark UI,查看详情。

3、如果发现shuffle 磁盘的write和read,很大。这个时候,就意味着最好调节一些shuffle的参数。进行调优。首先当然是考虑开启map端输出文件合并机制。

4、调节上面说的那两个参数。调节的时候的原则。spark.shuffle.file.buffer,每次扩大一倍,然后看看效果,64,128;spark.shuffle.memoryFraction,每次提高0.1,看看效果。

5、不能调节的太大,太大了以后过犹不及,因为内存资源是有限的,你这里调节的太大了,其他环稿冲模节的内存使用就会有问题了。

6、调节了以后,效果?map task内存缓冲变大了,减少spill到磁盘文件的次数;reduce端聚合内存变大了,减少spill到磁盘的次数,而且减少了后面聚合读取磁盘文件的数量。

关于spark调优和spark调优工具的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。

标签列表