From 828c6dfc312003143af40eae1d4698419f2bb922 Mon Sep 17 00:00:00 2001 From: Dhruv Arya Date: Thu, 5 Dec 2024 00:13:45 +0530 Subject: [PATCH 1/7] read p&m from the CRC --- .../org/apache/spark/sql/delta/Checksum.scala | 7 ++- .../org/apache/spark/sql/delta/Snapshot.scala | 39 ++++++++++++ .../sql/delta/logging/DeltaLogKeys.scala | 1 + .../sql/delta/sources/DeltaSQLConf.scala | 9 +++ .../spark/sql/delta/ChecksumSuite.scala | 3 +- .../sql/delta/DeltaAllFilesInCrcSuite.scala | 1 + .../spark/sql/delta/DeltaLogSuite.scala | 56 +++++++++++++++++ .../spark/sql/delta/DeltaTestUtils.scala | 28 ++++++++- .../sql/delta/InCommitTimestampSuite.scala | 61 +++++++++++++++++++ .../sql/delta/SnapshotManagementSuite.scala | 6 ++ 10 files changed, 207 insertions(+), 4 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/Checksum.scala b/spark/src/main/scala/org/apache/spark/sql/delta/Checksum.scala index 150b57c0376..d2bf124606f 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/Checksum.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/Checksum.scala @@ -795,8 +795,11 @@ trait ValidateChecksum extends DeltaLogging { self: Snapshot => } } - compareAction(checksum.metadata, computedStateToCheckAgainst.metadata, "Metadata", "metadata") - compareAction(checksum.protocol, computedStateToCheckAgainst.protocol, "Protocol", "protocol") + if (spark.sessionState.conf.getConf( + DeltaSQLConf.USE_PROTOCOL_AND_METADATA_FROM_CHECKSUM_ENABLED)) { + compareAction(checksum.metadata, computedStateToCheckAgainst.metadata, "Metadata", "metadata") + compareAction(checksum.protocol, computedStateToCheckAgainst.protocol, "Protocol", "protocol") + } compare( checksum.tableSizeBytes, computedStateToCheckAgainst.sizeInBytes, diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala b/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala index 8340b4aaab7..62aec833a9c 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala @@ -377,6 +377,41 @@ class Snapshot( override def protocol: Protocol = _reconstructedProtocolMetadataAndICT.protocol + /** + * Tries to retrieve the protocol, metadata, and in-commit-timestamp (if needed) from the + * checksum file. If the checksum file is not present or if the protocol or metadata is missing + * this will return None. + */ + protected def getProtocolMetadataAndIctFromCrc(): + Option[Array[ReconstructedProtocolMetadataAndICT]] = { + if (!spark.sessionState.conf.getConf( + DeltaSQLConf.USE_PROTOCOL_AND_METADATA_FROM_CHECKSUM_ENABLED)) { + return None + } + checksumOpt.map(c => (c.protocol, c.metadata, c.inCommitTimestampOpt)).flatMap { + case (p: Protocol, m: Metadata, ict: Option[Long]) => + Some(Array((p, null, None), (null, m, None), (null, null, ict)) + .map(ReconstructedProtocolMetadataAndICT.tupled)) + + case (p, m, _) if p != null || m != null => + // One was missing from the .crc file... warn and fall back to an optimized query + val protocolStr = Option(p).map(_.toString).getOrElse("null") + val metadataStr = Option(m).map(_.toString).getOrElse("null") + recordDeltaEvent( + deltaLog, + opType = "delta.assertions.missingEitherProtocolOrMetadataFromChecksum", + data = Map( + "version" -> version.toString, "protocol" -> protocolStr, "source" -> metadataStr)) + logWarning(log"Either protocol or metadata is null from checksum; " + + log"version:${MDC(DeltaLogKeys.VERSION, version)} " + + log"protocol:${MDC(DeltaLogKeys.PROTOCOL, protocolStr)} " + + log"metadata:${MDC(DeltaLogKeys.DELTA_METADATA, metadataStr)}") + None + + case _ => None // both missing... fall back to an optimized query + } + } + /** * Pulls the protocol and metadata of the table from the files that are used to compute the * Snapshot directly--without triggering a full state reconstruction. This is important, because @@ -394,6 +429,10 @@ class Snapshot( Array[ReconstructedProtocolMetadataAndICT] = { import implicits._ + getProtocolMetadataAndIctFromCrc().foreach { protocolMetadataAndIctFromCrc => + return protocolMetadataAndIctFromCrc + } + val schemaToUse = Action.logSchema(Set("protocol", "metaData", "commitInfo")) val checkpointOpt = checkpointProvider.topLevelFileIndex.map { index => deltaLog.loadIndex(index, schemaToUse) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/logging/DeltaLogKeys.scala b/spark/src/main/scala/org/apache/spark/sql/delta/logging/DeltaLogKeys.scala index 1b63dfdd39a..ad48fad8dd9 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/logging/DeltaLogKeys.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/logging/DeltaLogKeys.scala @@ -57,6 +57,7 @@ trait DeltaLogKeysBase { case object DATA_FILTER extends LogKeyShims case object DATE extends LogKeyShims case object DELTA_COMMIT_INFO extends LogKeyShims + case object DELTA_METADATA extends LogKeyShims case object DIR extends LogKeyShims case object DURATION extends LogKeyShims case object END_INDEX extends LogKeyShims diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index 1c46473a56b..c1630fd5230 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -1144,6 +1144,15 @@ trait DeltaSQLConfBase { .intConf .createOptional + val USE_PROTOCOL_AND_METADATA_FROM_CHECKSUM_ENABLED = + buildConf("readProtocolAndMetadataFromChecksum.enabled") + .internal() + .doc("If enabled, delta log snapshot will read the protocol, metadata, and ICT " + + "(if applicable) from the checksum file and use those to avoid a spark job over the " + + "checkpoint for the two rows of protocol and metadata") + .booleanConf + .createWithDefault(true) + val DELTA_CHECKPOINT_THROW_EXCEPTION_WHEN_FAILED = buildConf("checkpoint.exceptionThrowing.enabled") .internal() diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/ChecksumSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/ChecksumSuite.scala index 5e4e9f2f62e..0ab4479e255 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/ChecksumSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/ChecksumSuite.scala @@ -210,7 +210,8 @@ class ChecksumSuite DeltaSQLConf.INCREMENTAL_COMMIT_VERIFY.key -> "true", DeltaSQLConf.DELTA_CHECKSUM_MISMATCH_IS_FATAL.key -> "false", DeltaSQLConf.INCREMENTAL_COMMIT_ENABLED.key -> "true", - DeltaSQLConf.DELTA_ALL_FILES_IN_CRC_ENABLED.key -> "false" + DeltaSQLConf.DELTA_ALL_FILES_IN_CRC_ENABLED.key -> "false", + DeltaSQLConf.USE_PROTOCOL_AND_METADATA_FROM_CHECKSUM_ENABLED.key -> "true" ) { withTempDir { tempDir => import testImplicits._ diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaAllFilesInCrcSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaAllFilesInCrcSuite.scala index 0f7ad328747..1e95156fe3e 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaAllFilesInCrcSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaAllFilesInCrcSuite.scala @@ -50,6 +50,7 @@ class DeltaAllFilesInCrcSuite .set(DeltaSQLConf.DELTA_ALL_FILES_IN_CRC_THRESHOLD_FILES.key, "10000") // needed for DELTA_ALL_FILES_IN_CRC_ENABLED .set(DeltaSQLConf.INCREMENTAL_COMMIT_ENABLED.key, "true") + .set(DeltaSQLConf.USE_PROTOCOL_AND_METADATA_FROM_CHECKSUM_ENABLED.key, "true") // Turn on verification by default in the tests .set(DeltaSQLConf.DELTA_ALL_FILES_IN_CRC_VERIFICATION_MODE_ENABLED.key, "true") // Turn off force verification for non-UTC timezones by default in the tests diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala index 1423d85db0e..e8c9e31f939 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala @@ -791,6 +791,62 @@ class DeltaLogSuite extends QueryTest } } + test("checksum file should contain protocol and metadata") { + withSQLConf( + DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key -> "true", + DeltaSQLConf.USE_PROTOCOL_AND_METADATA_FROM_CHECKSUM_ENABLED.key -> "true" + ) { + withTempDir { dir => + val path = new Path("file://" + dir.getAbsolutePath) + val log = DeltaLog.forTable(spark, path) + + val txn = log.startTransaction() + val files = (1 to 10).map(f => createTestAddFile(encodedPath = f.toString)) + txn.commitManually(files: _*) + val metadata = log.snapshot.metadata + val protocol = log.snapshot.protocol + DeltaLog.clearCache() + + val readLog = DeltaLog.forTable(spark, path) + val checksum = readLog.snapshot.checksumOpt.get + assert(checksum.metadata != null) + assert(checksum.protocol != null) + assert(checksum.metadata.equals(metadata)) + assert(checksum.protocol.equals(protocol)) + } + } + } + + test("checksum reader should be able to read incomplete checksum file without " + + "protocol and metadata") { + withSQLConf( + DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key -> "true", + DeltaSQLConf.USE_PROTOCOL_AND_METADATA_FROM_CHECKSUM_ENABLED.key -> "true" + ) { + withTempDir { dir => + val path = new Path("file://" + dir.getAbsolutePath) + val log = DeltaLog.forTable(spark, path) + + val txn = log.startTransaction() + val files = (1 to 10).map(f => createTestAddFile(encodedPath = f.toString)) + txn.commitManually(files: _*) + val metadata = log.snapshot.metadata + val protocol = log.snapshot.protocol + DeltaLog.clearCache() + val checksumFilePath = FileNames.checksumFile(log.logPath, 0L) + removeProtocolAndMetadataFromChecksumFile(checksumFilePath) + + val readLog = DeltaLog.forTable(spark, path) + val checksum = readLog.snapshot.checksumOpt.get + assert(checksum.metadata == null) + assert(checksum.protocol == null) + + // check we are still able to read protocol and metadata from checkpoint + assert(readLog.snapshot.metadata.equals(metadata)) + assert(readLog.snapshot.protocol.equals(protocol)) + } + } + } } class CoordinatedCommitsBatchBackfill1DeltaLogSuite extends DeltaLogSuite { diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTestUtils.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTestUtils.scala index 8b50e37fd07..5d2c67c3182 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTestUtils.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTestUtils.scala @@ -16,7 +16,8 @@ package org.apache.spark.sql.delta -import java.io.File +import java.io.{BufferedReader, File, InputStreamReader} +import java.nio.charset.StandardCharsets.UTF_8 import java.util.Locale import java.util.concurrent.ConcurrentHashMap @@ -32,6 +33,8 @@ import org.apache.spark.sql.delta.commands.cdc.CDCReader import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.test.{DeltaSQLCommandTest, DeltaSQLTestUtils} import org.apache.spark.sql.delta.util.FileNames +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule import io.delta.tables.{DeltaTable => IODeltaTable} import org.apache.hadoop.fs.FileStatus import org.apache.hadoop.fs.Path @@ -171,6 +174,29 @@ trait DeltaTestUtilsBase { } } + /** + * Remove protocol and metadata fields from checksum file of json format + */ + def removeProtocolAndMetadataFromChecksumFile(checksumFilePath : Path): Unit = { + // scalastyle:off deltahadoopconfiguration + val fs = checksumFilePath.getFileSystem( + SparkSession.getActiveSession.map(_.sessionState.newHadoopConf()).get + ) + // scalastyle:on deltahadoopconfiguration + if (!fs.exists(checksumFilePath)) return + val stream = fs.open(checksumFilePath) + val reader = new BufferedReader(new InputStreamReader(stream, UTF_8)) + val content = reader.readLine() + stream.close() + val mapper = new ObjectMapper() + mapper.registerModule(DefaultScalaModule) + val map = mapper.readValue(content, classOf[Map[String, String]]) + val partialContent = mapper.writeValueAsString(map.-("protocol").-("metadata")) + "\n" + val output = fs.create(checksumFilePath, true) + output.write(partialContent.getBytes(UTF_8)) + output.close() + } + protected def getfindTouchedFilesJobPlans(plans: Seq[Plans]): SparkPlan = { // The expected plan for touched file computation is of the format below. // The data column should be pruned from both leaves. diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/InCommitTimestampSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/InCommitTimestampSuite.scala index c981672ffca..8ff40dc40d6 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/InCommitTimestampSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/InCommitTimestampSuite.scala @@ -179,6 +179,8 @@ class InCommitTimestampSuite } test("Missing CommitInfo should result in a DELTA_MISSING_COMMIT_INFO exception") { + // Make sure that we don't retrieve the time from the CRC. + withSQLConf(DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key -> "false") { withTempDir { tempDir => spark.range(10).write.format("delta").save(tempDir.getAbsolutePath) val deltaLog = @@ -206,10 +208,13 @@ class InCommitTimestampSuite "featureName" -> InCommitTimestampTableFeature.name, "version" -> "1")) } + } } test("Missing CommitInfo.commitTimestamp should result in a " + "DELTA_MISSING_COMMIT_TIMESTAMP exception") { + // Make sure that we don't retrieve the time from the CRC. + withSQLConf(DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key -> "false") { withTempDir { tempDir => spark.range(10).write.format("delta").save(tempDir.getAbsolutePath) val deltaLog = @@ -243,6 +248,7 @@ class InCommitTimestampSuite "DELTA_MISSING_COMMIT_TIMESTAMP", parameters = Map("featureName" -> InCommitTimestampTableFeature.name, "version" -> "1")) } + } } test("InCommitTimestamp is equal to snapshot.timestamp") { @@ -395,8 +401,28 @@ class InCommitTimestampSuite } } + test("snapshot.timestamp should be read from the CRC") { + withTempDir { tempDir => + var deltaLog: DeltaLog = null + var timestamp = -1L + val usageRecords = Log4jUsageLogger.track { + spark.range(1).write.format("delta").save(tempDir.getAbsolutePath) + DeltaLog.clearCache() // Clear the post-commit snapshot from the cache. + deltaLog = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath)) + val fs = deltaLog.logPath.getFileSystem(deltaLog.newDeltaHadoopConf()) + assert(fs.exists(FileNames.checksumFile(deltaLog.logPath, 0))) + timestamp = deltaLog.snapshot.timestamp + } + assert(timestamp == getInCommitTimestamp(deltaLog, 0)) + // No explicit read. + assert(filterUsageRecords(usageRecords, "delta.inCommitTimestamp.read").isEmpty) + } + } + test("postCommitSnapshot.timestamp should be populated by protocolMetadataAndICTReconstruction " + "when the table has no checkpoints") { + // Make sure that we don't retrieve the time from the CRC. + withSQLConf(DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key -> "false") { withTempDir { tempDir => var deltaLog: DeltaLog = null var timestamp = -1L @@ -410,10 +436,13 @@ class InCommitTimestampSuite // No explicit read. assert(filterUsageRecords(usageRecords, "delta.inCommitTimestamp.read").isEmpty) } + } } test("snapshot.timestamp should be populated by protocolMetadataAndICTReconstruction " + "during cold reads of checkpoints + deltas") { + // Make sure that we don't retrieve the time from the CRC. + withSQLConf(DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key -> "false") { withTempDir { tempDir => var deltaLog: DeltaLog = null var timestamp = -1L @@ -434,10 +463,13 @@ class InCommitTimestampSuite // No explicit read. assert(filterUsageRecords(usageRecords, "delta.inCommitTimestamp.read").isEmpty) } + } } test("snapshot.timestamp cannot be populated by protocolMetadataAndICTReconstruction " + "during cold reads of checkpoints") { + // Make sure that we don't retrieve the time from the CRC. + withSQLConf(DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key -> "false") { withTempDir { tempDir => var deltaLog: DeltaLog = null var timestamp = -1L @@ -452,9 +484,37 @@ class InCommitTimestampSuite assert(timestamp == getInCommitTimestamp(deltaLog, 0)) assert(filterUsageRecords(usageRecords, "delta.inCommitTimestamp.read").length == 1) } + } + } + + test("snapshot.timestamp is read from file when CRC doesn't have ICT and " + + "the latest version has a checkpoint") { + withTempDir { tempDir => + var deltaLog: DeltaLog = null + var timestamp = -1L + spark.range(1).write.format("delta").save(tempDir.getAbsolutePath) + deltaLog = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath)) + deltaLog.createCheckpointAtVersion(0) + // Remove the ICT from the CRC. + InCommitTimestampTestUtils.overwriteICTInCrc(deltaLog, 0, None) + val usageRecords = Log4jUsageLogger.track { + DeltaLog.clearCache() // Clear the post-commit snapshot from the cache. + deltaLog = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath)) + timestamp = deltaLog.snapshot.timestamp + } + assert(deltaLog.snapshot.checkpointProvider.version == 0) + assert(timestamp == getInCommitTimestamp(deltaLog, 0)) + val ictReadLog = filterUsageRecords(usageRecords, "delta.inCommitTimestamp.read").head + val blob = JsonUtils.fromJson[Map[String, String]](ictReadLog.blob) + assert(blob("version") == "0") + assert(blob("checkpointVersion") == "0") + assert(blob("isCRCPresent") == "true") + } } test("Exceptions during ICT reads from file should be logged") { + // Make sure that we don't retrieve the time from the CRC. + withSQLConf(DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key -> "false") { withTempDir { tempDir => spark.range(10).write.format("delta").save(tempDir.getAbsolutePath) val deltaLog = @@ -486,6 +546,7 @@ class InCommitTimestampSuite assert(blob("exceptionMessage").startsWith("[DELTA_MISSING_COMMIT_INFO]")) assert(blob("exceptionStackTrace").contains(Snapshot.getClass.getName.stripSuffix("$"))) } + } } test("DeltaHistoryManager.getActiveCommitAtTimeFromICTRange") { diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/SnapshotManagementSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/SnapshotManagementSuite.scala index ebdcda11cc8..b0a6732e198 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/SnapshotManagementSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/SnapshotManagementSuite.scala @@ -49,6 +49,12 @@ import org.apache.spark.storage.StorageLevel class SnapshotManagementSuite extends QueryTest with DeltaSQLTestUtils with SharedSparkSession with DeltaSQLCommandTest with CoordinatedCommitsBaseSuite { + protected override def sparkConf = { + // Disable loading protocol and metadata from checksum file. Otherwise, creating a Snapshot + // won't touch the checkpoint file and we won't be able to retry. + super.sparkConf + .set(DeltaSQLConf.USE_PROTOCOL_AND_METADATA_FROM_CHECKSUM_ENABLED.key, "false") + } /** * Truncate an existing checkpoint file to create a corrupt file. From 83ad1494c04855f737655f4bcaa71c58b9813e5e Mon Sep 17 00:00:00 2001 From: Dhruv Arya Date: Thu, 5 Dec 2024 02:40:00 +0530 Subject: [PATCH 2/7] fix test --- .../main/scala/org/apache/spark/sql/delta/Checksum.scala | 7 ++----- .../main/scala/org/apache/spark/sql/delta/Snapshot.scala | 4 +++- .../scala/org/apache/spark/sql/delta/ChecksumSuite.scala | 5 ++++- .../apache/spark/sql/delta/InCommitTimestampSuite.scala | 2 ++ 4 files changed, 11 insertions(+), 7 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/Checksum.scala b/spark/src/main/scala/org/apache/spark/sql/delta/Checksum.scala index d2bf124606f..150b57c0376 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/Checksum.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/Checksum.scala @@ -795,11 +795,8 @@ trait ValidateChecksum extends DeltaLogging { self: Snapshot => } } - if (spark.sessionState.conf.getConf( - DeltaSQLConf.USE_PROTOCOL_AND_METADATA_FROM_CHECKSUM_ENABLED)) { - compareAction(checksum.metadata, computedStateToCheckAgainst.metadata, "Metadata", "metadata") - compareAction(checksum.protocol, computedStateToCheckAgainst.protocol, "Protocol", "protocol") - } + compareAction(checksum.metadata, computedStateToCheckAgainst.metadata, "Metadata", "metadata") + compareAction(checksum.protocol, computedStateToCheckAgainst.protocol, "Protocol", "protocol") compare( checksum.tableSizeBytes, computedStateToCheckAgainst.sizeInBytes, diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala b/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala index 62aec833a9c..9a93fdb05fd 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala @@ -147,7 +147,9 @@ class Snapshot( "checkpointVersion" -> logSegment.checkpointProvider.version, "durationMs" -> (System.currentTimeMillis() - startTime), "exceptionMessage" -> exception.map(_.getMessage).getOrElse(""), - "exceptionStackTrace" -> exception.map(_.getStackTrace.mkString("\n")).getOrElse("") + "exceptionStackTrace" -> + exception.map(_.getStackTrace.mkString("\n")).getOrElse(""), + "isCRCPresent" -> checksumOpt.isDefined ) ) } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/ChecksumSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/ChecksumSuite.scala index 0ab4479e255..0d89ae989f7 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/ChecksumSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/ChecksumSuite.scala @@ -150,7 +150,10 @@ class ChecksumSuite test("Checksum validation should happen on checkpoint") { withSQLConf( DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key -> "true", - DeltaSQLConf.INCREMENTAL_COMMIT_ENABLED.key -> "true" + DeltaSQLConf.INCREMENTAL_COMMIT_ENABLED.key -> "true", + // Disabled for this test because with it enabled, a corrupted Protocol + // or Metadata will trigger a failure earlier than the full validation. + DeltaSQLConf.USE_PROTOCOL_AND_METADATA_FROM_CHECKSUM_ENABLED.key -> "false" ) { withTempDir { tempDir => spark.range(10).write.format("delta").save(tempDir.getCanonicalPath) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/InCommitTimestampSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/InCommitTimestampSuite.scala index 8ff40dc40d6..acc3b7119fa 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/InCommitTimestampSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/InCommitTimestampSuite.scala @@ -51,6 +51,8 @@ class InCommitTimestampSuite override def beforeAll(): Unit = { super.beforeAll() spark.conf.set(DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED.defaultTablePropertyKey, "true") + spark.conf.set(DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key, "true") + spark.conf.set(DeltaSQLConf.INCREMENTAL_COMMIT_ENABLED.key, "true") } test("Enable ICT on commit 0") { From ad55adbebe4a61a7bda3d64a776513e3f732f282 Mon Sep 17 00:00:00 2001 From: Dhruv Arya Date: Thu, 5 Dec 2024 02:46:49 +0530 Subject: [PATCH 3/7] fix indentation --- .../sql/delta/InCommitTimestampSuite.scala | 262 +++++++++--------- 1 file changed, 132 insertions(+), 130 deletions(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/InCommitTimestampSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/InCommitTimestampSuite.scala index acc3b7119fa..5597f41042e 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/InCommitTimestampSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/InCommitTimestampSuite.scala @@ -183,33 +183,34 @@ class InCommitTimestampSuite test("Missing CommitInfo should result in a DELTA_MISSING_COMMIT_INFO exception") { // Make sure that we don't retrieve the time from the CRC. withSQLConf(DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key -> "false") { - withTempDir { tempDir => - spark.range(10).write.format("delta").save(tempDir.getAbsolutePath) - val deltaLog = - DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath)) - deltaLog.startTransaction().commit(Seq(createTestAddFile("1")), ManualUpdate) - // Remove CommitInfo from the commit. - val commit1Path = DeltaCommitFileProvider(deltaLog.unsafeVolatileSnapshot).deltaFile(1) - val actions = deltaLog.store.readAsIterator(commit1Path, deltaLog.newDeltaHadoopConf()) - val actionsWithoutCommitInfo = actions.filterNot(Action.fromJson(_).isInstanceOf[CommitInfo]) - deltaLog.store.write( - commit1Path, - actionsWithoutCommitInfo, - overwrite = true, - deltaLog.newDeltaHadoopConf()) - - DeltaLog.clearCache() - val latestSnapshot = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath)).snapshot - val e = intercept[DeltaIllegalStateException] { - latestSnapshot.timestamp + withTempDir { tempDir => + spark.range(10).write.format("delta").save(tempDir.getAbsolutePath) + val deltaLog = + DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath)) + deltaLog.startTransaction().commit(Seq(createTestAddFile("1")), ManualUpdate) + // Remove CommitInfo from the commit. + val commit1Path = DeltaCommitFileProvider(deltaLog.unsafeVolatileSnapshot).deltaFile(1) + val actions = deltaLog.store.readAsIterator(commit1Path, deltaLog.newDeltaHadoopConf()) + val actionsWithoutCommitInfo = + actions.filterNot(Action.fromJson(_).isInstanceOf[CommitInfo]) + deltaLog.store.write( + commit1Path, + actionsWithoutCommitInfo, + overwrite = true, + deltaLog.newDeltaHadoopConf()) + + DeltaLog.clearCache() + val latestSnapshot = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath)).snapshot + val e = intercept[DeltaIllegalStateException] { + latestSnapshot.timestamp + } + checkError( + e, + "DELTA_MISSING_COMMIT_INFO", + parameters = Map( + "featureName" -> InCommitTimestampTableFeature.name, + "version" -> "1")) } - checkError( - e, - "DELTA_MISSING_COMMIT_INFO", - parameters = Map( - "featureName" -> InCommitTimestampTableFeature.name, - "version" -> "1")) - } } } @@ -217,39 +218,39 @@ class InCommitTimestampSuite "DELTA_MISSING_COMMIT_TIMESTAMP exception") { // Make sure that we don't retrieve the time from the CRC. withSQLConf(DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key -> "false") { - withTempDir { tempDir => - spark.range(10).write.format("delta").save(tempDir.getAbsolutePath) - val deltaLog = - DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath)) - deltaLog.startTransaction().commit(Seq(createTestAddFile("1")), ManualUpdate) - // Remove CommitInfo.commitTimestamp from the commit. - val commit1Path = DeltaCommitFileProvider(deltaLog.unsafeVolatileSnapshot).deltaFile(1) - val actions = deltaLog.store.readAsIterator( - commit1Path, - deltaLog.newDeltaHadoopConf()).toList - val actionsWithoutCommitInfoCommitTimestamp = - actions.map(Action.fromJson).map { - case ci: CommitInfo => - ci.copy(inCommitTimestamp = None).json - case other => - other.json - }.toIterator - deltaLog.store.write( - commit1Path, - actionsWithoutCommitInfoCommitTimestamp, - overwrite = true, - deltaLog.newDeltaHadoopConf()) - - DeltaLog.clearCache() - val latestSnapshot = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath)).snapshot - val e = intercept[DeltaIllegalStateException] { - latestSnapshot.timestamp + withTempDir { tempDir => + spark.range(10).write.format("delta").save(tempDir.getAbsolutePath) + val deltaLog = + DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath)) + deltaLog.startTransaction().commit(Seq(createTestAddFile("1")), ManualUpdate) + // Remove CommitInfo.commitTimestamp from the commit. + val commit1Path = DeltaCommitFileProvider(deltaLog.unsafeVolatileSnapshot).deltaFile(1) + val actions = deltaLog.store.readAsIterator( + commit1Path, + deltaLog.newDeltaHadoopConf()).toList + val actionsWithoutCommitInfoCommitTimestamp = + actions.map(Action.fromJson).map { + case ci: CommitInfo => + ci.copy(inCommitTimestamp = None).json + case other => + other.json + }.toIterator + deltaLog.store.write( + commit1Path, + actionsWithoutCommitInfoCommitTimestamp, + overwrite = true, + deltaLog.newDeltaHadoopConf()) + + DeltaLog.clearCache() + val latestSnapshot = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath)).snapshot + val e = intercept[DeltaIllegalStateException] { + latestSnapshot.timestamp + } + checkError( + e, + "DELTA_MISSING_COMMIT_TIMESTAMP", + parameters = Map("featureName" -> InCommitTimestampTableFeature.name, "version" -> "1")) } - checkError( - e, - "DELTA_MISSING_COMMIT_TIMESTAMP", - parameters = Map("featureName" -> InCommitTimestampTableFeature.name, "version" -> "1")) - } } } @@ -425,19 +426,19 @@ class InCommitTimestampSuite "when the table has no checkpoints") { // Make sure that we don't retrieve the time from the CRC. withSQLConf(DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key -> "false") { - withTempDir { tempDir => - var deltaLog: DeltaLog = null - var timestamp = -1L - spark.range(1).write.format("delta").save(tempDir.getAbsolutePath) - DeltaLog.clearCache() - val usageRecords = Log4jUsageLogger.track { - deltaLog = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath)) - timestamp = deltaLog.snapshot.timestamp + withTempDir { tempDir => + var deltaLog: DeltaLog = null + var timestamp = -1L + spark.range(1).write.format("delta").save(tempDir.getAbsolutePath) + DeltaLog.clearCache() + val usageRecords = Log4jUsageLogger.track { + deltaLog = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath)) + timestamp = deltaLog.snapshot.timestamp + } + assert(timestamp == getInCommitTimestamp(deltaLog, 0)) + // No explicit read. + assert(filterUsageRecords(usageRecords, "delta.inCommitTimestamp.read").isEmpty) } - assert(timestamp == getInCommitTimestamp(deltaLog, 0)) - // No explicit read. - assert(filterUsageRecords(usageRecords, "delta.inCommitTimestamp.read").isEmpty) - } } } @@ -445,26 +446,26 @@ class InCommitTimestampSuite "during cold reads of checkpoints + deltas") { // Make sure that we don't retrieve the time from the CRC. withSQLConf(DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key -> "false") { - withTempDir { tempDir => - var deltaLog: DeltaLog = null - var timestamp = -1L - spark.range(1).write.format("delta").save(tempDir.getAbsolutePath) - deltaLog = DeltaLog - .forTable(spark, new Path(tempDir.getCanonicalPath)) - deltaLog.createCheckpointAtVersion(0) - deltaLog.startTransaction().commit(Seq(createTestAddFile("c1")), ManualUpdate) + withTempDir { tempDir => + var deltaLog: DeltaLog = null + var timestamp = -1L + spark.range(1).write.format("delta").save(tempDir.getAbsolutePath) + deltaLog = DeltaLog + .forTable(spark, new Path(tempDir.getCanonicalPath)) + deltaLog.createCheckpointAtVersion(0) + deltaLog.startTransaction().commit(Seq(createTestAddFile("c1")), ManualUpdate) - val usageRecords = Log4jUsageLogger.track { - DeltaLog.clearCache() // Clear the post-commit snapshot from the cache. - deltaLog = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath)) - timestamp = deltaLog.snapshot.timestamp + val usageRecords = Log4jUsageLogger.track { + DeltaLog.clearCache() // Clear the post-commit snapshot from the cache. + deltaLog = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath)) + timestamp = deltaLog.snapshot.timestamp + } + assert(deltaLog.snapshot.checkpointProvider.version == 0) + assert(deltaLog.snapshot.version == 1) + assert(timestamp == getInCommitTimestamp(deltaLog, 1)) + // No explicit read. + assert(filterUsageRecords(usageRecords, "delta.inCommitTimestamp.read").isEmpty) } - assert(deltaLog.snapshot.checkpointProvider.version == 0) - assert(deltaLog.snapshot.version == 1) - assert(timestamp == getInCommitTimestamp(deltaLog, 1)) - // No explicit read. - assert(filterUsageRecords(usageRecords, "delta.inCommitTimestamp.read").isEmpty) - } } } @@ -472,20 +473,20 @@ class InCommitTimestampSuite "during cold reads of checkpoints") { // Make sure that we don't retrieve the time from the CRC. withSQLConf(DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key -> "false") { - withTempDir { tempDir => - var deltaLog: DeltaLog = null - var timestamp = -1L - spark.range(1).write.format("delta").save(tempDir.getAbsolutePath) - DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath)).createCheckpointAtVersion(0) - val usageRecords = Log4jUsageLogger.track { - DeltaLog.clearCache() // Clear the post-commit snapshot from the cache. - deltaLog = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath)) - timestamp = deltaLog.snapshot.timestamp + withTempDir { tempDir => + var deltaLog: DeltaLog = null + var timestamp = -1L + spark.range(1).write.format("delta").save(tempDir.getAbsolutePath) + DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath)).createCheckpointAtVersion(0) + val usageRecords = Log4jUsageLogger.track { + DeltaLog.clearCache() // Clear the post-commit snapshot from the cache. + deltaLog = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath)) + timestamp = deltaLog.snapshot.timestamp + } + assert(deltaLog.snapshot.checkpointProvider.version == 0) + assert(timestamp == getInCommitTimestamp(deltaLog, 0)) + assert(filterUsageRecords(usageRecords, "delta.inCommitTimestamp.read").length == 1) } - assert(deltaLog.snapshot.checkpointProvider.version == 0) - assert(timestamp == getInCommitTimestamp(deltaLog, 0)) - assert(filterUsageRecords(usageRecords, "delta.inCommitTimestamp.read").length == 1) - } } } @@ -517,37 +518,38 @@ class InCommitTimestampSuite test("Exceptions during ICT reads from file should be logged") { // Make sure that we don't retrieve the time from the CRC. withSQLConf(DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key -> "false") { - withTempDir { tempDir => - spark.range(10).write.format("delta").save(tempDir.getAbsolutePath) - val deltaLog = - DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath)) - deltaLog.startTransaction().commit(Seq(createTestAddFile("1")), ManualUpdate) - // Remove CommitInfo from the commit. - val commit1Path = DeltaCommitFileProvider(deltaLog.unsafeVolatileSnapshot).deltaFile(1) - val actions = deltaLog.store.readAsIterator(commit1Path, deltaLog.newDeltaHadoopConf()) - val actionsWithoutCommitInfo = actions.filterNot(Action.fromJson(_).isInstanceOf[CommitInfo]) - deltaLog.store.write( - commit1Path, - actionsWithoutCommitInfo, - overwrite = true, - deltaLog.newDeltaHadoopConf()) - - DeltaLog.clearCache() - val latestSnapshot = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath)).snapshot - val usageRecords = Log4jUsageLogger.track { - try { - latestSnapshot.timestamp - } catch { - case _ : DeltaIllegalStateException => () + withTempDir { tempDir => + spark.range(10).write.format("delta").save(tempDir.getAbsolutePath) + val deltaLog = + DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath)) + deltaLog.startTransaction().commit(Seq(createTestAddFile("1")), ManualUpdate) + // Remove CommitInfo from the commit. + val commit1Path = DeltaCommitFileProvider(deltaLog.unsafeVolatileSnapshot).deltaFile(1) + val actions = deltaLog.store.readAsIterator(commit1Path, deltaLog.newDeltaHadoopConf()) + val actionsWithoutCommitInfo = + actions.filterNot(Action.fromJson(_).isInstanceOf[CommitInfo]) + deltaLog.store.write( + commit1Path, + actionsWithoutCommitInfo, + overwrite = true, + deltaLog.newDeltaHadoopConf()) + + DeltaLog.clearCache() + val latestSnapshot = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath)).snapshot + val usageRecords = Log4jUsageLogger.track { + try { + latestSnapshot.timestamp + } catch { + case _ : DeltaIllegalStateException => () + } } + val ictReadLog = filterUsageRecords(usageRecords, "delta.inCommitTimestamp.read").head + val blob = JsonUtils.fromJson[Map[String, String]](ictReadLog.blob) + assert(blob("version") == "1") + assert(blob("checkpointVersion") == "-1") + assert(blob("exceptionMessage").startsWith("[DELTA_MISSING_COMMIT_INFO]")) + assert(blob("exceptionStackTrace").contains(Snapshot.getClass.getName.stripSuffix("$"))) } - val ictReadLog = filterUsageRecords(usageRecords, "delta.inCommitTimestamp.read").head - val blob = JsonUtils.fromJson[Map[String, String]](ictReadLog.blob) - assert(blob("version") == "1") - assert(blob("checkpointVersion") == "-1") - assert(blob("exceptionMessage").startsWith("[DELTA_MISSING_COMMIT_INFO]")) - assert(blob("exceptionStackTrace").contains(Snapshot.getClass.getName.stripSuffix("$"))) - } } } From 2baa9b73b3578a10b4ed830272d6c3e08e761977 Mon Sep 17 00:00:00 2001 From: Dhruv Arya Date: Thu, 5 Dec 2024 05:15:01 +0530 Subject: [PATCH 4/7] fix test --- .../org/apache/spark/sql/delta/DeltaAllFilesInCrcSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaAllFilesInCrcSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaAllFilesInCrcSuite.scala index 1e95156fe3e..29594434e4f 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaAllFilesInCrcSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaAllFilesInCrcSuite.scala @@ -198,6 +198,7 @@ class DeltaAllFilesInCrcSuite // We will see all files in CRC verification failure. // This will trigger the incremental commit verification which will fail. + assert(filterUsageRecords(records, "delta.assertions.mismatchedAction").size === 1) val allFilesInCrcValidationFailureRecords = filterUsageRecords(records, "delta.allFilesInCrc.checksumMismatch.differentAllFiles") assert(allFilesInCrcValidationFailureRecords.size === 1) @@ -210,9 +211,8 @@ class DeltaAllFilesInCrcSuite assert(eventData("filesCountFromStateReconstruction").toLong === expectedFilesCountFromCrc + 1) assert(eventData("incrementalCommitCrcValidationPassed").toBoolean === false) - val expectedValidationFailureMessage = "Number of files - Expected: 1 Computed: 2" assert(eventData("errorForIncrementalCommitCrcValidation").contains( - expectedValidationFailureMessage)) + "The metadata of your Delta table could not be recovered")) } } } From 4a160cf2026b74ef0a7c3a4d649b7baabf6f828b Mon Sep 17 00:00:00 2001 From: Dhruv Arya Date: Thu, 5 Dec 2024 09:28:11 +0530 Subject: [PATCH 5/7] fix deltalogsuite --- .../test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala index e8c9e31f939..d07eeb9eff8 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala @@ -411,6 +411,9 @@ class DeltaLogSuite extends QueryTest } } + val checksumFilePath = FileNames.checksumFile(log.logPath, log.snapshot.version) + removeProtocolAndMetadataFromChecksumFile(checksumFilePath) + { // Create an incomplete checkpoint without the action and overwrite the // original checkpoint From b7f4845c761484fdfad9b600820cc130a58302df Mon Sep 17 00:00:00 2001 From: Dhruv Arya Date: Thu, 5 Dec 2024 21:20:15 +0530 Subject: [PATCH 6/7] make folder name random --- .../apache/spark/sql/delta/CheckpointsSuite.scala | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/CheckpointsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/CheckpointsSuite.scala index 27a4855f60e..cdae5bc8a45 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/CheckpointsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/CheckpointsSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.delta import java.io.File import java.net.URI +import java.util.UUID import scala.concurrent.duration._ @@ -557,7 +558,8 @@ class CheckpointsSuite for (v2Checkpoint <- Seq(true, false)) withTempDir { tempDir => val source = new File(DeletionVectorsSuite.table1Path) // this table has DVs in two versions - val target = new File(tempDir, "insertTest") + val targetName = s"insertTest_${UUID.randomUUID().toString.replace("-", "")}" + val target = new File(tempDir, targetName) // Copy the source2 DV table to a temporary directory, so that we do updates to it FileUtils.copyDirectory(source, target) @@ -926,7 +928,13 @@ class CheckpointsSuite for (lastCheckpointMissing <- BOOLEAN_DOMAIN) testDifferentCheckpoints("intermittent error while reading checkpoint should not" + s" stick to snapshot [lastCheckpointMissing: $lastCheckpointMissing]") { (_, _) => - withTempDir { tempDir => checkIntermittentError(tempDir, lastCheckpointMissing) } + withTempDir { tempDir => + withSQLConf( + DeltaSQLConf.DELTA_ALL_FILES_IN_CRC_ENABLED.key -> "false" + ) { + checkIntermittentError(tempDir, lastCheckpointMissing) + } + } } test("validate metadata cleanup is not called with createCheckpointAtVersion API") { From 203bdf22f717438ff79ceaf167ce606487c23f35 Mon Sep 17 00:00:00 2001 From: Dhruv Arya Date: Thu, 5 Dec 2024 23:58:19 +0530 Subject: [PATCH 7/7] fix --- .../spark/sql/delta/CheckpointsSuite.scala | 39 ++++++++++++++++++- 1 file changed, 37 insertions(+), 2 deletions(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/CheckpointsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/CheckpointsSuite.scala index cdae5bc8a45..9983beb1888 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/CheckpointsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/CheckpointsSuite.scala @@ -769,7 +769,8 @@ class CheckpointsSuite } } - def checkIntermittentError(tempDir: File, lastCheckpointMissing: Boolean): Unit = { + def checkIntermittentError( + tempDir: File, lastCheckpointMissing: Boolean, crcMissing: Boolean): Unit = { // Create a table with commit version 0, 1 and a checkpoint. val tablePath = tempDir.getAbsolutePath spark.range(10).write.format("delta").save(tablePath) @@ -785,6 +786,13 @@ class CheckpointsSuite if (lastCheckpointMissing) { fs.delete(log.LAST_CHECKPOINT) } + // Delete CRC file based on test configuration. + if (crcMissing) { + // Delete all CRC files + (0L to log.update().version).foreach { version => + fs.delete(FileNames.checksumFile(log.logPath, version)) + } + } // In order to trigger an intermittent failure while reading checkpoint, this test corrupts // the checkpoint temporarily so that json/parquet checkpoint reader fails. The corrupted @@ -806,9 +814,36 @@ class CheckpointsSuite assert(fs.getFileStatus(tempPath).getLen === checkpointFileStatus.getLen) DeltaLog.clearCache() + if (!crcMissing) { + // When CRC is present, then P&M will be taken from CRC and snapshot will be initialized + // without needing a checkpoint. But the underlying checkpoint provider points to a + // corrupted checkpoint and so any query/state reconstruction on this will fail. + intercept[Exception] { + sql(s"SELECT * FROM delta.`$tablePath`").collect() + DeltaLog.forTable(spark, tablePath).unsafeVolatileSnapshot.validateChecksum() + } + val snapshot = DeltaLog.forTable(spark, tablePath).unsafeVolatileSnapshot + intercept[Exception] { + snapshot.allFiles.collect() + } + // Undo the corruption + assert(fs.delete(checkpointFileStatus.getPath, true)) + assert(fs.rename(tempPath, checkpointFileStatus.getPath)) + + // Once the corruption in undone, then the queries starts passing on top of same snapshot. + // This tests that we have not caches the intermittent error in the underlying checkpoint + // provider. + sql(s"SELECT * FROM delta.`$tablePath`").collect() + assert(DeltaLog.forTable(spark, tablePath).update() === snapshot) + return + } + // When CRC is missing, then P&M will be taken from checkpoint which is temporarily + // corrupted, so we will end up creating a new snapshot without using checkpoint and the + // query will succeed. sql(s"SELECT * FROM delta.`$tablePath`").collect() val snapshot = DeltaLog.forTable(spark, tablePath).unsafeVolatileSnapshot snapshot.computeChecksum + snapshot.validateChecksum() assert(snapshot.checkpointProvider.isEmpty) } @@ -932,7 +967,7 @@ class CheckpointsSuite withSQLConf( DeltaSQLConf.DELTA_ALL_FILES_IN_CRC_ENABLED.key -> "false" ) { - checkIntermittentError(tempDir, lastCheckpointMissing) + checkIntermittentError(tempDir, lastCheckpointMissing, crcMissing = true) } } }