From 74d09e9343ec14b63000874757eaffe680e4f140 Mon Sep 17 00:00:00 2001 From: Annie Wang <170372889+anniewang-db@users.noreply.github.com> Date: Mon, 8 Jul 2024 10:13:07 -0700 Subject: [PATCH] [Hudi] Add integration tests for Hudi (#3338) #### Which Delta project/connector is this regarding? - [ ] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [x] Other (Hudi) ## Description This PR adds integration tests for Hudi. Previously, the integration test was not able to actually read the table from a Hudi engine due to Spark incompatibility, but since the new Hudi 15.0.0 release supports Spark 3.5 we can now add verifications that actually read the tables from Hudi. ## How was this patch tested? Added integration tests ## Does this PR introduce _any_ user-facing changes? No --- hudi/integration_tests/write_uniform_hudi.py | 158 +++++++++++++++++-- run-integration-tests.py | 19 ++- 2 files changed, 160 insertions(+), 17 deletions(-) diff --git a/hudi/integration_tests/write_uniform_hudi.py b/hudi/integration_tests/write_uniform_hudi.py index 5fb40324e6f..03aab11e03e 100644 --- a/hudi/integration_tests/write_uniform_hudi.py +++ b/hudi/integration_tests/write_uniform_hudi.py @@ -1,35 +1,165 @@ from pyspark.sql import SparkSession -from pyspark.sql.functions import col +from pyspark.sql.functions import current_date, current_timestamp +from pyspark.testing import assertDataFrameEqual from delta.tables import DeltaTable import shutil import random import os import time -testRoot = "/tmp/delta-uniform-hudi/" -warehousePath = testRoot + "uniform_tables" -shutil.rmtree(testRoot, ignore_errors=True) +###################### Setup ###################### + +test_root = "/tmp/delta-uniform-hudi/" +warehouse_path = test_root + "uniform_tables" +shutil.rmtree(test_root, ignore_errors=True) +hudi_table_base_name = "delta_table_with_hudi" # we need to set the following configs -spark = SparkSession.builder \ - .appName("delta-uniform-hudi") \ +spark_delta = SparkSession.builder \ + .appName("delta-uniform-hudi-writer") \ .master("local[*]") \ .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \ .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \ - .config("spark.sql.warehouse.dir", warehousePath) \ + .config("spark.sql.warehouse.dir", warehouse_path) \ .getOrCreate() -spark.sql("""CREATE TABLE `delta_table_with_hudi` (col1 INT) USING DELTA - TBLPROPERTIES('delta.universalFormat.enabledFormats' = 'hudi') """) +###################### Helper functions ###################### + +def get_delta_df(spark, table_name): + hudi_table_path = os.path.join(warehouse_path, table_name) + print('hudi_table_path:', hudi_table_path) + df_delta = spark.read.format("delta").load(hudi_table_path) + return df_delta + +def get_hudi_df(spark, table_name): + hudi_table_path = os.path.join(warehouse_path, table_name) + df_hudi = (spark.read.format("hudi") + .option("hoodie.metadata.enable", "true") + .option("hoodie.datasource.write.hive_style_partitioning", "true") + .load(hudi_table_path)) + return df_hudi + +###################### Create tables in Delta ###################### +print('Delta tables:') + +# validate various data types +spark_delta.sql(f"""CREATE TABLE `{hudi_table_base_name}_0` (col1 BIGINT, col2 BOOLEAN, col3 DATE, + col4 DOUBLE, col5 FLOAT, col6 INT, col7 STRING, col8 TIMESTAMP, + col9 BINARY, col10 DECIMAL(5, 2), + col11 STRUCT>, + col12 ARRAY>, + col13 MAP>) USING DELTA + TBLPROPERTIES('delta.universalFormat.enabledFormats' = 'hudi') """) +spark_delta.sql(f"""INSERT INTO `{hudi_table_base_name}_0` VALUES + (123, true, date(current_timestamp()), 32.1, 1.23, 456, 'hello world', + current_timestamp(), X'1ABF', -999.99, + STRUCT(1, 'hello', STRUCT(2, 3, 'world')), + ARRAY( + STRUCT(1, 'first'), + STRUCT(2, 'second') + ), + MAP( + 'key1', STRUCT(1, 'delta'), + 'key2', STRUCT(1, 'lake') + )); """) + +df_delta_0 = get_delta_df(spark_delta, f"{hudi_table_base_name}_0") +df_delta_0.show() + +# conversion happens correctly when enabling property after table creation +spark_delta.sql(f"CREATE TABLE {hudi_table_base_name}_1 (col1 INT, col2 STRING) USING DELTA") +spark_delta.sql(f"INSERT INTO {hudi_table_base_name}_1 VALUES (1, 'a'), (2, 'b')") +spark_delta.sql(f"ALTER TABLE {hudi_table_base_name}_1 SET TBLPROPERTIES('delta.universalFormat.enabledFormats' = 'hudi')") + +df_delta_1 = get_delta_df(spark_delta, f"{hudi_table_base_name}_1") +df_delta_1.show() + +# validate deletes +spark_delta.sql(f"""CREATE TABLE {hudi_table_base_name}_2 (col1 INT, col2 STRING) USING DELTA + TBLPROPERTIES('delta.universalFormat.enabledFormats' = 'hudi')""") +spark_delta.sql(f"INSERT INTO {hudi_table_base_name}_2 VALUES (1, 'a'), (2, 'b')") +spark_delta.sql(f"DELETE FROM {hudi_table_base_name}_2 WHERE col1 = 1") + +df_delta_2 = get_delta_df(spark_delta, f"{hudi_table_base_name}_2") +df_delta_2.show() + +# basic schema evolution +spark_delta.sql(f"""CREATE TABLE {hudi_table_base_name}_3 (col1 INT, col2 STRING) USING DELTA + TBLPROPERTIES('delta.universalFormat.enabledFormats' = 'hudi')""") +spark_delta.sql(f"INSERT INTO {hudi_table_base_name}_3 VALUES (1, 'a'), (2, 'b')") +spark_delta.sql(f"ALTER TABLE {hudi_table_base_name}_3 ADD COLUMN col3 INT FIRST") +spark_delta.sql(f"INSERT INTO {hudi_table_base_name}_3 VALUES (3, 4, 'c')") -spark.sql("""INSERT INTO `delta_table_with_hudi` VALUES (1); """) +df_delta_3 = get_delta_df(spark_delta, f"{hudi_table_base_name}_3") +df_delta_3.show() + +# schema evolution for nested fields +spark_delta.sql(f"""CREATE TABLE {hudi_table_base_name}_4 (col1 STRUCT) + USING DELTA + TBLPROPERTIES('delta.universalFormat.enabledFormats' = 'hudi')""") +spark_delta.sql(f"""INSERT INTO {hudi_table_base_name}_4 VALUES + (named_struct('field1', 1, 'field2', 'hello')) + """) +spark_delta.sql(f"ALTER TABLE {hudi_table_base_name}_4 ADD COLUMN col1.field3 INT AFTER field1") +spark_delta.sql(f"INSERT INTO {hudi_table_base_name}_4 VALUES (named_struct('field1', 3, 'field3', 4, 'field2', 'delta'))") + +df_delta_4 = get_delta_df(spark_delta, f"{hudi_table_base_name}_4") +df_delta_4.show() + +# time travel +spark_delta.sql(f"""CREATE TABLE {hudi_table_base_name}_5 (col1 INT, col2 STRING) USING DELTA + TBLPROPERTIES('delta.universalFormat.enabledFormats' = 'hudi', + 'delta.columnMapping.mode' = 'name')""") +spark_delta.sql(f"INSERT INTO {hudi_table_base_name}_5 VALUES (1, 'a')") +spark_delta.sql(f"INSERT INTO {hudi_table_base_name}_5 VALUES (2, 'b')") + +df_history_5 = spark_delta.sql(f"DESCRIBE HISTORY {hudi_table_base_name}_5") +timestamp = df_history_5.collect()[0]['timestamp'] # get the timestamp of the first commit +df_delta_5 = spark_delta.sql(f""" + SELECT * FROM {hudi_table_base_name}_5 + TIMESTAMP AS OF '{timestamp}'""") +df_delta_5.show() time.sleep(5) -hudiTablePath = warehousePath + "/" + "delta_table_with_hudi" +###################### Read tables from Hudi engine ###################### +print('Hudi tables:') + +spark_hudi = SparkSession.builder \ + .appName("delta-uniform-hudi-reader") \ + .master("local[*]") \ + .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \ + .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog") \ + .config("spark.sql.warehouse.dir", warehouse_path) \ + .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \ + .config("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar") \ + .getOrCreate() + +df_hudi_0 = get_hudi_df(spark_hudi, f"{hudi_table_base_name}_0") +df_hudi_0.show() +assertDataFrameEqual(df_delta_0, df_hudi_0) + +df_hudi_1 = get_hudi_df(spark_hudi, f"{hudi_table_base_name}_1") +df_hudi_1.show() +assertDataFrameEqual(df_delta_1, df_hudi_1) + +df_hudi_2 = get_hudi_df(spark_hudi, f"{hudi_table_base_name}_2") +df_hudi_2.show() +assertDataFrameEqual(df_delta_2, df_hudi_2) + +df_hudi_3 = get_hudi_df(spark_hudi, f"{hudi_table_base_name}_3") +df_hudi_3.show() +assertDataFrameEqual(df_delta_3, df_hudi_3) -hudiMetadataPath = hudiTablePath + "/.hoodie/metadata/files" +df_hudi_4 = get_hudi_df(spark_hudi, f"{hudi_table_base_name}_4") +df_hudi_4.show() +assertDataFrameEqual(df_delta_4, df_hudi_4) -assert len(os.listdir(hudiMetadataPath)) > 0 +df_hudi_5 = spark_hudi.sql(f""" + SELECT * FROM {hudi_table_base_name}_5 + TIMESTAMP AS OF '{timestamp}'""") +df_hudi_5.show() +assertDataFrameEqual(df_delta_5, df_hudi_5) -# TODO: read with Hudi Spark to verify table content after Hudi supports Spark 3.5+ +print('UniForm Hudi integration test passed!') \ No newline at end of file diff --git a/run-integration-tests.py b/run-integration-tests.py index 2ff0a4b4c50..b846e86fbb0 100755 --- a/run-integration-tests.py +++ b/run-integration-tests.py @@ -239,7 +239,7 @@ def run_iceberg_integration_tests(root_dir, version, spark_version, iceberg_vers print("Failed Iceberg tests in %s" % (test_file)) raise -def run_uniform_hudi_integration_tests(root_dir, version, extra_maven_repo, use_local): +def run_uniform_hudi_integration_tests(root_dir, version, spark_version, hudi_version, extra_maven_repo, use_local): print("\n\n##### Running Uniform hudi tests on version %s #####" % str(version)) # clear_artifact_cache() if use_local: @@ -256,7 +256,9 @@ def run_uniform_hudi_integration_tests(root_dir, version, extra_maven_repo, use_ python_root_dir = path.join(root_dir, "python") extra_class_path = path.join(python_root_dir, path.join("delta", "testing")) package = ','.join([ - "io.delta:delta-%s_2.12:%s" % (get_artifact_name(version), version)]) + "io.delta:delta-%s_2.12:%s" % (get_artifact_name(version), version), + "org.apache.hudi:hudi-spark%s-bundle_2.12:%s" % (spark_version, hudi_version) + ]) jars = path.join(root_dir, "hudi/target/scala-2.12/delta-hudi-assembly_2.12-%s.jar" % (version)) repo = extra_maven_repo if extra_maven_repo else "" @@ -483,6 +485,17 @@ def __exit__(self, tpe, value, traceback): required=False, default="1.4.0", help="Iceberg Spark Runtime library version") + parser.add_argument( + "--hudi-spark-version", + required=False, + default="3.5", + help="Spark version for the Hudi library") + parser.add_argument( + "--hudi-version", + required=False, + default="0.15.0", + help="Hudi library version" + ) args = parser.parse_args() @@ -508,7 +521,7 @@ def __exit__(self, tpe, value, traceback): if args.run_uniform_hudi_integration_tests: run_uniform_hudi_integration_tests( - root_dir, args.version, args.maven_repo, args.use_local) + root_dir, args.version, args.hudi_spark_version, args.hudi_version, args.maven_repo, args.use_local) quit() if args.run_storage_s3_dynamodb_integration_tests: