diff --git a/hudi/src/main/scala/org/apache/spark/sql/delta/hudi/HudiConversionTransaction.scala b/hudi/src/main/scala/org/apache/spark/sql/delta/hudi/HudiConversionTransaction.scala index 98812427326..d0c687d7728 100644 --- a/hudi/src/main/scala/org/apache/spark/sql/delta/hudi/HudiConversionTransaction.scala +++ b/hudi/src/main/scala/org/apache/spark/sql/delta/hudi/HudiConversionTransaction.scala @@ -16,19 +16,29 @@ package org.apache.spark.sql.delta.hudi -import org.apache.avro.Schema +import java.io.{IOException, UncheckedIOException} +import java.time.{Instant, LocalDateTime, ZoneId} +import java.time.format.{DateTimeFormatterBuilder, DateTimeParseException} +import java.time.temporal.{ChronoField, ChronoUnit} +import java.util +import java.util.{Collections, Properties} +import java.util.stream.Collectors +import scala.collection.JavaConverters._ +import scala.collection.mutable._ import scala.util.control.NonFatal + import org.apache.spark.sql.delta.Snapshot import org.apache.spark.sql.delta.actions.Action import org.apache.spark.sql.delta.hudi.HudiSchemaUtils._ import org.apache.spark.sql.delta.hudi.HudiTransactionUtils._ import org.apache.spark.sql.delta.metering.DeltaLogging +import org.apache.avro.Schema import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.hadoop.conf.Configuration import org.apache.hudi.avro.model.HoodieActionInstant -import org.apache.hudi.avro.model.HoodieCleanFileInfo import org.apache.hudi.avro.model.HoodieCleanerPlan +import org.apache.hudi.avro.model.HoodieCleanFileInfo import org.apache.hudi.client.HoodieJavaWriteClient import org.apache.hudi.client.HoodieTimelineArchiver import org.apache.hudi.client.WriteStatus @@ -40,9 +50,9 @@ import org.apache.hudi.common.model.{HoodieAvroPayload, HoodieBaseFile, HoodieCl import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieInstantTimeGenerator, HoodieTimeline, TimelineMetadataUtils} import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator.{MILLIS_INSTANT_TIMESTAMP_FORMAT_LENGTH, SECS_INSTANT_ID_LENGTH, SECS_INSTANT_TIMESTAMP_FORMAT} +import org.apache.hudi.common.util.{Option => HudiOption} import org.apache.hudi.common.util.CleanerUtils import org.apache.hudi.common.util.ExternalFilePathUtil -import org.apache.hudi.common.util.{Option => HudiOption} import org.apache.hudi.common.util.collection.Pair import org.apache.hudi.config.HoodieArchivalConfig import org.apache.hudi.config.HoodieCleanConfig @@ -53,16 +63,6 @@ import org.apache.hudi.index.HoodieIndex.IndexType.INMEMORY import org.apache.hudi.table.HoodieJavaTable import org.apache.hudi.table.action.clean.CleanPlanner -import java.io.{IOException, UncheckedIOException} -import java.time.{Instant, LocalDateTime, ZoneId} -import java.time.format.{DateTimeFormatterBuilder, DateTimeParseException} -import java.time.temporal.{ChronoField, ChronoUnit} -import java.util -import java.util.stream.Collectors -import java.util.{Collections, Properties} -import collection.mutable._ -import scala.collection.JavaConverters._ - /** * Used to prepare (convert) and then commit a set of Delta actions into the Hudi table located * at the same path as [[postCommitSnapshot]] diff --git a/hudi/src/main/scala/org/apache/spark/sql/delta/hudi/HudiConverter.scala b/hudi/src/main/scala/org/apache/spark/sql/delta/hudi/HudiConverter.scala index 68eb75b1048..e67c8ba3c18 100644 --- a/hudi/src/main/scala/org/apache/spark/sql/delta/hudi/HudiConverter.scala +++ b/hudi/src/main/scala/org/apache/spark/sql/delta/hudi/HudiConverter.scala @@ -16,25 +16,30 @@ package org.apache.spark.sql.delta.hudi +import java.io.{IOException, UncheckedIOException} +import java.util.concurrent.atomic.AtomicReference +import javax.annotation.concurrent.GuardedBy + +import scala.collection.JavaConverters._ +import scala.util.control.NonFatal + +import org.apache.spark.sql.delta._ +import org.apache.spark.sql.delta.OptimisticTransactionImpl +import org.apache.spark.sql.delta.Snapshot +import org.apache.spark.sql.delta.UniversalFormatConverter +import org.apache.spark.sql.delta.actions.Action +import org.apache.spark.sql.delta.hooks.HudiConverterHook +import org.apache.spark.sql.delta.hudi.HudiTransactionUtils._ +import org.apache.spark.sql.delta.metering.DeltaLogging +import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieReplaceCommitMetadata} import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline} import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration + import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.catalog.CatalogTable -import org.apache.spark.sql.delta.actions.Action -import org.apache.spark.sql.delta.hooks.HudiConverterHook -import org.apache.spark.sql.delta.hudi.HudiTransactionUtils._ -import org.apache.spark.sql.delta.metering.DeltaLogging -import org.apache.spark.sql.delta.sources.DeltaSQLConf -import org.apache.spark.sql.delta._ - -import java.io.{IOException, UncheckedIOException} -import java.util.concurrent.atomic.AtomicReference -import javax.annotation.concurrent.GuardedBy -import scala.collection.JavaConverters._ -import scala.util.control.NonFatal object HudiConverter { /** diff --git a/hudi/src/main/scala/org/apache/spark/sql/delta/hudi/HudiSchemaUtils.scala b/hudi/src/main/scala/org/apache/spark/sql/delta/hudi/HudiSchemaUtils.scala index d1ca23ba494..bdd23e83cd9 100644 --- a/hudi/src/main/scala/org/apache/spark/sql/delta/hudi/HudiSchemaUtils.scala +++ b/hudi/src/main/scala/org/apache/spark/sql/delta/hudi/HudiSchemaUtils.scala @@ -16,13 +16,15 @@ package org.apache.spark.sql.delta.hudi -import org.apache.avro.{LogicalTypes, Schema} -import org.apache.spark.sql.delta.metering.DeltaLogging -import org.apache.spark.sql.types._ - import java.util + import scala.collection.JavaConverters._ +import org.apache.spark.sql.delta.metering.DeltaLogging +import org.apache.avro.{LogicalTypes, Schema} + +import org.apache.spark.sql.types._ + object HudiSchemaUtils extends DeltaLogging { ///////////////// @@ -73,7 +75,8 @@ object HudiSchemaUtils extends DeltaLogging { private def convertAtomic[E <: DataType](elem: E, isNullable: Boolean) = elem match { case StringType => finalizeSchema(Schema.create(Schema.Type.STRING), isNullable) case LongType => finalizeSchema(Schema.create(Schema.Type.LONG), isNullable) - case IntegerType | ShortType => finalizeSchema(Schema.create(Schema.Type.INT), isNullable) + case IntegerType => finalizeSchema( + Schema.create(Schema.Type.INT), isNullable) case FloatType => finalizeSchema(Schema.create(Schema.Type.FLOAT), isNullable) case DoubleType => finalizeSchema(Schema.create(Schema.Type.DOUBLE), isNullable) case d: DecimalType => finalizeSchema(LogicalTypes.decimal(d.precision, d.scale) @@ -84,8 +87,6 @@ object HudiSchemaUtils extends DeltaLogging { LogicalTypes.date.addToSchema(Schema.create(Schema.Type.INT)), isNullable) case TimestampType => finalizeSchema( LogicalTypes.timestampMicros.addToSchema(Schema.create(Schema.Type.LONG)), isNullable) - case TimestampNTZType => finalizeSchema( - LogicalTypes.localTimestampMicros.addToSchema(Schema.create(Schema.Type.LONG)), isNullable) case _ => throw new UnsupportedOperationException(s"Could not convert atomic type $elem") } } diff --git a/hudi/src/main/scala/org/apache/spark/sql/delta/hudi/HudiTransactionUtils.scala b/hudi/src/main/scala/org/apache/spark/sql/delta/hudi/HudiTransactionUtils.scala index bd7788b9f46..a99a73a0686 100644 --- a/hudi/src/main/scala/org/apache/spark/sql/delta/hudi/HudiTransactionUtils.scala +++ b/hudi/src/main/scala/org/apache/spark/sql/delta/hudi/HudiTransactionUtils.scala @@ -16,16 +16,15 @@ package org.apache.spark.sql.delta.hudi -import org.apache.hadoop.conf.Configuration +import org.apache.spark.sql.delta.actions.AddFile +import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.hadoop.fs.Path import org.apache.hudi.client.WriteStatus -import org.apache.hudi.common.model.{HoodieAvroPayload, HoodieTableType, HoodieTimelineTimeZone, HoodieDeltaWriteStat} +import org.apache.hudi.common.model.{HoodieAvroPayload, HoodieDeltaWriteStat, HoodieTableType, HoodieTimelineTimeZone} import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.util.ExternalFilePathUtil import org.apache.hudi.exception.TableNotFoundException import org.apache.hudi.storage.StorageConfiguration -import org.apache.spark.sql.delta.actions.AddFile -import org.apache.spark.sql.delta.metering.DeltaLogging object HudiTransactionUtils extends DeltaLogging { diff --git a/hudi/src/test/scala/org/apache/spark/sql/delta/hudi/ConvertToHudiSuite.scala b/hudi/src/test/scala/org/apache/spark/sql/delta/hudi/ConvertToHudiSuite.scala index b96915b7021..92abeeb3579 100644 --- a/hudi/src/test/scala/org/apache/spark/sql/delta/hudi/ConvertToHudiSuite.scala +++ b/hudi/src/test/scala/org/apache/spark/sql/delta/hudi/ConvertToHudiSuite.scala @@ -16,8 +16,21 @@ package org.apache.spark.sql.delta.hudi +import java.io.File +import java.time.Instant +import java.util.UUID +import java.util.stream.Collectors + +import scala.collection.JavaConverters + +import org.apache.spark.sql.delta.{DeltaConfigs, DeltaLog, DeltaUnsupportedOperationException, OptimisticTransaction} +import org.apache.spark.sql.delta.DeltaOperations.Truncate +import org.apache.spark.sql.delta.actions.{Action, AddFile, Metadata} +import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path +import org.scalatest.concurrent.Eventually +import org.scalatest.time.SpanSugar._ import org.apache.hudi.common.config.HoodieMetadataConfig import org.apache.hudi.common.engine.HoodieLocalEngineContext import org.apache.hudi.common.fs.FSUtils @@ -26,23 +39,12 @@ import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.metadata.HoodieMetadataFileSystemView import org.apache.hudi.storage.StorageConfiguration import org.apache.hudi.storage.hadoop.{HadoopStorageConfiguration, HoodieHadoopStorage} + import org.apache.spark.SparkContext import org.apache.spark.sql.{QueryTest, SparkSession} import org.apache.spark.sql.avro.SchemaConverters -import org.apache.spark.sql.delta.DeltaOperations.Truncate -import org.apache.spark.sql.delta.{DeltaConfigs, DeltaLog, DeltaUnsupportedOperationException, OptimisticTransaction} -import org.apache.spark.sql.delta.actions.{Action, AddFile, Metadata, RemoveFile} -import org.apache.spark.sql.delta.sources.DeltaSQLConf -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{ByteType, IntegerType, ShortType, StructField, StructType} import org.apache.spark.util.{ManualClock, Utils} -import org.scalatest.concurrent.Eventually -import org.scalatest.time.SpanSugar._ - -import java.io.File -import java.time.Instant -import java.util.UUID -import java.util.stream.Collectors -import scala.collection.JavaConverters class ConvertToHudiSuite extends QueryTest with Eventually { @@ -150,6 +152,21 @@ class ConvertToHudiSuite extends QueryTest with Eventually { } } + test("Enabling Delete Vector After Hudi Enabled Already Throws Exception") { + intercept[DeltaUnsupportedOperationException] { + _sparkSession.sql( + s"""CREATE TABLE `$testTableName` (col1 INT, col2 STRING) USING DELTA + |LOCATION '$testTablePath' + |TBLPROPERTIES ( + | 'delta.universalFormat.enabledFormats' = 'hudi' + |)""".stripMargin) + _sparkSession.sql( + s"""ALTER TABLE `$testTableName` SET TBLPROPERTIES ( + | 'delta.enableDeletionVectors' = true + |)""".stripMargin) + } + } + test(s"Conversion behavior for lists") { _sparkSession.sql( s"""CREATE TABLE `$testTableName` (col1 ARRAY) USING DELTA @@ -201,6 +218,22 @@ class ConvertToHudiSuite extends QueryTest with Eventually { verifyFilesAndSchemaMatch() } + test(s"Conversion behavior for nested structs") { + _sparkSession.sql( + s"""CREATE TABLE `$testTableName` (col1 STRUCT>) + |USING DELTA + |LOCATION '$testTablePath' + |TBLPROPERTIES ( + | 'delta.universalFormat.enabledFormats' = 'hudi' + |)""".stripMargin) + _sparkSession.sql( + s"INSERT INTO `$testTableName` VALUES (named_struct('field1', 1, 'field2', 'hello', " + + "'field3', named_struct('field4', 2, 'field5', 3, 'field6', 'world')))" + ) + verifyFilesAndSchemaMatch() + } + test("validate Hudi timeline archival and cleaning") { val testOp = Truncate() withDefaultTablePropsInSQLConf(true, { @@ -213,7 +246,8 @@ class ConvertToHudiSuite extends QueryTest with Eventually { val file = AddFile(i.toString + ".parquet", Map.empty, 1, 1, true) :: Nil val delete: Seq[Action] = if (i > 1) { val timestamp = startTime + (System.currentTimeMillis() - actualTestStartTime) - RemoveFile((i - 1).toString + ".parquet", Some(timestamp), true) :: Nil + val prevFile = AddFile((i - 1).toString + ".parquet", Map.empty, 1, 1, true) + prevFile.removeWithTimestamp(timestamp) :: Nil } else { Nil } @@ -241,7 +275,9 @@ class ConvertToHudiSuite extends QueryTest with Eventually { _sparkSession.sql( s"""CREATE TABLE `$testTableName` (col1 BIGINT, col2 BOOLEAN, col3 DATE, | col4 DOUBLE, col5 FLOAT, col6 INT, col7 STRING, col8 TIMESTAMP, - | col9 STRUCT) + | col9 BINARY, col10 DECIMAL(5, 2), + | col11 STRUCT>) | USING DELTA |LOCATION '$testTablePath' |TBLPROPERTIES ( @@ -250,11 +286,25 @@ class ConvertToHudiSuite extends QueryTest with Eventually { val nowSeconds = Instant.now().getEpochSecond _sparkSession.sql(s"INSERT INTO `$testTableName` VALUES (123, true, " + s"date(from_unixtime($nowSeconds)), 32.1, 1.23, 456, 'hello world', " - + s"timestamp(from_unixtime($nowSeconds)), " - + s"named_struct('field1', 789, 'field2', 'hello'))") + + s"timestamp(from_unixtime($nowSeconds)), X'1ABF', -999.99," + + s"STRUCT(1, 'hello', STRUCT(2, 3, 'world')))") verifyFilesAndSchemaMatch() } + for (invalidType <- Seq("SMALLINT", "TINYINT", "TIMESTAMP_NTZ", "VOID")) { + test(s"Unsupported Type $invalidType Throws Exception") { + intercept[DeltaUnsupportedOperationException] { + _sparkSession.sql( + s"""CREATE TABLE `$testTableName` (col1 $invalidType) USING DELTA + |LOCATION '$testTablePath' + |TBLPROPERTIES ( + | 'delta.universalFormat.enabledFormats' = 'hudi' + |)""".stripMargin) + } + } + } + + test("all batches of actions are converted") { withSQLConf( DeltaSQLConf.HUDI_MAX_COMMITS_TO_CONVERT.key -> "3" @@ -321,7 +371,14 @@ class ConvertToHudiSuite extends QueryTest with Eventually { assert(paths.equals(expectedFiles), s"Files do not match.\nExpected: $expectedFiles\nActual: $paths") // Assert schemas are equal - val expectedSchema = deltaDF.schema + val expectedSchema = StructType(deltaDF.schema.map { + case StructField(name, ShortType, nullable, _) => + StructField(name, IntegerType, nullable) + case StructField(name, ByteType, nullable, _) => + StructField(name, IntegerType, nullable) + case other => other + }) + assert(hudiSchemaAsStruct.equals(expectedSchema), s"Schemas do not match.\nExpected: $expectedSchema\nActual: $hudiSchemaAsStruct") } @@ -352,7 +409,6 @@ class ConvertToHudiSuite extends QueryTest with Eventually { .appName("UniformSession") .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") - .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .getOrCreate() } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/UniversalFormat.scala b/spark/src/main/scala/org/apache/spark/sql/delta/UniversalFormat.scala index acad136e499..d31ecd192d2 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/UniversalFormat.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/UniversalFormat.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.delta.schema.SchemaUtils import org.apache.spark.internal.MDC import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.catalog.CatalogTable -import org.apache.spark.sql.types.NullType +import org.apache.spark.sql.types.{ByteType, CalendarIntervalType, NullType, ShortType, TimestampNTZType, VariantType} /** * Utils to validate the Universal Format (UniForm) Delta feature (NOT a table feature). @@ -103,7 +103,8 @@ object UniversalFormat extends DeltaLogging { throw DeltaErrors.uniFormHudiDeleteVectorCompat() } SchemaUtils.findAnyTypeRecursively(newestMetadata.schema) { f => - f.isInstanceOf[NullType] + f.isInstanceOf[NullType] | f.isInstanceOf[ByteType] | f.isInstanceOf[ShortType] | + f.isInstanceOf[TimestampNTZType] | f.isInstanceOf[VariantType] } match { case Some(unsupportedType) => throw DeltaErrors.uniFormHudiSchemaCompat(unsupportedType)