包含sparkaqe的词条

本篇文章给大家谈谈sparkaqe,以及对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。

本文目录一览:

2022-02-24-Spark-44(性能调优通用调优)

计算负载主要由 Executors 承担,Driver 主要负责分布式调度,调优空间有限,因此对 Driver 端的配置项我们不作考虑

通过如下参数就可以明确有多少 CPU 资源被划拨给 Spark 用于分布式计算。

spark.cores.max 集群

spark.executor.cores Executor

spark.task.cpus 计算任务

并行度

spark.default.parallelism 并行度

spark.sql.shuffle.partitions 用于明确指定数据关联或聚合操作中 Reduce 端的分区数量。

在平衡 Execution memory 与 Storage memory 的时候,如果 RDD 缓存是刚需,我们就把 spark.memory.storageFraction 调大,并且在应用中优先把缓存灌满,再把计算逻辑应用在缓存数据之上。除此之外,我们还可以同时调整 spark.rdd.compress 和 spark.memory.storageFraction 来缓和 Full GC 的冲击

spark.local.dir 这个配置项,这个参数允许开发者设置磁盘目录,该目录用于存储 RDD cache 落盘数据块和 Shuffle 中间文件。如果你的经费比较充胡谈裕,有条件在计算节点中配备足量的 SSD 存储,甚至是更多的内存资源,完全可以把 SSD 上的文件系统目录,或是内存文件系统添加到 spark.local.dir 配置项中去,从而提供更好的 I/O 性能。

Configuration - Spark 3.2.1

自 1.6 版本之后,Spark 统一采用 Sort shuffle manager 来管理 Shuffle 操作,在 Sort shuffle manager 的管理机制下,无论计算结果本身是否需要排序,Shuffle 计算过程在 Map 阶段和 Reduce 阶段都会引入排序操作。

在不需要聚合,也不需要排序的计算场景中,我们就可以通过设置 spark.shuffle.sort.bypassMergeThreshold 的参数,来改变 Reduce 端的并行度(默认乎凯值是 200)。当 Reduce 端的分区数小于这个设置值的时候,我们就能避免 Shuffle 在计算过程引入排序。

Spark SQL 下面的配置项还是蛮多的,其中对执行性能贡献最大的,当属 AQE(Adaptive query execution,自适应查询引擎)引入的那 3 个特性了,也就是自动分区合并、自动数据倾斜处理和 Join 策略调整。

AQE 事先并不判断哪些分区足够小,而是按照分区编号进行扫描,当扫描量超过“目标尺寸”时,就合并一次,那么,“目标尺寸”由什么来决定的呢?Spark 提供了两个配置项来共同决定分区合并的“目裤顷碰标尺寸”,分区合并的目标尺寸取 advisoryPartitionSizeInBytes 与 partitionSize (每个分区的平均大小)之间的最小值。

我们来举个例子。假设,Shuffle 过后数据大小为 20GB,minPartitionNum 设置为 200,反推过来,每个分区的尺寸就是 20GB / 200 = 100MB。再假设,advisoryPartitionSizeInBytes 设置为 200MB,最终的目标分区尺寸就是取(100MB,200MB)之间的最小值,也就是 100MB。因此你看,并不是你指定了 advisoryPartitionSizeInBytes 是多少

首先,分区尺寸必须要大于 spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes 参数的设定值,才有可能被判定为倾斜分区。然后,AQE 统计所有数据分区大小并排序,取中位数作为放大基数,尺寸大于中位数一定倍数的分区会被判定为倾斜分区,中位数的放大倍数也是由参数 spark.sql.adaptive.skewJoin.skewedPartitionFactor(默认值是 5 倍) 控制。

实际上指的是,把会引入 Shuffle 的 Join 方式,如 Hash Join、Sort Merge Join,“降级”(Demote)为 Broadcast Join。

