Skip to content

Commit

Permalink
add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
xupefei committed Mar 19, 2024
1 parent 72fad38 commit 8448787
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ class MergeIntoSQLSuite extends MergeIntoSuiteBase

import testImplicits._

override def excluded: Seq[String] = super.excluded ++ Seq(
// Schema evolution SQL syntax is not yet supported
"schema evolution enabled for the current command"
)

test("CTE as a source in MERGE") {
withTable("source") {
Seq((1, 1), (0, 3)).toDF("key1", "value").write.saveAsTable("source")
Expand Down Expand Up @@ -386,10 +391,8 @@ class MergeIntoSQLSuite extends MergeIntoSuiteBase

trait MergeIntoSQLColumnMappingSuiteBase extends DeltaColumnMappingSelectedTestMixin {
override protected def runOnlyTests: Seq[String] =
Seq(
"schema evolution - new nested column with update non-* and insert * - " +
"array of struct - longer target - on via DeltaSQLConf"
)
Seq("schema evolution - new nested column with update non-* and insert * - " +
"array of struct - longer target")
}

class MergeIntoSQLIdColumnMappingSuite extends MergeIntoSQLSuite
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ trait MergeIntoSchemaEvolutionMixin {
}
}

test(s"schema evolution - $name - on via DeltaSQLConf") {
test(s"schema evolution - $name") {
withSQLConf((confs :+ (DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key, "true")): _*) {
executeMergeAndAssert(expected, expectErrorContains)
}
Expand Down Expand Up @@ -1003,6 +1003,39 @@ trait MergeIntoSchemaEvolutionBaseTests {
.toDF("key", "value"),
expectErrorWithoutEvolutionContains = "cannot resolve s.value in UPDATE clause")

test("schema evolution enabled for the current command") {
withSQLConf(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key -> "false") {
withTable("target", "source") {
Seq((0, 0), (1, 10), (3, 30)).toDF("key", "value")
.write.format("delta").saveAsTable("target")
Seq((1, 1, 1), (2, 2, 2)).toDF("key", "value", "extra")
.write.format("delta").saveAsTable("source")

// Should fail without schema evolution
val e = intercept[org.apache.spark.sql.AnalysisException] {
executeMerge(
"target",
"source",
"target.key = source.key",
update("extra = -1"), insert("*"))
}
assert(e.getErrorClass === "DELTA_MERGE_UNRESOLVED_EXPRESSION")
assert(e.getMessage.contains("resolve extra in UPDATE clause"))

// Should succeed with schema evolution
executeMergeWithSchemaEvolution(
"target",
"source",
"target.key = source.key",
update("extra = -1"), insert("*"))
checkAnswer(
spark.table("target"),
Seq[(Integer, Integer, Integer)]((0, 0, null), (1, 10, -1), (2, 2, 2), (3, 30, null))
.toDF("key", "value", "extra"))
}
}
}

testNestedStructsEvolution("nested field assignment qualified with source alias")(
target = Seq("""{ "a": 1, "t": { "a": 2 } }"""),
source = Seq("""{ "a": 3, "t": { "a": 5 } }"""),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ trait MergeIntoTestUtils extends DeltaDMLTestUtils with MergeHelpers {
cond: String,
clauses: MergeClause*): Unit

protected def executeMergeWithSchemaEvolution(
tgt: String,
src: String,
cond: String,
clauses: MergeClause*): Unit

protected def withCrossJoinEnabled(body: => Unit): Unit = {
withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "true") { body }
}
Expand Down Expand Up @@ -104,6 +110,15 @@ trait MergeIntoSQLTestUtils extends DeltaSQLTestUtils with MergeIntoTestUtils {
val clausesStr = clauses.map(_.sql).mkString("\n")
sql(s"MERGE INTO $tgt USING $src ON $cond\n" + clausesStr)
}

override protected def executeMergeWithSchemaEvolution(
tgt: String,
src: String,
cond: String,
clauses: MergeClause*): Unit = {
throw new UnsupportedOperationException(
"The SQL syntax [WITH SCHEMA EVOLUTION] is not yet supported.")
}
}

trait MergeIntoScalaTestUtils extends MergeIntoTestUtils {
Expand All @@ -130,6 +145,13 @@ trait MergeIntoScalaTestUtils extends MergeIntoTestUtils {
clauses: MergeClause*): Unit =
getMergeBuilder(tgt, src, cond, clauses: _*).execute()

override protected def executeMergeWithSchemaEvolution(
tgt: String,
src: String,
cond: String,
clauses: MergeClause*): Unit =
getMergeBuilder(tgt, src, cond, clauses: _*).withSchemaEvolution().execute()

private def getMergeBuilder(
tgt: String,
src: String,
Expand Down

0 comments on commit 8448787

Please sign in to comment.