SparkSQL AQE 八股文
AQE(Adaptive Query Execution,自适应查询执行) 是 sparksql 的一种动态优化机制,当开启 aqe 后,每当 shuffle map 阶段执行完毕,aqe 都会结合这个阶段 shuffle map 输出的中间文件的统计信息,基于既定的规则动态调整逻辑和物理执行计划,来完成运行时的优化
* shuffle map / reduce 阶段 同 shuffle write / read
aqe 默认是关闭(从 spark 3.2 开始默认开启),通过下面这个参数开启
# 开启 AQE set spark.sql.adaptive.enabled = true;
AQE既定的包括1个逻辑优化规则和3个物理优化策略
AQE 主要有下面三个特性
Join 策略调整:SortMergeJoin -> BroadcastHashJoin
对于这个策略,aqe有一个逻辑计划,一个物理计划
逻辑计划:对于 Sort Merge Join,如果表统计信息满足中间文件小于广播阈值并且非空分区文件比例大于阈值,就会降级(Demote)为 Broadcast Hash Join
# broadcast 表阈值,默认为 10MB set spark.sql.autoBroadcastJoinThreshold=10485760; # 允许降级为 broadcast join 的最小非空分区比例,默认20% set spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin=0.2;
AQE 需要等到两张表都完成 shuffle write,才能拿到统计信息,来决定要不要降级到 broadcast join
物理计划:省去 shuffle 的网络分发环节,reduce task 就近读取本地节点的中间文件,完成广播小表的关联
自动分区合并:自动添加到物理执行计划
在 reduce 阶段,当 reduce task 把全量数据拉回,aqe 会按照分区编号的顺序,依次把小于目标尺寸的分区合并在一起,由下面两个参数共同决定,这个物理计划会被自动添加
# 启用自动分区合并,默认开启 spark.sql.adaptive.coalescePartitions.enabled = true; # 分区合并后的推荐大小 set spark.sql.adaptive.advisoryPartitionSizeInBytes = 64MB; # 分区合并后,分区数不能低于这个值 set spark.sql.adaptive.coalescePartitions.minPartitionNum = 默认的并行度;
假设shuffle 后数据大小为 20G,minPartitionNum 为 200,每个分区大小就是 20G / 200 = 100MB,如果 advisoryPartitionSizeInBytes 设置的是 200MB,目标分区大小就是 min(100MB, 200MB) = 100MB
自动倾斜处理:适用场景有限
在 reduce 阶段执行,在同一个 Executor 内部,可以把由一个 task 处理的大分区拆分成多个小分区由多个 task 计算,不能解决不同 Executor 之间的负载均衡问题
# 开启自动倾斜处理,默认开启 set spark.sql.adaptive.skewJoin.enabled = true; # 判断分区是否倾斜的比例 set spark.sql.adaptive.skewJoin.skewedPartitionFactor = 5; # 判断分区是否倾斜的最低阈值 set spark.sql.adaptive.skewJoin.skewedPartitionThresholdlnBytes = 256MB; # 拆分分区的大小 set spark.sql.adaptive.advisoryPartitionSizelnBytes = 64MB;
AQE 统计所有分区大小,取中位数,大于中位数 skewedPartitionFactor 倍数的才有可能判断为倾斜分区
然后这个分区还必须大于 skewedPartitionThresholdlnBytes 这个大小,才能最终被 aqe 判断为倾斜分区
倾斜分区会按照 advisoryPartitionSizelnBytes 的大小拆分
假如左右两表都有倾斜,左表拆成 M 个分区,右表拆成 N 个分区,那么两张表都需要保持 M * N 个分区才能保证关联逻辑的一致性,当 M 和 N 逐渐变大,aqe 自动倾斜处理的计算开销将非常大
哪些场景AQESkewedJoin不支持
AQESkewedJoin功能并不能处理所有发生数据倾斜的Join,这是由它的实现逻辑所决定的。
第一,如果倾斜的分区的大部分数据来自于上游的同一个Mapper,AQESkewedJoin无法处理,原因是Spark不支持ReduceTask只读取上游Mapper的一个block的部分数据。
第二,如果Join的发生倾斜的一侧存在Agg或者Window这类有指定requiredChildDistribution的算子,那么
SkewedJoin优化无法处理,因为将分区切分会破坏RDD的outputPartitioning,导致不再满足requiredChildDistribution.
第三,对于Outer/SemiJoin,AQESkewedJoin是无法处理非 Outer/Semi侧的数据倾斜。比如,对于LeftOuterJoin,SkewedJoin无法处理右侧的数据倾斜。
第四,AQE无法处理倾斜的BroadcastHashJoin。
所以 AQE 处理数据倾斜只适用于关联中有一边倾斜,或者有倾斜但是数据分布比较均匀
如果 key 分布悬殊较大,或者两边都有大量的倾斜,要衡量使用 aqe 还是手动处理更节省计算开销spark
#Java##数据分析##数据人offer决赛圈怎么选##数据人的面试交流地##实习需要主动找活干吗?#