在 Spark 发布 AQE 之前,开发者可以利用 spark.sql.autoBroadcastJoinThreshold 配置项对数据关联操作进行主动降级。这个参数的默认值是 10MB,参与 Join 的两张表中只要有一张数据表的尺寸小于 10MB

不过,autoBroadcastJoinThreshold 这个参数虽然好用,但是有两个让人头疼的短板。一是可靠性较差。尽管开发者明确设置了广播阈值,而且小表数据量在阈值以内,但 Spark 对小表尺寸的误判时有发生,导致 Broadcast Join 降级失败。二来,预先设置广播阈值是一种静态的优化机制,它没有办法在运行时动态对数据关联进行降级调整。

AQE 很好地解决了这两个头疼的问题。首先,AQE 的 Join 策略调整是一种动态优化机制,对于刚才的两张大表,AQE 会在数据表完成过滤操作之后动态计算剩余数据量,当数据量满足广播条件时,AQE 会重新调整逻辑执行计划,在新的逻辑计划中把 Shuffle Joins 降级为 Broadcast Join。再者,运行时的数据量估算要比编译时准确得多,因此 AQE 的动态 Join 策略调整相比静态优化会更可靠、更稳定。

每个 Map Task 生成的数据文件,都包含所有 Reduce Task 所需的部分数据。因此,任何一个 Reduce Task 要想完成计算,必须先从所有 Map Task 的中间文件里去拉取属于自己的那部分数据。索引文件正是用于帮助判定哪部分数据属于哪个 Reduce Task。Reduce Task 通过网络拉取中间文件的过程,实际上就是不同 Stages 之间数据分发的过程。

显然,Shuffle 中数据分发的网络开销,会随着 Map Task 与 Reduce Task 的线性增长,呈指数级爆炸。

Shuffle Joins

第一步就是对参与关联的左右表分别进行 Shuffle,Shuffle 的分区规则是先对 Join keys 计算哈希值,再把哈希值对分区数取模。Shuffle 完成之后,第二步就是在同一个 Executors 内,Reduce task 就可以对 userID 一致的记录进行关联操作。

Broadcast Join

使用广播阈值配置项让 Spark 优先选择 Broadcast Joins 的关键,就是要确保至少有一张表的存储尺寸小于广播阈值(数据表在磁盘上的存储大小,同一份数据在内存中的存储大小往往会比磁盘中的存储大小膨胀数倍)

Spark 将内存分成了 Execution Memory 和 Storage Memory 两类,分别用于分布式任务执行和 RDD 缓存。其中,RDD 缓存虽然最终占用的是 Storage Memory,但在 RDD 展开(Unroll)之前,计算任务消耗的还是 Execution Memory。因此,Spark 中 CPU 与内存的平衡,其实就是 CPU 与执行内存之间的协同与配比。

并行度指的是为了实现分布式计算,分布式数据集被划分出来的份数。并行度明确了数据划分的粒度:并行度越高,数据的粒度越细,数据分片越多,数据越分散。并行度可以通过两个参数来设置,分别是 spark.default.parallelism 和 spark.sql.shuffle.partitions。前者用于设置 RDD 的默认并行度,后者在 Spark SQL 开发框架下,指定了 Shuffle Reduce 阶段默认的并行度。并发度呢?Executor 的线程池大小由参数 spark.executor.cores 决定,每个任务在执行期间需要消耗的线程数由 spark.task.cpus 配置项给定。两者相除得到的商就是并发度,也就是同一时间内,一个 Executor 内部可以同时运行的最大任务数量。又因为,spark.task.cpus 默认数值为 1,并且通常不需要调整,所以,并发度基本由 spark.executor.cores 参数敲定。就 Executor 的线程池来说,尽管线程本身可以复用,但每个线程在同一时间只能计算一个任务,每个任务负责处理一个数据分片。因此,在运行时,线程、任务与分区是一一对应的关系。

对于 User Memory 内存区域来说,使用 空间去重复存储同样的数据,本身就是降低了内存的利用率

