Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
  • Loading branch information
anniewang-db committed Jul 5, 2024
1 parent 9743983 commit affa389
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,29 @@

package org.apache.spark.sql.delta.hudi

import org.apache.avro.Schema
import java.io.{IOException, UncheckedIOException}
import java.time.{Instant, LocalDateTime, ZoneId}
import java.time.format.{DateTimeFormatterBuilder, DateTimeParseException}
import java.time.temporal.{ChronoField, ChronoUnit}
import java.util
import java.util.{Collections, Properties}
import java.util.stream.Collectors

import scala.collection.JavaConverters._
import scala.collection.mutable._
import scala.util.control.NonFatal

import org.apache.spark.sql.delta.Snapshot
import org.apache.spark.sql.delta.actions.Action
import org.apache.spark.sql.delta.hudi.HudiSchemaUtils._
import org.apache.spark.sql.delta.hudi.HudiTransactionUtils._
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.avro.Schema
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hudi.avro.model.HoodieActionInstant
import org.apache.hudi.avro.model.HoodieCleanFileInfo
import org.apache.hudi.avro.model.HoodieCleanerPlan
import org.apache.hudi.avro.model.HoodieCleanFileInfo
import org.apache.hudi.client.HoodieJavaWriteClient
import org.apache.hudi.client.HoodieTimelineArchiver
import org.apache.hudi.client.WriteStatus
Expand All @@ -40,9 +50,9 @@ import org.apache.hudi.common.model.{HoodieAvroPayload, HoodieBaseFile, HoodieCl
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieInstantTimeGenerator, HoodieTimeline, TimelineMetadataUtils}
import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator.{MILLIS_INSTANT_TIMESTAMP_FORMAT_LENGTH, SECS_INSTANT_ID_LENGTH, SECS_INSTANT_TIMESTAMP_FORMAT}
import org.apache.hudi.common.util.{Option => HudiOption}
import org.apache.hudi.common.util.CleanerUtils
import org.apache.hudi.common.util.ExternalFilePathUtil
import org.apache.hudi.common.util.{Option => HudiOption}
import org.apache.hudi.common.util.collection.Pair
import org.apache.hudi.config.HoodieArchivalConfig
import org.apache.hudi.config.HoodieCleanConfig
Expand All @@ -53,16 +63,6 @@ import org.apache.hudi.index.HoodieIndex.IndexType.INMEMORY
import org.apache.hudi.table.HoodieJavaTable
import org.apache.hudi.table.action.clean.CleanPlanner

import java.io.{IOException, UncheckedIOException}
import java.time.{Instant, LocalDateTime, ZoneId}
import java.time.format.{DateTimeFormatterBuilder, DateTimeParseException}
import java.time.temporal.{ChronoField, ChronoUnit}
import java.util
import java.util.stream.Collectors
import java.util.{Collections, Properties}
import collection.mutable._
import scala.collection.JavaConverters._

