Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark] Reject unsupported type changes with Uniform #3947

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta.uniform

import org.apache.spark.sql.delta.typewidening.TypeWideningUniformTests

class TypeWideningUniformSuite extends TypeWideningUniformTests with WriteDeltaHMSReadIceberg
6 changes: 6 additions & 0 deletions spark/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -1221,6 +1221,12 @@
"<schema>"
]
},
"UNSUPPORTED_TYPE_WIDENING" : {
"message" : [
"IcebergCompatV<version> is incompatible with a type change applied to this table:",
"Field <fieldPath> was changed from <prevType> to <newType>."
]
},
"VERSION_MUTUAL_EXCLUSIVE" : {
"message" : [
"Only one IcebergCompat version can be enabled, please explicitly disable all other IcebergCompat versions that are not needed."
Expand Down
152 changes: 87 additions & 65 deletions spark/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala

Large diffs are not rendered by default.

17 changes: 17 additions & 0 deletions spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3402,6 +3402,23 @@ trait DeltaErrorsBase
)
}

def icebergCompatUnsupportedTypeWideningException(
version: Int,
fieldPath: Seq[String],
oldType: DataType,
newType: DataType): Throwable = {
new DeltaUnsupportedOperationException(
errorClass = "DELTA_ICEBERG_COMPAT_VIOLATION.UNSUPPORTED_TYPE_WIDENING",
messageParameters = Array(
version.toString,
version.toString,
Comment on lines +3413 to +3414
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

double version?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That got me at first too: version is used twice in the error message: once in the main error DELTA_ICEBERG_COMPAT_VIOLATION and once in the sub error class UNSUPPORTED_TYPE_WIDENING

SchemaUtils.prettyFieldName(fieldPath),
toSQLType(oldType),
toSQLType(newType)
)
)
}

