From 8fb17a0160a937307d6fb9276a77403aeb7efc63 Mon Sep 17 00:00:00 2001 From: Dhruv Arya Date: Thu, 5 Dec 2024 16:58:21 -0800 Subject: [PATCH] [Spark][Version Checksum] Read Protocol, Metadata, and ICT directly from the Checksum during Snapshot construction (#3920) #### Which Delta project/connector is this regarding? - [X] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description Stacked over https://github.com/delta-io/delta/pull/3907. This PR makes the Checksum (if available) the source of truth for Protocol, Metadata, ICT during snapshot construction. This helps us avoid a Spark query and improves performance. ## How was this patch tested? Added some test cases to existing suites ## Does this PR introduce _any_ user-facing changes? No --- .../org/apache/spark/sql/delta/Snapshot.scala | 43 ++- .../sql/delta/logging/DeltaLogKeys.scala | 1 + .../sql/delta/sources/DeltaSQLConf.scala | 9 + .../spark/sql/delta/CheckpointsSuite.scala | 49 ++- .../spark/sql/delta/ChecksumSuite.scala | 8 +- .../sql/delta/DeltaAllFilesInCrcSuite.scala | 5 +- .../spark/sql/delta/DeltaLogSuite.scala | 59 ++++ .../spark/sql/delta/DeltaTestUtils.scala | 28 +- .../sql/delta/InCommitTimestampSuite.scala | 279 +++++++++++------- .../sql/delta/SnapshotManagementSuite.scala | 6 + 10 files changed, 371 insertions(+), 116 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala b/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala index 8340b4aaab7..9a93fdb05fd 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala @@ -147,7 +147,9 @@ class Snapshot( "checkpointVersion" -> logSegment.checkpointProvider.version, "durationMs" -> (System.currentTimeMillis() - startTime), "exceptionMessage" -> exception.map(_.getMessage).getOrElse(""), - "exceptionStackTrace" -> exception.map(_.getStackTrace.mkString("\n")).getOrElse("") + "exceptionStackTrace" -> + exception.map(_.getStackTrace.mkString("\n")).getOrElse(""), + "isCRCPresent" -> checksumOpt.isDefined ) ) } @@ -377,6 +379,41 @@ class Snapshot( override def protocol: Protocol = _reconstructedProtocolMetadataAndICT.protocol + /** + * Tries to retrieve the protocol, metadata, and in-commit-timestamp (if needed) from the + * checksum file. If the checksum file is not present or if the protocol or metadata is missing + * this will return None. + */ + protected def getProtocolMetadataAndIctFromCrc(): + Option[Array[ReconstructedProtocolMetadataAndICT]] = { + if (!spark.sessionState.conf.getConf( + DeltaSQLConf.USE_PROTOCOL_AND_METADATA_FROM_CHECKSUM_ENABLED)) { + return None + } + checksumOpt.map(c => (c.protocol, c.metadata, c.inCommitTimestampOpt)).flatMap { + case (p: Protocol, m: Metadata, ict: Option[Long]) => + Some(Array((p, null, None), (null, m, None), (null, null, ict)) + .map(ReconstructedProtocolMetadataAndICT.tupled)) + + case (p, m, _) if p != null || m != null => + // One was missing from the .crc file... warn and fall back to an optimized query + val protocolStr = Option(p).map(_.toString).getOrElse("null") + val metadataStr = Option(m).map(_.toString).getOrElse("null") + recordDeltaEvent( + deltaLog, + opType = "delta.assertions.missingEitherProtocolOrMetadataFromChecksum", + data = Map( + "version" -> version.toString, "protocol" -> protocolStr, "source" -> metadataStr)) + logWarning(log"Either protocol or metadata is null from checksum; " + + log"version:${MDC(DeltaLogKeys.VERSION, version)} " + + log"protocol:${MDC(DeltaLogKeys.PROTOCOL, protocolStr)} " + + log"metadata:${MDC(DeltaLogKeys.DELTA_METADATA, metadataStr)}") + None + + case _ => None // both missing... fall back to an optimized query + } + } + /** * Pulls the protocol and metadata of the table from the files that are used to compute the * Snapshot directly--without triggering a full state reconstruction. This is important, because @@ -394,6 +431,10 @@ class Snapshot( Array[ReconstructedProtocolMetadataAndICT] = { import implicits._ + getProtocolMetadataAndIctFromCrc().foreach { protocolMetadataAndIctFromCrc => + return protocolMetadataAndIctFromCrc + } + val schemaToUse = Action.logSchema(Set("protocol", "metaData", "commitInfo")) val checkpointOpt = checkpointProvider.topLevelFileIndex.map { index => deltaLog.loadIndex(index, schemaToUse) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/logging/DeltaLogKeys.scala b/spark/src/main/scala/org/apache/spark/sql/delta/logging/DeltaLogKeys.scala index 1b63dfdd39a..ad48fad8dd9 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/logging/DeltaLogKeys.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/logging/DeltaLogKeys.scala @@ -57,6 +57,7 @@ trait DeltaLogKeysBase { case object DATA_FILTER extends LogKeyShims case object DATE extends LogKeyShims case object DELTA_COMMIT_INFO extends LogKeyShims + case object DELTA_METADATA extends LogKeyShims case object DIR extends LogKeyShims case object DURATION extends LogKeyShims case object END_INDEX extends LogKeyShims diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index 1c46473a56b..c1630fd5230 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -1144,6 +1144,15 @@ trait DeltaSQLConfBase { .intConf .createOptional + val USE_PROTOCOL_AND_METADATA_FROM_CHECKSUM_ENABLED = + buildConf("readProtocolAndMetadataFromChecksum.enabled") + .internal() + .doc("If enabled, delta log snapshot will read the protocol, metadata, and ICT " + + "(if applicable) from the checksum file and use those to avoid a spark job over the " + + "checkpoint for the two rows of protocol and metadata") + .booleanConf + .createWithDefault(true) + val DELTA_CHECKPOINT_THROW_EXCEPTION_WHEN_FAILED = buildConf("checkpoint.exceptionThrowing.enabled") .internal() diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/CheckpointsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/CheckpointsSuite.scala index 27a4855f60e..9983beb1888 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/CheckpointsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/CheckpointsSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.delta import java.io.File import java.net.URI +import java.util.UUID import scala.concurrent.duration._ @@ -557,7 +558,8 @@ class CheckpointsSuite for (v2Checkpoint <- Seq(true, false)) withTempDir { tempDir => val source = new File(DeletionVectorsSuite.table1Path) // this table has DVs in two versions - val target = new File(tempDir, "insertTest") + val targetName = s"insertTest_${UUID.randomUUID().toString.replace("-", "")}" + val target = new File(tempDir, targetName) // Copy the source2 DV table to a temporary directory, so that we do updates to it FileUtils.copyDirectory(source, target) @@ -767,7 +769,8 @@ class CheckpointsSuite } } - def checkIntermittentError(tempDir: File, lastCheckpointMissing: Boolean): Unit = { + def checkIntermittentError( + tempDir: File, lastCheckpointMissing: Boolean, crcMissing: Boolean): Unit = { // Create a table with commit version 0, 1 and a checkpoint. val tablePath = tempDir.getAbsolutePath spark.range(10).write.format("delta").save(tablePath) @@ -783,6 +786,13 @@ class CheckpointsSuite if (lastCheckpointMissing) { fs.delete(log.LAST_CHECKPOINT) } + // Delete CRC file based on test configuration. + if (crcMissing) { + // Delete all CRC files + (0L to log.update().version).foreach { version => + fs.delete(FileNames.checksumFile(log.logPath, version)) + } + } // In order to trigger an intermittent failure while reading checkpoint, this test corrupts // the checkpoint temporarily so that json/parquet checkpoint reader fails. The corrupted @@ -804,9 +814,36 @@ class CheckpointsSuite assert(fs.getFileStatus(tempPath).getLen === checkpointFileStatus.getLen) DeltaLog.clearCache() + if (!crcMissing) { + // When CRC is present, then P&M will be taken from CRC and snapshot will be initialized + // without needing a checkpoint. But the underlying checkpoint provider points to a + // corrupted checkpoint and so any query/state reconstruction on this will fail. + intercept[Exception] { + sql(s"SELECT * FROM delta.`$tablePath`").collect() + DeltaLog.forTable(spark, tablePath).unsafeVolatileSnapshot.validateChecksum() + } + val snapshot = DeltaLog.forTable(spark, tablePath).unsafeVolatileSnapshot + intercept[Exception] { + snapshot.allFiles.collect() + } + // Undo the corruption + assert(fs.delete(checkpointFileStatus.getPath, true)) + assert(fs.rename(tempPath, checkpointFileStatus.getPath)) + + // Once the corruption in undone, then the queries starts passing on top of same snapshot. + // This tests that we have not caches the intermittent error in the underlying checkpoint + // provider. + sql(s"SELECT * FROM delta.`$tablePath`").collect() + assert(DeltaLog.forTable(spark, tablePath).update() === snapshot) + return + } + // When CRC is missing, then P&M will be taken from checkpoint which is temporarily + // corrupted, so we will end up creating a new snapshot without using checkpoint and the + // query will succeed. sql(s"SELECT * FROM delta.`$tablePath`").collect() val snapshot = DeltaLog.forTable(spark, tablePath).unsafeVolatileSnapshot snapshot.computeChecksum + snapshot.validateChecksum() assert(snapshot.checkpointProvider.isEmpty) } @@ -926,7 +963,13 @@ class CheckpointsSuite for (lastCheckpointMissing <- BOOLEAN_DOMAIN) testDifferentCheckpoints("intermittent error while reading checkpoint should not" + s" stick to snapshot [lastCheckpointMissing: $lastCheckpointMissing]") { (_, _) => - withTempDir { tempDir => checkIntermittentError(tempDir, lastCheckpointMissing) } + withTempDir { tempDir => + withSQLConf( + DeltaSQLConf.DELTA_ALL_FILES_IN_CRC_ENABLED.key -> "false" + ) { + checkIntermittentError(tempDir, lastCheckpointMissing, crcMissing = true) + } + } } test("validate metadata cleanup is not called with createCheckpointAtVersion API") { diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/ChecksumSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/ChecksumSuite.scala index 5e4e9f2f62e..0d89ae989f7 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/ChecksumSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/ChecksumSuite.scala @@ -150,7 +150,10 @@ class ChecksumSuite test("Checksum validation should happen on checkpoint") { withSQLConf( DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key -> "true", - DeltaSQLConf.INCREMENTAL_COMMIT_ENABLED.key -> "true" + DeltaSQLConf.INCREMENTAL_COMMIT_ENABLED.key -> "true", + // Disabled for this test because with it enabled, a corrupted Protocol + // or Metadata will trigger a failure earlier than the full validation. + DeltaSQLConf.USE_PROTOCOL_AND_METADATA_FROM_CHECKSUM_ENABLED.key -> "false" ) { withTempDir { tempDir => spark.range(10).write.format("delta").save(tempDir.getCanonicalPath) @@ -210,7 +213,8 @@ class ChecksumSuite DeltaSQLConf.INCREMENTAL_COMMIT_VERIFY.key -> "true", DeltaSQLConf.DELTA_CHECKSUM_MISMATCH_IS_FATAL.key -> "false", DeltaSQLConf.INCREMENTAL_COMMIT_ENABLED.key -> "true", - DeltaSQLConf.DELTA_ALL_FILES_IN_CRC_ENABLED.key -> "false" + DeltaSQLConf.DELTA_ALL_FILES_IN_CRC_ENABLED.key -> "false", + DeltaSQLConf.USE_PROTOCOL_AND_METADATA_FROM_CHECKSUM_ENABLED.key -> "true" ) { withTempDir { tempDir => import testImplicits._ diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaAllFilesInCrcSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaAllFilesInCrcSuite.scala index 0f7ad328747..29594434e4f 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaAllFilesInCrcSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaAllFilesInCrcSuite.scala @@ -50,6 +50,7 @@ class DeltaAllFilesInCrcSuite .set(DeltaSQLConf.DELTA_ALL_FILES_IN_CRC_THRESHOLD_FILES.key, "10000") // needed for DELTA_ALL_FILES_IN_CRC_ENABLED .set(DeltaSQLConf.INCREMENTAL_COMMIT_ENABLED.key, "true") + .set(DeltaSQLConf.USE_PROTOCOL_AND_METADATA_FROM_CHECKSUM_ENABLED.key, "true") // Turn on verification by default in the tests .set(DeltaSQLConf.DELTA_ALL_FILES_IN_CRC_VERIFICATION_MODE_ENABLED.key, "true") // Turn off force verification for non-UTC timezones by default in the tests @@ -197,6 +198,7 @@ class DeltaAllFilesInCrcSuite // We will see all files in CRC verification failure. // This will trigger the incremental commit verification which will fail. + assert(filterUsageRecords(records, "delta.assertions.mismatchedAction").size === 1) val allFilesInCrcValidationFailureRecords = filterUsageRecords(records, "delta.allFilesInCrc.checksumMismatch.differentAllFiles") assert(allFilesInCrcValidationFailureRecords.size === 1) @@ -209,9 +211,8 @@ class DeltaAllFilesInCrcSuite assert(eventData("filesCountFromStateReconstruction").toLong === expectedFilesCountFromCrc + 1) assert(eventData("incrementalCommitCrcValidationPassed").toBoolean === false) - val expectedValidationFailureMessage = "Number of files - Expected: 1 Computed: 2" assert(eventData("errorForIncrementalCommitCrcValidation").contains( - expectedValidationFailureMessage)) + "The metadata of your Delta table could not be recovered")) } } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala index 1423d85db0e..d07eeb9eff8 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala @@ -411,6 +411,9 @@ class DeltaLogSuite extends QueryTest } } + val checksumFilePath = FileNames.checksumFile(log.logPath, log.snapshot.version) + removeProtocolAndMetadataFromChecksumFile(checksumFilePath) + { // Create an incomplete checkpoint without the action and overwrite the // original checkpoint @@ -791,6 +794,62 @@ class DeltaLogSuite extends QueryTest } } + test("checksum file should contain protocol and metadata") { + withSQLConf( + DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key -> "true", + DeltaSQLConf.USE_PROTOCOL_AND_METADATA_FROM_CHECKSUM_ENABLED.key -> "true" + ) { + withTempDir { dir => + val path = new Path("file://" + dir.getAbsolutePath) + val log = DeltaLog.forTable(spark, path) + + val txn = log.startTransaction() + val files = (1 to 10).map(f => createTestAddFile(encodedPath = f.toString)) + txn.commitManually(files: _*) + val metadata = log.snapshot.metadata + val protocol = log.snapshot.protocol + DeltaLog.clearCache() + + val readLog = DeltaLog.forTable(spark, path) + val checksum = readLog.snapshot.checksumOpt.get + assert(checksum.metadata != null) + assert(checksum.protocol != null) + assert(checksum.metadata.equals(metadata)) + assert(checksum.protocol.equals(protocol)) + } + } + } + + test("checksum reader should be able to read incomplete checksum file without " + + "protocol and metadata") { + withSQLConf( + DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key -> "true", + DeltaSQLConf.USE_PROTOCOL_AND_METADATA_FROM_CHECKSUM_ENABLED.key -> "true" + ) { + withTempDir { dir => + val path = new Path("file://" + dir.getAbsolutePath) + val log = DeltaLog.forTable(spark, path) + + val txn = log.startTransaction() + val files = (1 to 10).map(f => createTestAddFile(encodedPath = f.toString)) + txn.commitManually(files: _*) + val metadata = log.snapshot.metadata + val protocol = log.snapshot.protocol + DeltaLog.clearCache() + val checksumFilePath = FileNames.checksumFile(log.logPath, 0L) + removeProtocolAndMetadataFromChecksumFile(checksumFilePath) + + val readLog = DeltaLog.forTable(spark, path) + val checksum = readLog.snapshot.checksumOpt.get + assert(checksum.metadata == null) + assert(checksum.protocol == null) + + // check we are still able to read protocol and metadata from checkpoint + assert(readLog.snapshot.metadata.equals(metadata)) + assert(readLog.snapshot.protocol.equals(protocol)) + } + } + } } class CoordinatedCommitsBatchBackfill1DeltaLogSuite extends DeltaLogSuite { diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTestUtils.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTestUtils.scala index 8b50e37fd07..5d2c67c3182 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTestUtils.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTestUtils.scala @@ -16,7 +16,8 @@ package org.apache.spark.sql.delta -import java.io.File +import java.io.{BufferedReader, File, InputStreamReader} +import java.nio.charset.StandardCharsets.UTF_8 import java.util.Locale import java.util.concurrent.ConcurrentHashMap @@ -32,6 +33,8 @@ import org.apache.spark.sql.delta.commands.cdc.CDCReader import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.test.{DeltaSQLCommandTest, DeltaSQLTestUtils} import org.apache.spark.sql.delta.util.FileNames +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule import io.delta.tables.{DeltaTable => IODeltaTable} import org.apache.hadoop.fs.FileStatus import org.apache.hadoop.fs.Path @@ -171,6 +174,29 @@ trait DeltaTestUtilsBase { } } + /** + * Remove protocol and metadata fields from checksum file of json format + */ + def removeProtocolAndMetadataFromChecksumFile(checksumFilePath : Path): Unit = { + // scalastyle:off deltahadoopconfiguration + val fs = checksumFilePath.getFileSystem( + SparkSession.getActiveSession.map(_.sessionState.newHadoopConf()).get + ) + // scalastyle:on deltahadoopconfiguration + if (!fs.exists(checksumFilePath)) return + val stream = fs.open(checksumFilePath) + val reader = new BufferedReader(new InputStreamReader(stream, UTF_8)) + val content = reader.readLine() + stream.close() + val mapper = new ObjectMapper() + mapper.registerModule(DefaultScalaModule) + val map = mapper.readValue(content, classOf[Map[String, String]]) + val partialContent = mapper.writeValueAsString(map.-("protocol").-("metadata")) + "\n" + val output = fs.create(checksumFilePath, true) + output.write(partialContent.getBytes(UTF_8)) + output.close() + } + protected def getfindTouchedFilesJobPlans(plans: Seq[Plans]): SparkPlan = { // The expected plan for touched file computation is of the format below. // The data column should be pruned from both leaves. diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/InCommitTimestampSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/InCommitTimestampSuite.scala index c981672ffca..5597f41042e 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/InCommitTimestampSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/InCommitTimestampSuite.scala @@ -51,6 +51,8 @@ class InCommitTimestampSuite override def beforeAll(): Unit = { super.beforeAll() spark.conf.set(DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED.defaultTablePropertyKey, "true") + spark.conf.set(DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key, "true") + spark.conf.set(DeltaSQLConf.INCREMENTAL_COMMIT_ENABLED.key, "true") } test("Enable ICT on commit 0") { @@ -179,69 +181,76 @@ class InCommitTimestampSuite } test("Missing CommitInfo should result in a DELTA_MISSING_COMMIT_INFO exception") { - withTempDir { tempDir => - spark.range(10).write.format("delta").save(tempDir.getAbsolutePath) - val deltaLog = - DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath)) - deltaLog.startTransaction().commit(Seq(createTestAddFile("1")), ManualUpdate) - // Remove CommitInfo from the commit. - val commit1Path = DeltaCommitFileProvider(deltaLog.unsafeVolatileSnapshot).deltaFile(1) - val actions = deltaLog.store.readAsIterator(commit1Path, deltaLog.newDeltaHadoopConf()) - val actionsWithoutCommitInfo = actions.filterNot(Action.fromJson(_).isInstanceOf[CommitInfo]) - deltaLog.store.write( - commit1Path, - actionsWithoutCommitInfo, - overwrite = true, - deltaLog.newDeltaHadoopConf()) - - DeltaLog.clearCache() - val latestSnapshot = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath)).snapshot - val e = intercept[DeltaIllegalStateException] { - latestSnapshot.timestamp + // Make sure that we don't retrieve the time from the CRC. + withSQLConf(DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key -> "false") { + withTempDir { tempDir => + spark.range(10).write.format("delta").save(tempDir.getAbsolutePath) + val deltaLog = + DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath)) + deltaLog.startTransaction().commit(Seq(createTestAddFile("1")), ManualUpdate) + // Remove CommitInfo from the commit. + val commit1Path = DeltaCommitFileProvider(deltaLog.unsafeVolatileSnapshot).deltaFile(1) + val actions = deltaLog.store.readAsIterator(commit1Path, deltaLog.newDeltaHadoopConf()) + val actionsWithoutCommitInfo = + actions.filterNot(Action.fromJson(_).isInstanceOf[CommitInfo]) + deltaLog.store.write( + commit1Path, + actionsWithoutCommitInfo, + overwrite = true, + deltaLog.newDeltaHadoopConf()) + + DeltaLog.clearCache() + val latestSnapshot = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath)).snapshot + val e = intercept[DeltaIllegalStateException] { + latestSnapshot.timestamp + } + checkError( + e, + "DELTA_MISSING_COMMIT_INFO", + parameters = Map( + "featureName" -> InCommitTimestampTableFeature.name, + "version" -> "1")) } - checkError( - e, - "DELTA_MISSING_COMMIT_INFO", - parameters = Map( - "featureName" -> InCommitTimestampTableFeature.name, - "version" -> "1")) } } test("Missing CommitInfo.commitTimestamp should result in a " + "DELTA_MISSING_COMMIT_TIMESTAMP exception") { - withTempDir { tempDir => - spark.range(10).write.format("delta").save(tempDir.getAbsolutePath) - val deltaLog = - DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath)) - deltaLog.startTransaction().commit(Seq(createTestAddFile("1")), ManualUpdate) - // Remove CommitInfo.commitTimestamp from the commit. - val commit1Path = DeltaCommitFileProvider(deltaLog.unsafeVolatileSnapshot).deltaFile(1) - val actions = deltaLog.store.readAsIterator( - commit1Path, - deltaLog.newDeltaHadoopConf()).toList - val actionsWithoutCommitInfoCommitTimestamp = - actions.map(Action.fromJson).map { - case ci: CommitInfo => - ci.copy(inCommitTimestamp = None).json - case other => - other.json - }.toIterator - deltaLog.store.write( - commit1Path, - actionsWithoutCommitInfoCommitTimestamp, - overwrite = true, - deltaLog.newDeltaHadoopConf()) - - DeltaLog.clearCache() - val latestSnapshot = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath)).snapshot - val e = intercept[DeltaIllegalStateException] { - latestSnapshot.timestamp + // Make sure that we don't retrieve the time from the CRC. + withSQLConf(DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key -> "false") { + withTempDir { tempDir => + spark.range(10).write.format("delta").save(tempDir.getAbsolutePath) + val deltaLog = + DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath)) + deltaLog.startTransaction().commit(Seq(createTestAddFile("1")), ManualUpdate) + // Remove CommitInfo.commitTimestamp from the commit. + val commit1Path = DeltaCommitFileProvider(deltaLog.unsafeVolatileSnapshot).deltaFile(1) + val actions = deltaLog.store.readAsIterator( + commit1Path, + deltaLog.newDeltaHadoopConf()).toList + val actionsWithoutCommitInfoCommitTimestamp = + actions.map(Action.fromJson).map { + case ci: CommitInfo => + ci.copy(inCommitTimestamp = None).json + case other => + other.json + }.toIterator + deltaLog.store.write( + commit1Path, + actionsWithoutCommitInfoCommitTimestamp, + overwrite = true, + deltaLog.newDeltaHadoopConf()) + + DeltaLog.clearCache() + val latestSnapshot = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath)).snapshot + val e = intercept[DeltaIllegalStateException] { + latestSnapshot.timestamp + } + checkError( + e, + "DELTA_MISSING_COMMIT_TIMESTAMP", + parameters = Map("featureName" -> InCommitTimestampTableFeature.name, "version" -> "1")) } - checkError( - e, - "DELTA_MISSING_COMMIT_TIMESTAMP", - parameters = Map("featureName" -> InCommitTimestampTableFeature.name, "version" -> "1")) } } @@ -395,15 +404,16 @@ class InCommitTimestampSuite } } - test("postCommitSnapshot.timestamp should be populated by protocolMetadataAndICTReconstruction " + - "when the table has no checkpoints") { + test("snapshot.timestamp should be read from the CRC") { withTempDir { tempDir => var deltaLog: DeltaLog = null var timestamp = -1L - spark.range(1).write.format("delta").save(tempDir.getAbsolutePath) - DeltaLog.clearCache() val usageRecords = Log4jUsageLogger.track { + spark.range(1).write.format("delta").save(tempDir.getAbsolutePath) + DeltaLog.clearCache() // Clear the post-commit snapshot from the cache. deltaLog = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath)) + val fs = deltaLog.logPath.getFileSystem(deltaLog.newDeltaHadoopConf()) + assert(fs.exists(FileNames.checksumFile(deltaLog.logPath, 0))) timestamp = deltaLog.snapshot.timestamp } assert(timestamp == getInCommitTimestamp(deltaLog, 0)) @@ -412,37 +422,84 @@ class InCommitTimestampSuite } } + test("postCommitSnapshot.timestamp should be populated by protocolMetadataAndICTReconstruction " + + "when the table has no checkpoints") { + // Make sure that we don't retrieve the time from the CRC. + withSQLConf(DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key -> "false") { + withTempDir { tempDir => + var deltaLog: DeltaLog = null + var timestamp = -1L + spark.range(1).write.format("delta").save(tempDir.getAbsolutePath) + DeltaLog.clearCache() + val usageRecords = Log4jUsageLogger.track { + deltaLog = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath)) + timestamp = deltaLog.snapshot.timestamp + } + assert(timestamp == getInCommitTimestamp(deltaLog, 0)) + // No explicit read. + assert(filterUsageRecords(usageRecords, "delta.inCommitTimestamp.read").isEmpty) + } + } + } + test("snapshot.timestamp should be populated by protocolMetadataAndICTReconstruction " + "during cold reads of checkpoints + deltas") { - withTempDir { tempDir => - var deltaLog: DeltaLog = null - var timestamp = -1L - spark.range(1).write.format("delta").save(tempDir.getAbsolutePath) - deltaLog = DeltaLog - .forTable(spark, new Path(tempDir.getCanonicalPath)) - deltaLog.createCheckpointAtVersion(0) - deltaLog.startTransaction().commit(Seq(createTestAddFile("c1")), ManualUpdate) + // Make sure that we don't retrieve the time from the CRC. + withSQLConf(DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key -> "false") { + withTempDir { tempDir => + var deltaLog: DeltaLog = null + var timestamp = -1L + spark.range(1).write.format("delta").save(tempDir.getAbsolutePath) + deltaLog = DeltaLog + .forTable(spark, new Path(tempDir.getCanonicalPath)) + deltaLog.createCheckpointAtVersion(0) + deltaLog.startTransaction().commit(Seq(createTestAddFile("c1")), ManualUpdate) - val usageRecords = Log4jUsageLogger.track { - DeltaLog.clearCache() // Clear the post-commit snapshot from the cache. - deltaLog = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath)) - timestamp = deltaLog.snapshot.timestamp + val usageRecords = Log4jUsageLogger.track { + DeltaLog.clearCache() // Clear the post-commit snapshot from the cache. + deltaLog = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath)) + timestamp = deltaLog.snapshot.timestamp + } + assert(deltaLog.snapshot.checkpointProvider.version == 0) + assert(deltaLog.snapshot.version == 1) + assert(timestamp == getInCommitTimestamp(deltaLog, 1)) + // No explicit read. + assert(filterUsageRecords(usageRecords, "delta.inCommitTimestamp.read").isEmpty) } - assert(deltaLog.snapshot.checkpointProvider.version == 0) - assert(deltaLog.snapshot.version == 1) - assert(timestamp == getInCommitTimestamp(deltaLog, 1)) - // No explicit read. - assert(filterUsageRecords(usageRecords, "delta.inCommitTimestamp.read").isEmpty) } } test("snapshot.timestamp cannot be populated by protocolMetadataAndICTReconstruction " + "during cold reads of checkpoints") { + // Make sure that we don't retrieve the time from the CRC. + withSQLConf(DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key -> "false") { + withTempDir { tempDir => + var deltaLog: DeltaLog = null + var timestamp = -1L + spark.range(1).write.format("delta").save(tempDir.getAbsolutePath) + DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath)).createCheckpointAtVersion(0) + val usageRecords = Log4jUsageLogger.track { + DeltaLog.clearCache() // Clear the post-commit snapshot from the cache. + deltaLog = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath)) + timestamp = deltaLog.snapshot.timestamp + } + assert(deltaLog.snapshot.checkpointProvider.version == 0) + assert(timestamp == getInCommitTimestamp(deltaLog, 0)) + assert(filterUsageRecords(usageRecords, "delta.inCommitTimestamp.read").length == 1) + } + } + } + + test("snapshot.timestamp is read from file when CRC doesn't have ICT and " + + "the latest version has a checkpoint") { withTempDir { tempDir => var deltaLog: DeltaLog = null var timestamp = -1L spark.range(1).write.format("delta").save(tempDir.getAbsolutePath) - DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath)).createCheckpointAtVersion(0) + deltaLog = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath)) + deltaLog.createCheckpointAtVersion(0) + // Remove the ICT from the CRC. + InCommitTimestampTestUtils.overwriteICTInCrc(deltaLog, 0, None) val usageRecords = Log4jUsageLogger.track { DeltaLog.clearCache() // Clear the post-commit snapshot from the cache. deltaLog = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath)) @@ -450,41 +507,49 @@ class InCommitTimestampSuite } assert(deltaLog.snapshot.checkpointProvider.version == 0) assert(timestamp == getInCommitTimestamp(deltaLog, 0)) - assert(filterUsageRecords(usageRecords, "delta.inCommitTimestamp.read").length == 1) + val ictReadLog = filterUsageRecords(usageRecords, "delta.inCommitTimestamp.read").head + val blob = JsonUtils.fromJson[Map[String, String]](ictReadLog.blob) + assert(blob("version") == "0") + assert(blob("checkpointVersion") == "0") + assert(blob("isCRCPresent") == "true") } } test("Exceptions during ICT reads from file should be logged") { - withTempDir { tempDir => - spark.range(10).write.format("delta").save(tempDir.getAbsolutePath) - val deltaLog = - DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath)) - deltaLog.startTransaction().commit(Seq(createTestAddFile("1")), ManualUpdate) - // Remove CommitInfo from the commit. - val commit1Path = DeltaCommitFileProvider(deltaLog.unsafeVolatileSnapshot).deltaFile(1) - val actions = deltaLog.store.readAsIterator(commit1Path, deltaLog.newDeltaHadoopConf()) - val actionsWithoutCommitInfo = actions.filterNot(Action.fromJson(_).isInstanceOf[CommitInfo]) - deltaLog.store.write( - commit1Path, - actionsWithoutCommitInfo, - overwrite = true, - deltaLog.newDeltaHadoopConf()) - - DeltaLog.clearCache() - val latestSnapshot = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath)).snapshot - val usageRecords = Log4jUsageLogger.track { - try { - latestSnapshot.timestamp - } catch { - case _ : DeltaIllegalStateException => () + // Make sure that we don't retrieve the time from the CRC. + withSQLConf(DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key -> "false") { + withTempDir { tempDir => + spark.range(10).write.format("delta").save(tempDir.getAbsolutePath) + val deltaLog = + DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath)) + deltaLog.startTransaction().commit(Seq(createTestAddFile("1")), ManualUpdate) + // Remove CommitInfo from the commit. + val commit1Path = DeltaCommitFileProvider(deltaLog.unsafeVolatileSnapshot).deltaFile(1) + val actions = deltaLog.store.readAsIterator(commit1Path, deltaLog.newDeltaHadoopConf()) + val actionsWithoutCommitInfo = + actions.filterNot(Action.fromJson(_).isInstanceOf[CommitInfo]) + deltaLog.store.write( + commit1Path, + actionsWithoutCommitInfo, + overwrite = true, + deltaLog.newDeltaHadoopConf()) + + DeltaLog.clearCache() + val latestSnapshot = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath)).snapshot + val usageRecords = Log4jUsageLogger.track { + try { + latestSnapshot.timestamp + } catch { + case _ : DeltaIllegalStateException => () + } } + val ictReadLog = filterUsageRecords(usageRecords, "delta.inCommitTimestamp.read").head + val blob = JsonUtils.fromJson[Map[String, String]](ictReadLog.blob) + assert(blob("version") == "1") + assert(blob("checkpointVersion") == "-1") + assert(blob("exceptionMessage").startsWith("[DELTA_MISSING_COMMIT_INFO]")) + assert(blob("exceptionStackTrace").contains(Snapshot.getClass.getName.stripSuffix("$"))) } - val ictReadLog = filterUsageRecords(usageRecords, "delta.inCommitTimestamp.read").head - val blob = JsonUtils.fromJson[Map[String, String]](ictReadLog.blob) - assert(blob("version") == "1") - assert(blob("checkpointVersion") == "-1") - assert(blob("exceptionMessage").startsWith("[DELTA_MISSING_COMMIT_INFO]")) - assert(blob("exceptionStackTrace").contains(Snapshot.getClass.getName.stripSuffix("$"))) } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/SnapshotManagementSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/SnapshotManagementSuite.scala index ebdcda11cc8..b0a6732e198 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/SnapshotManagementSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/SnapshotManagementSuite.scala @@ -49,6 +49,12 @@ import org.apache.spark.storage.StorageLevel class SnapshotManagementSuite extends QueryTest with DeltaSQLTestUtils with SharedSparkSession with DeltaSQLCommandTest with CoordinatedCommitsBaseSuite { + protected override def sparkConf = { + // Disable loading protocol and metadata from checksum file. Otherwise, creating a Snapshot + // won't touch the checkpoint file and we won't be able to retry. + super.sparkConf + .set(DeltaSQLConf.USE_PROTOCOL_AND_METADATA_FROM_CHECKSUM_ENABLED.key, "false") + } /** * Truncate an existing checkpoint file to create a corrupt file.