Skip to content

Commit

Permalink
Parallel calls
Browse files Browse the repository at this point in the history
  • Loading branch information
sumeet-db committed Oct 11, 2024
1 parent 09fb300 commit 29b48a2
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -359,17 +359,19 @@ public CommitResponse commit(
"Commit version 0 must go via filesystem.");
}
try {
FileSystem fs = logPath.getFileSystem(hadoopConf);
Path commitPath =
CoordinatedCommitsUtils.generateUnbackfilledDeltaFilePath(logPath, commitVersion);
logStore.write(commitPath, actions, true /* overwrite */, hadoopConf);
FileStatus commitFileStatus = fs.getFileStatus(commitPath);
FileStatus commitFileStatus = CoordinatedCommitsUtils.writeUnbackfilledCommitFile(
logStore,
hadoopConf,
logPath.toString(),
commitVersion,
actions,
UUID.randomUUID().toString());
long inCommitTimestamp = updatedActions.getCommitInfo().getCommitTimestamp();
boolean isCCtoFSConversion =
CoordinatedCommitsUtils.isCoordinatedCommitsToFSConversion(commitVersion, updatedActions);

LOG.info("Committing version {} with UUID delta file {} to DynamoDB.",
commitVersion, commitPath);
commitVersion, commitFileStatus.getPath());
CommitResponse res = commitToCoordinator(
logPath,
tableDesc.getTableConf(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.delta.actions.Metadata
import org.apache.spark.sql.delta.logging.DeltaLogKeys
import org.apache.spark.sql.delta.util.FileNames
import io.delta.storage.LogStore
import io.delta.storage.commit.{CommitCoordinatorClient, CommitFailedException => JCommitFailedException, CommitResponse, TableDescriptor, TableIdentifier, UpdatedActions}
import io.delta.storage.commit.{CommitCoordinatorClient, CommitFailedException => JCommitFailedException, CommitResponse, CoordinatedCommitsUtils => JCoordinatedCommitsUtils, TableDescriptor, TableIdentifier, UpdatedActions}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}

Expand Down Expand Up @@ -70,7 +70,7 @@ trait AbstractBatchBackfillingCommitCoordinatorClient
updatedActions: UpdatedActions): CommitResponse = {
val logPath = tableDesc.getLogPath
val executionObserver = TransactionExecutionObserver.getObserver
val tablePath = CoordinatedCommitsUtils.getTablePath(logPath)
val tablePath = JCoordinatedCommitsUtils.getTablePath(logPath)
if (commitVersion == 0) {
throw new JCommitFailedException(false, false, "Commit version 0 must go via filesystem.")
}
Expand All @@ -92,8 +92,8 @@ trait AbstractBatchBackfillingCommitCoordinatorClient
}

// Write new commit file in _commits directory
val fileStatus = CoordinatedCommitsUtils.writeCommitFile(
logStore, hadoopConf, logPath, commitVersion, actions.asScala, generateUUID())
val fileStatus = JCoordinatedCommitsUtils.writeUnbackfilledCommitFile(
logStore, hadoopConf, logPath.toString, commitVersion, actions, generateUUID())

// Do the actual commit
val commitTimestamp = updatedActions.getCommitInfo.getCommitTimestamp
Expand Down Expand Up @@ -132,9 +132,9 @@ trait AbstractBatchBackfillingCommitCoordinatorClient
commitVersion: Long,
updatedActions: UpdatedActions): Boolean = {
val oldMetadataHasCoordinatedCommits =
CoordinatedCommitsUtils.getCommitCoordinatorName(updatedActions.getOldMetadata).nonEmpty
JCoordinatedCommitsUtils.getCoordinatorName(updatedActions.getOldMetadata).isPresent
val newMetadataHasCoordinatedCommits =
CoordinatedCommitsUtils.getCommitCoordinatorName(updatedActions.getNewMetadata).nonEmpty
JCoordinatedCommitsUtils.getCoordinatorName(updatedActions.getNewMetadata).isPresent
oldMetadataHasCoordinatedCommits && !newMetadataHasCoordinatedCommits && commitVersion > 0
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,27 +149,6 @@ object CoordinatedCommitsUtils extends DeltaLogging {
}
}

/**
* Write a UUID-based commit file for the specified version to the
* table at [[logPath]].
*/
def writeCommitFile(
logStore: LogStore,
hadoopConf: Configuration,
logPath: Path,
commitVersion: Long,
actions: Iterator[String],
uuid: String): FileStatus = {
val commitPath = FileNames.unbackfilledDeltaFile(logPath, commitVersion, Some(uuid))
logStore.write(commitPath, actions.asJava, true, hadoopConf)
commitPath.getFileSystem(hadoopConf).getFileStatus(commitPath)
}

/**
* Get the table path from the provided log path.
*/
def getTablePath(logPath: Path): Path = logPath.getParent

def getCommitCoordinatorClient(
spark: SparkSession,
deltaLog: DeltaLog, // Used for logging
Expand Down Expand Up @@ -236,44 +215,6 @@ object CoordinatedCommitsUtils extends DeltaLogging {
}
}

/**
* Helper method to recover the saved value of `deltaConfig` from `abstractMetadata`.
* If undefined, fall back to alternate keys, returning defaultValue if none match.
*/
private[delta] def fromAbstractMetadataAndDeltaConfig[T](
abstractMetadata: AbstractMetadata,
deltaConfig: DeltaConfig[T]): T = {
val conf = abstractMetadata.getConfiguration
for (key <- deltaConfig.key +: deltaConfig.alternateKeys) {
Option(conf.get(key)).map { value => return deltaConfig.fromString(value) }
}
deltaConfig.fromString(deltaConfig.defaultValue)
}

