Skip to content

Commit

Permalink
add incremental computation of filesizehistogram
Browse files Browse the repository at this point in the history
  • Loading branch information
dhruvarya-db committed Dec 6, 2024
1 parent da162a0 commit ae0ae4f
Show file tree
Hide file tree
Showing 8 changed files with 822 additions and 7 deletions.
43 changes: 42 additions & 1 deletion spark/src/main/scala/org/apache/spark/sql/delta/Checksum.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.stats.FileSizeHistogram
import org.apache.spark.sql.delta.storage.LogStore
import org.apache.spark.sql.delta.util.{FileNames, JsonUtils}
import org.apache.spark.sql.util.ScalaExtensions._
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.fs.Path
Expand Down Expand Up @@ -136,6 +137,8 @@ trait RecordChecksum extends DeltaLogging {
* `versionToCompute - 1` or a snapshot. Note that the snapshot may
* belong to any version and this method will only use the snapshot if
* it corresponds to `versionToCompute - 1`.
* @param mustIncludeFileSizeHistogram True if the new checksum must include a file size
* histogram
* @param includeAddFilesInCrc True if the new checksum should include a [[AddFile]]s.
* @return Either the new checksum or an error code string if the checksum could not be computed.
*/
Expand All @@ -150,6 +153,7 @@ trait RecordChecksum extends DeltaLogging {
operationName: String,
txnIdOpt: Option[String],
previousVersionState: Either[Snapshot, VersionChecksum],
mustIncludeFileSizeHistogram: Boolean,
includeAddFilesInCrc: Boolean
): Either[String, VersionChecksum] = {
// scalastyle:on argcount
Expand Down Expand Up @@ -192,6 +196,9 @@ trait RecordChecksum extends DeltaLogging {
val oldCrcFiltered = oldCrcOpt
.filterNot(_.metadata == null)
.filterNot(_.protocol == null)
// If the old CRC doesn't have file size histogram, we can't use it to generate new CRC
// in case `mustIncludeFileSizeHistogram` is set.
.filterNot(_.histogramOpt.isEmpty && mustIncludeFileSizeHistogram)

val oldCrc = oldCrcFiltered.getOrElse {
return Left("OLD_CRC_INCOMPLETE")
Expand Down Expand Up @@ -247,20 +254,28 @@ trait RecordChecksum extends DeltaLogging {
var numFiles = oldVersionChecksum.numFiles
var protocol = oldVersionChecksum.protocol
var metadata = oldVersionChecksum.metadata
val histogramOpt =
Option.when (spark.conf.get(DeltaSQLConf.DELTA_FILE_SIZE_HISTOGRAM_ENABLED)) {
oldVersionChecksum.histogramOpt.map { h =>
FileSizeHistogram(h.sortedBinBoundaries, h.fileCounts.clone(), h.totalBytes.clone())
}
}.flatten

var inCommitTimestamp : Option[Long] = None
actions.foreach {
case a: AddFile if !ignoreAddFiles =>
tableSizeBytes += a.size
numFiles += 1

histogramOpt.foreach(_.insert(a.size))

// extendedFileMetadata == true implies fields partitionValues, size, and tags are present
case r: RemoveFile if r.extendedFileMetadata == Some(true) =>
val size = r.size.get
tableSizeBytes -= size
numFiles -= 1

histogramOpt.foreach(_.remove(size))

case r: RemoveFile =>
// Report the failure to usage logs.
Expand Down Expand Up @@ -367,7 +382,7 @@ trait RecordChecksum extends DeltaLogging {
setTransactions = setTransactions,
domainMetadata = domainMetadata,
allFiles = allFiles,
histogramOpt = None
histogramOpt = histogramOpt
))
}

Expand Down Expand Up @@ -794,6 +809,32 @@ trait ValidateChecksum extends DeltaLogging { self: Snapshot =>
detailedErrorMapForUsageLogs += ("domainMetadata" -> JsonUtils.toJson(eventData))
}
}
def compareFileSizeHistogram(
expectedOpt: Option[FileSizeHistogram],
foundOpt: Option[FileSizeHistogram]): Unit = {
val result = new ArrayBuffer[String]()
expectedOpt.zip(foundOpt).foreach {
// Only check if both expected and found are present. Missing histograms can happen
// when a commit is written by older writers or when histograms have been disabled in
// the Spark session conf.
case (expected, found) =>
if (!expected.sortedBinBoundaries.equals(found.sortedBinBoundaries)) {
result.append(s"FileSizeHistogram mismatch in sorted boundaries")
}
if (!java.util.Arrays.equals(expected.totalBytes, found.totalBytes)) {
result.append(s"FileSizeHistogram mismatch in file sizes")
}
if (!java.util.Arrays.equals(expected.fileCounts, found.fileCounts)) {
result.append(s"FileSizeHistogram mismatch in file counts")
}
}
if (result.nonEmpty) {
errorMap += ("fileSizeHistogram" -> result.mkString("\n"))
}
}
if (spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_FILE_SIZE_HISTOGRAM_ENABLED)) {
compareFileSizeHistogram(checksum.histogramOpt, computedStateToCheckAgainst.fileSizeHistogram)
}

compareAction(checksum.metadata, computedStateToCheckAgainst.metadata, "Metadata", "metadata")
compareAction(checksum.protocol, computedStateToCheckAgainst.protocol, "Protocol", "protocol")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import org.apache.spark.sql.delta.redirect.{RedirectFeature, TableRedirectConfig
import org.apache.spark.sql.delta.schema.{SchemaMergingUtils, SchemaUtils}
import org.apache.spark.sql.delta.sources.{DeltaSourceUtils, DeltaSQLConf}
import org.apache.spark.sql.delta.stats._
import org.apache.spark.sql.delta.stats.FileSizeHistogramUtils
import org.apache.spark.sql.delta.util.{DeltaCommitFileProvider, JsonUtils}
import org.apache.spark.sql.util.ScalaExtensions._
import io.delta.storage.commit._
Expand Down Expand Up @@ -1601,6 +1602,10 @@ trait OptimisticTransactionImpl extends TransactionalWrite
var addFilesHistogram: Option[FileSizeHistogram] = None
var removeFilesHistogram: Option[FileSizeHistogram] = None
val assertDeletionVectorWellFormed = getAssertDeletionVectorWellFormedFunc(spark, op)
if (spark.conf.get(DeltaSQLConf.DELTA_FILE_SIZE_HISTOGRAM_ENABLED)) {
addFilesHistogram = Some(FileSizeHistogramUtils.emptyHistogram)
removeFilesHistogram = Some(FileSizeHistogramUtils.emptyHistogram)
}
// Initialize everything needed to maintain auto-compaction stats.
partitionsAddedToOpt = Some(new mutable.HashSet[Map[String, String]])
val acStatsCollector = createAutoCompactStatsCollector()
Expand Down Expand Up @@ -1717,7 +1722,12 @@ trait OptimisticTransactionImpl extends TransactionalWrite
isolationLevel = Serializable.toString,
coordinatedCommitsInfo = createCoordinatedCommitsStats(),
numOfDomainMetadatas = numOfDomainMetadatas,
txnId = Some(txnId))
txnId = Some(txnId),
fileSizeHistogram =
postCommitSnapshot.fileSizeHistogram.map(FileSizeHistogramUtils.compress),
addFilesHistogram = addFilesHistogram.map(FileSizeHistogramUtils.compress),
removeFilesHistogram = removeFilesHistogram.map(FileSizeHistogramUtils.compress)
)

executionObserver.transactionCommitted()
recordDeltaEvent(deltaLog, DeltaLogging.DELTA_COMMIT_STATS_OPTYPE, data = stats)
Expand Down Expand Up @@ -2327,6 +2337,12 @@ trait OptimisticTransactionImpl extends TransactionalWrite
// variables. This is more efficient than a functional approach.
var numAbsolutePaths = 0
val distinctPartitions = new mutable.HashSet[Map[String, String]]
val (addFilesHistogram, removeFilesHistogram) =
if (spark.conf.get(DeltaSQLConf.DELTA_FILE_SIZE_HISTOGRAM_ENABLED)) {
(Some(FileSizeHistogramUtils.emptyHistogram), Some(FileSizeHistogramUtils.emptyHistogram))
} else {
(None, None)
}

var bytesNew: Long = 0L
var numAdd: Int = 0
Expand All @@ -2340,9 +2356,11 @@ trait OptimisticTransactionImpl extends TransactionalWrite
numAdd += 1
if (a.pathAsUri.isAbsolute) numAbsolutePaths += 1
distinctPartitions += a.partitionValues
addFilesHistogram.foreach(_.insert(a.size))
if (a.dataChange) bytesNew += a.size
case r: RemoveFile =>
numRemove += 1
removeFilesHistogram.foreach(_.insert(r.size.getOrElse(0L)))
case c: AddCDCFile =>
numCdcFiles += 1
cdcBytesNew += c.size
Expand All @@ -2363,6 +2381,10 @@ trait OptimisticTransactionImpl extends TransactionalWrite
val numFilesTotal = if (doCollectCommitStats) postCommitSnapshot.numOfFiles else -1L
val sizeInBytesTotal = if (doCollectCommitStats) postCommitSnapshot.sizeInBytes else -1L

val fileSizeHistogram = postCommitSnapshot
.checksumOpt
.flatMap(_.histogramOpt)
.map(FileSizeHistogramUtils.compress)
val stats = CommitStats(
startVersion = snapshot.version,
commitVersion = attemptVersion,
Expand Down Expand Up @@ -2392,6 +2414,9 @@ trait OptimisticTransactionImpl extends TransactionalWrite
numPartitionColumnsInTable = postCommitSnapshot.metadata.partitionColumns.size,
isolationLevel = isolationLevel.toString,
coordinatedCommitsInfo = createCoordinatedCommitsStats(),
addFilesHistogram = addFilesHistogram.map(FileSizeHistogramUtils.compress),
removeFilesHistogram = removeFilesHistogram.map(FileSizeHistogramUtils.compress),
fileSizeHistogram = fileSizeHistogram,
numOfDomainMetadatas = numOfDomainMetadatas,
txnId = Some(txnId))
recordDeltaEvent(deltaLog, DeltaLogging.DELTA_COMMIT_STATS_OPTYPE, data = stats)
Expand Down Expand Up @@ -2553,6 +2578,8 @@ trait OptimisticTransactionImpl extends TransactionalWrite
operationName = currentTransactionInfo.op.name,
txnIdOpt = Some(currentTransactionInfo.txnId),
previousVersionState = scala.Left(snapshot),
mustIncludeFileSizeHistogram =
spark.conf.get(DeltaSQLConf.DELTA_FILE_SIZE_HISTOGRAM_ENABLED),
includeAddFilesInCrc = allFilesInCrcWritePathEnabled
).toOption
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,10 @@ class Snapshot(
},
domainMetadata = checksumOpt.flatMap(_.domainMetadata)
.orElse(Option.when(_computedStateTriggered)(domainMetadata)),
histogramOpt = checksumOpt.flatMap(_.histogramOpt)
histogramOpt = Option.when(fileSizeHistogramEnabled) {
checksumOpt.flatMap(_.histogramOpt)
.orElse(Option.when(_computedStateTriggered)(fileSizeHistogram).flatten)
}.flatten
)

