diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotState.scala b/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotState.scala index 6879c66c20..7e35b9dae1 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotState.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotState.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.util.ScalaExtensions._ import org.apache.spark.sql.{Column, DataFrame} import org.apache.spark.sql.functions.{coalesce, col, collect_set, count, last, lit, sum} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.Utils @@ -145,8 +146,16 @@ trait SnapshotStateManager extends DeltaLogging { self: Snapshot => * A Map of alias to aggregations which needs to be done to calculate the `computedState` */ protected def aggregationsToComputeState: Map[String, Column] = { - lazy val histogramAgg = + // The FileSizeHistogram computation doesn't work without whole stage codegen. + val avoidHistogramComputationDueToNoWholeStageCodeGen = + !spark.conf.get(SQLConf.WHOLESTAGE_CODEGEN_ENABLED) + val histogramAgg = if ( + fileSizeHistogramEnabled && !avoidHistogramComputationDueToNoWholeStageCodeGen + ) { FileSizeHistogramUtils.histogramAggregate(coalesce(col("add.size"), lit(-1L)).expr) + } else { + lit(null).cast(FileSizeHistogram.schema) + } Map( // sum may return null for empty data set. "sizeInBytes" -> coalesce(sum(col("add.size")), lit(0L)), @@ -159,8 +168,7 @@ trait SnapshotStateManager extends DeltaLogging { self: Snapshot => "domainMetadata" -> collect_set(col("domainMetadata")), "metadata" -> last(col("metaData"), ignoreNulls = true), "protocol" -> last(col("protocol"), ignoreNulls = true), - "fileSizeHistogram" -> - (if (fileSizeHistogramEnabled) histogramAgg else lit(null).cast(FileSizeHistogram.schema)) + "fileSizeHistogram" -> histogramAgg ) }