对于存储级别来说,实际开发中最常用到的有两个,MEMORY_ONLY 和 MEMORY_AND_DISK,它们分别是 RDD 缓存和 DataFrame 缓存的默认存储级别。对于缓存计算来说,它分为 3 个步骤,第一步是 Unroll,把 RDD 数据分片的 Iterator 物化为对象值,第二步是 Transfer,把对象值封装为 MemoryEntry,第三步是把 BlockId、MemoryEntry 价值对注册到 LinkedHashMap 数据结构。另外,当数据缓存需求远大于 Storage Memory 区域的空间供给时,Spark 利用 LinkedHashMap 数据结构提供的特性,会遵循 LRU 和兔子不吃窝边草这两个基本原则来清除内存空间:LRU:按照元素的访问顺序,优先清除那些“最近最少访问”的 BlockId、MemoryEntry 键值对兔子不吃窝边草:在清除的过程中,同属一个 RDD 的 MemoryEntry 拥有“赦免权”

PROCESS_LOCAL:任务与数据同在一个 JVM 进程中

NODE_LOCAL:任务与数据同在一个计算节点,数据可能在磁盘上或是另一个 JVM 进程中

RACK_LOCAL:任务与数据不在同一节点,但在同一个物理机架上

ANY:任务与数据是跨机架、甚至是跨 DC(Data Center,数据中心)的关系访问数据源是否会引入网络开销,取决于任务与数据的本地性关系,也就是任务的本地性级别

Shuffle 作为大多数计算场景的“性能瓶颈担当”,确实是网络开销的罪魁祸首。根据“能省则省”的开发原则,我们自然要想尽办法去避免 Shuffle。

在数据通过网络分发之前,我们可以利用 Kryo Serializer 序列化器,提升序列化字节的存储效率,从而有效降低在网络中分发的数据量,整体上减少网络开销。

2022-02-26-Spark-45(性能调优通用SQL调优)

RDD 的核心痛点是优化空间有限,它指的是 RDD 高阶算子中封装的函数对于 Spark 来说完全透明,因此 Spark 对于计算逻辑的优化无从下手。相比 RDD,DataFrame 是携带 Schema 的分布式数据集,只腔升手能封装结构化数据。DataFrame 的算子大多数都是普通的标量函数,以消费数据列为主。但是,DataFrame 更弱的表示能力和表达能力,反而为 Spark 引擎的内核优化打开了全新的空间。根据 DataFrame 简单的标量算子和明确的 Schema 定义,借助 Catalyst 优化器和 Tungsten,Spark SQL 有能力在运行时,构建起一套端到端的优化机制。这套机制运用启发式的规则与策略和运行时的执行信息,将原本次优、甚至是低效的查询计划转换为高效的执行计划,从而提升端到端的执行性能

这里的 Cache 指的就是我们常说的分布式数据缓存。想要对数据进行缓存,你可以调用 DataFrame 的.cache 或.persist,或是在 SQL 语句中使用“cache table”关键字。Cache Manager 其实很简单,它的主要职责是维护与缓存有关的信息。具体来说,Cache Manager 维护了一个 Mapping 映射字典,字典的 Key 是逻辑计划,Value 是对应的 Cache 元信息。当 Catalyst 尝试对逻辑计划做优化时,会先尝试对 Cache Manager 查找,看看当前的逻辑计划或是逻辑计划分支,是否已经被记录在 Cache Manager 的字典里。如果在字典中可以查到当前计划或是分支,Catalyst 就用 InMemoryRelation 节点来替换整个计划或是计划的一部分,从而充分利用已有的缓存数据做优化。

从 Spark Plan 到 Physical Plan 的转换,需要几组叫做 Preparation Rules 的规则。这些规则坚守最后一班岗,负责生成 Physical Plan。那么,这些规则都是什么,它们都做了哪些事情呢?

