From d1d009cd86d92e3249b31edb6296d0b88b8b1486 Mon Sep 17 00:00:00 2001 From: Zhipeng Mao Date: Thu, 12 Dec 2024 20:09:46 +0100 Subject: [PATCH] [3.3][SPARK] Add test for Identity Column merge metadata conflict (#3970) #### Which Delta project/connector is this regarding? - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description It adds a test for identity column to verify merge will be aborted if high water mark is changed after analysis and before execution. ## How was this patch tested? Test-only. ## Does this PR introduce _any_ user-facing changes? No. --- .../delta/IdentityColumnConflictSuite.scala | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnConflictSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnConflictSuite.scala index e56b57aa21b..04c06020661 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnConflictSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnConflictSuite.scala @@ -276,6 +276,30 @@ trait IdentityColumnConflictSuiteBase tblIsoLevel = Some(Serializable) ) } + + test("high watermark changes after analysis but before execution of merge") { + val tblName = getRandomTableName + withIdentityColumnTable(GeneratedAsIdentityType.GeneratedAlways, tblName) { + // Create a QueryExecution object for a MERGE statement, and it forces the command to be + // analyzed, but does not execute the command yet. + val parsedMerge = spark.sessionState.sqlParser.parsePlan( + s"""MERGE INTO $tblName t + |USING (SELECT * FROM range(1000)) s + |ON t.id = s.id + |WHEN NOT MATCHED THEN INSERT (value) VALUES (s.id)""".stripMargin) + val qeMerge = new QueryExecution(spark, parsedMerge) + qeMerge.analyzed + + // Insert a row, forcing the high watermark to be updated. + sql(s"INSERT INTO $tblName (value) VALUES (0)") + + // Force merge to be executed. This should fail, as MERGE is still using the old high + // watermark in its insert action. + intercept[MetadataChangedException] { + qeMerge.commandExecuted + } + } + } } class IdentityColumnConflictScalaSuite