From 4c89c8fd6e33599fe14984ddd75437269cb67868 Mon Sep 17 00:00:00 2001 From: Sabir Date: Mon, 11 Mar 2024 12:14:24 +0100 Subject: [PATCH] . --- .../spark/sql/delta/DeltaOperations.scala | 17 ++- .../commands/alterDeltaTableCommands.scala | 8 ++ .../RemoveColumnMappingCommand.scala | 79 +++++++++++- .../RemoveColumnMappingSuite.scala | 100 ++++++++++++++- .../RemoveColumnMappingSuiteUtils.scala | 116 ++++++++++++++++++ 5 files changed, 313 insertions(+), 7 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala index de7ae6e6097..09fd1ae456c 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.delta // scalastyle:off import.ordering.noEmptyLine import org.apache.spark.sql.delta.DeltaOperationMetrics.MetricsTransformer import org.apache.spark.sql.delta.actions.{Metadata, Protocol} -import org.apache.spark.sql.delta.constraints.Constraint import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.util.JsonUtils @@ -135,6 +134,14 @@ object DeltaOperations { } override def changesData: Boolean = true } + + case class RemoveColumnMapping( + override val userMetadata: Option[String] = None) extends Operation("REMOVE COLUMN MAPPING") { + override def parameters: Map[String, Any] = Map() + + override val operationMetrics: Set[String] = DeltaOperationMetrics.REMOVE_COLUMN_MAPPING + } + /** Recorded during streaming inserts. */ case class StreamingUpdate( outputMode: OutputMode, @@ -612,6 +619,14 @@ private[delta] object DeltaOperationMetrics { "numOutputRows" // number of rows written ) + val REMOVE_COLUMN_MAPPING: Set[String] = Set( + "numRewrittenFiles", + "numOutputBytes", + "numRemovedBytes", + "numCopiedRows", + "numDeletionVectorsRemoved" + ) + val STREAMING_UPDATE = Set( "numAddedFiles", // number of files added "numRemovedFiles", // number of files removed diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala index 4f45d31f30e..f5c2b827869 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala @@ -128,6 +128,10 @@ case class AlterTableSetPropertiesDeltaCommand( if (disableColumnMapping && columnMappingRemovalAllowed) { RemoveColumnMappingCommand(deltaLog, table.catalogTable) .run(sparkSession, removeColumnMappingTableProperty = false) + // Not changing anything else, so we can return early. + if (configuration.size == 1) { + return Seq.empty[Row] + } } recordDeltaOperation(deltaLog, "delta.ddl.alter.setProperties") { val txn = startTransaction() @@ -187,6 +191,10 @@ case class AlterTableUnsetPropertiesDeltaCommand( if (disableColumnMapping && columnMappingRemovalAllowed) { RemoveColumnMappingCommand(deltaLog, table.catalogTable) .run(sparkSession, removeColumnMappingTableProperty = true) + if (propKeys.size == 1) { + // Not unsetting anything else, so we can return early. + return Seq.empty[Row] + } } recordDeltaOperation(deltaLog, "delta.ddl.alter.unsetProperties") { val txn = startTransaction() diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/columnmapping/RemoveColumnMappingCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/columnmapping/RemoveColumnMappingCommand.scala index d989f9cffde..1ec9104b087 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/columnmapping/RemoveColumnMappingCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/columnmapping/RemoveColumnMappingCommand.scala @@ -16,10 +16,11 @@ package org.apache.spark.sql.delta.commands.columnmapping -import org.apache.spark.sql.delta.{DeltaErrors, DeltaLog} +import org.apache.spark.sql.delta._ +import org.apache.spark.sql.delta.actions.AddFile import org.apache.spark.sql.delta.schema.{ImplicitMetadataOperation, SchemaUtils} -import org.apache.spark.sql.SparkSession +import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.types.StructType @@ -39,14 +40,29 @@ class RemoveColumnMappingCommand( * the table instead of setting it to 'none' */ def run(spark: SparkSession, removeColumnMappingTableProperty: Boolean): Unit = { - val schema = deltaLog.update().schema - verifySchemaFieldNames(schema) + deltaLog.withNewTransaction(catalogOpt) { txn => + val originalFiles = txn.filterFiles() + val originalData = buildDataFrame(txn, originalFiles) + val originalSchema = txn.snapshot.schema + val newSchema = DeltaColumnMapping.dropColumnMappingMetadata(originalSchema) + verifySchemaFieldNames(newSchema) + + updateMetadata(removeColumnMappingTableProperty, txn, newSchema) + + val deltaOptions = getDeltaOptionsForWrite(spark) + val addedFiles = writeData(txn, originalData, deltaOptions) + val removeFileActions = originalFiles.map(_.removeWithTimestamp(dataChange = false)) + + txn.commit(removeFileActions ++ addedFiles, + DeltaOperations.RemoveColumnMapping() + ) + } } /** * Verify none of the schema fields contain invalid column names. */ - protected def verifySchemaFieldNames(schema: StructType) = { + def verifySchemaFieldNames(schema: StructType): Unit = { val invalidColumnNames = SchemaUtils.findInvalidColumnNamesInSchema(schema) if (invalidColumnNames.nonEmpty) { @@ -54,6 +70,59 @@ class RemoveColumnMappingCommand( .foundInvalidColumnNamesWhenRemovingColumnMapping(invalidColumnNames) } } + + /** + * Update the metadata to remove the column mapping table properties and + * update the schema to remove the column mapping metadata. + */ + def updateMetadata( + removeColumnMappingTableProperty: Boolean, + txn: OptimisticTransaction, + newSchema: StructType): Unit = { + val newConfiguration = + getConfigurationWithoutColumnMapping(txn, removeColumnMappingTableProperty) + val newMetadata = txn.metadata.copy( + schemaString = newSchema.json, + configuration = newConfiguration) + txn.updateMetadata(newMetadata) + } + + def getConfigurationWithoutColumnMapping( + txn: OptimisticTransaction, + removeColumnMappingTableProperty: Boolean): Map[String, String] = { + // Scanned schema does not include the column mapping metadata and can be reused as is. + val columnMappingPropertyKey = DeltaConfigs.COLUMN_MAPPING_MODE.key + val columnMappingMaxIdPropertyKey = DeltaConfigs.COLUMN_MAPPING_MAX_ID.key + // Unset or overwrite the column mapping mode to none and remove max id property + // while keeping other properties. + (if (removeColumnMappingTableProperty) { + txn.metadata.configuration - columnMappingPropertyKey + } else { + txn.metadata.configuration + (columnMappingPropertyKey -> "none") + }) - columnMappingMaxIdPropertyKey + } + + def getDeltaOptionsForWrite(spark: SparkSession): DeltaOptions = { + new DeltaOptions( + // Prevent files from being split by writers. + Map(DeltaOptions.MAX_RECORDS_PER_FILE -> "0"), + spark.sessionState.conf) + } + + def buildDataFrame( + txn: OptimisticTransaction, + originalFiles: Seq[AddFile]): DataFrame = + recordDeltaOperation(txn.deltaLog, "delta.removeColumnMapping.setupDataFrame") { + txn.deltaLog.createDataFrame(txn.snapshot, originalFiles) + } + + def writeData( + txn: OptimisticTransaction, + data: DataFrame, + deltaOptions: DeltaOptions): Seq[AddFile] = { + txn.writeFiles(data, Some(deltaOptions), isOptimize = true, additionalConstraints = Seq.empty) + .asInstanceOf[Seq[AddFile]] + } } object RemoveColumnMappingCommand { diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/columnmapping/RemoveColumnMappingSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/columnmapping/RemoveColumnMappingSuite.scala index c5a2f23b5e0..3e5fd78421d 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/columnmapping/RemoveColumnMappingSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/columnmapping/RemoveColumnMappingSuite.scala @@ -24,7 +24,12 @@ import org.apache.spark.sql.delta.sources.DeltaSQLConf._ import org.apache.spark.sql.catalyst.TableIdentifier -class RemoveColumnMappingSuite extends RemoveColumnMappingSuiteUtils { +/** + * Test removing column mapping from a table. + */ +class RemoveColumnMappingSuite + extends RemoveColumnMappingSuiteUtils + { test("column mapping cannot be removed without the feature flag") { withSQLConf(ALLOW_COLUMN_MAPPING_REMOVAL.key -> "false") { @@ -119,4 +124,97 @@ class RemoveColumnMappingSuite extends RemoveColumnMappingSuiteUtils { assert(deltaLog.update().metadata.configuration.contains(propertyToKeep)) assert(deltaLog.update().metadata.configuration.contains(propertyToUnset)) } + + test("remove column mapping from a table") { + sql( + s"""CREATE TABLE $testTableName + |USING delta + |TBLPROPERTIES ('${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'name') + |AS SELECT id as $logicalColumnName, id + 1 as $secondColumn + | FROM RANGE(0, $totalRows, 1, $numFiles) + |""".stripMargin) + testRemovingColumnMapping() + } + + test("remove column mapping using unset") { + sql( + s"""CREATE TABLE $testTableName + |USING delta + |TBLPROPERTIES ('${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'name') + |AS SELECT id as $logicalColumnName, id + 1 as $secondColumn + | FROM RANGE(0, $totalRows, 1, $numFiles) + |""".stripMargin) + testRemovingColumnMapping(unsetTableProperty = true) + } + + test("remove column mapping from a partitioned table") { + sql( + s"""CREATE TABLE $testTableName + |USING delta + |PARTITIONED BY (part) + |TBLPROPERTIES ('${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'name') + |AS SELECT id as $logicalColumnName, id + 1 as $secondColumn, id % 2 as part + | FROM RANGE(0, $totalRows, 1, $numFiles) + |""".stripMargin) + testRemovingColumnMapping() + } + + test("remove column mapping from a partitioned table with two part columns") { + sql( + s"""CREATE TABLE $testTableName + |USING delta + |PARTITIONED BY (part1, part2) + |TBLPROPERTIES ('${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'name') + |AS SELECT id as $logicalColumnName, id + 1 as $secondColumn, id % 2 as part1, + |id % 3 as part2 + | FROM RANGE(0, $totalRows, 1, $numFiles) + |""".stripMargin) + testRemovingColumnMapping() + } + + test("remove column mapping from a table with only logical names") { + sql( + s"""CREATE TABLE $testTableName + |USING delta + |TBLPROPERTIES ('${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'none') + |AS SELECT id as $logicalColumnName, id + 1 as $secondColumn + | FROM RANGE(0, $totalRows, 1, $numFiles) + |""".stripMargin) + // Add column mapping without renaming any columns. + // That is, the column names in the table should be the same as the logical column names. + sql( + s"""ALTER TABLE $testTableName + |SET TBLPROPERTIES ('${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'name', + 'delta.minReaderVersion' = '2', + 'delta.minWriterVersion' = '5' + |)""".stripMargin) + testRemovingColumnMapping() + } + + test("dropped column is added back") { + sql( + s"""CREATE TABLE $testTableName + |USING delta + |TBLPROPERTIES ('${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'none') + |AS SELECT id as $logicalColumnName, id + 1 as $secondColumn + | FROM RANGE(0, $totalRows, 1, $numFiles) + |""".stripMargin) + // Add column mapping without renaming any columns. + // That is, the column names in the table should be the same as the logical column names. + sql( + s"""ALTER TABLE $testTableName + |SET TBLPROPERTIES ('${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'name', + 'delta.minReaderVersion' = '2', + 'delta.minWriterVersion' = '5' + |)""".stripMargin) + // Drop the second column. + sql(s"ALTER TABLE $testTableName DROP COLUMN $secondColumn") + // Remove column mapping, this should rewrite the table to physically remove the dropped column. + testRemovingColumnMapping() + // Add the same column back. + sql(s"ALTER TABLE $testTableName ADD COLUMN $secondColumn BIGINT") + // Read from the table, ensure none of the original values of secondColumn are present. + assert(sql(s"SELECT $secondColumn FROM $testTableName WHERE $secondColumn IS NOT NULL").count() + == 0) + } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/columnmapping/RemoveColumnMappingSuiteUtils.scala b/spark/src/test/scala/org/apache/spark/sql/delta/columnmapping/RemoveColumnMappingSuiteUtils.scala index 731d777961c..ff8cc2f3f61 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/columnmapping/RemoveColumnMappingSuiteUtils.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/columnmapping/RemoveColumnMappingSuiteUtils.scala @@ -17,9 +17,15 @@ package org.apache.spark.sql.delta.columnmapping import org.apache.spark.sql.delta._ +import org.apache.spark.sql.delta.DeltaOperations.RemoveColumnMapping +import org.apache.spark.sql.delta.actions.AddFile +import org.apache.spark.sql.delta.schema.SchemaMergingUtils import org.apache.spark.sql.delta.sources.DeltaSQLConf._ +import com.fasterxml.jackson.databind.ObjectMapper import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.functions.col /** * A base trait for testing removing column mapping. @@ -37,5 +43,115 @@ trait RemoveColumnMappingSuiteUtils extends QueryTest with DeltaColumnMappingSui super.afterEach() } + protected val numFiles = 10 + protected val totalRows = 100 + protected val rowsPerFile = totalRows / numFiles + protected val logicalColumnName = "logical_column_name" + protected val secondColumn = "second_column_name" protected val testTableName: String = "test_table_" + this.getClass.getSimpleName + + import testImplicits._ + + protected def testRemovingColumnMapping( + unsetTableProperty: Boolean = false): Any = { + // Verify the input data is as expected. + checkAnswer( + spark.table(tableName = testTableName).select(logicalColumnName), + spark.range(totalRows).select(col("id").as(logicalColumnName))) + // Add a schema comment and verify it is preserved after the rewrite. + val comment = "test comment" + sql(s"ALTER TABLE $testTableName ALTER COLUMN $logicalColumnName COMMENT '$comment'") + + val deltaLog = DeltaLog.forTable(spark, TableIdentifier(tableName = testTableName)) + val originalSnapshot = deltaLog.update() + + assert(originalSnapshot.schema.head.getComment().get == comment, + "Renamed column should preserved comment.") + val originalFiles = getFiles(originalSnapshot) + val startingVersion = deltaLog.update().version + + unsetColumnMappingProperty(useUnset = unsetTableProperty) + + verifyRewrite( + unsetTableProperty, + deltaLog, + originalFiles, + startingVersion) + // Verify the schema comment is preserved after the rewrite. + assert(deltaLog.update().schema.head.getComment().get == comment, + "Should preserve the schema comment.") + } + + /** + * Verify the table still contains the same data after the rewrite, column mapping is removed + * from table properties and the operation recorded properly. + */ + protected def verifyRewrite( + unsetTableProperty: Boolean, + deltaLog: DeltaLog, + originalFiles: Array[AddFile], + startingVersion: Long): Unit = { + checkAnswer( + spark.table(tableName = testTableName).select(logicalColumnName), + spark.range(totalRows).select(col("id").as(logicalColumnName))) + + val newSnapshot = deltaLog.update() + assert(newSnapshot.version - startingVersion == 1, "Should rewrite the table in one commit.") + + val history = deltaLog.history.getHistory(deltaLog.update().version) + verifyColumnMappingOperationIsRecordedInHistory(history) + + assert(newSnapshot.schema.head.name == logicalColumnName, "Should rename the first column.") + + verifyColumnMappingSchemaMetadataIsRemoved(newSnapshot) + + verifyColumnMappingTablePropertiesAbsent(newSnapshot, unsetTableProperty) + assert(originalFiles.map(_.numLogicalRecords.get).sum == + newSnapshot.allFiles.map(_.numLogicalRecords.get).collect().sum, + "Should have the same number of records.") + } + + protected def unsetColumnMappingProperty(useUnset: Boolean): Unit = { + val unsetStr = if (useUnset) { + s"UNSET TBLPROPERTIES ('${DeltaConfigs.COLUMN_MAPPING_MODE.key}')" + } else { + s"SET TBLPROPERTIES ('${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'none')" + } + sql( + s""" + |ALTER TABLE $testTableName $unsetStr + |""".stripMargin) + } + + /** + * Get all files in snapshot. + */ + protected def getFiles(snapshot: Snapshot): Array[AddFile] = snapshot.allFiles.collect() + + protected def verifyColumnMappingOperationIsRecordedInHistory(history: Seq[DeltaHistory]) = { + val op = RemoveColumnMapping() + assert(history.head.operation === op.name) + assert(history.head.operationParameters === op.parameters.mapValues(_.toString).toMap) + } + + protected def verifyColumnMappingSchemaMetadataIsRemoved(newSnapshot: Snapshot) = { + SchemaMergingUtils.explode(newSnapshot.schema).foreach { case(_, col) => + assert(!DeltaColumnMapping.hasPhysicalName(col)) + assert(!DeltaColumnMapping.hasColumnId(col)) + } + } + + protected def verifyColumnMappingTablePropertiesAbsent( + newSnapshot: Snapshot, + unsetTablePropertyUsed: Boolean) = { + val columnMappingPropertyKey = DeltaConfigs.COLUMN_MAPPING_MODE.key + val columnMappingMaxIdPropertyKey = DeltaConfigs.COLUMN_MAPPING_MAX_ID.key + val newColumnMappingModeOpt = newSnapshot.metadata.configuration.get(columnMappingPropertyKey) + if (unsetTablePropertyUsed) { + assert(newColumnMappingModeOpt.isEmpty) + } else { + assert(newColumnMappingModeOpt.contains("none")) + } + assert(!newSnapshot.metadata.configuration.contains(columnMappingMaxIdPropertyKey)) + } }