Skip to content

Commit

Permalink
read p&m from the CRC
Browse files Browse the repository at this point in the history
  • Loading branch information
dhruvarya-db committed Dec 4, 2024
1 parent 43ea689 commit af9c14c
Show file tree
Hide file tree
Showing 10 changed files with 207 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -795,8 +795,11 @@ trait ValidateChecksum extends DeltaLogging { self: Snapshot =>
}
}

compareAction(checksum.metadata, computedStateToCheckAgainst.metadata, "Metadata", "metadata")
compareAction(checksum.protocol, computedStateToCheckAgainst.protocol, "Protocol", "protocol")
if (spark.sessionState.conf.getConf(
DeltaSQLConf.USE_PROTOCOL_AND_METADATA_FROM_CHECKSUM_ENABLED)) {
compareAction(checksum.metadata, computedStateToCheckAgainst.metadata, "Metadata", "metadata")
compareAction(checksum.protocol, computedStateToCheckAgainst.protocol, "Protocol", "protocol")
}
compare(
checksum.tableSizeBytes,
computedStateToCheckAgainst.sizeInBytes,
Expand Down
39 changes: 39 additions & 0 deletions spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,41 @@ class Snapshot(

override def protocol: Protocol = _reconstructedProtocolMetadataAndICT.protocol

/**
* Tries to retrieve the protocol, metadata, and in-commit-timestamp (if needed) from the
* checksum file. If the checksum file is not present or if the protocol or metadata is missing
* this will return None.
*/
protected def getProtocolMetadataAndIctFromCrc():
Option[Array[ReconstructedProtocolMetadataAndICT]] = {
if (!spark.sessionState.conf.getConf(
DeltaSQLConf.USE_PROTOCOL_AND_METADATA_FROM_CHECKSUM_ENABLED)) {
return None
}
checksumOpt.map(c => (c.protocol, c.metadata, c.inCommitTimestampOpt)).flatMap {
case (p: Protocol, m: Metadata, ict: Option[Long]) =>
Some(Array((p, null, None), (null, m, None), (null, null, ict))
.map(ReconstructedProtocolMetadataAndICT.tupled))

case (p, m, _) if p != null || m != null =>
// One was missing from the .crc file... warn and fall back to an optimized query
val protocolStr = Option(p).map(_.toString).getOrElse("null")
val metadataStr = Option(m).map(_.toString).getOrElse("null")
recordDeltaEvent(
deltaLog,
opType = "delta.assertions.missingEitherProtocolOrMetadataFromChecksum",
data = Map(
"version" -> version.toString, "protocol" -> protocolStr, "source" -> metadataStr))
logWarning(log"Either protocol or metadata is null from checksum; " +
log"version:${MDC(DeltaLogKeys.VERSION, version)} " +
log"protocol:${MDC(DeltaLogKeys.PROTOCOL, protocolStr)} " +
log"metadata:${MDC(DeltaLogKeys.DELTA_METADATA, metadataStr)}")
None

case _ => None // both missing... fall back to an optimized query
}
}

/**
* Pulls the protocol and metadata of the table from the files that are used to compute the
* Snapshot directly--without triggering a full state reconstruction. This is important, because
Expand All @@ -394,6 +429,10 @@ class Snapshot(
Array[ReconstructedProtocolMetadataAndICT] = {
import implicits._

getProtocolMetadataAndIctFromCrc().foreach { protocolMetadataAndIctFromCrc =>
return protocolMetadataAndIctFromCrc
}

val schemaToUse = Action.logSchema(Set("protocol", "metaData", "commitInfo"))
val checkpointOpt = checkpointProvider.topLevelFileIndex.map { index =>
deltaLog.loadIndex(index, schemaToUse)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ trait DeltaLogKeysBase {
case object DATA_FILTER extends LogKeyShims
case object DATE extends LogKeyShims
case object DELTA_COMMIT_INFO extends LogKeyShims
case object DELTA_METADATA extends LogKeyShims
case object DIR extends LogKeyShims
case object DURATION extends LogKeyShims
case object END_INDEX extends LogKeyShims
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1144,6 +1144,15 @@ trait DeltaSQLConfBase {
.intConf
.createOptional

val USE_PROTOCOL_AND_METADATA_FROM_CHECKSUM_ENABLED =
buildConf("readProtocolAndMetadataFromChecksum.enabled")
.internal()
.doc("If enabled, delta log snapshot will read the protocol, metadata, and ICT " +
"(if applicable) from the checksum file and use those to avoid a spark job over the " +
"checkpoint for the two rows of protocol and metadata")
.booleanConf
.createWithDefault(true)

val DELTA_CHECKPOINT_THROW_EXCEPTION_WHEN_FAILED =
buildConf("checkpoint.exceptionThrowing.enabled")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ class ChecksumSuite
DeltaSQLConf.INCREMENTAL_COMMIT_VERIFY.key -> "true",
DeltaSQLConf.DELTA_CHECKSUM_MISMATCH_IS_FATAL.key -> "false",
DeltaSQLConf.INCREMENTAL_COMMIT_ENABLED.key -> "true",
DeltaSQLConf.DELTA_ALL_FILES_IN_CRC_ENABLED.key -> "false"
DeltaSQLConf.DELTA_ALL_FILES_IN_CRC_ENABLED.key -> "false",
DeltaSQLConf.USE_PROTOCOL_AND_METADATA_FROM_CHECKSUM_ENABLED.key -> "true"
) {
withTempDir { tempDir =>
import testImplicits._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class DeltaAllFilesInCrcSuite
.set(DeltaSQLConf.DELTA_ALL_FILES_IN_CRC_THRESHOLD_FILES.key, "10000")
// needed for DELTA_ALL_FILES_IN_CRC_ENABLED
.set(DeltaSQLConf.INCREMENTAL_COMMIT_ENABLED.key, "true")
.set(DeltaSQLConf.USE_PROTOCOL_AND_METADATA_FROM_CHECKSUM_ENABLED.key, "true")
// Turn on verification by default in the tests
.set(DeltaSQLConf.DELTA_ALL_FILES_IN_CRC_VERIFICATION_MODE_ENABLED.key, "true")
// Turn off force verification for non-UTC timezones by default in the tests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -791,6 +791,62 @@ class DeltaLogSuite extends QueryTest
}
}

test("checksum file should contain protocol and metadata") {
withSQLConf(
DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key -> "true",
DeltaSQLConf.USE_PROTOCOL_AND_METADATA_FROM_CHECKSUM_ENABLED.key -> "true"
) {
withTempDir { dir =>
val path = new Path("file://" + dir.getAbsolutePath)
val log = DeltaLog.forTable(spark, path)

val txn = log.startTransaction()
val files = (1 to 10).map(f => createTestAddFile(encodedPath = f.toString))
txn.commitManually(files: _*)
val metadata = log.snapshot.metadata
val protocol = log.snapshot.protocol
DeltaLog.clearCache()

val readLog = DeltaLog.forTable(spark, path)
val checksum = readLog.snapshot.checksumOpt.get
assert(checksum.metadata != null)
assert(checksum.protocol != null)
assert(checksum.metadata.equals(metadata))
assert(checksum.protocol.equals(protocol))
}
}
}

test("checksum reader should be able to read incomplete checksum file without " +
"protocol and metadata") {
withSQLConf(
DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key -> "true",
DeltaSQLConf.USE_PROTOCOL_AND_METADATA_FROM_CHECKSUM_ENABLED.key -> "true"
) {
withTempDir { dir =>
val path = new Path("file://" + dir.getAbsolutePath)
val log = DeltaLog.forTable(spark, path)

val txn = log.startTransaction()
val files = (1 to 10).map(f => createTestAddFile(encodedPath = f.toString))
txn.commitManually(files: _*)
val metadata = log.snapshot.metadata
val protocol = log.snapshot.protocol
DeltaLog.clearCache()
val checksumFilePath = FileNames.checksumFile(log.logPath, 0L)
removeProtocolAndMetadataFromChecksumFile(checksumFilePath)

val readLog = DeltaLog.forTable(spark, path)
val checksum = readLog.snapshot.checksumOpt.get
assert(checksum.metadata == null)
assert(checksum.protocol == null)

// check we are still able to read protocol and metadata from checkpoint
assert(readLog.snapshot.metadata.equals(metadata))
assert(readLog.snapshot.protocol.equals(protocol))
}
}
}
}

class CoordinatedCommitsBatchBackfill1DeltaLogSuite extends DeltaLogSuite {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@

package org.apache.spark.sql.delta

import java.io.File
import java.io.{BufferedReader, File, InputStreamReader}
import java.nio.charset.StandardCharsets.UTF_8
import java.util.Locale
import java.util.concurrent.ConcurrentHashMap

Expand All @@ -32,6 +33,8 @@ import org.apache.spark.sql.delta.commands.cdc.CDCReader
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.test.{DeltaSQLCommandTest, DeltaSQLTestUtils}
import org.apache.spark.sql.delta.util.FileNames
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import io.delta.tables.{DeltaTable => IODeltaTable}
import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.fs.Path
Expand Down Expand Up @@ -171,6 +174,29 @@ trait DeltaTestUtilsBase {
}
}

/**
* Remove protocol and metadata fields from checksum file of json format
*/
def removeProtocolAndMetadataFromChecksumFile(checksumFilePath : Path): Unit = {
// scalastyle:off deltahadoopconfiguration
val fs = checksumFilePath.getFileSystem(
SparkSession.getActiveSession.map(_.sessionState.newHadoopConf()).get
)
// scalastyle:on deltahadoopconfiguration
if (!fs.exists(checksumFilePath)) return
val stream = fs.open(checksumFilePath)
val reader = new BufferedReader(new InputStreamReader(stream, UTF_8))
val content = reader.readLine()
stream.close()
val mapper = new ObjectMapper()
mapper.registerModule(DefaultScalaModule)
val map = mapper.readValue(content, classOf[Map[String, String]])
val partialContent = mapper.writeValueAsString(map.-("protocol").-("metadata")) + "\n"
val output = fs.create(checksumFilePath, true)
output.write(partialContent.getBytes(UTF_8))
output.close()
}

protected def getfindTouchedFilesJobPlans(plans: Seq[Plans]): SparkPlan = {
// The expected plan for touched file computation is of the format below.
// The data column should be pruned from both leaves.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ class InCommitTimestampSuite
}

test("Missing CommitInfo should result in a DELTA_MISSING_COMMIT_INFO exception") {
// Make sure that we don't retrieve the time from the CRC.
withSQLConf(DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key -> "false") {
withTempDir { tempDir =>
spark.range(10).write.format("delta").save(tempDir.getAbsolutePath)
val deltaLog =
Expand Down Expand Up @@ -206,10 +208,13 @@ class InCommitTimestampSuite
"featureName" -> InCommitTimestampTableFeature.name,
"version" -> "1"))
}
}
}

