Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark] Drop feature support in DeltaTable Scala/Python APIs #3952

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 37 additions & 3 deletions python/delta/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -594,14 +594,43 @@ def addFeatureSupport(self, featureName: str) -> None:
DeltaTable._verify_type_str(featureName, "featureName")
self._jdt.addFeatureSupport(featureName)

@since(3.4) # type: ignore[arg-type]
def dropFeatureSupport(self, featureName: str, truncateHistory: Optional[bool] = None) -> None:
"""
Modify the protocol to drop a supported feature. The operation always normalizes the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Modify the protocol to drop a supported feature. The operation always normalizes the
Modify the protocol to drop an existing supported feature. The operation always normalizes the

Nit: Similar to the alterDeltaTableCommand's description.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could not find where this is mentioned in alterDeltaTableCommand but "existing" is redundant in that case since we are already using "supported."

resulting protocol. Protocol normalization is the process of converting a table features
andreaschat-db marked this conversation as resolved.
Show resolved Hide resolved
protocol to the weakest possible form. This primarily refers to converting a table features
andreaschat-db marked this conversation as resolved.
Show resolved Hide resolved
protocol to a legacy protocol. A table features protocol can be represented with the legacy
representation only when the feature set of the former exactly matches a legacy protocol.
Normalization can also decrease the reader version of a table features protocol when it is
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we make the Scala text description and Python text description identical?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was my initial intention but I diverted later on :D.

higher than necessary. For example:

(1, 7, None, {AppendOnly, Invariants, CheckConstraints}) -> (1, 3)
(3, 7, None, {RowTracking}) -> (1, 7, RowTracking)

The dropFeatureSupport method can be used as follows:
delta.tables.DeltaTable.dropFeatureSupport("rowTracking")

:param featureName: The name of the feature to drop.
:param truncateHistory: Optional value whether to truncate history. If not specified,
the history is not truncated.
:return: None.
"""
DeltaTable._verify_type_str(featureName, "featureName")
if truncateHistory is None:
self._jdt.dropFeatureSupport(featureName)
else:
DeltaTable._verify_type_bool(truncateHistory, "truncateHistory")
self._jdt.dropFeatureSupport(featureName, truncateHistory)