AQE 是 Spark SQL 的一种动态优化机制,在运行伍嫌时,每当 Shuffle Map 阶段执行完毕,AQE 都会结合这个阶段的统计信息,基于既定的规则动态地调整、修正尚未执行的逻辑计划和物理计划,来完成对原始查询语句的运行时优化。

DemoteBroadcastHashJoin 规则的作用,是把 Shuffle Joins 降级为 Broadcast Joins。需要注意的是,这个规则仅适用于 Shuffle Sort Merge Join 这种关联机制,其他机制如 Shuffle Hash Join、Shuffle Nested Loop Join 都不支持。

在 Reduce 阶段,当 Reduce Task 从全网把数据分片拉回,AQE 按照分区编号的顺序,依次把小于目标尺寸的分区合并在一起

在 Reduce 阶段,当 Reduce Task 所需处理的分区尺寸大于一定阈值时,利用 OptimizeSkewedJoin 策略,AQE 会把大分区拆成多个小分区。

相比于谓词下推,分区剪裁往往能更好地提升磁盘访问的 I/O 效率。谓词下推操作往往是根据文件注脚中的统计信息完成对文件的过滤,过滤效果取决于文件中内容的“纯度”。分区剪裁则不同,它的分区表可以把包含不同内容的文件,隔离到不同的文件系统目录下。这样一来,包含分区键的过滤条件能够以文件系统目录为粒度对磁盘文件进行过滤,从而大幅提笑帆升磁盘访问的 I/O 效率。

动态分区剪裁运作的背后逻辑,是把维度表中的过滤条件,通过关联关系传导到事实表,来完成事实表的优化。在数据关联的场景中,开发者要想利用好动态分区剪裁特性,需要注意 3 点:

NLJ 是采用“嵌套循环”的方式来实现关联的。也就是说,NLJ 会使用内、外两个嵌套的 for 循环依次扫描外表和内表中的数据记录,判断关联条件是否满足

SMJ 的思路是先排序、再归并。具体来说,就是参与 Join 的两张表先分别按照 Join Key 做升序排序。然后,SMJ 会使用两个独立的游标对排好序的两张表完成归并关联。

先用 Hash Key 取代 Join Keys,再清除内表冗余数据。Hash Key 实际上是 Join Keys 拼接之后的哈希值。既然存在哈希运算,我们就必须要考虑哈希冲突的问题。

AQE 允许 Spark SQL 在运行时动态地调整 Join 策略。我们刚好可以利用这个特性,把最初制定的 SMJ 策略转化为 BHJ 策略

当参与 Join 的两张表尺寸相差悬殊且小表数据分布均匀的时候,SHJ 往往比 SMJ 的执行效率更高。这种情况下,我们不妨使用 Join Hints 来强制 Spark SQL 去选择 SHJ 策略进行关联计算

先把一个复杂任务拆解成多个简单任务,再合并多个简单任务的计算结果。首先,我们要根据两张表的尺寸大小区分出外表和内表。一般来说,内表是尺寸较小的那一方。然后,我们人为地在内表上添加过滤条件,把内表划分为多个不重复的完整子集。接着,我们让外表依次与这些子集做关联,得到部分计算结果。最后,再用 Union 操作把所有的部分结果合并到一起,得到完整的计算结果,这就是端到端的关联计算。

“分而治之”中一个关键的环节就是内表拆分,我们要求每一个子表的尺寸相对均匀,且都小到可以放进广播变量。拆分的关键在于拆分列的选取,为了让子表足够小,拆分列的基数(Cardinality)要足够大才行。

对于外表参与的每一个子关联,在逻辑上,我们完全可以只扫描那些与内表子表相关的外表数据,并不需要每次都扫描外表的全量数据。

有了 AQE 的自动倾斜处理特性,在应对数据倾斜问题的时候,我们确实能够大幅节省开发成本。不过,天下没有免费的午餐,AQE 的倾斜处理是以 Task 为粒度的,这意味着原本 Executors 之间的负载倾斜并没有得到根本改善

“两阶段 Shuffle”

