Skip to content

Commit

Permalink
Parallel calls
Browse files Browse the repository at this point in the history
  • Loading branch information
sumeet-db committed Dec 27, 2024
1 parent 82e940f commit b20ff20
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,9 @@ class DeltaHistoryManager(
start,
Some(end),
deltaLog.newDeltaHadoopConf())
if (commits.isEmpty) {
throw DeltaErrors.noHistoryFound(deltaLog.logPath)
}
lastCommitBeforeTimestamp(commits, time).getOrElse(commits.head)
}
}
Expand Down Expand Up @@ -710,12 +713,19 @@ object DeltaHistoryManager extends DeltaLogging {
startVersion,
Some(math.min(startVersion + step, end)),
conf.value)
lastCommitBeforeTimestamp(commits, time).getOrElse(commits.head)
if (commits.isEmpty) {
None
} else {
Some(lastCommitBeforeTimestamp(commits, time).getOrElse(commits.head))
}
}
}.collect()

// Spark should return the commits in increasing order as well
val commitList = monotonizeCommitTimestamps(possibleCommits)
val commitList = monotonizeCommitTimestamps(possibleCommits.flatten)
if (commitList.isEmpty) {
throw DeltaErrors.noHistoryFound(new Path(logPath))
}
lastCommitBeforeTimestamp(commitList, time).getOrElse(commitList.head)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,4 +116,9 @@ private[delta] trait DeltaEncoders {
implicit def fsPartitionSpecEncoder
: Encoder[(SerializableFileStatus, CatalogTypes.TablePartitionSpec)]
= _fsPartitionSpecEncoder.get

private lazy val _optionalHistoryCommitEncoder =
new DeltaEncoder[Option[DeltaHistoryManager.Commit]]
implicit def optionalHistoryCommitEncoder: Encoder[Option[DeltaHistoryManager.Commit]] =
_optionalHistoryCommitEncoder.get
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import scala.concurrent.duration._
import scala.language.implicitConversions

import com.databricks.spark.util.Log4jUsageLogger
import org.apache.spark.sql.delta.DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED
import org.apache.spark.sql.delta.DeltaHistoryManagerSuiteShims._
import org.apache.spark.sql.delta.DeltaTestUtils.createTestAddFile
import org.apache.spark.sql.delta.catalog.DeltaTableV2
Expand Down Expand Up @@ -660,6 +661,54 @@ abstract class DeltaHistoryManagerBase extends DeltaTimeTravelTests
testGetHistory(start = 2, endOpt = Some(1), versions = Seq.empty, expectedLogUpdates = 0)
}
}

test("getCommitFromNonICTRange should handle empty history by throwing proper error") {
val tblName = "delta_table"
withTable(tblName) {
val start = 1540415658000L
generateCommits(tblName, start)
val deltaLog = DeltaLog.forTable(spark, getTableLocation(tblName))

val deltaFile = new File(FileNames.unsafeDeltaFile(deltaLog.logPath, 0).toUri)
assert(deltaFile.delete(), "Failed to delete delta log file")

val e = intercept[DeltaAnalysisException] {
deltaLog.history.getCommitFromNonICTRange(0, 1, start)
}

assert(e.getMessage.contains("DELTA_NO_COMMITS_FOUND"))
assert(e.getMessage.contains(deltaLog.logPath.toString))
}
}

test("parallel search handles empty commits in a partition correctly") {
if (coordinatedCommitsBackfillBatchSize.isDefined) {
cancel("This test is not compatible with coordinated commits backfill timestamps.")
}
val tblName = "delta_table"
withTable(tblName) {
// Small threshold to trigger parallel search
withSQLConf(
DeltaSQLConf.DELTA_HISTORY_PAR_SEARCH_THRESHOLD.key -> "3",
IN_COMMIT_TIMESTAMPS_ENABLED.key -> "false") {
val start = 1540415658000L
// Generate 10 commits which will be processed in parallel due to threshold=3
val timestamps = (0 to 9).map(i => start + (i * 20).minutes)
generateCommits(tblName, timestamps: _*)
val table = DeltaTableV2(spark, TableIdentifier(tblName))
val deltaLog = table.deltaLog

// Delete all files in first partition to simulate concurrent metadata cleanup
val deltaFiles = (0 to 4).map { version =>
new File(FileNames.unsafeDeltaFile(deltaLog.logPath, version).toUri)
}
deltaFiles.foreach(f =>
assert(f.delete(), s"Failed to delete delta log file ${f.getPath}"))
assert(
deltaLog.history.getCommitFromNonICTRange(0, 9, start + (7 * 20).minutes).version == 7)
}
}
}
}

/** Uses V2 resolution code paths */
Expand Down

0 comments on commit b20ff20

Please sign in to comment.