diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java index 501e763e3c4..d0467e53cb7 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java @@ -32,6 +32,7 @@ import org.apache.seatunnel.engine.core.job.JobStatus; import org.apache.seatunnel.engine.server.SeaTunnelServerStarter; +import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLEngine.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLEngine.java index b1e734c31ef..6dfaddca00a 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLEngine.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLEngine.java @@ -23,7 +23,11 @@ import java.util.List; public interface SQLEngine { - void init(String inputTableName, SeaTunnelRowType inputRowType, String sql); + void init( + String inputTableName, + String catalogTableName, + SeaTunnelRowType inputRowType, + String sql); SeaTunnelRowType typeMapping(List inputColumnsMapping); diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java index 20a07dcee02..9b21c4b6f5c 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java @@ -115,7 +115,11 @@ protected void setConfig(Config pluginConfig) { @Override public void open() { sqlEngine = SQLEngineFactory.getSQLEngine(engineType); - sqlEngine.init(inputTableName, inputRowType, query); + sqlEngine.init( + inputTableName, + inputCatalogTable != null ? inputCatalogTable.getTableId().getTableName() : null, + inputRowType, + query); } private void tryOpen() { diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java index 55fbe04cf13..2f01fe3af98 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java @@ -37,6 +37,8 @@ import net.sf.jsqlparser.statement.select.SelectExpressionItem; import net.sf.jsqlparser.statement.select.SelectItem; +import javax.annotation.Nullable; + import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -45,6 +47,7 @@ public class ZetaSQLEngine implements SQLEngine { private String inputTableName; + @Nullable private String catalogTableName; private SeaTunnelRowType inputRowType; private String sql; @@ -59,8 +62,13 @@ public class ZetaSQLEngine implements SQLEngine { public ZetaSQLEngine() {} @Override - public void init(String inputTableName, SeaTunnelRowType inputRowType, String sql) { + public void init( + String inputTableName, + String catalogTableName, + SeaTunnelRowType inputRowType, + String sql) { this.inputTableName = inputTableName; + this.catalogTableName = catalogTableName; this.inputRowType = inputRowType; this.sql = sql; @@ -109,7 +117,8 @@ private void validateSQL(Statement statement) { throw new IllegalArgumentException("Unsupported table alias name syntax"); } String tableName = table.getName(); - if (!inputTableName.equalsIgnoreCase(tableName)) { + if (!inputTableName.equalsIgnoreCase(tableName) + && !tableName.equalsIgnoreCase(catalogTableName)) { throw new IllegalArgumentException( String.format("Table name: %s not found", tableName)); } diff --git a/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngineTest.java b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngineTest.java new file mode 100644 index 00000000000..94e1060af85 --- /dev/null +++ b/seatunnel-transforms-v2/src/test/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngineTest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.transform.sql.zeta; + +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.transform.exception.TransformException; +import org.apache.seatunnel.transform.sql.SQLEngine; +import org.apache.seatunnel.transform.sql.SQLEngineFactory; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class ZetaSQLEngineTest { + + @Test + public void testCatalogNameAndSourceTableNameBothSupport() { + + SQLEngine sqlEngine = SQLEngineFactory.getSQLEngine(SQLEngineFactory.EngineType.ZETA); + + SeaTunnelRowType rowType = + new SeaTunnelRowType( + new String[] {"id", "name", "age"}, + new SeaTunnelDataType[] { + BasicType.INT_TYPE, BasicType.STRING_TYPE, BasicType.INT_TYPE + }); + sqlEngine.init("test", null, rowType, "select * from test"); + sqlEngine.init("test", "nameFromCatalog", rowType, "select * from test"); + sqlEngine.init("test", "nameFromCatalog", rowType, "select * from nameFromCatalog"); + + Assertions.assertThrows( + TransformException.class, + () -> sqlEngine.init("test", "nameFromCatalog", rowType, "select * from unknown")); + Assertions.assertThrows( + TransformException.class, + () -> sqlEngine.init("test", null, rowType, "select * from unknown")); + } +}