“两阶段 Shuffle”指的是,通过“加盐、Shuffle、关联、聚合”与“去盐化、Shuffle、聚合”这两个阶段的计算过程,在不破坏原有关联关系的前提下,在集群范围内以 Executors 为粒度平衡计算负载 。

ntekeystentry报错

某银行客户的cluster里同时安装了Spectrum Symphony + Spectrum Conductor,属于multihomed模式。这种安装和配置是支持的,详情可以参考IBM文档。

出于安全要求,他们在tier 2和tier 3启用了TLS,详情参考IBM文档。结果是,在tier 3一切顺利,访问网页没有问题;但是在tier 2却遇到了问题,报错如下。

"Failed to retrieve the Spark applications. Connection refused. Ensure that either the required IBM Spectrum Conductor services are running (ascd and REST) or SSL is configured properly."

因为只有tier 2才有问题而tier 3没问题问题,而且tier 2和tier 3的certificate都放在相同的keystore里,所以我们有理由怀疑可能tier 3的certificate配置出错了。当然,脑子里先要有关于certificate的相关知识,不然可能也不会怀疑到这。SSL certificate相关知识可以参考我的这篇"一文读懂HTTP, HTTPS, SSL和TLS"讲解。

于是,我们可以通过扮友下面的念困步骤来测试certificate的配置。

openssl s_client -CAfile /path/to/target/keystore/file -connect target_FQDN:target_port

 针对tier 3上,测试得到的结果如下,连接状态是CONNECTED,certificate chain和厅高槐certificate都可以返回来,没问题。

$openssl s_client -CAfile /opt/sym/certificates/truststore.pem -connect bens3-a1.svr.us.jpm.net:8643

CONNECTED(00000003)

depth=2 DC = NET, DC = JPMCHASE, DC = EXCHAD, CN = JPMCROOTCA

verify return:1

depth=1 DC = net, DC = jpmchase, DC = exchad, CN = PSIN0P551

verify return:1

depth=0 C = US, ST = NJ, L = Jersey City, O = JPMorg, OU = Compute Backbone, CN = bens3-a1.svr.us.jpm.net

verify return:1

---

Certificate chain

0 s:/C=US/ST=NJ/L=Jersey City/O=JPMorg /OU=Compute Backbone/CN=bens3-a1.svr.us.jpm.net

i:/DC=net/DC=jpmchase/DC=exchad/CN=PSIN0P551

1 s:/DC=net/DC=jpmchase/DC=exchad/CN=PSIN0P551

i:/DC=NET/DC=JPMCHASE/DC=EXCHAD/CN=JPMCROOTCA

---

Server certificate

-----BEGIN CERTIFICATE-----

MIIHszCCBZugAwIBAgITRQAC1Y89tfV7k9/q/gABAALVjzANBgkqhkiG9w0BAQsF

ADBbMRMwEQYKCZImiZPyLGQBGRYDbmV0MRgwFgYKCZImiZPyLGQBGRYIanBtY2hh

c2UxFjAUBgoJkiaJk/IsZAEZFgZleGNoYWQxEjAQBgNVBAMTCVBTSU4wUDU1MTAe

Fw0xOTEwMDIxMjU0MDZaFw0yMTEwMDExMjU0MDZaMIGNMQswCQYDVQQGEwJVUzEL

MAkGA1UECBMCTkoxFDASBgNVBAcTC0plcnNleSBDaXR5MRcwFQYDVQQKEw5KUE1v

cmdhbiBDaGFzZTEZMBcGA1UECxMQQ29tcHV0ZSBCYWNrYm9uZTEnMCUGA1UEAxMe

Y2JiZW5zMy1hMS5zdnIudXMuanBtY2hhc2UubmV0MIIBIjANBgkqhkiG9w0BAQEF

AAOCAQ8AMIIBCgKCAQEAo/khQh8MHdTkTuKa7eO7Qigx9UuqRlZ+lMQImtZxhiEQ

