Spark SQL Dive (1) - When do Spark SQL runtime parameters take affects in AQE (Spark 3)?
As a gaint distributed computing engine, Apache Spark is controlled by tons of parameters.
While most parameters are static and pre-configured when the sparkContext
is initialized, there are a portion of SQL-related parameters can be updated during the runtime especially when the Adaptvie Query Execution (AQE) is enabled.
In this blog, I will iterate a selected list of SQL runtime paraemeters and show when they are taking effects with AQE based on this commit for Spark 3.5
spark.sql.inMemoryColumnarStorage.batchSize
Controls the size of batches for columnar caching. Larger batch sizes can improve memory utilization and compression, but risk OOMs when caching data.
10000 by default. Applied in
- postStageCreationRules
columnarRules
spark.sql.files.maxPartitionBytes
The maximum number of bytes to pack into a single partition when reading files. This configuration is effective only when using file-based sources such as Parquet, JSON and ORC.
128MB by default. Applied for the DataSourceScan
operators (DataSourceScanExec
). The parameter will not be used for every QueryStage, does not need to be re-tuned in AQE.
spark.sql.adaptive.autoBroadcastJoinThreshold (spark.sql.autoBroadcastJoinThreshold)
Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1, broadcasting can be disabled. The default value is the same as
spark.sql.autoBroadcastJoinThreshold
. Note that, this config is used only in adaptive framework.
The same as spark.sql.autoBroadcastJoinThreshold
by default (10MB). Applied in
- the logical SparkOptimizer
InjectRuntimeFilter
- the logical AQEOptimizer
DynamicJoinSelection
spark.sql.shuffle.partitions
Configures the number of partitions to use when shuffling data for joins or aggregations.
200 by default. Appeared in EnsureRequirements. However, when AQE is on, the actual number of partitions will be adjusted by the size distributions to re-balance. This parameter is applied in
- queryStagePreparationRules
OptimizeSkewedJoin
spark.sql.adaptive.coalescePartitions.minPartitionSize
The minimum size of shuffle partitions after coalescing. Its value can be at most 20% of
spark.sql.adaptive.advisoryPartitionSizeInBytes
. This is useful when the target size is ignored during partition coalescing, which is the default case.
1MB by default; applied in
- queryStageOptimizerRules
CoalesceShufflePartitions
,
spark.sql.adaptive.advisoryPartitionSizeInBytes
The advisory size in bytes of the shuffle partition during adaptive optimization (when
spark.sql.adaptive.enabled
is true). It takes effect when Spark coalesces small shuffle partitions or splits skewed shuffle partition.
64MB by default; applied in
-
OptimizeSkewedJoin
-
OptimizeSkewInRebalancePartitions
CoalesceShufflePartitions
-
the logical AQEOptimizer
DynamicJoinSelection
spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor
A partition will be merged during splitting if its size is small than this factor multiply
spark.sql.adaptive.advisoryPartitionSizeInBytes
.
0.2 by default. Applied in
- queryStageOptimizerRules
OptimizeSkewInRebalancePartitions
spark.sql.adaptive.skewJoin.skewedPartitionFactor
A partition is considered as skewed if its size is larger than this factor multiplying the median partition size and also larger than
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes
.
5.0 by default. Applied in
- queryStagePreparationRules
OptimizeSkewedJoin
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes
A partition is considered as skewed if its size in bytes is larger than this threshold and also larger than
spark.sql.adaptive.skewJoin.skewedPartitionFactor
multiplying the median partition size. Ideally, this config should be set larger thanspark.sql.adaptive.advisoryPartitionSizeInBytes
.
256MB by default. Applied in
- queryStagePreparationRules
OptimizeSkewedJoin