Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark]Column mapping removal basic rewrite operation #2741

Merged
merged 1 commit into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.sql.delta
// scalastyle:off import.ordering.noEmptyLine
import org.apache.spark.sql.delta.DeltaOperationMetrics.MetricsTransformer
import org.apache.spark.sql.delta.actions.{Metadata, Protocol}
import org.apache.spark.sql.delta.constraints.Constraint
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.util.JsonUtils

Expand Down Expand Up @@ -135,6 +134,14 @@ object DeltaOperations {
}
override def changesData: Boolean = true
}

case class RemoveColumnMapping(
override val userMetadata: Option[String] = None) extends Operation("REMOVE COLUMN MAPPING") {
override def parameters: Map[String, Any] = Map()

override val operationMetrics: Set[String] = DeltaOperationMetrics.REMOVE_COLUMN_MAPPING
}

/** Recorded during streaming inserts. */
case class StreamingUpdate(
outputMode: OutputMode,
Expand Down Expand Up @@ -612,6 +619,14 @@ private[delta] object DeltaOperationMetrics {
"numOutputRows" // number of rows written
)

val REMOVE_COLUMN_MAPPING: Set[String] = Set(
"numRewrittenFiles",
"numOutputBytes",
"numRemovedBytes",
"numCopiedRows",
"numDeletionVectorsRemoved"
)

val STREAMING_UPDATE = Set(
"numAddedFiles", // number of files added
"numRemovedFiles", // number of files removed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ case class AlterTableSetPropertiesDeltaCommand(
if (disableColumnMapping && columnMappingRemovalAllowed) {
RemoveColumnMappingCommand(deltaLog, table.catalogTable)
.run(sparkSession, removeColumnMappingTableProperty = false)
// Not changing anything else, so we can return early.
if (configuration.size == 1) {
return Seq.empty[Row]
}
}
recordDeltaOperation(deltaLog, "delta.ddl.alter.setProperties") {
val txn = startTransaction()
Expand Down Expand Up @@ -187,6 +191,10 @@ case class AlterTableUnsetPropertiesDeltaCommand(
if (disableColumnMapping && columnMappingRemovalAllowed) {
RemoveColumnMappingCommand(deltaLog, table.catalogTable)
.run(sparkSession, removeColumnMappingTableProperty = true)
if (propKeys.size == 1) {
// Not unsetting anything else, so we can return early.
return Seq.empty[Row]
}
}
recordDeltaOperation(deltaLog, "delta.ddl.alter.unsetProperties") {
val txn = startTransaction()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@

package org.apache.spark.sql.delta.commands.columnmapping

import org.apache.spark.sql.delta.{DeltaErrors, DeltaLog}
import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.actions.AddFile
import org.apache.spark.sql.delta.schema.{ImplicitMetadataOperation, SchemaUtils}

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.types.StructType

Expand All @@ -39,21 +40,89 @@ class RemoveColumnMappingCommand(
* the table instead of setting it to 'none'
*/
def run(spark: SparkSession, removeColumnMappingTableProperty: Boolean): Unit = {
val schema = deltaLog.update().schema
verifySchemaFieldNames(schema)
deltaLog.withNewTransaction(catalogOpt) { txn =>
val originalFiles = txn.filterFiles()
val originalData = buildDataFrame(txn, originalFiles)
val originalSchema = txn.snapshot.schema
val newSchema = DeltaColumnMapping.dropColumnMappingMetadata(originalSchema)
verifySchemaFieldNames(newSchema)

updateMetadata(removeColumnMappingTableProperty, txn, newSchema)

val deltaOptions = getDeltaOptionsForWrite(spark)
val addedFiles = writeData(txn, originalData, deltaOptions)
val removeFileActions = originalFiles.map(_.removeWithTimestamp(dataChange = false))

txn.commit(removeFileActions ++ addedFiles,
DeltaOperations.RemoveColumnMapping()
)
}
}

/**
* Verify none of the schema fields contain invalid column names.
*/
protected def verifySchemaFieldNames(schema: StructType) = {
def verifySchemaFieldNames(schema: StructType): Unit = {
val invalidColumnNames =
SchemaUtils.findInvalidColumnNamesInSchema(schema)
if (invalidColumnNames.nonEmpty) {
throw DeltaErrors
.foundInvalidColumnNamesWhenRemovingColumnMapping(invalidColumnNames)
}
}

/**
* Update the metadata to remove the column mapping table properties and
* update the schema to remove the column mapping metadata.
*/
def updateMetadata(
removeColumnMappingTableProperty: Boolean,
txn: OptimisticTransaction,
newSchema: StructType): Unit = {
val newConfiguration =
getConfigurationWithoutColumnMapping(txn, removeColumnMappingTableProperty)
val newMetadata = txn.metadata.copy(
schemaString = newSchema.json,
configuration = newConfiguration)
txn.updateMetadata(newMetadata)
}

def getConfigurationWithoutColumnMapping(
txn: OptimisticTransaction,
removeColumnMappingTableProperty: Boolean): Map[String, String] = {
// Scanned schema does not include the column mapping metadata and can be reused as is.
val columnMappingPropertyKey = DeltaConfigs.COLUMN_MAPPING_MODE.key
val columnMappingMaxIdPropertyKey = DeltaConfigs.COLUMN_MAPPING_MAX_ID.key
// Unset or overwrite the column mapping mode to none and remove max id property
// while keeping other properties.
(if (removeColumnMappingTableProperty) {
txn.metadata.configuration - columnMappingPropertyKey
} else {
txn.metadata.configuration + (columnMappingPropertyKey -> "none")
}) - columnMappingMaxIdPropertyKey
}

def getDeltaOptionsForWrite(spark: SparkSession): DeltaOptions = {
new DeltaOptions(
// Prevent files from being split by writers.
Map(DeltaOptions.MAX_RECORDS_PER_FILE -> "0"),
spark.sessionState.conf)
}

def buildDataFrame(
txn: OptimisticTransaction,
originalFiles: Seq[AddFile]): DataFrame =
recordDeltaOperation(txn.deltaLog, "delta.removeColumnMapping.setupDataFrame") {
txn.deltaLog.createDataFrame(txn.snapshot, originalFiles)
}

def writeData(
txn: OptimisticTransaction,
data: DataFrame,
deltaOptions: DeltaOptions): Seq[AddFile] = {
txn.writeFiles(data, Some(deltaOptions), isOptimize = true, additionalConstraints = Seq.empty)
.asInstanceOf[Seq[AddFile]]
}
}

object RemoveColumnMappingCommand {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,12 @@ import org.apache.spark.sql.delta.sources.DeltaSQLConf._

import org.apache.spark.sql.catalyst.TableIdentifier

class RemoveColumnMappingSuite extends RemoveColumnMappingSuiteUtils {
/**
* Test removing column mapping from a table.
*/
class RemoveColumnMappingSuite
extends RemoveColumnMappingSuiteUtils
{

test("column mapping cannot be removed without the feature flag") {
withSQLConf(ALLOW_COLUMN_MAPPING_REMOVAL.key -> "false") {
Expand Down Expand Up @@ -119,4 +124,97 @@ class RemoveColumnMappingSuite extends RemoveColumnMappingSuiteUtils {
assert(deltaLog.update().metadata.configuration.contains(propertyToKeep))
assert(deltaLog.update().metadata.configuration.contains(propertyToUnset))
}

test("remove column mapping from a table") {
sql(
s"""CREATE TABLE $testTableName
|USING delta
|TBLPROPERTIES ('${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'name')
|AS SELECT id as $logicalColumnName, id + 1 as $secondColumn
| FROM RANGE(0, $totalRows, 1, $numFiles)
|""".stripMargin)
testRemovingColumnMapping()
}

test("remove column mapping using unset") {
sql(
s"""CREATE TABLE $testTableName
|USING delta
|TBLPROPERTIES ('${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'name')
|AS SELECT id as $logicalColumnName, id + 1 as $secondColumn
| FROM RANGE(0, $totalRows, 1, $numFiles)
|""".stripMargin)
testRemovingColumnMapping(unsetTableProperty = true)
}

test("remove column mapping from a partitioned table") {
sql(
s"""CREATE TABLE $testTableName
|USING delta
|PARTITIONED BY (part)
|TBLPROPERTIES ('${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'name')
|AS SELECT id as $logicalColumnName, id + 1 as $secondColumn, id % 2 as part
| FROM RANGE(0, $totalRows, 1, $numFiles)
|""".stripMargin)
testRemovingColumnMapping()
}

test("remove column mapping from a partitioned table with two part columns") {
sql(
s"""CREATE TABLE $testTableName
|USING delta
|PARTITIONED BY (part1, part2)
|TBLPROPERTIES ('${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'name')
|AS SELECT id as $logicalColumnName, id + 1 as $secondColumn, id % 2 as part1,
|id % 3 as part2
| FROM RANGE(0, $totalRows, 1, $numFiles)
|""".stripMargin)
testRemovingColumnMapping()
}

test("remove column mapping from a table with only logical names") {
sql(
s"""CREATE TABLE $testTableName
|USING delta
|TBLPROPERTIES ('${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'none')
|AS SELECT id as $logicalColumnName, id + 1 as $secondColumn
| FROM RANGE(0, $totalRows, 1, $numFiles)
|""".stripMargin)
// Add column mapping without renaming any columns.
// That is, the column names in the table should be the same as the logical column names.
sql(
s"""ALTER TABLE $testTableName
|SET TBLPROPERTIES ('${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'name',
'delta.minReaderVersion' = '2',
'delta.minWriterVersion' = '5'
|)""".stripMargin)
testRemovingColumnMapping()
}

test("dropped column is added back") {
sql(
s"""CREATE TABLE $testTableName
|USING delta
|TBLPROPERTIES ('${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'none')
|AS SELECT id as $logicalColumnName, id + 1 as $secondColumn
| FROM RANGE(0, $totalRows, 1, $numFiles)
|""".stripMargin)
// Add column mapping without renaming any columns.
// That is, the column names in the table should be the same as the logical column names.
sql(
s"""ALTER TABLE $testTableName
|SET TBLPROPERTIES ('${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'name',
'delta.minReaderVersion' = '2',
'delta.minWriterVersion' = '5'
|)""".stripMargin)
// Drop the second column.
sql(s"ALTER TABLE $testTableName DROP COLUMN $secondColumn")
// Remove column mapping, this should rewrite the table to physically remove the dropped column.
testRemovingColumnMapping()
// Add the same column back.
sql(s"ALTER TABLE $testTableName ADD COLUMN $secondColumn BIGINT")
// Read from the table, ensure none of the original values of secondColumn are present.
assert(sql(s"SELECT $secondColumn FROM $testTableName WHERE $secondColumn IS NOT NULL").count()
== 0)
}
}
Loading
Loading