Skip to content

Commit

Permalink
[Spark][Version Checksum] Read Protocol, Metadata, and ICT directly f…
Browse files Browse the repository at this point in the history
…rom the Checksum during Snapshot construction (#3920)

<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [X] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->
Stacked over #3907. 
This PR makes the Checksum (if available) the source of truth for
Protocol, Metadata, ICT during snapshot construction. This helps us
avoid a Spark query and improves performance.

## How was this patch tested?

<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->
Added some test cases to existing suites

## Does this PR introduce _any_ user-facing changes?

<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
No
  • Loading branch information
dhruvarya-db authored Dec 6, 2024
1 parent 1ee278a commit 8fb17a0
Show file tree
Hide file tree
Showing 10 changed files with 371 additions and 116 deletions.
43 changes: 42 additions & 1 deletion spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ class Snapshot(
"checkpointVersion" -> logSegment.checkpointProvider.version,
"durationMs" -> (System.currentTimeMillis() - startTime),
"exceptionMessage" -> exception.map(_.getMessage).getOrElse(""),
"exceptionStackTrace" -> exception.map(_.getStackTrace.mkString("\n")).getOrElse("")
"exceptionStackTrace" ->
exception.map(_.getStackTrace.mkString("\n")).getOrElse(""),
"isCRCPresent" -> checksumOpt.isDefined
)
)
}
Expand Down Expand Up @@ -377,6 +379,41 @@ class Snapshot(

override def protocol: Protocol = _reconstructedProtocolMetadataAndICT.protocol

/**
* Tries to retrieve the protocol, metadata, and in-commit-timestamp (if needed) from the
* checksum file. If the checksum file is not present or if the protocol or metadata is missing
* this will return None.
*/
protected def getProtocolMetadataAndIctFromCrc():
Option[Array[ReconstructedProtocolMetadataAndICT]] = {
if (!spark.sessionState.conf.getConf(
DeltaSQLConf.USE_PROTOCOL_AND_METADATA_FROM_CHECKSUM_ENABLED)) {
return None
}
checksumOpt.map(c => (c.protocol, c.metadata, c.inCommitTimestampOpt)).flatMap {
case (p: Protocol, m: Metadata, ict: Option[Long]) =>
Some(Array((p, null, None), (null, m, None), (null, null, ict))
.map(ReconstructedProtocolMetadataAndICT.tupled))

case (p, m, _) if p != null || m != null =>
// One was missing from the .crc file... warn and fall back to an optimized query
val protocolStr = Option(p).map(_.toString).getOrElse("null")
val metadataStr = Option(m).map(_.toString).getOrElse("null")
recordDeltaEvent(
deltaLog,
opType = "delta.assertions.missingEitherProtocolOrMetadataFromChecksum",
data = Map(
"version" -> version.toString, "protocol" -> protocolStr, "source" -> metadataStr))
logWarning(log"Either protocol or metadata is null from checksum; " +
log"version:${MDC(DeltaLogKeys.VERSION, version)} " +
log"protocol:${MDC(DeltaLogKeys.PROTOCOL, protocolStr)} " +
log"metadata:${MDC(DeltaLogKeys.DELTA_METADATA, metadataStr)}")
None

case _ => None // both missing... fall back to an optimized query
}
}

/**
* Pulls the protocol and metadata of the table from the files that are used to compute the
* Snapshot directly--without triggering a full state reconstruction. This is important, because
Expand All @@ -394,6 +431,10 @@ class Snapshot(
Array[ReconstructedProtocolMetadataAndICT] = {
import implicits._

getProtocolMetadataAndIctFromCrc().foreach { protocolMetadataAndIctFromCrc =>
return protocolMetadataAndIctFromCrc
}

val schemaToUse = Action.logSchema(Set("protocol", "metaData", "commitInfo"))
val checkpointOpt = checkpointProvider.topLevelFileIndex.map { index =>
deltaLog.loadIndex(index, schemaToUse)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ trait DeltaLogKeysBase {
case object DATA_FILTER extends LogKeyShims
case object DATE extends LogKeyShims
case object DELTA_COMMIT_INFO extends LogKeyShims
case object DELTA_METADATA extends LogKeyShims
case object DIR extends LogKeyShims
case object DURATION extends LogKeyShims
case object END_INDEX extends LogKeyShims
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1144,6 +1144,15 @@ trait DeltaSQLConfBase {
.intConf
.createOptional

val USE_PROTOCOL_AND_METADATA_FROM_CHECKSUM_ENABLED =
buildConf("readProtocolAndMetadataFromChecksum.enabled")
.internal()
.doc("If enabled, delta log snapshot will read the protocol, metadata, and ICT " +
"(if applicable) from the checksum file and use those to avoid a spark job over the " +
"checkpoint for the two rows of protocol and metadata")
.booleanConf
.createWithDefault(true)

val DELTA_CHECKPOINT_THROW_EXCEPTION_WHEN_FAILED =
buildConf("checkpoint.exceptionThrowing.enabled")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package org.apache.spark.sql.delta

import java.io.File
import java.net.URI
import java.util.UUID

import scala.concurrent.duration._

Expand Down Expand Up @@ -557,7 +558,8 @@ class CheckpointsSuite
for (v2Checkpoint <- Seq(true, false))
withTempDir { tempDir =>
val source = new File(DeletionVectorsSuite.table1Path) // this table has DVs in two versions
val target = new File(tempDir, "insertTest")
val targetName = s"insertTest_${UUID.randomUUID().toString.replace("-", "")}"
val target = new File(tempDir, targetName)

// Copy the source2 DV table to a temporary directory, so that we do updates to it
FileUtils.copyDirectory(source, target)
Expand Down Expand Up @@ -767,7 +769,8 @@ class CheckpointsSuite
}
}

def checkIntermittentError(tempDir: File, lastCheckpointMissing: Boolean): Unit = {
def checkIntermittentError(
tempDir: File, lastCheckpointMissing: Boolean, crcMissing: Boolean): Unit = {
// Create a table with commit version 0, 1 and a checkpoint.
val tablePath = tempDir.getAbsolutePath
spark.range(10).write.format("delta").save(tablePath)
Expand All @@ -783,6 +786,13 @@ class CheckpointsSuite
if (lastCheckpointMissing) {
fs.delete(log.LAST_CHECKPOINT)
}
// Delete CRC file based on test configuration.
if (crcMissing) {
// Delete all CRC files
(0L to log.update().version).foreach { version =>
fs.delete(FileNames.checksumFile(log.logPath, version))
}
}

// In order to trigger an intermittent failure while reading checkpoint, this test corrupts
// the checkpoint temporarily so that json/parquet checkpoint reader fails. The corrupted
Expand All @@ -804,9 +814,36 @@ class CheckpointsSuite
assert(fs.getFileStatus(tempPath).getLen === checkpointFileStatus.getLen)

DeltaLog.clearCache()
if (!crcMissing) {
// When CRC is present, then P&M will be taken from CRC and snapshot will be initialized
// without needing a checkpoint. But the underlying checkpoint provider points to a
// corrupted checkpoint and so any query/state reconstruction on this will fail.
intercept[Exception] {
sql(s"SELECT * FROM delta.`$tablePath`").collect()
DeltaLog.forTable(spark, tablePath).unsafeVolatileSnapshot.validateChecksum()
}
val snapshot = DeltaLog.forTable(spark, tablePath).unsafeVolatileSnapshot
intercept[Exception] {
snapshot.allFiles.collect()
}
// Undo the corruption
assert(fs.delete(checkpointFileStatus.getPath, true))
assert(fs.rename(tempPath, checkpointFileStatus.getPath))

// Once the corruption in undone, then the queries starts passing on top of same snapshot.
// This tests that we have not caches the intermittent error in the underlying checkpoint
// provider.
sql(s"SELECT * FROM delta.`$tablePath`").collect()
assert(DeltaLog.forTable(spark, tablePath).update() === snapshot)
return
}
// When CRC is missing, then P&M will be taken from checkpoint which is temporarily
// corrupted, so we will end up creating a new snapshot without using checkpoint and the
// query will succeed.
sql(s"SELECT * FROM delta.`$tablePath`").collect()
val snapshot = DeltaLog.forTable(spark, tablePath).unsafeVolatileSnapshot
snapshot.computeChecksum
snapshot.validateChecksum()
assert(snapshot.checkpointProvider.isEmpty)
}

Expand Down Expand Up @@ -926,7 +963,13 @@ class CheckpointsSuite
for (lastCheckpointMissing <- BOOLEAN_DOMAIN)
testDifferentCheckpoints("intermittent error while reading checkpoint should not" +
s" stick to snapshot [lastCheckpointMissing: $lastCheckpointMissing]") { (_, _) =>
withTempDir { tempDir => checkIntermittentError(tempDir, lastCheckpointMissing) }
withTempDir { tempDir =>
withSQLConf(
DeltaSQLConf.DELTA_ALL_FILES_IN_CRC_ENABLED.key -> "false"
) {
checkIntermittentError(tempDir, lastCheckpointMissing, crcMissing = true)
}
}
}

test("validate metadata cleanup is not called with createCheckpointAtVersion API") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,10 @@ class ChecksumSuite
test("Checksum validation should happen on checkpoint") {
withSQLConf(
DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key -> "true",
DeltaSQLConf.INCREMENTAL_COMMIT_ENABLED.key -> "true"
DeltaSQLConf.INCREMENTAL_COMMIT_ENABLED.key -> "true",
// Disabled for this test because with it enabled, a corrupted Protocol
// or Metadata will trigger a failure earlier than the full validation.
DeltaSQLConf.USE_PROTOCOL_AND_METADATA_FROM_CHECKSUM_ENABLED.key -> "false"
) {
withTempDir { tempDir =>
spark.range(10).write.format("delta").save(tempDir.getCanonicalPath)
Expand Down Expand Up @@ -210,7 +213,8 @@ class ChecksumSuite
DeltaSQLConf.INCREMENTAL_COMMIT_VERIFY.key -> "true",
DeltaSQLConf.DELTA_CHECKSUM_MISMATCH_IS_FATAL.key -> "false",
DeltaSQLConf.INCREMENTAL_COMMIT_ENABLED.key -> "true",
DeltaSQLConf.DELTA_ALL_FILES_IN_CRC_ENABLED.key -> "false"
DeltaSQLConf.DELTA_ALL_FILES_IN_CRC_ENABLED.key -> "false",
DeltaSQLConf.USE_PROTOCOL_AND_METADATA_FROM_CHECKSUM_ENABLED.key -> "true"
) {
withTempDir { tempDir =>
import testImplicits._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class DeltaAllFilesInCrcSuite
.set(DeltaSQLConf.DELTA_ALL_FILES_IN_CRC_THRESHOLD_FILES.key, "10000")
// needed for DELTA_ALL_FILES_IN_CRC_ENABLED
.set(DeltaSQLConf.INCREMENTAL_COMMIT_ENABLED.key, "true")
.set(DeltaSQLConf.USE_PROTOCOL_AND_METADATA_FROM_CHECKSUM_ENABLED.key, "true")
// Turn on verification by default in the tests
.set(DeltaSQLConf.DELTA_ALL_FILES_IN_CRC_VERIFICATION_MODE_ENABLED.key, "true")
// Turn off force verification for non-UTC timezones by default in the tests
Expand Down Expand Up @@ -197,6 +198,7 @@ class DeltaAllFilesInCrcSuite

// We will see all files in CRC verification failure.
// This will trigger the incremental commit verification which will fail.
assert(filterUsageRecords(records, "delta.assertions.mismatchedAction").size === 1)
val allFilesInCrcValidationFailureRecords =
filterUsageRecords(records, "delta.allFilesInCrc.checksumMismatch.differentAllFiles")
assert(allFilesInCrcValidationFailureRecords.size === 1)
Expand All @@ -209,9 +211,8 @@ class DeltaAllFilesInCrcSuite
assert(eventData("filesCountFromStateReconstruction").toLong ===
expectedFilesCountFromCrc + 1)
assert(eventData("incrementalCommitCrcValidationPassed").toBoolean === false)
val expectedValidationFailureMessage = "Number of files - Expected: 1 Computed: 2"
assert(eventData("errorForIncrementalCommitCrcValidation").contains(
expectedValidationFailureMessage))
"The metadata of your Delta table could not be recovered"))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,9 @@ class DeltaLogSuite extends QueryTest
}
}

val checksumFilePath = FileNames.checksumFile(log.logPath, log.snapshot.version)
removeProtocolAndMetadataFromChecksumFile(checksumFilePath)

{
// Create an incomplete checkpoint without the action and overwrite the
// original checkpoint
Expand Down Expand Up @@ -791,6 +794,62 @@ class DeltaLogSuite extends QueryTest
}
}

test("checksum file should contain protocol and metadata") {
withSQLConf(
DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key -> "true",
DeltaSQLConf.USE_PROTOCOL_AND_METADATA_FROM_CHECKSUM_ENABLED.key -> "true"
) {
withTempDir { dir =>
val path = new Path("file://" + dir.getAbsolutePath)
val log = DeltaLog.forTable(spark, path)

val txn = log.startTransaction()
val files = (1 to 10).map(f => createTestAddFile(encodedPath = f.toString))
txn.commitManually(files: _*)
val metadata = log.snapshot.metadata
val protocol = log.snapshot.protocol
DeltaLog.clearCache()

val readLog = DeltaLog.forTable(spark, path)
val checksum = readLog.snapshot.checksumOpt.get
assert(checksum.metadata != null)
assert(checksum.protocol != null)
assert(checksum.metadata.equals(metadata))
assert(checksum.protocol.equals(protocol))
}
}
}

test("checksum reader should be able to read incomplete checksum file without " +
"protocol and metadata") {
withSQLConf(
DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key -> "true",
DeltaSQLConf.USE_PROTOCOL_AND_METADATA_FROM_CHECKSUM_ENABLED.key -> "true"
) {
withTempDir { dir =>
val path = new Path("file://" + dir.getAbsolutePath)
val log = DeltaLog.forTable(spark, path)

val txn = log.startTransaction()
val files = (1 to 10).map(f => createTestAddFile(encodedPath = f.toString))
txn.commitManually(files: _*)
val metadata = log.snapshot.metadata
val protocol = log.snapshot.protocol
DeltaLog.clearCache()
val checksumFilePath = FileNames.checksumFile(log.logPath, 0L)
removeProtocolAndMetadataFromChecksumFile(checksumFilePath)

val readLog = DeltaLog.forTable(spark, path)
val checksum = readLog.snapshot.checksumOpt.get
assert(checksum.metadata == null)
assert(checksum.protocol == null)

// check we are still able to read protocol and metadata from checkpoint
assert(readLog.snapshot.metadata.equals(metadata))
assert(readLog.snapshot.protocol.equals(protocol))
}
}
}
}

class CoordinatedCommitsBatchBackfill1DeltaLogSuite extends DeltaLogSuite {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@

package org.apache.spark.sql.delta

import java.io.File
import java.io.{BufferedReader, File, InputStreamReader}
import java.nio.charset.StandardCharsets.UTF_8
import java.util.Locale
import java.util.concurrent.ConcurrentHashMap

Expand All @@ -32,6 +33,8 @@ import org.apache.spark.sql.delta.commands.cdc.CDCReader
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.test.{DeltaSQLCommandTest, DeltaSQLTestUtils}
import org.apache.spark.sql.delta.util.FileNames
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import io.delta.tables.{DeltaTable => IODeltaTable}
import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.fs.Path
Expand Down Expand Up @@ -171,6 +174,29 @@ trait DeltaTestUtilsBase {
}
}

/**
* Remove protocol and metadata fields from checksum file of json format
*/
def removeProtocolAndMetadataFromChecksumFile(checksumFilePath : Path): Unit = {
// scalastyle:off deltahadoopconfiguration
val fs = checksumFilePath.getFileSystem(
SparkSession.getActiveSession.map(_.sessionState.newHadoopConf()).get
)
// scalastyle:on deltahadoopconfiguration
if (!fs.exists(checksumFilePath)) return
val stream = fs.open(checksumFilePath)
val reader = new BufferedReader(new InputStreamReader(stream, UTF_8))
val content = reader.readLine()
stream.close()
val mapper = new ObjectMapper()
mapper.registerModule(DefaultScalaModule)
val map = mapper.readValue(content, classOf[Map[String, String]])
val partialContent = mapper.writeValueAsString(map.-("protocol").-("metadata")) + "\n"
val output = fs.create(checksumFilePath, true)
output.write(partialContent.getBytes(UTF_8))
output.close()
}

protected def getfindTouchedFilesJobPlans(plans: Seq[Plans]): SparkPlan = {
// The expected plan for touched file computation is of the format below.
// The data column should be pruned from both leaves.
Expand Down
Loading

0 comments on commit 8fb17a0

Please sign in to comment.