test("Missing CommitInfo.commitTimestamp should result in a " +
"DELTA_MISSING_COMMIT_TIMESTAMP exception") {
// Make sure that we don't retrieve the time from the CRC.
withSQLConf(DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key -> "false") {
withTempDir { tempDir =>
spark.range(10).write.format("delta").save(tempDir.getAbsolutePath)
val deltaLog =
Expand Down Expand Up @@ -243,6 +248,7 @@ class InCommitTimestampSuite
"DELTA_MISSING_COMMIT_TIMESTAMP",
parameters = Map("featureName" -> InCommitTimestampTableFeature.name, "version" -> "1"))
}
}
}

test("InCommitTimestamp is equal to snapshot.timestamp") {
Expand Down Expand Up @@ -395,8 +401,28 @@ class InCommitTimestampSuite
}
}

test("snapshot.timestamp should be read from the CRC") {
withTempDir { tempDir =>
var deltaLog: DeltaLog = null
var timestamp = -1L
val usageRecords = Log4jUsageLogger.track {
spark.range(1).write.format("delta").save(tempDir.getAbsolutePath)
DeltaLog.clearCache() // Clear the post-commit snapshot from the cache.
deltaLog = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath))
val fs = deltaLog.logPath.getFileSystem(deltaLog.newDeltaHadoopConf())
assert(fs.exists(FileNames.checksumFile(deltaLog.logPath, 0)))
timestamp = deltaLog.snapshot.timestamp
}
assert(timestamp == getInCommitTimestamp(deltaLog, 0))
// No explicit read.
assert(filterUsageRecords(usageRecords, "delta.inCommitTimestamp.read").isEmpty)
}
}