@since(1.2) # type: ignore[arg-type]
def restoreToVersion(self, version: int) -> DataFrame:
"""
Restore the DeltaTable to an older version of the table specified by version number.

Example::

io.delta.tables.DeltaTable.restoreToVersion(1)
delta.tables.DeltaTable.restoreToVersion(1)

:param version: target version of restored table
:return: Dataframe with metrics of restore operation.
Expand All @@ -622,8 +651,8 @@ def restoreToTimestamp(self, timestamp: str) -> DataFrame:

Example::

io.delta.tables.DeltaTable.restoreToTimestamp('2021-01-01')
io.delta.tables.DeltaTable.restoreToTimestamp('2021-01-01 01:01:01')
delta.tables.DeltaTable.restoreToTimestamp('2021-01-01')
delta.tables.DeltaTable.restoreToTimestamp('2021-01-01 01:01:01')

:param timestamp: target timestamp of restored table
:return: Dataframe with metrics of restore operation.
Expand Down Expand Up @@ -658,6 +687,11 @@ def optimize(self) -> "DeltaOptimizeBuilder":
jbuilder = self._jdt.optimize()
return DeltaOptimizeBuilder(self._spark, jbuilder)

@classmethod # type: ignore[arg-type]
def _verify_type_bool(self, variable: bool, name: str) -> None:
if not isinstance(variable, bool) or variable is None:
raise ValueError("%s needs to be a boolean but got '%s'." % (name, type(variable)))

@staticmethod # type: ignore[arg-type]
def _verify_type_str(variable: str, name: str) -> None:
if not isinstance(variable, str) or variable is None:
Expand Down
85 changes: 72 additions & 13 deletions python/delta/tests/test_deltatable.py
Original file line number Diff line number Diff line change
Expand Up @@ -1151,16 +1151,19 @@ def test_delta_table_builder_with_bad_args(self) -> None:
with self.assertRaises(TypeError):
builder.property("1", 1) # type: ignore[arg-type]

def test_protocolUpgrade(self) -> None:
def __create_df_for_feature_tests(self) -> DeltaTable:
try:
self.spark.conf.set('spark.databricks.delta.minWriterVersion', '2')
self.spark.conf.set('spark.databricks.delta.minReaderVersion', '1')
self.spark.conf.set('spark.databricks.delta.minWriterVersion', '2')
self.__writeDeltaTable([('a', 1), ('b', 2), ('c', 3), ('d', 4)])
dt = DeltaTable.forPath(self.spark, self.tempFile)
dt.upgradeTableProtocol(1, 3)
return DeltaTable.forPath(self.spark, self.tempFile)
finally:
self.spark.conf.unset('spark.databricks.delta.minWriterVersion')
self.spark.conf.unset('spark.databricks.delta.minReaderVersion')
self.spark.conf.unset('spark.databricks.delta.minWriterVersion')

def test_protocolUpgrade(self) -> None:
dt = self.__create_df_for_feature_tests()
dt.upgradeTableProtocol(1, 3)

# cannot downgrade once upgraded
dt.upgradeTableProtocol(1, 2)
Expand Down Expand Up @@ -1189,14 +1192,7 @@ def test_protocolUpgrade(self) -> None:
dt.upgradeTableProtocol(1, {}) # type: ignore[arg-type]

def test_addFeatureSupport(self) -> None:
try:
self.spark.conf.set('spark.databricks.delta.minReaderVersion', '1')
self.spark.conf.set('spark.databricks.delta.minWriterVersion', '2')
self.__writeDeltaTable([('a', 1), ('b', 2), ('c', 3), ('d', 4)])
dt = DeltaTable.forPath(self.spark, self.tempFile)
finally:
self.spark.conf.unset('spark.databricks.delta.minReaderVersion')
self.spark.conf.unset('spark.databricks.delta.minWriterVersion')
dt = self.__create_df_for_feature_tests()

# bad args
with self.assertRaisesRegex(Py4JJavaError, "DELTA_UNSUPPORTED_FEATURES_IN_CONFIG"):
Expand Down Expand Up @@ -1224,6 +1220,69 @@ def test_addFeatureSupport(self) -> None:
self.assertEqual(sorted(dt_details["tableFeatures"]),
["appendOnly", "deletionVectors", "invariants"])

def test_dropFeatureSupport(self) -> None:
dt = self.__create_df_for_feature_tests()

dt.addFeatureSupport("testRemovableWriter")
dt_details = dt.detail().collect()[0].asDict()
self.assertTrue(dt_details["minReaderVersion"] == 1)
self.assertTrue(dt_details["minWriterVersion"] == 7, "Should upgrade to table features")
self.assertEqual(sorted(dt_details["tableFeatures"]),
["appendOnly", "invariants", "testRemovableWriter"])

# Attempt truncating the history when dropping a feature that is not required.
# This verifies the truncateHistory option was correctly passed.
with self.assertRaisesRegex(Exception,
"DELTA_FEATURE_DROP_HISTORY_TRUNCATION_NOT_ALLOWED"):
dt.dropFeatureSupport("testRemovableWriter", True)

dt.dropFeatureSupport("testRemovableWriter")
dt_details = dt.detail().collect()[0].asDict()
self.assertTrue(dt_details["minReaderVersion"] == 1)
self.assertTrue(dt_details["minWriterVersion"] == 2, "Should return to legacy protocol")

dt.addFeatureSupport("testRemovableReaderWriter")
dt_details = dt.detail().collect()[0].asDict()
self.assertTrue(dt_details["minReaderVersion"] == 3, "Should upgrade to table features")
self.assertTrue(dt_details["minWriterVersion"] == 7, "Should upgrade to table features")
self.assertEqual(sorted(dt_details["tableFeatures"]),
["appendOnly", "invariants", "testRemovableReaderWriter"])

dt.dropFeatureSupport("testRemovableReaderWriter")
dt_details = dt.detail().collect()[0].asDict()
self.assertTrue(dt_details["minReaderVersion"] == 1, "Should return to legacy protocol")
self.assertTrue(dt_details["minWriterVersion"] == 2, "Should return to legacy protocol")

# Try to drop an unsupported feature.
with self.assertRaisesRegex(Exception, "DELTA_FEATURE_DROP_UNSUPPORTED_CLIENT_FEATURE"):
dt.dropFeatureSupport("__invalid_feature__")

# Try to drop a feature that is not present in the protocol.
with self.assertRaisesRegex(Exception, "DELTA_FEATURE_DROP_FEATURE_NOT_PRESENT"):
dt.dropFeatureSupport("testRemovableReaderWriter")

# Try to drop a non-removable feature.
with self.assertRaisesRegex(Exception, "DELTA_FEATURE_DROP_NONREMOVABLE_FEATURE"):
dt.dropFeatureSupport("testReaderWriter")

with self.assertRaisesRegex(ValueError, "featureName needs to be a string"):
dt.dropFeatureSupport(12345) # type: ignore[arg-type]
with self.assertRaisesRegex(ValueError, "featureName needs to be a string"):
dt.dropFeatureSupport([12345]) # type: ignore[arg-type]
with self.assertRaisesRegex(ValueError, "featureName needs to be a string"):
dt.dropFeatureSupport({}) # type: ignore[arg-type]
with self.assertRaisesRegex(ValueError, "featureName needs to be a string"):
dt.dropFeatureSupport([]) # type: ignore[arg-type]

with self.assertRaisesRegex(ValueError, "truncateHistory needs to be a boolean"):
dt.dropFeatureSupport("testRemovableWriter", 12345) # type: ignore[arg-type]
with self.assertRaisesRegex(ValueError, "truncateHistory needs to be a boolean"):
dt.dropFeatureSupport("testRemovableWriter", [12345]) # type: ignore[arg-type]
with self.assertRaisesRegex(ValueError, "truncateHistory needs to be a boolean"):
dt.dropFeatureSupport("testRemovableWriter", {}) # type: ignore[arg-type]
with self.assertRaisesRegex(ValueError, "truncateHistory needs to be a boolean"):
dt.dropFeatureSupport("testRemovableWriter", []) # type: ignore[arg-type]

def test_restore_to_version(self) -> None:
self.__writeDeltaTable([('a', 1), ('b', 2)])
self.__overwriteDeltaTable([('a', 3), ('b', 2)],
Expand Down
71 changes: 69 additions & 2 deletions spark/src/main/scala/io/delta/tables/DeltaTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import scala.collection.JavaConverters._

import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.DeltaTableUtils.withActiveSession
import org.apache.spark.sql.delta.actions.{Protocol, TableFeatureProtocolUtils}
import org.apache.spark.sql.delta.actions.TableFeatureProtocolUtils
import org.apache.spark.sql.delta.catalog.DeltaTableV2
import org.apache.spark.sql.delta.commands.AlterTableSetPropertiesDeltaCommand
import org.apache.spark.sql.delta.commands.{AlterTableDropFeatureDeltaCommand, AlterTableSetPropertiesDeltaCommand}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import io.delta.tables.execution._
import org.apache.hadoop.fs.Path
Expand Down Expand Up @@ -562,6 +562,73 @@ class DeltaTable private[tables](
toDataset(sparkSession, alterTableCmd)
}

private def executeDropFeature(featureName: String, truncateHistory: Option[Boolean]): Unit = {
val alterTableCmd = AlterTableDropFeatureDeltaCommand(
table = table,
featureName = featureName,
truncateHistory = truncateHistory.getOrElse(false))
toDataset(sparkSession, alterTableCmd)
}

/**
* Modify the protocol to drop a supported feature. The operation always normalizes the
* resulting protocol. Protocol normalization is the process of converting a table features
andreaschat-db marked this conversation as resolved.
Show resolved Hide resolved
* protocol to the weakest possible form. This primarily refers to converting a table features
andreaschat-db marked this conversation as resolved.
Show resolved Hide resolved
* protocol to a legacy protocol. A table features protocol can be represented with the legacy
* representation only when the feature set of the former exactly matches a legacy protocol.
* Normalization can also decrease the reader version of a table features protocol when it is
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"decrease the reader version" -> This should apply to writer version as well, so any version?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No it only applies to the reader version.

andreaschat-db marked this conversation as resolved.
Show resolved Hide resolved
* higher than necessary. For example:
*
* (1, 7, None, {AppendOnly, Invariants, CheckConstraints}) -> (1, 3)
* (3, 7, None, {RowTracking}) -> (1, 7, RowTracking)
*
* The dropFeatureSupport method can be used as follows:
* {{{
* io.delta.tables.DeltaTable.dropFeatureSupport("rowTracking")
* }}}
*
* See online documentation for more details.
*
* @param featureName The name of the feature to drop.
* @param truncateHistory Whether to truncate history before downgrading the protocol.
* @return None.
* @since 3.4.0
*/
def dropFeatureSupport(
featureName: String,
truncateHistory: Boolean): Unit = withActiveSession(sparkSession) {
executeDropFeature(featureName, Some(truncateHistory))
}

/**
* Modify the protocol to drop a supported feature. The operation always normalizes the
* resulting protocol. Protocol normalization is the process of converting a table features
* protocol to the weakest possible form. This primarily refers to converting a table features
* protocol to a legacy protocol. A table features protocol can be represented with the legacy
* representation only when the feature set of the former exactly matches a legacy protocol.
* Normalization can also decrease the reader version of a table features protocol when it is
* higher than necessary. For example:
*
* (1, 7, None, {AppendOnly, Invariants, CheckConstraints}) -> (1, 3)
* (3, 7, None, {RowTracking}) -> (1, 7, RowTracking)
*
* The dropFeatureSupport method can be used as follows:
* {{{
* io.delta.tables.DeltaTable.dropFeatureSupport("rowTracking")
* }}}
*
* Note, this command will not truncate history.
*
* See online documentation for more details.
*
* @param featureName The name of the feature to drop.
* @return None.
* @since 3.4.0
*/
def dropFeatureSupport(featureName: String): Unit = withActiveSession(sparkSession) {
executeDropFeature(featureName, None)
}

andreaschat-db marked this conversation as resolved.
Show resolved Hide resolved
/**
* Clone a DeltaTable to a given destination to mirror the existing table's data and metadata.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1044,7 +1044,7 @@ object TestReaderWriterMetadataAutoUpdateFeature
}
}

private[sql] object TestRemovableWriterFeature
object TestRemovableWriterFeature
extends WriterFeature(name = "testRemovableWriter")
with FeatureAutomaticallyEnabledByMetadata
with RemovableFeature {
Expand Down Expand Up @@ -1093,7 +1093,7 @@ private[sql] object TestRemovableWriterFeatureWithDependency
Set(TestRemovableReaderWriterFeature, TestRemovableWriterFeature)
}

private[sql] object TestRemovableReaderWriterFeature
object TestRemovableReaderWriterFeature
extends ReaderWriterFeature(name = "testRemovableReaderWriter")
with FeatureAutomaticallyEnabledByMetadata
with RemovableFeature {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ case class AlterTableDropFeatureDeltaCommand(
// Check whether the protocol contains the feature in either the writer features list or
// the reader+writer features list. Note, protocol needs to denormalized to allow dropping
// features from legacy protocols.
val protocol = table.initialSnapshot.protocol
val protocol = table.deltaLog.update().protocol
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm happy with changing this to deltaLog.update(), but just wondering if there was any risk before with using initialSnapshot. Don't know why we haven't ran into a problem with this before since we should always get the latest protocol, not the "The snapshot initially associated with this table. "

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah good question. That command was until now primarily only accessible with spark SQL. It seems that when using SQL the command would get table with a fresh snapshot. However, now with the DeltaTable API the user directly controls which table instance is using. So unless the user cares to properly refresh the table instance, we can get a stale snapshot.

val protocolContainsFeatureName =
protocol.implicitlyAndExplicitlySupportedFeatures.map(_.name).contains(featureName)
if (!protocolContainsFeatureName) {
Expand Down
70 changes: 67 additions & 3 deletions spark/src/test/scala/io/delta/tables/DeltaTableSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.Locale
import scala.language.postfixOps

// scalastyle:off import.ordering.noEmptyLine
import org.apache.spark.sql.delta.{AppendOnlyTableFeature, DeltaIllegalArgumentException, DeltaLog, DeltaTableFeatureException, FakeFileSystem, InvariantsTableFeature, TestReaderWriterFeature, TestWriterFeature}
import org.apache.spark.sql.delta.{AppendOnlyTableFeature, DeltaIllegalArgumentException, DeltaLog, DeltaTableFeatureException, FakeFileSystem, InvariantsTableFeature, TestReaderWriterFeature, TestRemovableReaderWriterFeature, TestRemovableWriterFeature, TestWriterFeature}
import org.apache.spark.sql.delta.actions.{ Metadata, Protocol }
import org.apache.spark.sql.delta.storage.LocalLogStore
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
Expand Down Expand Up @@ -247,6 +247,7 @@ class DeltaTableHadoopOptionsSuite extends QueryTest
"cloneAtVersion",
"delete",
"detail",
"dropFeatureSupport",
"generate",
"history",
"merge",
Expand Down Expand Up @@ -631,8 +632,7 @@ class DeltaTableHadoopOptionsSuite extends QueryTest
}
}

test(
"addFeatureSupport - with filesystem options.") {
test("addFeatureSupport - with filesystem options.") {
withTempDir { dir =>
val path = fakeFileSystemPath(dir)
val fsOptions = fakeFileSystemOptions
Expand Down Expand Up @@ -670,6 +670,70 @@ class DeltaTableHadoopOptionsSuite extends QueryTest
}
}

test("dropFeatureSupport - with filesystem options.") {
withTempDir { dir =>
val path = fakeFileSystemPath(dir)
val fsOptions = fakeFileSystemOptions

// create a table with a default Protocol.
val testSchema = spark.range(1).schema
val log = DeltaLog.forTable(spark, new Path(path), fsOptions)
log.createLogDirectoriesIfNotExists()
log.store.write(
FileNames.unsafeDeltaFile(log.logPath, 0),
Iterator(Metadata(schemaString = testSchema.json).json, Protocol(1, 2).json),
overwrite = false,
log.newDeltaHadoopConf())
log.update()

// update the protocol to support a writer feature.
val table = DeltaTable.forPath(spark, path, fsOptions)
table.addFeatureSupport(TestRemovableWriterFeature.name)
assert(log.update().protocol === Protocol(1, 7).withFeatures(Seq(
AppendOnlyTableFeature,
InvariantsTableFeature,
TestRemovableWriterFeature)))

// Attempt truncating the history when dropping a feature that is not required.
// This verifies the truncateHistory option was correctly passed.
assert(intercept[DeltaTableFeatureException] {
table.dropFeatureSupport("testRemovableWriter", truncateHistory = true)
}.getErrorClass === "DELTA_FEATURE_DROP_HISTORY_TRUNCATION_NOT_ALLOWED")

// Drop feature.
table.dropFeatureSupport(TestRemovableWriterFeature.name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a test where after dropping the feature, the protocol doesn't change? Like dropping a readerWriterFeature from a set of 2 readerWriterFeatures should remain (3, 7)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All there cases (and more) are covered in the DeltaProtocolVersionSuite. Here I tried to focus on whether the DeltaTable API works by testing some basic functionality.

// After dropping the feature we should return back to the original protocol.
assert(log.update().protocol === Protocol(1, 2))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to check explicitly the list of features still remain here? Or that we are implicitly having all the supported features of 1, 2 due to the merge function in TableFeatureSupport?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Protocol(1, 2) is in legacy form, That is it only contains version numbers, and versions are less than (3, 7). In legacy protocols, features are implied instead of being explicitly defined as in table feature protocols. Therefore, in that case, Protocol(1, 2) is sufficient and it implies invariants and appendOnly features.


table.addFeatureSupport(TestRemovableReaderWriterFeature.name)
assert(
log.update().protocol === Protocol(3, 7).withFeatures(Seq(
AppendOnlyTableFeature,
InvariantsTableFeature,
TestRemovableReaderWriterFeature)))

// Drop feature.
table.dropFeatureSupport(TestRemovableReaderWriterFeature.name)
// After dropping the feature we should return back to the original protocol.
assert(log.update().protocol === Protocol(1, 2))
andreaschat-db marked this conversation as resolved.
Show resolved Hide resolved

// Try to drop an unsupported feature.
assert(intercept[DeltaTableFeatureException] {
table.dropFeatureSupport("__invalid_feature__")
}.getErrorClass === "DELTA_FEATURE_DROP_UNSUPPORTED_CLIENT_FEATURE")

// Try to drop a feature that is not present in the protocol.
assert(intercept[DeltaTableFeatureException] {
table.dropFeatureSupport(TestRemovableReaderWriterFeature.name)
}.getErrorClass === "DELTA_FEATURE_DROP_FEATURE_NOT_PRESENT")

// Try to drop a non-removable feature.
assert(intercept[DeltaTableFeatureException] {
table.dropFeatureSupport(TestReaderWriterFeature.name)
}.getErrorClass === "DELTA_FEATURE_DROP_NONREMOVABLE_FEATURE")
}
}

test("details - with filesystem options.") {
withTempDir{ dir =>
val path = fakeFileSystemPath(dir)
Expand Down
Loading