g9vpEhZk193G9IRuV8lVHbV6fMe6WYCuSGP0V+ZF1OVe5XtmFnWNNW5FS8WyApk3

hcSeWeeI6QDArMutidpya30a21UUv+ZxoOdnEDwAvMjoWBS6caJPiRnKQ77TXl+J

HHVv2Q6SDCSQiwuLxRZzD+c637bJXvvw0Tt1YKwcijp0DBwGmZotdvONulEJNvtM

J7Pn8bhgWoVC7UkM1TY6M4xikJgFHh+AlT0+Z+tYfGMbu7aPUBjO61f2Qq9KSouT

n6di8ule0c9hntat+JS1bDHz9Czd0IcmIfNpGeS8cwIDAQABo4IDOzCCAzcwKQYD

VR0RBCIwIIIeY2JiZW5zMy1hMS5zdnIudXMuanBtY2hhc2UubmV0MB0GA1UdDgQW

BBTi/DxO6VfcEMq8ZqpNiDDpPeaQ7jAfBgNVHSMEGDAWgBQy3mGo4/el4t5HICuq

......

cyxDTj1TZXJ2aWNlcyxDTj1Db25maWd1cmF0aW9uLERDPWV4Y2hhZCxEQz1qcG1j

aGFzZSxEQz1uZXQ/Y2VydGlmaWNhdGVSZXZvY2F0aW9uTGlzdD9iYXNlP29iamVj

dENsYXNzPWNSTERpc3RyaWJ1dGlvblBvaW50MIIBLgYIKwYBBQUHAQEEggEgMIIB

HDA5BggrBgEFBQcwAoYtaHR0cDovL2FkY3MuanBtY2hhc2UubmV0L2NybC9QU0lO

MFA1NTEoMSkuY3J0MCkGCCsGAQUFBzABhh1odHRwOi8vYWRjcy5qcG1jaGFzZS5u

ZXQvb2NzcDCBswYIKwYBBQUHMAKGgaZsZGFwOi8vL0NOPVBTSU4wUDU1MSxDTj1B

SUEsQ049UHVibGljJTIwS2V5JTIwU2VydmljZXMsQ049U2VydmljZXMsQ049Q29u

ZmlndXJhdGlvbixEQz1leGNoYWQsREM9anBtY2hhc2UsREM9bmV0P2NBQ2VydGlm

aWNhdGU/YmFzZT9vYmplY3RDbGFzcz1jZXJ0aWZpY2F0aW9uQXV0aG9yaXR5MA4G

A1UdDwEB/wQEAwIFoDA8BgkrBgEEAYI3FQcELzAtBiUrBgEEAYI3FQiBg5o1g/PQ

QIKBkwyC3ZYCk506RIOUsA+Etq87AgFkAgEKMB0GA1UdJQQWMBQGCCsGAQUFBwMB

BggrBgEFBQcDAjAnBgkrBgEEAYI3FQoEGjAYMAoGCCsGAQUFBwMBMAoGCCsGAQUF

BwMCMA0GCSqGSIb3DQEBCwUAA4ICAQASNIP+nc1/TAYpIzY45C+c69pFlv0QupDq

ovOs9uPz/4oiGfwLaXmVVYmmUZIdlH8QaR4v/AYGkbYnej9BAHX7/NynevTT908v

VjFMb0GNkGC+KgCOaEeLv5fR9/x2xoVFOyztjysHnDjvi1A5VcyTqRiZynwOzrMZ

jtLS/jtI/65K7yDTYQDLATuUWmi3xcl0QyV11bxgDeU6ggOu1w/SyiFPPng9mWEA

UfE8yIWiXTrEZlKo00tV8L5x6vizq4sBQTxbuOuDJbqTCJKkZUv+GQuvWuwcPcFi

xRZboWVOaZ6v9i3HOv1Yd7mCjkT67rC2lzqPgxpZAD2ew9/LTmtTQYRc7iWUUBPb

