Skip to content

Commit

Permalink
Downgrade to the minimum supported protocol when a table is using onl…
Browse files Browse the repository at this point in the history
…y legacy features
  • Loading branch information
zsxwing committed Mar 13, 2024
1 parent 60914cd commit 34caa80
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -264,11 +264,12 @@ trait TableFeatureSupport { this: Protocol =>
// remove a legacy feature in a table that only contains legacy features.
if (to.readerAndWriterFeatureNames.isEmpty) {
val featureNames = readerAndWriterFeatureNames - droppedFeatureName
val sameLegacyFeaturesSupported = featureNames == to.implicitlySupportedFeatures.map(_.name)
val toFeatureNames = to.implicitlySupportedFeatures.map(_.name)
val legacyFeaturesSupportedByDowngradedPotocol = featureNames.forall(toFeatureNames.contains)
val minRequiredVersions = TableFeatureProtocolUtils.minimumRequiredVersions(
featureNames.flatMap(TableFeature.featureNameToFeature).toSeq)

return sameLegacyFeaturesSupported &&
return legacyFeaturesSupportedByDowngradedPotocol &&
(to.minReaderVersion, to.minWriterVersion) == minRequiredVersions &&
readerAndWriterFeatures.filterNot(_.isLegacyFeature).size <= 1
}
Expand Down Expand Up @@ -378,9 +379,9 @@ trait TableFeatureSupport { this: Protocol =>
!newProtocol.supportsReaderFeatures && !newProtocol.supportsWriterFeatures,
s"Downgraded protocol should not support table features, but got $newProtocol.")

// Ensure the legacy protocol supports features exactly as the current protocol.
if (this.implicitlyAndExplicitlySupportedFeatures ==
newProtocol.implicitlyAndExplicitlySupportedFeatures) {
// Ensure the legacy protocol supports a superset of features of the current protocol.
if (this.implicitlyAndExplicitlySupportedFeatures.forall(
newProtocol.implicitlyAndExplicitlySupportedFeatures.contains)) {
newProtocol
} else {
this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3438,6 +3438,15 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest
expectedDowngradedProtocol = protocolWithReaderFeature(TestRemovableReaderWriterFeature))
}

test("Downgrade protocol version (1, 4) for CDF tables") {
testProtocolVersionDowngrade(
initialMinReaderVersion = 3,
initialMinWriterVersion = 7,
featuresToAdd = Seq(TestRemovableWriterFeature, ChangeDataFeedTableFeature),
featuresToRemove = Seq(TestRemovableWriterFeature),
expectedDowngradedProtocol = Protocol(1, 4))
}

private def dropV2CheckpointsTableFeature(spark: SparkSession, log: DeltaLog): Unit = {
spark.sql(s"ALTER TABLE delta.`${log.dataPath}` DROP FEATURE " +
s"`${V2CheckpointTableFeature.name}`")
Expand Down

0 comments on commit 34caa80

Please sign in to comment.