Skip to content

Commit

Permalink
[Improve][SQL] Support use catalogTableName as SQL expression (apache…
Browse files Browse the repository at this point in the history
  • Loading branch information
Hisoka-X authored Aug 11, 2023
1 parent 837003c commit fa11abb
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;

import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@
import java.util.List;

public interface SQLEngine {
void init(String inputTableName, SeaTunnelRowType inputRowType, String sql);
void init(
String inputTableName,
String catalogTableName,
SeaTunnelRowType inputRowType,
String sql);

SeaTunnelRowType typeMapping(List<String> inputColumnsMapping);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,11 @@ protected void setConfig(Config pluginConfig) {
@Override
public void open() {
sqlEngine = SQLEngineFactory.getSQLEngine(engineType);
sqlEngine.init(inputTableName, inputRowType, query);
sqlEngine.init(
inputTableName,
inputCatalogTable != null ? inputCatalogTable.getTableId().getTableName() : null,
inputRowType,
query);
}

private void tryOpen() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import net.sf.jsqlparser.statement.select.SelectExpressionItem;
import net.sf.jsqlparser.statement.select.SelectItem;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand All @@ -45,6 +47,7 @@

public class ZetaSQLEngine implements SQLEngine {
private String inputTableName;
@Nullable private String catalogTableName;
private SeaTunnelRowType inputRowType;

private String sql;
Expand All @@ -59,8 +62,13 @@ public class ZetaSQLEngine implements SQLEngine {
public ZetaSQLEngine() {}

@Override
public void init(String inputTableName, SeaTunnelRowType inputRowType, String sql) {
public void init(
String inputTableName,
String catalogTableName,
SeaTunnelRowType inputRowType,
String sql) {
this.inputTableName = inputTableName;
this.catalogTableName = catalogTableName;
this.inputRowType = inputRowType;
this.sql = sql;

Expand Down Expand Up @@ -109,7 +117,8 @@ private void validateSQL(Statement statement) {
throw new IllegalArgumentException("Unsupported table alias name syntax");
}
String tableName = table.getName();
if (!inputTableName.equalsIgnoreCase(tableName)) {
if (!inputTableName.equalsIgnoreCase(tableName)
&& !tableName.equalsIgnoreCase(catalogTableName)) {
throw new IllegalArgumentException(
String.format("Table name: %s not found", tableName));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.transform.sql.zeta;

import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.transform.exception.TransformException;
import org.apache.seatunnel.transform.sql.SQLEngine;
import org.apache.seatunnel.transform.sql.SQLEngineFactory;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class ZetaSQLEngineTest {

@Test
public void testCatalogNameAndSourceTableNameBothSupport() {

SQLEngine sqlEngine = SQLEngineFactory.getSQLEngine(SQLEngineFactory.EngineType.ZETA);

SeaTunnelRowType rowType =
new SeaTunnelRowType(
new String[] {"id", "name", "age"},
new SeaTunnelDataType[] {
BasicType.INT_TYPE, BasicType.STRING_TYPE, BasicType.INT_TYPE
});
sqlEngine.init("test", null, rowType, "select * from test");
sqlEngine.init("test", "nameFromCatalog", rowType, "select * from test");
sqlEngine.init("test", "nameFromCatalog", rowType, "select * from nameFromCatalog");

Assertions.assertThrows(
TransformException.class,
() -> sqlEngine.init("test", "nameFromCatalog", rowType, "select * from unknown"));
Assertions.assertThrows(
TransformException.class,
() -> sqlEngine.init("test", null, rowType, "select * from unknown"));
}
}

0 comments on commit fa11abb

Please sign in to comment.