Skip to content

Commit

Permalink
avoid computation without wscg
Browse files Browse the repository at this point in the history
  • Loading branch information
dhruvarya-db committed Dec 6, 2024
1 parent ae0ae4f commit 6255cc4
Showing 1 changed file with 11 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.util.ScalaExtensions._

import org.apache.spark.sql.{Column, DataFrame}
import org.apache.spark.sql.functions.{coalesce, col, collect_set, count, last, lit, sum}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.Utils


Expand Down Expand Up @@ -145,8 +146,16 @@ trait SnapshotStateManager extends DeltaLogging { self: Snapshot =>
* A Map of alias to aggregations which needs to be done to calculate the `computedState`
*/
protected def aggregationsToComputeState: Map[String, Column] = {
lazy val histogramAgg =
// The FileSizeHistogram computation doesn't work without whole stage codegen.
val avoidHistogramComputationDueToNoWholeStageCodeGen =
!spark.conf.get(SQLConf.WHOLESTAGE_CODEGEN_ENABLED)
val histogramAgg = if (
fileSizeHistogramEnabled && !avoidHistogramComputationDueToNoWholeStageCodeGen
) {
FileSizeHistogramUtils.histogramAggregate(coalesce(col("add.size"), lit(-1L)).expr)
} else {
lit(null).cast(FileSizeHistogram.schema)
}
Map(
// sum may return null for empty data set.
"sizeInBytes" -> coalesce(sum(col("add.size")), lit(0L)),
Expand All @@ -159,8 +168,7 @@ trait SnapshotStateManager extends DeltaLogging { self: Snapshot =>
"domainMetadata" -> collect_set(col("domainMetadata")),
"metadata" -> last(col("metaData"), ignoreNulls = true),
"protocol" -> last(col("protocol"), ignoreNulls = true),
"fileSizeHistogram" ->
(if (fileSizeHistogramEnabled) histogramAgg else lit(null).cast(FileSizeHistogram.schema))
"fileSizeHistogram" -> histogramAgg
)
}

Expand Down

0 comments on commit 6255cc4

Please sign in to comment.