/**
* Used to prepare (convert) and then commit a set of Delta actions into the Hudi table located
* at the same path as [[postCommitSnapshot]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,30 @@

package org.apache.spark.sql.delta.hudi

import java.io.{IOException, UncheckedIOException}
import java.util.concurrent.atomic.AtomicReference
import javax.annotation.concurrent.GuardedBy

import scala.collection.JavaConverters._
import scala.util.control.NonFatal

import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.OptimisticTransactionImpl
import org.apache.spark.sql.delta.Snapshot
import org.apache.spark.sql.delta.UniversalFormatConverter
import org.apache.spark.sql.delta.actions.Action
import org.apache.spark.sql.delta.hooks.HudiConverterHook
import org.apache.spark.sql.delta.hudi.HudiTransactionUtils._
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieReplaceCommitMetadata}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.delta.actions.Action
import org.apache.spark.sql.delta.hooks.HudiConverterHook
import org.apache.spark.sql.delta.hudi.HudiTransactionUtils._
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta._

import java.io.{IOException, UncheckedIOException}
import java.util.concurrent.atomic.AtomicReference
import javax.annotation.concurrent.GuardedBy
import scala.collection.JavaConverters._
import scala.util.control.NonFatal

object HudiConverter {
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@

package org.apache.spark.sql.delta.hudi

import org.apache.avro.{LogicalTypes, Schema}
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.types._

import java.util

import scala.collection.JavaConverters._

import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.avro.{LogicalTypes, Schema}

import org.apache.spark.sql.types._

object HudiSchemaUtils extends DeltaLogging {

/////////////////
Expand Down Expand Up @@ -73,7 +75,8 @@ object HudiSchemaUtils extends DeltaLogging {
private def convertAtomic[E <: DataType](elem: E, isNullable: Boolean) = elem match {
case StringType => finalizeSchema(Schema.create(Schema.Type.STRING), isNullable)
case LongType => finalizeSchema(Schema.create(Schema.Type.LONG), isNullable)
case IntegerType | ShortType => finalizeSchema(Schema.create(Schema.Type.INT), isNullable)
case IntegerType => finalizeSchema(
Schema.create(Schema.Type.INT), isNullable)
case FloatType => finalizeSchema(Schema.create(Schema.Type.FLOAT), isNullable)
case DoubleType => finalizeSchema(Schema.create(Schema.Type.DOUBLE), isNullable)
case d: DecimalType => finalizeSchema(LogicalTypes.decimal(d.precision, d.scale)
Expand All @@ -84,8 +87,6 @@ object HudiSchemaUtils extends DeltaLogging {
LogicalTypes.date.addToSchema(Schema.create(Schema.Type.INT)), isNullable)
case TimestampType => finalizeSchema(
LogicalTypes.timestampMicros.addToSchema(Schema.create(Schema.Type.LONG)), isNullable)
case TimestampNTZType => finalizeSchema(
LogicalTypes.localTimestampMicros.addToSchema(Schema.create(Schema.Type.LONG)), isNullable)
case _ => throw new UnsupportedOperationException(s"Could not convert atomic type $elem")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,15 @@

package org.apache.spark.sql.delta.hudi

import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.delta.actions.AddFile
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.hadoop.fs.Path
import org.apache.hudi.client.WriteStatus
import org.apache.hudi.common.model.{HoodieAvroPayload, HoodieTableType, HoodieTimelineTimeZone, HoodieDeltaWriteStat}
import org.apache.hudi.common.model.{HoodieAvroPayload, HoodieDeltaWriteStat, HoodieTableType, HoodieTimelineTimeZone}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.util.ExternalFilePathUtil
import org.apache.hudi.exception.TableNotFoundException
import org.apache.hudi.storage.StorageConfiguration
import org.apache.spark.sql.delta.actions.AddFile
import org.apache.spark.sql.delta.metering.DeltaLogging

object HudiTransactionUtils extends DeltaLogging {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,21 @@

package org.apache.spark.sql.delta.hudi

import java.io.File
import java.time.Instant
import java.util.UUID
import java.util.stream.Collectors

import scala.collection.JavaConverters

import org.apache.spark.sql.delta.{DeltaConfigs, DeltaLog, DeltaUnsupportedOperationException, OptimisticTransaction}
import org.apache.spark.sql.delta.DeltaOperations.Truncate
import org.apache.spark.sql.delta.actions.{Action, AddFile, Metadata}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.scalatest.concurrent.Eventually
import org.scalatest.time.SpanSugar._
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.engine.HoodieLocalEngineContext
import org.apache.hudi.common.fs.FSUtils
Expand All @@ -26,23 +39,12 @@ import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.metadata.HoodieMetadataFileSystemView
import org.apache.hudi.storage.StorageConfiguration
import org.apache.hudi.storage.hadoop.{HadoopStorageConfiguration, HoodieHadoopStorage}

import org.apache.spark.SparkContext
import org.apache.spark.sql.{QueryTest, SparkSession}
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.delta.DeltaOperations.Truncate
import org.apache.spark.sql.delta.{DeltaConfigs, DeltaLog, DeltaUnsupportedOperationException, OptimisticTransaction}
import org.apache.spark.sql.delta.actions.{Action, AddFile, Metadata, RemoveFile}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{ByteType, IntegerType, ShortType, StructField, StructType}
import org.apache.spark.util.{ManualClock, Utils}
import org.scalatest.concurrent.Eventually
import org.scalatest.time.SpanSugar._

import java.io.File
import java.time.Instant
import java.util.UUID
import java.util.stream.Collectors
import scala.collection.JavaConverters

class ConvertToHudiSuite extends QueryTest with Eventually {

Expand Down Expand Up @@ -150,6 +152,21 @@ class ConvertToHudiSuite extends QueryTest with Eventually {
}
}

test("Enabling Delete Vector After Hudi Enabled Already Throws Exception") {
intercept[DeltaUnsupportedOperationException] {
_sparkSession.sql(
s"""CREATE TABLE `$testTableName` (col1 INT, col2 STRING) USING DELTA
|LOCATION '$testTablePath'
|TBLPROPERTIES (
| 'delta.universalFormat.enabledFormats' = 'hudi'
|)""".stripMargin)
_sparkSession.sql(
s"""ALTER TABLE `$testTableName` SET TBLPROPERTIES (
| 'delta.enableDeletionVectors' = true
|)""".stripMargin)
}
}

test(s"Conversion behavior for lists") {
_sparkSession.sql(
s"""CREATE TABLE `$testTableName` (col1 ARRAY<INT>) USING DELTA
Expand Down Expand Up @@ -201,6 +218,22 @@ class ConvertToHudiSuite extends QueryTest with Eventually {
verifyFilesAndSchemaMatch()
}

test(s"Conversion behavior for nested structs") {
_sparkSession.sql(
s"""CREATE TABLE `$testTableName` (col1 STRUCT<field1: INT, field2: STRING,
|field3: STRUCT<field4: INT, field5: INT, field6: STRING>>)
|USING DELTA
|LOCATION '$testTablePath'
|TBLPROPERTIES (
| 'delta.universalFormat.enabledFormats' = 'hudi'
|)""".stripMargin)
_sparkSession.sql(
s"INSERT INTO `$testTableName` VALUES (named_struct('field1', 1, 'field2', 'hello', " +
"'field3', named_struct('field4', 2, 'field5', 3, 'field6', 'world')))"
)
verifyFilesAndSchemaMatch()
}

test("validate Hudi timeline archival and cleaning") {
val testOp = Truncate()
withDefaultTablePropsInSQLConf(true, {
Expand All @@ -213,7 +246,8 @@ class ConvertToHudiSuite extends QueryTest with Eventually {
val file = AddFile(i.toString + ".parquet", Map.empty, 1, 1, true) :: Nil
val delete: Seq[Action] = if (i > 1) {
val timestamp = startTime + (System.currentTimeMillis() - actualTestStartTime)
RemoveFile((i - 1).toString + ".parquet", Some(timestamp), true) :: Nil
val prevFile = AddFile((i - 1).toString + ".parquet", Map.empty, 1, 1, true)
prevFile.removeWithTimestamp(timestamp) :: Nil
} else {
Nil
}
Expand Down Expand Up @@ -241,7 +275,9 @@ class ConvertToHudiSuite extends QueryTest with Eventually {
_sparkSession.sql(
s"""CREATE TABLE `$testTableName` (col1 BIGINT, col2 BOOLEAN, col3 DATE,
| col4 DOUBLE, col5 FLOAT, col6 INT, col7 STRING, col8 TIMESTAMP,
| col9 STRUCT<field1: INT, field2: STRING>)
| col9 BINARY, col10 DECIMAL(5, 2),
| col11 STRUCT<field1: INT, field2: STRING,
| field3: STRUCT<field4: INT, field5: INT, field6: STRING>>)
| USING DELTA
|LOCATION '$testTablePath'
|TBLPROPERTIES (
Expand All @@ -250,11 +286,25 @@ class ConvertToHudiSuite extends QueryTest with Eventually {
val nowSeconds = Instant.now().getEpochSecond
_sparkSession.sql(s"INSERT INTO `$testTableName` VALUES (123, true, "
+ s"date(from_unixtime($nowSeconds)), 32.1, 1.23, 456, 'hello world', "
+ s"timestamp(from_unixtime($nowSeconds)), "
+ s"named_struct('field1', 789, 'field2', 'hello'))")
+ s"timestamp(from_unixtime($nowSeconds)), X'1ABF', -999.99,"
+ s"STRUCT(1, 'hello', STRUCT(2, 3, 'world')))")
verifyFilesAndSchemaMatch()
}

for (invalidType <- Seq("SMALLINT", "TINYINT", "TIMESTAMP_NTZ", "VOID")) {
test(s"Unsupported Type $invalidType Throws Exception") {
intercept[DeltaUnsupportedOperationException] {
_sparkSession.sql(
s"""CREATE TABLE `$testTableName` (col1 $invalidType) USING DELTA
|LOCATION '$testTablePath'
|TBLPROPERTIES (
| 'delta.universalFormat.enabledFormats' = 'hudi'
|)""".stripMargin)
}
}
}


test("all batches of actions are converted") {
withSQLConf(
DeltaSQLConf.HUDI_MAX_COMMITS_TO_CONVERT.key -> "3"
Expand Down Expand Up @@ -321,7 +371,14 @@ class ConvertToHudiSuite extends QueryTest with Eventually {
assert(paths.equals(expectedFiles),
s"Files do not match.\nExpected: $expectedFiles\nActual: $paths")
// Assert schemas are equal
val expectedSchema = deltaDF.schema
val expectedSchema = StructType(deltaDF.schema.map {
case StructField(name, ShortType, nullable, _) =>
StructField(name, IntegerType, nullable)
case StructField(name, ByteType, nullable, _) =>
StructField(name, IntegerType, nullable)
case other => other
})

assert(hudiSchemaAsStruct.equals(expectedSchema),
s"Schemas do not match.\nExpected: $expectedSchema\nActual: $hudiSchemaAsStruct")
}
Expand Down Expand Up @@ -352,7 +409,6 @@ class ConvertToHudiSuite extends QueryTest with Eventually {
.appName("UniformSession")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.delta.schema.SchemaUtils
import org.apache.spark.internal.MDC
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.types.NullType
import org.apache.spark.sql.types.{ByteType, CalendarIntervalType, NullType, ShortType, TimestampNTZType, VariantType}

/**
* Utils to validate the Universal Format (UniForm) Delta feature (NOT a table feature).
Expand Down Expand Up @@ -103,7 +103,8 @@ object UniversalFormat extends DeltaLogging {
throw DeltaErrors.uniFormHudiDeleteVectorCompat()
}
SchemaUtils.findAnyTypeRecursively(newestMetadata.schema) { f =>
f.isInstanceOf[NullType]
f.isInstanceOf[NullType] | f.isInstanceOf[ByteType] | f.isInstanceOf[ShortType] |
f.isInstanceOf[TimestampNTZType] | f.isInstanceOf[VariantType]
} match {
case Some(unsupportedType) =>
throw DeltaErrors.uniFormHudiSchemaCompat(unsupportedType)
Expand Down

0 comments on commit affa389

Please sign in to comment.