9PRIIuf8sLp/9Lt06loVGe5saFvxG/ooGSfe2JwLvQUIg9HKhZNFaIvLdu6V/dXq

DLzYdEfhF7KuM2TzwIRETSahMadk6+z17OUlzu87aWPVBr7YRmBtupBC1J1QaFH6

tbmh5+56gAmSSvNt5l6yVGgZB0ooklTJYwkc9lH7NYzunzksaXPbVvjJEDUl+e6w

z2XIripgZRZfnOiGHrNPjuPuUGP2gPFfm7NViGUoOY11GzTzU2l2xFzSMlngvIwR

sq1waInp1NDkr0ue08l27NnwBurqmiXfP9KQsu7gpaj8RAXiq8afQpReCHV9Ra3X

Oj+YAovtzA==

-----END CERTIFICATE-----

subject=/C=US/ST=NJ/L=Jersey City/O=JPMorg/OU=Compute Backbone/CN=bens3-a1.svr.us.jpm.net

issuer=/DC=net/DC=jpmchase/DC=exchad/CN=PSIN0P551

---

No client certificate CA names sent

Peer signing digest: SHA512

Server Temp Key: ECDH, P-256, 256 bits

---

SSL handshake has read 4767 bytes and written 415 bytes

---

New, TLSv1/SSLv3, Cipher is ECDHE-RSA-AES256-GCM-SHA384

Server public key is 2048 bit

Secure Renegotiation IS supported

Compression: NONE

Expansion: NONE

No ALPN negotiated

SSL-Session:

Protocol : TLSv1.2

Cipher : ECDHE-RSA-AES256-GCM-SHA384

Session-ID: 5DF8DBBCAA37AC5D809C6831174368C0545E3E06A0E8BE2F6450F03C96DCA198

Session-ID-ctx:

Master-Key: 582ABE9363DE36147A845750A7199639CF8CC88D7C3C50EE3B3C7941EE9713F120DF8558504F41CECB6838C5B6E32C47

Key-Arg : None

Krb5 Principal: None

PSK identity: None

PSK identity hint: None

Start Time: 1576590268

Timeout : 300 (sec)

Verify return code: 0 (ok)

---

针对tier 2,测试的结果如下,得到sslv3 alert handshake failure的错误,无法返回server端的certificate chain和certificate。这更一步说明tier 3的证书配置有问题。

Spark原理 | V3.0 新特性

自适应查询执行(AQE)是Spark SQL中的一种优化技术,它利用运行时统计信旁派带息来选择最有效的查询执行计划,也就是说可以根据执行过程中的运芦中间数据优化后续执行,从而提高整体执行效率。核心特征有如下三点:

使用Spark SQL时,可通过spark.sql.shuffle.partitions 指定Shuffle时Partition个数,也就是Reducer个数。该参数决定了一个Spark SQL Job中包含的所有Shuffle的Partition个数。如下图所示,当该参数设置为3时,所有Shuffle中Reducer个数都为3。

这种方法存在如下问题:

假如Stage1的5个partition数据量分别为60MB,40MB,1MB,2MB,50MB。其中1MB与2MB的Partition明显过小,开启Adaptive Execution后

每个Reducer 读取一个或者多个Shuffle Write partition数据(如下图所示,Reducer 0 读取 Partition 0,Reducer 1 读取 Partition 1、2、3,Reducer2读取Partition4)

在运行时动态调整join策略,在满足条件的情况下,即一张表小于Broadcast阈值,可以讲SortMergeJoin转化为BroadcastHashJoin。羡灶

在运行时很容易地检测出有数据倾斜的partition,当执行某个stage时,我们收集该stage的每个mapper的shuffle数据大小和记录条数。如果某一个partition数据量或者记录条数超过中位数的N倍,并且大于某个预先配置的阈值,我们就认为这是一个数据倾斜的partition,需要进行特殊的处理。

[img]

关于sparkaqe和的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。

标签列表