def universalFormatConversionFailedException(
failedOnCommitVersion: Long,
format: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ import org.apache.spark.sql.delta.commands.DeletionVectorUtils
import org.apache.spark.sql.delta.logging.DeltaLogKeys
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.schema.SchemaUtils
import org.apache.spark.sql.delta.sources.DeltaSQLConf

import org.apache.spark.internal.MDC
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._

/**
Expand All @@ -47,7 +49,8 @@ object IcebergCompatV1 extends IcebergCompat(
CheckAddFileHasStats,
CheckNoPartitionEvolution,
CheckNoListMapNullType,
CheckDeletionVectorDisabled
CheckDeletionVectorDisabled,
CheckTypeWideningSupported
)
)

Expand All @@ -62,7 +65,8 @@ object IcebergCompatV2 extends IcebergCompat(
CheckTypeInV2AllowList,
CheckPartitionDataTypeInV2AllowList,
CheckNoPartitionEvolution,
CheckDeletionVectorDisabled
CheckDeletionVectorDisabled,
CheckTypeWideningSupported
)
)

Expand Down Expand Up @@ -104,6 +108,7 @@ case class IcebergCompat(
* updates need to be applied, will return None.
*/
def enforceInvariantsAndDependencies(
spark: SparkSession,
prevSnapshot: Snapshot,
newestProtocol: Protocol,
newestMetadata: Metadata,
Expand Down Expand Up @@ -190,6 +195,7 @@ case class IcebergCompat(

// Apply additional checks
val context = IcebergCompatContext(
spark,
prevSnapshot,
protocolResult.getOrElse(newestProtocol),
metadataResult.getOrElse(newestMetadata),
Expand Down Expand Up @@ -301,6 +307,7 @@ object RequireColumnMapping extends RequiredDeltaTableProperty(
}

case class IcebergCompatContext(
spark: SparkSession,
prevSnapshot: Snapshot,
newestProtocol: Protocol,
newestMetadata: Metadata,
Expand Down Expand Up @@ -457,3 +464,31 @@ object CheckDeletionVectorDisabled extends IcebergCompatCheck {
}
}
}

/**
* Checks that the table didn't go through any type changes that Iceberg doesn't support. See
* `TypeWidening.isTypeChangeSupportedByIceberg()` for supported type changes.
* Note that this check covers both:
* - When the table had an unsupported type change applied in the past and Uniform is being enabled.
* - When Uniform is enabled and a new, unsupported type change is being applied.
*/
object CheckTypeWideningSupported extends IcebergCompatCheck {
override def apply(context: IcebergCompatContext): Unit = {
val skipCheck = context.spark.sessionState.conf
.getConf(DeltaSQLConf.DELTA_TYPE_WIDENING_ALLOW_UNSUPPORTED_ICEBERG_TYPE_CHANGES)

if (skipCheck || !TypeWidening.isSupported(context.newestProtocol)) return

TypeWideningMetadata.getAllTypeChanges(context.newestMetadata.schema).foreach {
case (fieldPath, TypeChange(_, fromType: AtomicType, toType: AtomicType, _))
// We ignore type changes that are not generally supported with type widening to reduce the
// risk of this check misfiring. These are handled by `TypeWidening.assertTableReadable()`.
// The error here only captures type changes that are supported in Delta but not Iceberg.
if TypeWidening.isTypeChangeSupported(fromType, toType) &&
!TypeWidening.isTypeChangeSupportedByIceberg(fromType, toType) =>
throw DeltaErrors.icebergCompatUnsupportedTypeWideningException(
context.version, fieldPath, fromType, toType)
case _ => () // ignore
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1970,6 +1970,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite

val (protocolUpdate1, metadataUpdate1) =
UniversalFormat.enforceInvariantsAndDependencies(
spark,
// Note: if this txn has no protocol or metadata updates, then `prev` will equal `newest`.
snapshot,
newestProtocol = protocol, // Note: this will try to use `newProtocol`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.types.{AtomicType, StructField, StructType}

/**
* Implements logic to resolve conditions and actions in MERGE clauses and handles schema evolution.
Expand Down Expand Up @@ -292,11 +292,13 @@ object ResolveDeltaMergeInto {
})

val migrationSchema = filterSchema(source.schema, Seq.empty)
val allowTypeWidening = target.exists {
case DeltaTable(fileIndex) =>
TypeWidening.isEnabled(fileIndex.protocol, fileIndex.metadata)
case _ => false
}

val typeWideningMode =
target.collectFirst {
case DeltaTable(index) if TypeWidening.isEnabled(index.protocol, index.metadata) =>
TypeWideningMode.TypeEvolution(
uniformIcebergEnabled = UniversalFormat.icebergEnabled(index.metadata))
}.getOrElse(TypeWideningMode.NoTypeWidening)

// The implicit conversions flag allows any type to be merged from source to target if Spark
// SQL considers the source type implicitly castable to the target. Normally, mergeSchemas
Expand All @@ -306,7 +308,7 @@ object ResolveDeltaMergeInto {
target.schema,
migrationSchema,
allowImplicitConversions = true,
allowTypeWidening = allowTypeWidening
typeWideningMode = typeWideningMode
)
} else {
target.schema
Expand Down
33 changes: 30 additions & 3 deletions spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,41 @@ object TypeWidening {
* It is the responsibility of the caller to recurse into structs, maps and arrays.
*/
def isTypeChangeSupported(fromType: AtomicType, toType: AtomicType): Boolean =
TypeWideningShims.isTypeChangeSupported(fromType, toType)
TypeWideningShims.isTypeChangeSupported(fromType = fromType, toType = toType)

/**
* Returns whether the given type change can be applied during schema evolution. Only a
* subset of supported type changes are considered for schema evolution.
*/
def isTypeChangeSupportedForSchemaEvolution(fromType: AtomicType, toType: AtomicType): Boolean =
TypeWideningShims.isTypeChangeSupportedForSchemaEvolution(fromType, toType)
def isTypeChangeSupportedForSchemaEvolution(
fromType: AtomicType,
toType: AtomicType,
uniformIcebergEnabled: Boolean): Boolean =
TypeWideningShims.isTypeChangeSupportedForSchemaEvolution(
fromType = fromType,
toType = toType
) && (
!uniformIcebergEnabled ||
isTypeChangeSupportedByIceberg(fromType = fromType, toType = toType)
)

/**
* Returns whether the given type change is supported by Iceberg, and by extension can be read
* using Uniform. See https://iceberg.apache.org/spec/#schema-evolution.
* Note that these are type promotions supported by Iceberg V1 & V2 (both support the same type
* promotions). Iceberg V3 will add support for date -> timestamp_ntz and void -> any but Uniform
* doesn't currently support Iceberg V3.
*/
def isTypeChangeSupportedByIceberg(fromType: AtomicType, toType: AtomicType): Boolean =
(fromType, toType) match {
case (from, to) if from == to => true
case (from, to) if !isTypeChangeSupported(from, to) => false
case (from: IntegralType, to: IntegralType) => from.defaultSize <= to.defaultSize
case (FloatType, DoubleType) => true
case (from: DecimalType, to: DecimalType)
if from.scale == to.scale && from.precision <= to.precision => true
case _ => false
}

/**
* Asserts that the given table doesn't contain any unsupported type changes. This should never
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.delta

import org.apache.spark.sql.types.AtomicType

/**
* A type widening mode captures a specific set of type changes that are allowed to be applied.
* Currently:
* - NoTypeWidening: No type change is allowed.
* - TypeEvolution(uniformIcebergEnabled = true): Type changes that are eligible to be applied
* automatically during schema evolution and that are supported by Iceberg are allowed.
* - TypeEvolution(uniformIcebergEnabled = false): Type changes that are eligible to be applied
* automatically during schema evolution are allowed, even if they are not supported by Iceberg.
*/
trait TypeWideningMode {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
trait TypeWideningMode {
sealed trait TypeWideningMode {

I think it makes sense to enforce exhaustiveness here. There shouldn't be other places that need to specify customs rules, I think.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

def shouldWidenType(fromType: AtomicType, toType: AtomicType): Boolean
}

object TypeWideningMode {
/**
* No type change allowed. Typically because type widening and/or schema evolution isn't enabled.
*/
object NoTypeWidening extends TypeWideningMode {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
object NoTypeWidening extends TypeWideningMode {
case object NoTypeWidening extends TypeWideningMode {

Just so it looks like a nice enum-like thingy ;)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

override def shouldWidenType(fromType: AtomicType, toType: AtomicType): Boolean = false
}

/**
* Type changes that are eligible to be applied automatically during schema evolution are allowed.
* Can be restricted to only type changes supported by Iceberg.
*/
case class TypeEvolution(uniformIcebergEnabled: Boolean) extends TypeWideningMode {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not call this uniformIcebergCompatibleOnly instead? That way we can also override later in case people want this anyway without having a weird mental mismatch where uniformIcebergEnabled=false on tables where it actually is enabled.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, that fits better

override def shouldWidenType(fromType: AtomicType, toType: AtomicType): Boolean =
TypeWidening.isTypeChangeSupportedForSchemaEvolution(
fromType = fromType, toType = toType, uniformIcebergEnabled)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,15 @@ object UniversalFormat extends DeltaLogging {
* updates need to be applied, will return None.
*/
def enforceInvariantsAndDependencies(
spark: SparkSession,
snapshot: Snapshot,
newestProtocol: Protocol,
newestMetadata: Metadata,
operation: Option[DeltaOperations.Operation],
actions: Seq[Action]): (Option[Protocol], Option[Metadata]) = {
enforceHudiDependencies(newestMetadata, snapshot)
enforceIcebergInvariantsAndDependencies(
snapshot, newestProtocol, newestMetadata, operation, actions)
spark, snapshot, newestProtocol, newestMetadata, operation, actions)
}

/**
Expand Down Expand Up @@ -151,6 +152,7 @@ object UniversalFormat extends DeltaLogging {
* updates need to be applied, will return None.
*/
def enforceIcebergInvariantsAndDependencies(
spark: SparkSession,
snapshot: Snapshot,
newestProtocol: Protocol,
newestMetadata: Metadata,
Expand Down Expand Up @@ -200,6 +202,7 @@ object UniversalFormat extends DeltaLogging {
changed = uniformProtocol.nonEmpty || uniformMetadata.nonEmpty

val (v1protocolUpdate, v1metadataUpdate) = IcebergCompatV1.enforceInvariantsAndDependencies(
spark,
snapshot,
newestProtocol = protocolToCheck,
newestMetadata = metadataToCheck,
Expand All @@ -211,6 +214,7 @@ object UniversalFormat extends DeltaLogging {
changed ||= v1protocolUpdate.nonEmpty || v1metadataUpdate.nonEmpty

val (v2protocolUpdate, v2metadataUpdate) = IcebergCompatV2.enforceInvariantsAndDependencies(
spark,
snapshot,
newestProtocol = protocolToCheck,
newestMetadata = metadataToCheck,
Expand Down Expand Up @@ -238,12 +242,14 @@ object UniversalFormat extends DeltaLogging {
* otherwise the original configuration.
*/
def enforceDependenciesInConfiguration(
spark: SparkSession,
configuration: Map[String, String],
snapshot: Snapshot): Map[String, String] = {
var metadata = snapshot.metadata.copy(configuration = configuration)

// Check UniversalFormat related property dependencies
val (_, universalMetadata) = UniversalFormat.enforceInvariantsAndDependencies(
spark,
snapshot,
newestProtocol = snapshot.protocol,
newestMetadata = metadata,
Expand Down
Loading
Loading