test("postCommitSnapshot.timestamp should be populated by protocolMetadataAndICTReconstruction " +
"when the table has no checkpoints") {
// Make sure that we don't retrieve the time from the CRC.
withSQLConf(DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key -> "false") {
withTempDir { tempDir =>
var deltaLog: DeltaLog = null
var timestamp = -1L
Expand All @@ -410,10 +436,13 @@ class InCommitTimestampSuite
// No explicit read.
assert(filterUsageRecords(usageRecords, "delta.inCommitTimestamp.read").isEmpty)
}
}
}

test("snapshot.timestamp should be populated by protocolMetadataAndICTReconstruction " +
"during cold reads of checkpoints + deltas") {
// Make sure that we don't retrieve the time from the CRC.
withSQLConf(DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key -> "false") {
withTempDir { tempDir =>
var deltaLog: DeltaLog = null
var timestamp = -1L
Expand All @@ -434,10 +463,13 @@ class InCommitTimestampSuite
// No explicit read.
assert(filterUsageRecords(usageRecords, "delta.inCommitTimestamp.read").isEmpty)
}
}
}

test("snapshot.timestamp cannot be populated by protocolMetadataAndICTReconstruction " +
"during cold reads of checkpoints") {
// Make sure that we don't retrieve the time from the CRC.
withSQLConf(DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key -> "false") {
withTempDir { tempDir =>
var deltaLog: DeltaLog = null
var timestamp = -1L
Expand All @@ -452,9 +484,37 @@ class InCommitTimestampSuite
assert(timestamp == getInCommitTimestamp(deltaLog, 0))
assert(filterUsageRecords(usageRecords, "delta.inCommitTimestamp.read").length == 1)
}
}
}

