Skip to content

Commit

Permalink
Managed Commit support for cold and hot snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
prakharjain09 committed Mar 15, 2024
1 parent 9a59c0a commit 344e558
Show file tree
Hide file tree
Showing 8 changed files with 582 additions and 75 deletions.
40 changes: 36 additions & 4 deletions spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import com.databricks.spark.util.TagDefinitions._
import org.apache.spark.sql.delta.actions._
import org.apache.spark.sql.delta.commands.WriteIntoDelta
import org.apache.spark.sql.delta.files.{TahoeBatchFileIndex, TahoeLogFileIndex}
import org.apache.spark.sql.delta.managedcommit.CommitStore
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.schema.{SchemaMergingUtils, SchemaUtils}
import org.apache.spark.sql.delta.sources._
Expand Down Expand Up @@ -75,6 +76,7 @@ class DeltaLog private(
val dataPath: Path,
val options: Map[String, String],
val allOptions: Map[String, String],
val initialCommitStoreOpt: Option[CommitStore],
val clock: Clock
) extends Checkpoints
with MetadataCleanup
Expand Down Expand Up @@ -691,7 +693,12 @@ object DeltaLog extends DeltaLogging {

/** Helper for creating a log when it stored at the root of the data. */
def forTable(spark: SparkSession, dataPath: String): DeltaLog = {
apply(spark, logPathFor(dataPath), Map.empty, new SystemClock)
apply(
spark,
logPathFor(dataPath),
options = Map.empty,
initialCommitStoreOpt = None,
clock = new SystemClock)
}

/** Helper for creating a log when it stored at the root of the data. */
Expand All @@ -701,7 +708,8 @@ object DeltaLog extends DeltaLogging {

/** Helper for creating a log when it stored at the root of the data. */
def forTable(spark: SparkSession, dataPath: Path, options: Map[String, String]): DeltaLog = {
apply(spark, logPathFor(dataPath), options, new SystemClock)
apply(
spark, logPathFor(dataPath), options, initialCommitStoreOpt = None, clock = new SystemClock)
}

/** Helper for creating a log when it stored at the root of the data. */
Expand Down Expand Up @@ -733,8 +741,23 @@ object DeltaLog extends DeltaLogging {
apply(spark, logPathFor(new Path(table.location)), clock)
}

def forTable(
spark: SparkSession,
dataPath: Path,
options: Map[String, String],
initialCommitStoreOpt: Option[CommitStore],
tableIdentifier: Option[TableIdentifier]): DeltaLog = {
val result = apply(
spark,
logPathFor(dataPath),
options,
initialCommitStoreOpt,
new SystemClock)
result
}

private def apply(spark: SparkSession, rawPath: Path, clock: Clock = new SystemClock): DeltaLog =
apply(spark, rawPath, Map.empty, clock)
apply(spark, rawPath, options = Map.empty, initialCommitStoreOpt = None, clock = clock)


/** Helper for getting a log, as well as the latest snapshot, of the table */
Expand All @@ -756,7 +779,14 @@ object DeltaLog extends DeltaLogging {
spark: SparkSession,
dataPath: Path,
options: Map[String, String]): (DeltaLog, Snapshot) =
withFreshSnapshot { apply(spark, logPathFor(dataPath), options, _) }
withFreshSnapshot { clock =>
apply(
spark,
logPathFor(dataPath),
options,
initialCommitStoreOpt = None,
clock = clock)
}

/**
* Helper function to be used with the forTableWithSnapshot calls. Thunk is a
Expand All @@ -775,6 +805,7 @@ object DeltaLog extends DeltaLogging {
spark: SparkSession,
rawPath: Path,
options: Map[String, String],
initialCommitStoreOpt: Option[CommitStore],
clock: Clock
): DeltaLog = {
val fileSystemOptions: Map[String, String] =
Expand Down Expand Up @@ -803,6 +834,7 @@ object DeltaLog extends DeltaLogging {
dataPath = path.getParent,
options = fileSystemOptions,
allOptions = options,
initialCommitStoreOpt = initialCommitStoreOpt,
clock = clock
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,15 @@ trait SnapshotManagement { self: DeltaLog =>

@volatile private[delta] var asyncUpdateTask: Future[Unit] = _

/** Use ReentrantLock to allow us to call `lockInterruptibly` */
protected val snapshotLock = new ReentrantLock()

/**
* Cached fileStatus for the latest CRC file seen in the deltaLog.
*/
@volatile protected var lastSeenChecksumFileStatusOpt: Option[FileStatus] = None
@volatile protected var currentSnapshot: CapturedSnapshot = getSnapshotAtInit

/** Use ReentrantLock to allow us to call `lockInterruptibly` */
protected val snapshotLock = new ReentrantLock()

/**
* Run `body` inside `snapshotLock` lock using `lockInterruptibly` so that the thread
* can be interrupted when waiting for the lock.
Expand All @@ -86,12 +86,16 @@ trait SnapshotManagement { self: DeltaLog =>
* initialization, or None if the directory was empty/missing.
*
* @param startingCheckpoint A checkpoint that we can start our listing from
* @param commitStoreOpt The initial commit store to use for getting the list of un-backfilled
* commits
*/
protected def getLogSegmentFrom(
startingCheckpoint: Option[LastCheckpointInfo]): Option[LogSegment] = {
startingCheckpoint: Option[LastCheckpointInfo],
commitStoreOpt: Option[CommitStore]): Option[LogSegment] = {
getLogSegmentForVersion(
versionToLoad = None,
lastCheckpointInfo = startingCheckpoint
lastCheckpointInfo = startingCheckpoint,
commitStoreOpt = commitStoreOpt
)
}

Expand Down Expand Up @@ -498,30 +502,17 @@ trait SnapshotManagement { self: DeltaLog =>
* file as a hint on where to start listing the transaction log directory. If the _delta_log
* directory doesn't exist, this method will return an `InitialSnapshot`.
*/
protected def getSnapshotAtInit: CapturedSnapshot = {
protected def getSnapshotAtInit: CapturedSnapshot = withSnapshotLockInterruptibly {
recordFrameProfile("Delta", "SnapshotManagement.getSnapshotAtInit") {
val currentTimestamp = clock.getTimeMillis()
val snapshotInitWallclockTime = clock.getTimeMillis()
val lastCheckpointOpt = readLastCheckpointFile()
createSnapshotAtInitInternal(
initSegment = getLogSegmentFrom(lastCheckpointOpt),
timestamp = currentTimestamp
)
}
}

protected def createSnapshotAtInitInternal(
initSegment: Option[LogSegment],
timestamp: Long): CapturedSnapshot = {
val snapshot = initSegment.map { segment =>
val snapshot = createSnapshot(
initSegment = segment,
checksumOpt = None)
snapshot
}.getOrElse {
logInfo(s"Creating initial snapshot without metadata, because the directory is empty")
new InitialSnapshot(logPath, this)
val snapshot = getUpdatedSnapshot(
oldSnapshotOpt = None,
initialSegmentForNewSnapshot = getLogSegmentFrom(lastCheckpointOpt, initialCommitStoreOpt),
initialCommitStore = initialCommitStoreOpt,
isAsync = false)
CapturedSnapshot(snapshot, snapshotInitWallclockTime)
}
CapturedSnapshot(snapshot, timestamp)
}

/**
Expand Down Expand Up @@ -907,48 +898,102 @@ trait SnapshotManagement { self: DeltaLog =>
*/
protected def updateInternal(isAsync: Boolean): Snapshot =
recordDeltaOperation(this, "delta.log.update", Map(TAG_ASYNC -> isAsync.toString)) {
val updateTimestamp = clock.getTimeMillis()
val updateStartTimeMs = clock.getTimeMillis()
val previousSnapshot = currentSnapshot.snapshot
val segmentOpt = getLogSegmentForVersion(
oldCheckpointProviderOpt = Some(previousSnapshot.checkpointProvider),
commitStoreOpt = previousSnapshot.commitStoreOpt)
installLogSegmentInternal(previousSnapshot, segmentOpt, updateTimestamp, isAsync)
val segmentOpt = inferNextLogSegmentFromPreviousSnapshot(previousSnapshot)
val newSnapshot = getUpdatedSnapshot(
oldSnapshotOpt = Some(previousSnapshot),
initialSegmentForNewSnapshot = segmentOpt,
initialCommitStore = previousSnapshot.commitStoreOpt,
isAsync = isAsync)
installSnapshot(newSnapshot, updateStartTimeMs)
}

/** Install the provided segmentOpt as the currentSnapshot on the cluster */
protected def installLogSegmentInternal(
previousSnapshot: Snapshot,
/**
* Updates and installs a new snapshot in the `currentSnapshot`.
* This method takes care of recursively creating new snapshots if the commit store has changed.
* @param oldSnapshotOpt The previous snapshot, if any.
* @param initialSegmentForNewSnapshot the log segment constructed for the new snapshot
* @param initialCommitStore the Commit Store used for constructing the
* `initialSegmentForNewSnapshot`
* @param isAsync Whether the update is async.
* @return The new snapshot.
*/
protected def getUpdatedSnapshot(
oldSnapshotOpt: Option[Snapshot],
initialSegmentForNewSnapshot: Option[LogSegment],
initialCommitStore: Option[CommitStore],
isAsync: Boolean): Snapshot = {
var commitStoreUsed = initialCommitStore
var newSnapshot = getSnapshotForLogSegmentInternal(
oldSnapshotOpt,
initialSegmentForNewSnapshot,
isAsync
)
// If the commit store has changed, we need to recursively invoke updateSnapshot so that we
// could get the latest commits from the new commit store.
while (newSnapshot.version >= 0 && newSnapshot.commitStoreOpt != commitStoreUsed) {
commitStoreUsed = newSnapshot.commitStoreOpt
val segmentOpt = inferNextLogSegmentFromPreviousSnapshot(newSnapshot)
newSnapshot = getSnapshotForLogSegmentInternal(Some(newSnapshot), segmentOpt, isAsync)
}
newSnapshot
}

private def inferNextLogSegmentFromPreviousSnapshot(
previousSnapshot: Snapshot): Option[LogSegment] = {
getLogSegmentForVersion(
oldCheckpointProviderOpt = Some(previousSnapshot.checkpointProvider),
commitStoreOpt = previousSnapshot.commitStoreOpt)
}

/** Creates a Snapshot for the given `segmentOpt` */
protected def getSnapshotForLogSegmentInternal(
previousSnapshotOpt: Option[Snapshot],
segmentOpt: Option[LogSegment],
updateTimestamp: Long,
isAsync: Boolean): Snapshot = {
segmentOpt.map { segment =>
if (segment == previousSnapshot.logSegment) {
if (previousSnapshotOpt.exists(_.logSegment == segment)) {
// If no changes were detected, just refresh the timestamp
val timestampToUse = math.max(updateTimestamp, currentSnapshot.updateTimestamp)
currentSnapshot = currentSnapshot.copy(updateTimestamp = timestampToUse)
previousSnapshotOpt.get
} else {
val newSnapshot = createSnapshot(
initSegment = segment,
checksumOpt = None)
logMetadataTableIdChange(previousSnapshot, newSnapshot)
previousSnapshotOpt.foreach(logMetadataTableIdChange(_, newSnapshot))
logInfo(s"Updated snapshot to $newSnapshot")
replaceSnapshot(newSnapshot, updateTimestamp)
newSnapshot
}
}.getOrElse {
logInfo(s"No delta log found for the Delta table at $logPath")
replaceSnapshot(new InitialSnapshot(logPath, this), updateTimestamp)
logInfo(s"Creating initial snapshot without metadata, because the directory is empty")
new InitialSnapshot(logPath, this)
}
currentSnapshot.snapshot
}

/** Replace the given snapshot with the provided one. */
protected def replaceSnapshot(newSnapshot: Snapshot, updateTimestamp: Long): Unit = {
protected def installSnapshot(newSnapshot: Snapshot, updateTimestamp: Long): Snapshot = {
if (!snapshotLock.isHeldByCurrentThread) {
if (Utils.isTesting) {
throw new RuntimeException("DeltaLog snapshot replaced without taking lock")
}
recordDeltaEvent(this, "delta.update.unsafeReplace")
}
val oldSnapshot = currentSnapshot.snapshot
currentSnapshot = CapturedSnapshot(newSnapshot, updateTimestamp)
oldSnapshot.uncache()
if (currentSnapshot == null) {
// cold snapshot initialization
currentSnapshot = CapturedSnapshot(newSnapshot, updateTimestamp)
return newSnapshot
}
val CapturedSnapshot(oldSnapshot, oldTimestamp) = currentSnapshot
if (oldSnapshot == newSnapshot) {
// Same snapshot as before, so just refresh the timestamp
val timestampToUse = math.max(updateTimestamp, oldTimestamp)
currentSnapshot = CapturedSnapshot(newSnapshot, timestampToUse)
} else {
// Install the new snapshot and uncache the old one
currentSnapshot = CapturedSnapshot(newSnapshot, updateTimestamp)
oldSnapshot.uncache()
}
newSnapshot
}

/** Log a change in the metadata's table id whenever we install a newer version of a snapshot */
Expand Down Expand Up @@ -1022,8 +1067,7 @@ trait SnapshotManagement { self: DeltaLog =>
committedVersion)
logMetadataTableIdChange(previousSnapshot, newSnapshot)
logInfo(s"Updated snapshot to $newSnapshot")
replaceSnapshot(newSnapshot, updateTimestamp)
currentSnapshot.snapshot
installSnapshot(newSnapshot, updateTimestamp)
}
}

Expand Down Expand Up @@ -1085,6 +1129,9 @@ trait SnapshotManagement { self: DeltaLog =>
throw DeltaErrors.emptyDirectoryException(logPath.toString)
}
}

// Visible for testing
private[delta] def getCapturedSnapshot(): CapturedSnapshot = currentSnapshot
}

object SnapshotManagement {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ trait AbstractBatchBackfillingCommitStore extends CommitStore with Logging {
* Commit a given `commitFile` to the table represented by given `logPath` at the
* given `commitVersion`
*/
protected def commitImpl(
private[delta] def commitImpl(
logStore: LogStore,
hadoopConf: Configuration,
logPath: Path,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class InMemoryCommitStore(val batchSize: Long) extends AbstractBatchBackfillingC
* @throws CommitFailedException if the commit version is not the expected next version,
* indicating a version conflict.
*/
protected def commitImpl(
private[delta] def commitImpl(
logStore: LogStore,
hadoopConf: Configuration,
logPath: Path,
Expand Down Expand Up @@ -126,6 +126,16 @@ class InMemoryCommitStore(val batchSize: Long) extends AbstractBatchBackfillingC
versionsToRemove.foreach(tableData.commitsMap.remove)
}
}

def registerTable(
logPath: Path,
maxCommitVersion: Long): Unit = {
val newPerTableData = new PerTableData()
newPerTableData.maxCommitVersion = maxCommitVersion
if (perTableMap.putIfAbsent(logPath, newPerTableData) != null) {
throw new IllegalStateException(s"Table $logPath already exists in the commit store.")
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -936,13 +936,6 @@ class CheckpointsSuite
assert(filterUsageRecords(usageRecords2, "delta.log.cleanup").size > 0)
}
}

protected def filterUsageRecords(
usageRecords: Seq[UsageRecord], opType: String): Seq[UsageRecord] = {
usageRecords.filter { r =>
r.tags.get("opType").contains(opType) || r.opType.map(_.typeName).contains(opType)
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.collection.concurrent
import scala.reflect.ClassTag
import scala.util.matching.Regex

import com.databricks.spark.util.UsageRecord
import org.apache.spark.sql.delta.DeltaTestUtils.Plans
import org.apache.spark.sql.delta.actions._
import org.apache.spark.sql.delta.commands.cdc.CDCReader
Expand Down Expand Up @@ -155,6 +156,13 @@ trait DeltaTestUtilsBase {
jobs.values.count(_ > 0)
}

/** Filter `usageRecords` by the `opType` tag or field. */
def filterUsageRecords(usageRecords: Seq[UsageRecord], opType: String): Seq[UsageRecord] = {
usageRecords.filter { r =>
r.tags.get("opType").contains(opType) || r.opType.map(_.typeName).contains(opType)
}
}

protected def getfindTouchedFilesJobPlans(plans: Seq[Plans]): SparkPlan = {
// The expected plan for touched file computation is of the format below.
// The data column should be pruned from both leaves.
Expand Down
Loading

0 comments on commit 344e558

Please sign in to comment.