Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark][Version Checksum] Read Protocol, Metadata, and ICT directly from the Checksum during Snapshot construction #3920

Merged
merged 7 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 42 additions & 1 deletion spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ class Snapshot(
"checkpointVersion" -> logSegment.checkpointProvider.version,
"durationMs" -> (System.currentTimeMillis() - startTime),
"exceptionMessage" -> exception.map(_.getMessage).getOrElse(""),
"exceptionStackTrace" -> exception.map(_.getStackTrace.mkString("\n")).getOrElse("")
"exceptionStackTrace" ->
exception.map(_.getStackTrace.mkString("\n")).getOrElse(""),
"isCRCPresent" -> checksumOpt.isDefined
)
)
}
Expand Down Expand Up @@ -377,6 +379,41 @@ class Snapshot(

override def protocol: Protocol = _reconstructedProtocolMetadataAndICT.protocol

/**
* Tries to retrieve the protocol, metadata, and in-commit-timestamp (if needed) from the
* checksum file. If the checksum file is not present or if the protocol or metadata is missing
* this will return None.
*/
protected def getProtocolMetadataAndIctFromCrc():
Option[Array[ReconstructedProtocolMetadataAndICT]] = {
if (!spark.sessionState.conf.getConf(
DeltaSQLConf.USE_PROTOCOL_AND_METADATA_FROM_CHECKSUM_ENABLED)) {
return None
}
checksumOpt.map(c => (c.protocol, c.metadata, c.inCommitTimestampOpt)).flatMap {
case (p: Protocol, m: Metadata, ict: Option[Long]) =>
Some(Array((p, null, None), (null, m, None), (null, null, ict))
.map(ReconstructedProtocolMetadataAndICT.tupled))

case (p, m, _) if p != null || m != null =>
// One was missing from the .crc file... warn and fall back to an optimized query
val protocolStr = Option(p).map(_.toString).getOrElse("null")
val metadataStr = Option(m).map(_.toString).getOrElse("null")
recordDeltaEvent(
deltaLog,
opType = "delta.assertions.missingEitherProtocolOrMetadataFromChecksum",
data = Map(
"version" -> version.toString, "protocol" -> protocolStr, "source" -> metadataStr))
logWarning(log"Either protocol or metadata is null from checksum; " +
log"version:${MDC(DeltaLogKeys.VERSION, version)} " +
log"protocol:${MDC(DeltaLogKeys.PROTOCOL, protocolStr)} " +
log"metadata:${MDC(DeltaLogKeys.DELTA_METADATA, metadataStr)}")
Comment on lines +409 to +410
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the one called PROTOCOL but the other DELTA_METADATA? Why not just METADATA?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update: There are already several METADATA_xxx keys, I guess this avoids ambiguity?

None

case _ => None // both missing... fall back to an optimized query
}
}

/**
* Pulls the protocol and metadata of the table from the files that are used to compute the
* Snapshot directly--without triggering a full state reconstruction. This is important, because
Expand All @@ -394,6 +431,10 @@ class Snapshot(
Array[ReconstructedProtocolMetadataAndICT] = {
import implicits._

getProtocolMetadataAndIctFromCrc().foreach { protocolMetadataAndIctFromCrc =>
return protocolMetadataAndIctFromCrc
}

val schemaToUse = Action.logSchema(Set("protocol", "metaData", "commitInfo"))
val checkpointOpt = checkpointProvider.topLevelFileIndex.map { index =>
deltaLog.loadIndex(index, schemaToUse)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ trait DeltaLogKeysBase {
case object DATA_FILTER extends LogKeyShims
case object DATE extends LogKeyShims
case object DELTA_COMMIT_INFO extends LogKeyShims
case object DELTA_METADATA extends LogKeyShims
case object DIR extends LogKeyShims
case object DURATION extends LogKeyShims
case object END_INDEX extends LogKeyShims
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1144,6 +1144,15 @@ trait DeltaSQLConfBase {
.intConf
.createOptional

val USE_PROTOCOL_AND_METADATA_FROM_CHECKSUM_ENABLED =
buildConf("readProtocolAndMetadataFromChecksum.enabled")
.internal()
.doc("If enabled, delta log snapshot will read the protocol, metadata, and ICT " +
"(if applicable) from the checksum file and use those to avoid a spark job over the " +
"checkpoint for the two rows of protocol and metadata")
.booleanConf
.createWithDefault(true)

val DELTA_CHECKPOINT_THROW_EXCEPTION_WHEN_FAILED =
buildConf("checkpoint.exceptionThrowing.enabled")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package org.apache.spark.sql.delta

import java.io.File
import java.net.URI
import java.util.UUID

import scala.concurrent.duration._

Expand Down Expand Up @@ -557,7 +558,8 @@ class CheckpointsSuite
for (v2Checkpoint <- Seq(true, false))
withTempDir { tempDir =>
val source = new File(DeletionVectorsSuite.table1Path) // this table has DVs in two versions
val target = new File(tempDir, "insertTest")
val targetName = s"insertTest_${UUID.randomUUID().toString.replace("-", "")}"
val target = new File(tempDir, targetName)

// Copy the source2 DV table to a temporary directory, so that we do updates to it
FileUtils.copyDirectory(source, target)
Expand Down Expand Up @@ -767,7 +769,8 @@ class CheckpointsSuite
}
}

def checkIntermittentError(tempDir: File, lastCheckpointMissing: Boolean): Unit = {
def checkIntermittentError(
tempDir: File, lastCheckpointMissing: Boolean, crcMissing: Boolean): Unit = {
// Create a table with commit version 0, 1 and a checkpoint.
val tablePath = tempDir.getAbsolutePath
spark.range(10).write.format("delta").save(tablePath)
Expand All @@ -783,6 +786,13 @@ class CheckpointsSuite
if (lastCheckpointMissing) {
fs.delete(log.LAST_CHECKPOINT)
}
// Delete CRC file based on test configuration.
if (crcMissing) {
// Delete all CRC files
(0L to log.update().version).foreach { version =>
fs.delete(FileNames.checksumFile(log.logPath, version))
}
}

// In order to trigger an intermittent failure while reading checkpoint, this test corrupts
// the checkpoint temporarily so that json/parquet checkpoint reader fails. The corrupted
Expand All @@ -804,9 +814,36 @@ class CheckpointsSuite
assert(fs.getFileStatus(tempPath).getLen === checkpointFileStatus.getLen)

DeltaLog.clearCache()
if (!crcMissing) {
// When CRC is present, then P&M will be taken from CRC and snapshot will be initialized
// without needing a checkpoint. But the underlying checkpoint provider points to a
// corrupted checkpoint and so any query/state reconstruction on this will fail.
intercept[Exception] {
sql(s"SELECT * FROM delta.`$tablePath`").collect()
DeltaLog.forTable(spark, tablePath).unsafeVolatileSnapshot.validateChecksum()
}
val snapshot = DeltaLog.forTable(spark, tablePath).unsafeVolatileSnapshot
intercept[Exception] {
snapshot.allFiles.collect()
}
// Undo the corruption
assert(fs.delete(checkpointFileStatus.getPath, true))
assert(fs.rename(tempPath, checkpointFileStatus.getPath))

// Once the corruption in undone, then the queries starts passing on top of same snapshot.
// This tests that we have not caches the intermittent error in the underlying checkpoint
// provider.
sql(s"SELECT * FROM delta.`$tablePath`").collect()
assert(DeltaLog.forTable(spark, tablePath).update() === snapshot)
return
}
// When CRC is missing, then P&M will be taken from checkpoint which is temporarily
// corrupted, so we will end up creating a new snapshot without using checkpoint and the
// query will succeed.
sql(s"SELECT * FROM delta.`$tablePath`").collect()
val snapshot = DeltaLog.forTable(spark, tablePath).unsafeVolatileSnapshot
snapshot.computeChecksum
snapshot.validateChecksum()
assert(snapshot.checkpointProvider.isEmpty)
}

Expand Down Expand Up @@ -926,7 +963,13 @@ class CheckpointsSuite
for (lastCheckpointMissing <- BOOLEAN_DOMAIN)
testDifferentCheckpoints("intermittent error while reading checkpoint should not" +
s" stick to snapshot [lastCheckpointMissing: $lastCheckpointMissing]") { (_, _) =>
withTempDir { tempDir => checkIntermittentError(tempDir, lastCheckpointMissing) }
withTempDir { tempDir =>
withSQLConf(
DeltaSQLConf.DELTA_ALL_FILES_IN_CRC_ENABLED.key -> "false"
) {
checkIntermittentError(tempDir, lastCheckpointMissing, crcMissing = true)
}
}
}

test("validate metadata cleanup is not called with createCheckpointAtVersion API") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,10 @@ class ChecksumSuite
test("Checksum validation should happen on checkpoint") {
withSQLConf(
DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key -> "true",
DeltaSQLConf.INCREMENTAL_COMMIT_ENABLED.key -> "true"
DeltaSQLConf.INCREMENTAL_COMMIT_ENABLED.key -> "true",
// Disabled for this test because with it enabled, a corrupted Protocol
// or Metadata will trigger a failure earlier than the full validation.
DeltaSQLConf.USE_PROTOCOL_AND_METADATA_FROM_CHECKSUM_ENABLED.key -> "false"
) {
withTempDir { tempDir =>
spark.range(10).write.format("delta").save(tempDir.getCanonicalPath)
Expand Down Expand Up @@ -210,7 +213,8 @@ class ChecksumSuite
DeltaSQLConf.INCREMENTAL_COMMIT_VERIFY.key -> "true",
DeltaSQLConf.DELTA_CHECKSUM_MISMATCH_IS_FATAL.key -> "false",
DeltaSQLConf.INCREMENTAL_COMMIT_ENABLED.key -> "true",
DeltaSQLConf.DELTA_ALL_FILES_IN_CRC_ENABLED.key -> "false"
DeltaSQLConf.DELTA_ALL_FILES_IN_CRC_ENABLED.key -> "false",
DeltaSQLConf.USE_PROTOCOL_AND_METADATA_FROM_CHECKSUM_ENABLED.key -> "true"
) {
withTempDir { tempDir =>
import testImplicits._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class DeltaAllFilesInCrcSuite
.set(DeltaSQLConf.DELTA_ALL_FILES_IN_CRC_THRESHOLD_FILES.key, "10000")
// needed for DELTA_ALL_FILES_IN_CRC_ENABLED
.set(DeltaSQLConf.INCREMENTAL_COMMIT_ENABLED.key, "true")
.set(DeltaSQLConf.USE_PROTOCOL_AND_METADATA_FROM_CHECKSUM_ENABLED.key, "true")
// Turn on verification by default in the tests
.set(DeltaSQLConf.DELTA_ALL_FILES_IN_CRC_VERIFICATION_MODE_ENABLED.key, "true")
// Turn off force verification for non-UTC timezones by default in the tests
Expand Down Expand Up @@ -197,6 +198,7 @@ class DeltaAllFilesInCrcSuite

// We will see all files in CRC verification failure.
// This will trigger the incremental commit verification which will fail.
assert(filterUsageRecords(records, "delta.assertions.mismatchedAction").size === 1)
val allFilesInCrcValidationFailureRecords =
filterUsageRecords(records, "delta.allFilesInCrc.checksumMismatch.differentAllFiles")
assert(allFilesInCrcValidationFailureRecords.size === 1)
Expand All @@ -209,9 +211,8 @@ class DeltaAllFilesInCrcSuite
assert(eventData("filesCountFromStateReconstruction").toLong ===
expectedFilesCountFromCrc + 1)
assert(eventData("incrementalCommitCrcValidationPassed").toBoolean === false)
val expectedValidationFailureMessage = "Number of files - Expected: 1 Computed: 2"
assert(eventData("errorForIncrementalCommitCrcValidation").contains(
expectedValidationFailureMessage))
"The metadata of your Delta table could not be recovered"))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,9 @@ class DeltaLogSuite extends QueryTest
}
}

val checksumFilePath = FileNames.checksumFile(log.logPath, log.snapshot.version)
removeProtocolAndMetadataFromChecksumFile(checksumFilePath)

{
// Create an incomplete checkpoint without the action and overwrite the
// original checkpoint
Expand Down Expand Up @@ -791,6 +794,62 @@ class DeltaLogSuite extends QueryTest
}
}

test("checksum file should contain protocol and metadata") {
withSQLConf(
DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key -> "true",
DeltaSQLConf.USE_PROTOCOL_AND_METADATA_FROM_CHECKSUM_ENABLED.key -> "true"
) {
withTempDir { dir =>
val path = new Path("file://" + dir.getAbsolutePath)
val log = DeltaLog.forTable(spark, path)

val txn = log.startTransaction()
val files = (1 to 10).map(f => createTestAddFile(encodedPath = f.toString))
txn.commitManually(files: _*)
val metadata = log.snapshot.metadata
val protocol = log.snapshot.protocol
DeltaLog.clearCache()

val readLog = DeltaLog.forTable(spark, path)
val checksum = readLog.snapshot.checksumOpt.get
assert(checksum.metadata != null)
assert(checksum.protocol != null)
assert(checksum.metadata.equals(metadata))
assert(checksum.protocol.equals(protocol))
}
}
}

test("checksum reader should be able to read incomplete checksum file without " +
"protocol and metadata") {
withSQLConf(
DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key -> "true",
DeltaSQLConf.USE_PROTOCOL_AND_METADATA_FROM_CHECKSUM_ENABLED.key -> "true"
) {
withTempDir { dir =>
val path = new Path("file://" + dir.getAbsolutePath)
val log = DeltaLog.forTable(spark, path)

val txn = log.startTransaction()
val files = (1 to 10).map(f => createTestAddFile(encodedPath = f.toString))
txn.commitManually(files: _*)
val metadata = log.snapshot.metadata
val protocol = log.snapshot.protocol
DeltaLog.clearCache()
val checksumFilePath = FileNames.checksumFile(log.logPath, 0L)
removeProtocolAndMetadataFromChecksumFile(checksumFilePath)

val readLog = DeltaLog.forTable(spark, path)
val checksum = readLog.snapshot.checksumOpt.get
assert(checksum.metadata == null)
assert(checksum.protocol == null)

// check we are still able to read protocol and metadata from checkpoint
assert(readLog.snapshot.metadata.equals(metadata))
assert(readLog.snapshot.protocol.equals(protocol))
}
}
}
}

class CoordinatedCommitsBatchBackfill1DeltaLogSuite extends DeltaLogSuite {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@

package org.apache.spark.sql.delta

import java.io.File
import java.io.{BufferedReader, File, InputStreamReader}
import java.nio.charset.StandardCharsets.UTF_8
import java.util.Locale
import java.util.concurrent.ConcurrentHashMap

Expand All @@ -32,6 +33,8 @@ import org.apache.spark.sql.delta.commands.cdc.CDCReader
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.test.{DeltaSQLCommandTest, DeltaSQLTestUtils}
import org.apache.spark.sql.delta.util.FileNames
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import io.delta.tables.{DeltaTable => IODeltaTable}
import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.fs.Path
Expand Down Expand Up @@ -171,6 +174,29 @@ trait DeltaTestUtilsBase {
}
}

/**
* Remove protocol and metadata fields from checksum file of json format
*/
def removeProtocolAndMetadataFromChecksumFile(checksumFilePath : Path): Unit = {
// scalastyle:off deltahadoopconfiguration
val fs = checksumFilePath.getFileSystem(
SparkSession.getActiveSession.map(_.sessionState.newHadoopConf()).get
)
// scalastyle:on deltahadoopconfiguration
if (!fs.exists(checksumFilePath)) return
val stream = fs.open(checksumFilePath)
val reader = new BufferedReader(new InputStreamReader(stream, UTF_8))
val content = reader.readLine()
stream.close()
val mapper = new ObjectMapper()
mapper.registerModule(DefaultScalaModule)
val map = mapper.readValue(content, classOf[Map[String, String]])
val partialContent = mapper.writeValueAsString(map.-("protocol").-("metadata")) + "\n"
val output = fs.create(checksumFilePath, true)
output.write(partialContent.getBytes(UTF_8))
output.close()
}

protected def getfindTouchedFilesJobPlans(plans: Seq[Plans]): SparkPlan = {
// The expected plan for touched file computation is of the format below.
// The data column should be pruned from both leaves.
Expand Down
Loading
Loading