/**
* Get the commit coordinator name from the provided abstract metadata.
*/
def getCommitCoordinatorName(abstractMetadata: AbstractMetadata): Option[String] = {
fromAbstractMetadataAndDeltaConfig(
abstractMetadata, DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_NAME)
}

/**
* Get the commit coordinator configuration from the provided abstract metadata.
*/
def getCommitCoordinatorConf(abstractMetadata: AbstractMetadata): Map[String, String] = {
fromAbstractMetadataAndDeltaConfig(
abstractMetadata, DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_CONF)
}

/**
* Get the coordinated commits table configuration from the provided abstract metadata.
*/
def getCoordinatedCommitsTableConf(abstractMetadata: AbstractMetadata): Map[String, String] = {
fromAbstractMetadataAndDeltaConfig(
abstractMetadata, DeltaConfigs.COORDINATED_COMMITS_TABLE_CONF)
}

val TABLE_PROPERTY_CONFS = Seq(
DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_NAME,
DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_CONF,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import org.apache.spark.sql.delta.test.DeltaTestImplicits._
import org.apache.spark.sql.delta.util.{FileNames, JsonUtils}
import org.apache.spark.sql.delta.util.FileNames.{CompactedDeltaFile, DeltaFile, UnbackfilledDeltaFile}
import io.delta.storage.LogStore
import io.delta.storage.commit.{CommitCoordinatorClient, CommitResponse, GetCommitsResponse => JGetCommitsResponse, TableDescriptor, TableIdentifier, UpdatedActions}
import io.delta.storage.commit.{CommitCoordinatorClient, CommitResponse, CoordinatedCommitsUtils => JCoordinatedCommitsUtils, GetCommitsResponse => JGetCommitsResponse, TableDescriptor, TableIdentifier, UpdatedActions}
import io.delta.storage.commit.actions.{AbstractMetadata, AbstractProtocol}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
Expand Down Expand Up @@ -79,27 +79,23 @@ class CoordinatedCommitsSuite
val m1 = Metadata(
configuration = Map(COORDINATED_COMMITS_COORDINATOR_NAME.key -> "string_value")
)
assert(CoordinatedCommitsUtils.fromAbstractMetadataAndDeltaConfig(
m1, COORDINATED_COMMITS_COORDINATOR_NAME) === Some("string_value"))
assert(JCoordinatedCommitsUtils.getCoordinatorName(m1) === Optional.of("string_value"))

val m2 = Metadata(
configuration = Map(COORDINATED_COMMITS_COORDINATOR_NAME.key -> "")
)
assert(CoordinatedCommitsUtils.fromAbstractMetadataAndDeltaConfig(
m2, COORDINATED_COMMITS_COORDINATOR_NAME) === Some(""))
assert(JCoordinatedCommitsUtils.getCoordinatorName(m2)=== Optional.of(""))

val m3 = Metadata(
configuration = Map(
COORDINATED_COMMITS_COORDINATOR_CONF.key ->
"""{"key1": "string_value", "key2Int": 2, "key3ComplexStr": "\"hello\""}""")
)
assert(CoordinatedCommitsUtils.fromAbstractMetadataAndDeltaConfig(
m3, COORDINATED_COMMITS_COORDINATOR_CONF) ===
Map("key1" -> "string_value", "key2Int" -> "2", "key3ComplexStr" -> "\"hello\""))
assert(JCoordinatedCommitsUtils.getCoordinatorConf(m3) ===
Map("key1" -> "string_value", "key2Int" -> "2", "key3ComplexStr" -> "\"hello\"").asJava)

val m4 = Metadata()
assert(CoordinatedCommitsUtils.fromAbstractMetadataAndDeltaConfig(
m4, COORDINATED_COMMITS_TABLE_CONF) === Map.empty)
assert(JCoordinatedCommitsUtils.getCoordinatorConf(m4) === Map.empty.asJava)
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public static Path getUnbackfilledDeltaFile(
/**
* Write a UUID-based commit file for the specified version to the table at logPath.
*/
public static FileStatus writeCommitFile(
public static FileStatus writeUnbackfilledCommitFile(
LogStore logStore,
Configuration hadoopConf,
String logPath,
Expand All @@ -107,11 +107,10 @@ public static FileStatus writeCommitFile(
String uuid) throws IOException {
Path commitPath = new Path(getUnbackfilledDeltaFile(
new Path(logPath), commitVersion, Optional.of(uuid)).toString());
FileSystem fs = commitPath.getFileSystem(hadoopConf);
if (!fs.exists(commitPath.getParent())) {
fs.mkdirs(commitPath.getParent());
}
logStore.write(commitPath, actions, false, hadoopConf);
// Do not use Put-If-Absent for Unbackfilled Commits files since we assume that UUID-based
// commit files are globally unique, and so we will never have concurrent writers attempting
// to write the same commit file.
logStore.write(commitPath, actions, true /* overwrite */, hadoopConf);
return commitPath.getFileSystem(hadoopConf).getFileStatus(commitPath);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ class InMemoryCommitCoordinator(val batchSize: Long) extends CommitCoordinatorCl
backfillToVersion(logStore, hadoopConf, tableDesc, commitVersion - 1, null)
}
// Write new commit file in _commits directory
val fileStatus = CoordinatedCommitsUtils.writeCommitFile(
val fileStatus = CoordinatedCommitsUtils.writeUnbackfilledCommitFile(
logStore, hadoopConf, logPath.toString, commitVersion, actions, generateUUID())
// Do the actual commit
val commitTimestamp = updatedActions.getCommitInfo.getCommitTimestamp
Expand Down

0 comments on commit 29b48a2

Please sign in to comment.