/** Returns the data schema of the table, used for reading stats */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import org.apache.spark.sql.delta.actions.DomainMetadata
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.stats.FileSizeHistogram
import org.apache.spark.sql.delta.stats.FileSizeHistogramUtils
import org.apache.spark.sql.util.ScalaExtensions._

import org.apache.spark.sql.{Column, DataFrame}
import org.apache.spark.sql.functions.{coalesce, col, collect_set, count, last, lit, sum}
Expand Down Expand Up @@ -65,6 +67,10 @@ trait SnapshotStateManager extends DeltaLogging { self: Snapshot =>

// For implicits which re-use Encoder:
import implicits._

protected def fileSizeHistogramEnabled: Boolean =
spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_FILE_SIZE_HISTOGRAM_ENABLED)

/** Whether computedState is already computed or not */
@volatile protected var _computedStateTriggered: Boolean = false

Expand Down Expand Up @@ -139,6 +145,8 @@ trait SnapshotStateManager extends DeltaLogging { self: Snapshot =>
* A Map of alias to aggregations which needs to be done to calculate the `computedState`
*/
protected def aggregationsToComputeState: Map[String, Column] = {
lazy val histogramAgg =
FileSizeHistogramUtils.histogramAggregate(coalesce(col("add.size"), lit(-1L)).expr)
Map(
// sum may return null for empty data set.
"sizeInBytes" -> coalesce(sum(col("add.size")), lit(0L)),
Expand All @@ -151,7 +159,8 @@ trait SnapshotStateManager extends DeltaLogging { self: Snapshot =>
"domainMetadata" -> collect_set(col("domainMetadata")),
"metadata" -> last(col("metaData"), ignoreNulls = true),
"protocol" -> last(col("protocol"), ignoreNulls = true),
"fileSizeHistogram" -> lit(null).cast(FileSizeHistogram.schema)
"fileSizeHistogram" ->
(if (fileSizeHistogramEnabled) histogramAgg else lit(null).cast(FileSizeHistogram.schema))
)
}

Expand All @@ -165,7 +174,8 @@ trait SnapshotStateManager extends DeltaLogging { self: Snapshot =>
def numOfMetadata: Long = computedState.numOfMetadata
def numOfProtocol: Long = computedState.numOfProtocol
def setTransactions: Seq[SetTransaction] = computedState.setTransactions
def fileSizeHistogram: Option[FileSizeHistogram] = computedState.fileSizeHistogram
def fileSizeHistogram: Option[FileSizeHistogram] =
Option.when(fileSizeHistogramEnabled)(computedState.fileSizeHistogram).flatten
def domainMetadata: Seq[DomainMetadata] = computedState.domainMetadata
protected[delta] def sizeInBytesIfKnown: Option[Long] = Some(sizeInBytes)
protected[delta] def setTransactionsIfKnown: Option[Seq[SetTransaction]] = Some(setTransactions)
Expand All @@ -185,7 +195,9 @@ trait SnapshotStateManager extends DeltaLogging { self: Snapshot =>
setTransactions = Nil,
domainMetadata = Nil,
metadata = metadata,
protocol = protocol
protocol = protocol,
fileSizeHistogram =
Option.when(fileSizeHistogramEnabled)(FileSizeHistogramUtils.emptyHistogram)
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1153,6 +1153,15 @@ trait DeltaSQLConfBase {
.booleanConf
.createWithDefault(true)

val DELTA_FILE_SIZE_HISTOGRAM_ENABLED =
buildConf("fileSizeHistogramMetrics.enabled")
.internal()
.doc(s"""When enabled, each delta transaction reports file size distribution histogram
|of all the files in the latest snapshot after the commit and histograms of new
|files added and old files removed.""".stripMargin)
.booleanConf
.createWithDefault(true)

val DELTA_CHECKPOINT_THROW_EXCEPTION_WHEN_FAILED =
buildConf("checkpoint.exceptionThrowing.enabled")
.internal()
Expand Down
Loading

0 comments on commit ae0ae4f

Please sign in to comment.