test("snapshot.timestamp is read from file when CRC doesn't have ICT and " +
"the latest version has a checkpoint") {
withTempDir { tempDir =>
var deltaLog: DeltaLog = null
var timestamp = -1L
spark.range(1).write.format("delta").save(tempDir.getAbsolutePath)
deltaLog = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath))
deltaLog.createCheckpointAtVersion(0)
// Remove the ICT from the CRC.
InCommitTimestampTestUtils.overwriteICTInCrc(deltaLog, 0, None)
val usageRecords = Log4jUsageLogger.track {
DeltaLog.clearCache() // Clear the post-commit snapshot from the cache.
deltaLog = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath))
timestamp = deltaLog.snapshot.timestamp
}
assert(deltaLog.snapshot.checkpointProvider.version == 0)
assert(timestamp == getInCommitTimestamp(deltaLog, 0))
val ictReadLog = filterUsageRecords(usageRecords, "delta.inCommitTimestamp.read").head
val blob = JsonUtils.fromJson[Map[String, String]](ictReadLog.blob)
assert(blob("version") == "0")
assert(blob("checkpointVersion") == "0")
assert(blob("isCRCPresent") == "true")
}
}

test("Exceptions during ICT reads from file should be logged") {
// Make sure that we don't retrieve the time from the CRC.
withSQLConf(DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key -> "false") {
withTempDir { tempDir =>
spark.range(10).write.format("delta").save(tempDir.getAbsolutePath)
val deltaLog =
Expand Down Expand Up @@ -486,6 +546,7 @@ class InCommitTimestampSuite
assert(blob("exceptionMessage").startsWith("[DELTA_MISSING_COMMIT_INFO]"))
assert(blob("exceptionStackTrace").contains(Snapshot.getClass.getName.stripSuffix("$")))
}
}
}

test("DeltaHistoryManager.getActiveCommitAtTimeFromICTRange") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ import org.apache.spark.storage.StorageLevel
class SnapshotManagementSuite extends QueryTest with DeltaSQLTestUtils with SharedSparkSession
with DeltaSQLCommandTest with CoordinatedCommitsBaseSuite {

protected override def sparkConf = {
// Disable loading protocol and metadata from checksum file. Otherwise, creating a Snapshot
// won't touch the checkpoint file and we won't be able to retry.
super.sparkConf
.set(DeltaSQLConf.USE_PROTOCOL_AND_METADATA_FROM_CHECKSUM_ENABLED.key, "false")
}

/**
* Truncate an existing checkpoint file to create a corrupt file.
Expand Down

0 comments on commit af9c14c

Please sign in to comment.