From 51e83c756fc1df7811b1902c98e0b0f1a69f2e14 Mon Sep 17 00:00:00 2001 From: Johan Lasperas Date: Fri, 13 Dec 2024 12:58:43 +0100 Subject: [PATCH] Minor updates: comments, naming --- .../apache/spark/sql/delta/DeltaAnalysis.scala | 11 ++++++----- .../sql/delta/ResolveDeltaMergeInto.scala | 2 +- .../apache/spark/sql/delta/TypeWidening.scala | 4 ++-- .../spark/sql/delta/TypeWideningMode.scala | 18 ++++++++++-------- .../schema/ImplicitMetadataOperation.scala | 2 +- .../spark/sql/delta/sources/DeltaSink.scala | 2 +- .../sql/delta/schema/SchemaUtilsSuite.scala | 2 +- 7 files changed, 22 insertions(+), 19 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala index 66c638c371..f2a2e1168e 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala @@ -949,10 +949,11 @@ class DeltaAnalysis(session: SparkSession) } /** - * Returns a mapping of (fromType, toType) to Boolean indicating whether `fromType` is eligible to - * be automatically widened to `toType` when ingesting data. If it is, the table schema is updated - * to `toType` before ingestion and values are written using their origin `toType` type. - * Otherwise, the table type `fromType` is retained and values are downcasted on write. + * Returns the type widening mode to use for the given delta table. A type widening mode indicates + * for (fromType, toType) tuples whether `fromType` is eligible to be automatically widened to + * `toType` when ingesting data. If it is, the table schema is updated to `toType` before + * ingestion and values are written using their original `toType` type. Otherwise, the table type + * `fromType` is retained and values are downcasted on write. */ private def getTypeWideningMode( deltaTable: DeltaTableV2, @@ -964,7 +965,7 @@ class DeltaAnalysis(session: SparkSession) if (typeWideningEnabled && schemaEvolutionEnabled) { TypeWideningMode.TypeEvolution( - uniformIcebergEnabled = UniversalFormat.icebergEnabled(snapshot.metadata)) + uniformIcebergCompatibleOnly = UniversalFormat.icebergEnabled(snapshot.metadata)) } else { TypeWideningMode.NoTypeWidening } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/ResolveDeltaMergeInto.scala b/spark/src/main/scala/org/apache/spark/sql/delta/ResolveDeltaMergeInto.scala index 947d662b4b..d891d74a1c 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/ResolveDeltaMergeInto.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/ResolveDeltaMergeInto.scala @@ -297,7 +297,7 @@ object ResolveDeltaMergeInto { target.collectFirst { case DeltaTable(index) if TypeWidening.isEnabled(index.protocol, index.metadata) => TypeWideningMode.TypeEvolution( - uniformIcebergEnabled = UniversalFormat.icebergEnabled(index.metadata)) + uniformIcebergCompatibleOnly = UniversalFormat.icebergEnabled(index.metadata)) }.getOrElse(TypeWideningMode.NoTypeWidening) // The implicit conversions flag allows any type to be merged from source to target if Spark diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala b/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala index bf9980182d..49ddaa800e 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala @@ -77,12 +77,12 @@ object TypeWidening { def isTypeChangeSupportedForSchemaEvolution( fromType: AtomicType, toType: AtomicType, - uniformIcebergEnabled: Boolean): Boolean = + uniformIcebergCompatibleOnly: Boolean): Boolean = TypeWideningShims.isTypeChangeSupportedForSchemaEvolution( fromType = fromType, toType = toType ) && ( - !uniformIcebergEnabled || + !uniformIcebergCompatibleOnly || isTypeChangeSupportedByIceberg(fromType = fromType, toType = toType) ) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/TypeWideningMode.scala b/spark/src/main/scala/org/apache/spark/sql/delta/TypeWideningMode.scala index dd226b90c4..4d3a15dbf9 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/TypeWideningMode.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/TypeWideningMode.scala @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.spark.sql.delta import org.apache.spark.sql.types.AtomicType @@ -21,12 +22,13 @@ import org.apache.spark.sql.types.AtomicType * A type widening mode captures a specific set of type changes that are allowed to be applied. * Currently: * - NoTypeWidening: No type change is allowed. - * - TypeEvolution(uniformIcebergEnabled = true): Type changes that are eligible to be applied - * automatically during schema evolution and that are supported by Iceberg are allowed. - * - TypeEvolution(uniformIcebergEnabled = false): Type changes that are eligible to be applied - * automatically during schema evolution are allowed, even if they are not supported by Iceberg. + * - TypeEvolution(uniformIcebergCompatibleOnly = true): Type changes that are eligible to be + * applied automatically during schema evolution and that are supported by Iceberg are allowed. + * - TypeEvolution(uniformIcebergCompatibleOnly = false): Type changes that are eligible to be + * applied automatically during schema evolution are allowed, even if they are not supported by + * Iceberg. */ -trait TypeWideningMode { +sealed trait TypeWideningMode { def shouldWidenType(fromType: AtomicType, toType: AtomicType): Boolean } @@ -34,7 +36,7 @@ object TypeWideningMode { /** * No type change allowed. Typically because type widening and/or schema evolution isn't enabled. */ - object NoTypeWidening extends TypeWideningMode { + case object NoTypeWidening extends TypeWideningMode { override def shouldWidenType(fromType: AtomicType, toType: AtomicType): Boolean = false } @@ -42,9 +44,9 @@ object TypeWideningMode { * Type changes that are eligible to be applied automatically during schema evolution are allowed. * Can be restricted to only type changes supported by Iceberg. */ - case class TypeEvolution(uniformIcebergEnabled: Boolean) extends TypeWideningMode { + case class TypeEvolution(uniformIcebergCompatibleOnly: Boolean) extends TypeWideningMode { override def shouldWidenType(fromType: AtomicType, toType: AtomicType): Boolean = TypeWidening.isTypeChangeSupportedForSchemaEvolution( - fromType = fromType, toType = toType, uniformIcebergEnabled) + fromType = fromType, toType = toType, uniformIcebergCompatibleOnly) } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/schema/ImplicitMetadataOperation.scala b/spark/src/main/scala/org/apache/spark/sql/delta/schema/ImplicitMetadataOperation.scala index 65cf763d05..e66e68d81f 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/schema/ImplicitMetadataOperation.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/schema/ImplicitMetadataOperation.scala @@ -222,7 +222,7 @@ object ImplicitMetadataOperation { val typeWideningMode = if (TypeWidening.isEnabled(txn.protocol, txn.metadata)) { TypeWideningMode.TypeEvolution( - uniformIcebergEnabled = UniversalFormat.icebergEnabled(txn.metadata)) + uniformIcebergCompatibleOnly = UniversalFormat.icebergEnabled(txn.metadata)) } else { TypeWideningMode.NoTypeWidening } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSink.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSink.scala index e032b212c8..a412aa47c2 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSink.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSink.scala @@ -178,7 +178,7 @@ case class DeltaSink( val typeWideningMode = if (canMergeSchema && TypeWidening.isEnabled(protocol, metadata)) { TypeWideningMode.TypeEvolution( - uniformIcebergEnabled = UniversalFormat.icebergEnabled(metadata)) + uniformIcebergCompatibleOnly = UniversalFormat.icebergEnabled(metadata)) } else { TypeWideningMode.NoTypeWidening } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/schema/SchemaUtilsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/schema/SchemaUtilsSuite.scala index c3141291a0..c6939677d7 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/schema/SchemaUtilsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/schema/SchemaUtilsSuite.scala @@ -2533,7 +2533,7 @@ class SchemaUtilsSuite extends QueryTest mergeSchemas( base, update, - typeWideningMode = TypeWideningMode.TypeEvolution(uniformIcebergEnabled = false), + typeWideningMode = TypeWideningMode.TypeEvolution(uniformIcebergCompatibleOnly = false), keepExistingType = true ) assert(mergedSchema === expected)