Skip to content

Commit

Permalink
Minor updates: comments, naming
Browse files Browse the repository at this point in the history
  • Loading branch information
johanl-db committed Dec 13, 2024
1 parent 783af1b commit 51e83c7
Show file tree
Hide file tree
Showing 7 changed files with 22 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -949,10 +949,11 @@ class DeltaAnalysis(session: SparkSession)
}

/**
* Returns a mapping of (fromType, toType) to Boolean indicating whether `fromType` is eligible to
* be automatically widened to `toType` when ingesting data. If it is, the table schema is updated
* to `toType` before ingestion and values are written using their origin `toType` type.
* Otherwise, the table type `fromType` is retained and values are downcasted on write.
* Returns the type widening mode to use for the given delta table. A type widening mode indicates
* for (fromType, toType) tuples whether `fromType` is eligible to be automatically widened to
* `toType` when ingesting data. If it is, the table schema is updated to `toType` before
* ingestion and values are written using their original `toType` type. Otherwise, the table type
* `fromType` is retained and values are downcasted on write.
*/
private def getTypeWideningMode(
deltaTable: DeltaTableV2,
Expand All @@ -964,7 +965,7 @@ class DeltaAnalysis(session: SparkSession)

if (typeWideningEnabled && schemaEvolutionEnabled) {
TypeWideningMode.TypeEvolution(
uniformIcebergEnabled = UniversalFormat.icebergEnabled(snapshot.metadata))
uniformIcebergCompatibleOnly = UniversalFormat.icebergEnabled(snapshot.metadata))
} else {
TypeWideningMode.NoTypeWidening
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ object ResolveDeltaMergeInto {
target.collectFirst {
case DeltaTable(index) if TypeWidening.isEnabled(index.protocol, index.metadata) =>
TypeWideningMode.TypeEvolution(
uniformIcebergEnabled = UniversalFormat.icebergEnabled(index.metadata))
uniformIcebergCompatibleOnly = UniversalFormat.icebergEnabled(index.metadata))
}.getOrElse(TypeWideningMode.NoTypeWidening)

// The implicit conversions flag allows any type to be merged from source to target if Spark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,12 @@ object TypeWidening {
def isTypeChangeSupportedForSchemaEvolution(
fromType: AtomicType,
toType: AtomicType,
uniformIcebergEnabled: Boolean): Boolean =
uniformIcebergCompatibleOnly: Boolean): Boolean =
TypeWideningShims.isTypeChangeSupportedForSchemaEvolution(
fromType = fromType,
toType = toType
) && (
!uniformIcebergEnabled ||
!uniformIcebergCompatibleOnly ||
isTypeChangeSupportedByIceberg(fromType = fromType, toType = toType)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta

import org.apache.spark.sql.types.AtomicType
Expand All @@ -21,30 +22,31 @@ import org.apache.spark.sql.types.AtomicType
* A type widening mode captures a specific set of type changes that are allowed to be applied.
* Currently:
* - NoTypeWidening: No type change is allowed.
* - TypeEvolution(uniformIcebergEnabled = true): Type changes that are eligible to be applied
* automatically during schema evolution and that are supported by Iceberg are allowed.
* - TypeEvolution(uniformIcebergEnabled = false): Type changes that are eligible to be applied
* automatically during schema evolution are allowed, even if they are not supported by Iceberg.
* - TypeEvolution(uniformIcebergCompatibleOnly = true): Type changes that are eligible to be
* applied automatically during schema evolution and that are supported by Iceberg are allowed.
* - TypeEvolution(uniformIcebergCompatibleOnly = false): Type changes that are eligible to be
* applied automatically during schema evolution are allowed, even if they are not supported by
* Iceberg.
*/
trait TypeWideningMode {
sealed trait TypeWideningMode {
def shouldWidenType(fromType: AtomicType, toType: AtomicType): Boolean
}

object TypeWideningMode {
/**
* No type change allowed. Typically because type widening and/or schema evolution isn't enabled.
*/
object NoTypeWidening extends TypeWideningMode {
case object NoTypeWidening extends TypeWideningMode {
override def shouldWidenType(fromType: AtomicType, toType: AtomicType): Boolean = false
}

/**
* Type changes that are eligible to be applied automatically during schema evolution are allowed.
* Can be restricted to only type changes supported by Iceberg.
*/
case class TypeEvolution(uniformIcebergEnabled: Boolean) extends TypeWideningMode {
case class TypeEvolution(uniformIcebergCompatibleOnly: Boolean) extends TypeWideningMode {
override def shouldWidenType(fromType: AtomicType, toType: AtomicType): Boolean =
TypeWidening.isTypeChangeSupportedForSchemaEvolution(
fromType = fromType, toType = toType, uniformIcebergEnabled)
fromType = fromType, toType = toType, uniformIcebergCompatibleOnly)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ object ImplicitMetadataOperation {

val typeWideningMode = if (TypeWidening.isEnabled(txn.protocol, txn.metadata)) {
TypeWideningMode.TypeEvolution(
uniformIcebergEnabled = UniversalFormat.icebergEnabled(txn.metadata))
uniformIcebergCompatibleOnly = UniversalFormat.icebergEnabled(txn.metadata))
} else {
TypeWideningMode.NoTypeWidening
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ case class DeltaSink(

val typeWideningMode = if (canMergeSchema && TypeWidening.isEnabled(protocol, metadata)) {
TypeWideningMode.TypeEvolution(
uniformIcebergEnabled = UniversalFormat.icebergEnabled(metadata))
uniformIcebergCompatibleOnly = UniversalFormat.icebergEnabled(metadata))
} else {
TypeWideningMode.NoTypeWidening
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2533,7 +2533,7 @@ class SchemaUtilsSuite extends QueryTest
mergeSchemas(
base,
update,
typeWideningMode = TypeWideningMode.TypeEvolution(uniformIcebergEnabled = false),
typeWideningMode = TypeWideningMode.TypeEvolution(uniformIcebergCompatibleOnly = false),
keepExistingType = true
)
assert(mergedSchema === expected)
Expand Down

0 comments on commit 51e83c7

Please sign in to comment.