From 851aa88ebbfc6aa4e54b335263f928069c6e0f07 Mon Sep 17 00:00:00 2001 From: PeterZh6 Date: Fri, 4 Oct 2024 00:54:11 +0800 Subject: [PATCH] initial commit --- .../sort-end-to-end-tests-v1.15/pom.xml | 8 + .../inlong/sort/tests/Postgres2HBaseTest.java | 168 ++++++++++++++++++ .../sort/tests/utils/HBaseContainer.java | 119 +++++++++++++ .../inlong/sort/tests/utils/HBaseManager.java | 78 ++++++++ .../flinkSql/postgres2hbase_test.sql | 36 ++++ 5 files changed, 409 insertions(+) create mode 100644 inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Postgres2HBaseTest.java create mode 100644 inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/HBaseContainer.java create mode 100644 inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/HBaseManager.java create mode 100644 inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/postgres2hbase_test.sql diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml index cfaaee3f651..380587691cc 100644 --- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml @@ -246,6 +246,14 @@ jar ${project.build.directory}/dependencies + + org.apache.inlong + sort-connector-hbase-v1.15 + ${project.version} + sort-connector-hbase.jar + jar + ${project.build.directory}/dependencies + diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Postgres2HBaseTest.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Postgres2HBaseTest.java new file mode 100644 index 00000000000..d76795e6761 --- /dev/null +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Postgres2HBaseTest.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.tests; + +import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnvJRE11; +import org.apache.inlong.sort.tests.utils.HBaseContainer; +import org.apache.inlong.sort.tests.utils.JdbcProxy; +import org.apache.inlong.sort.tests.utils.TestUtils; + +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.utility.DockerImageName; + +import java.net.URISyntaxException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.time.Duration; +import java.util.Arrays; +import java.util.List; + +import static org.apache.inlong.sort.tests.utils.HBaseManager.*; + +/** + * End-to-end tests for sort-connector-postgres-cdc-v1.15 uber jar. + * Test flink sql Postgres cdc to HBase + */ +public class Postgres2HBaseTest extends FlinkContainerTestEnvJRE11 { + + private static final Logger PG_LOG = LoggerFactory.getLogger(PostgreSQLContainer.class); + private static final Logger LOG = LoggerFactory.getLogger(Postgres2HBaseTest.class); + private static final Path postgresJar = TestUtils.getResource("sort-connector-postgres-cdc.jar"); + private static final Path hbaseJar = TestUtils.getResource("sort-connector-hbase.jar"); + private static final Path mysqlJdbcJar = TestUtils.getResource("mysql-driver.jar"); + private static final String sqlFile; + + static { + try { + sqlFile = Paths.get(Postgres2HBaseTest.class.getResource("/flinkSql/postgres2hbase_test.sql").toURI()) + .toString(); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } + + @ClassRule + public static final PostgreSQLContainer POSTGRES_CONTAINER = (PostgreSQLContainer) new PostgreSQLContainer( + DockerImageName.parse("debezium/postgres:13").asCompatibleSubstituteFor("postgres")) + .withUsername("flinkuser") + .withPassword("flinkpw") + .withDatabaseName("test") + .withNetwork(NETWORK) + .withNetworkAliases("postgres") + .withLogConsumer(new Slf4jLogConsumer(PG_LOG)); + + @ClassRule + public static final HBaseContainer HBASE_CONTAINER = + new HBaseContainer(getNewHBaseImageName()) + .withNetwork(NETWORK) + .withNetworkAliases(INTER_CONTAINER_HBASE_ALIAS) + .withLogConsumer(new Slf4jLogConsumer(HBASE_LOG)); + + @Before + public void setup() { + waitUntilJobRunning(Duration.ofSeconds(30)); + initializePostgresTable(); + initializeHBaseTable(HBASE_CONTAINER); + } + + private void initializePostgresTable() { + try { + Class.forName(POSTGRES_CONTAINER.getDriverClassName()); + Connection conn = DriverManager + .getConnection(POSTGRES_CONTAINER.getJdbcUrl(), POSTGRES_CONTAINER.getUsername(), + POSTGRES_CONTAINER.getPassword()); + Statement stat = conn.createStatement(); + stat.execute( + "CREATE TABLE test_input1 (\n" + + " id SERIAL,\n" + + " name VARCHAR(255) NOT NULL DEFAULT 'flink',\n" + + " description VARCHAR(512),\n" + + " PRIMARY KEY(id)\n" + + ");"); + stat.execute( + "ALTER TABLE test_input1 REPLICA IDENTITY FULL; "); + stat.close(); + conn.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @AfterClass + public static void teardown() { + if (POSTGRES_CONTAINER != null) { + POSTGRES_CONTAINER.stop(); + } + if (HBASE_CONTAINER != null) { + HBASE_CONTAINER.stop(); + } + } + + /** + * Test flink sql postgresql cdc to HBase + * + * @throws Exception The exception may throws when execute the case + */ + @Test + public void testPostgresUpdateAndDelete() throws Exception { + submitSQLJob(sqlFile, hbaseJar, postgresJar, mysqlJdbcJar); + waitUntilJobRunning(Duration.ofSeconds(10)); + + // generate input + try (Connection conn = + DriverManager.getConnection(POSTGRES_CONTAINER.getJdbcUrl(), POSTGRES_CONTAINER.getUsername(), + POSTGRES_CONTAINER.getPassword()); + Statement stat = conn.createStatement()) { + stat.execute( + "INSERT INTO test_input1 " + + "VALUES (1,'jacket','water resistent white wind breaker');"); + stat.execute( + "INSERT INTO test_input1 VALUES (2,'scooter','Big 2-wheel scooter ');"); + stat.execute( + "update test_input1 set name = 'tom' where id = 2;"); + stat.execute( + "delete from test_input1 where id = 1;"); + } catch (SQLException e) { + LOG.error("Update table for CDC failed.", e); + throw e; + } + + JdbcProxy proxy = + new JdbcProxy(HBASE_CONTAINER.getJdbcUrl(), null, + null, + HBASE_CONTAINER.getDriverClassName()); + List expectResult = + Arrays.asList("2,tom,Big 2-wheel scooter "); + proxy.checkResultWithTimeout( + expectResult, + "test_output1", + 3, + 60000L); + } +} diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/HBaseContainer.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/HBaseContainer.java new file mode 100644 index 00000000000..5572e29bab9 --- /dev/null +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/HBaseContainer.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.tests.utils; + +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.utility.DockerImageName; + +/** + * Docker container for HBase. + */ +@SuppressWarnings("rawtypes") +public class HBaseContainer extends GenericContainer { + + public static final String IMAGE = "harisekhon/hbase"; + public static final Integer HBASE_MASTER_PORT = 16000; + public static final Integer HBASE_THRIFT_PORT = 9090; + public static final Integer HBASE_REST_PORT = 8080; + + private String databaseName = "test"; + private String username = "hbaseuser"; + private String password = "hbasepw"; + + public HBaseContainer() { + this(HBaseVersion.V2_4); + } + + public HBaseContainer(HBaseVersion version) { + super(DockerImageName.parse(IMAGE + ":" + version.getVersion()).asCompatibleSubstituteFor("hbase")); + addExposedPort(HBASE_MASTER_PORT); + addExposedPort(HBASE_THRIFT_PORT); + addExposedPort(HBASE_REST_PORT); + } + + public HBaseContainer(String imageName) { + super(DockerImageName.parse(imageName).asCompatibleSubstituteFor("hbase")); + addExposedPort(HBASE_MASTER_PORT); + addExposedPort(HBASE_THRIFT_PORT); + addExposedPort(HBASE_REST_PORT); + } + + public String getDriverClassName() { + try { + Class.forName("org.apache.hbase.jdbc.Driver"); + return "org.apache.hbase.jdbc.Driver"; + } catch (ClassNotFoundException e) { + throw new RuntimeException("HBase JDBC Driver not found!", e); + } + } + + public String getJdbcUrl(String databaseName) { + return "jdbc:hbase://" + + getHost() + + ":" + + getMappedPort(HBASE_THRIFT_PORT) + + "/" + + databaseName; + } + + public String getJdbcUrl() { + return getJdbcUrl(databaseName); + } + + public int getDatabasePort() { + return getMappedPort(HBASE_THRIFT_PORT); + } + + public String getDatabaseName() { + return databaseName; + } + + public String getUsername() { + return username; + } + + public String getPassword() { + return password; + } + + protected String getTestQueryString() { + return "status 'simple'"; + } + + /** HBase version enum. */ + public enum HBaseVersion { + + V2_2("2.2.7"), + V2_4("2.4.11"); + + private String version; + + HBaseVersion(String version) { + this.version = version; + } + + public String getVersion() { + return version; + } + + @Override + public String toString() { + return "HBaseVersion{" + "version='" + version + '\'' + '}'; + } + } +} diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/HBaseManager.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/HBaseManager.java new file mode 100644 index 00000000000..074a381d8ba --- /dev/null +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/utils/HBaseManager.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.tests.utils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.MountableFile; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.stream.Stream; + +public class HBaseManager { + + // ---------------------------------------------------------------------------------------- + // HBase Variables + // ---------------------------------------------------------------------------------------- + public static final String INTER_CONTAINER_HBASE_ALIAS = "hbase"; + private static final String NEW_HBASE_REPOSITORY = "inlong-hbase"; + private static final String NEW_HBASE_TAG = "latest"; + private static final String HBASE_IMAGE_NAME = "harisekhon/hbase:2.4.11"; + private static final String DEFAULT_COLUMN_FAMILY = "cf1"; + public static final Logger HBASE_LOG = LoggerFactory.getLogger(HBaseContainer.class); + + static { + GenericContainer oldHBase = new GenericContainer(HBASE_IMAGE_NAME); + Startables.deepStart(Stream.of(oldHBase)).join(); + oldHBase.copyFileToContainer(MountableFile.forClasspathResource("/docker/hbase/start_hbase.sh"), + "/data/hbase/"); + try { + oldHBase.execInContainer("chmod", "+x", "/data/hbase/start_hbase.sh"); + } catch (Exception e) { + e.printStackTrace(); + } + oldHBase.getDockerClient() + .commitCmd(oldHBase.getContainerId()) + .withRepository(NEW_HBASE_REPOSITORY) + .withTag(NEW_HBASE_TAG).exec(); + oldHBase.stop(); + } + + public static String getNewHBaseImageName() { + return NEW_HBASE_REPOSITORY + ":" + NEW_HBASE_TAG; + } + + public static void initializeHBaseTable(HBaseContainer HBASE_CONTAINER) { + initializeHBaseTable(HBASE_CONTAINER, DEFAULT_COLUMN_FAMILY); + } + + public static void initializeHBaseTable(HBaseContainer HBASE_CONTAINER, String columnFamily) { + try (Connection conn = DriverManager.getConnection(HBASE_CONTAINER.getJdbcUrl(), HBASE_CONTAINER.getUsername(), + HBASE_CONTAINER.getPassword()); + Statement stat = conn.createStatement()) { + stat.execute("CREATE 'test_output1', '" + columnFamily + "'"); + } catch (SQLException e) { + throw new RuntimeException("Failed to initialize HBase table", e); + } + } +} diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/postgres2hbase_test.sql b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/postgres2hbase_test.sql new file mode 100644 index 00000000000..8b50c2807db --- /dev/null +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/resources/flinkSql/postgres2hbase_test.sql @@ -0,0 +1,36 @@ +CREATE TABLE test_input1 +( + `id` INT primary key, + name STRING, + description STRING +) +WITH ( 'connector' = 'postgres-cdc-inlong', + 'hostname' = 'postgres', + 'port' = '5432', + 'username' = 'flinkuser', + 'password' = 'flinkpw', + 'database-name' = 'test', + 'table-name' = 'test_input1', + 'schema-name' = 'public', + 'decoding.plugin.name' = 'pgoutput', + 'slot.name' = 'inlong_slot', + 'debezium.slot.name' = 'inlong_slot'); + +CREATE TABLE test_output1 +( + `id` INT primary key, + name STRING, + description STRING +) +WITH ( 'connector' = 'hbase-2.2-inlong', + 'hbase.zookeeper.quorum' = 'hbase-zk', + 'hbase.zookeeper.znode.parent' = '/hbase', + 'table-name' = 'test_output1', + 'sink.buffer-flush.max-rows' = '1000', + 'sink.buffer-flush.interval-ms' = '1000', + 'sink.parallelism' = '2' +); + +INSERT INTO test_output1 +SELECT * +FROM test_input1;