Skip to content

Commit

Permalink
initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
PeterZh6 committed Oct 3, 2024
1 parent b709fc2 commit 851aa88
Show file tree
Hide file tree
Showing 5 changed files with 409 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,14 @@
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
</artifactItem>
<artifactItem>
<groupId>org.apache.inlong</groupId>
<artifactId>sort-connector-hbase-v1.15</artifactId>
<version>${project.version}</version>
<destFileName>sort-connector-hbase.jar</destFileName>
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
</artifactItem>
</artifactItems>
</configuration>
<executions>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sort.tests;

import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnvJRE11;
import org.apache.inlong.sort.tests.utils.HBaseContainer;
import org.apache.inlong.sort.tests.utils.JdbcProxy;
import org.apache.inlong.sort.tests.utils.TestUtils;

import org.junit.AfterClass;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.utility.DockerImageName;

import java.net.URISyntaxException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;

import static org.apache.inlong.sort.tests.utils.HBaseManager.*;

/**
* End-to-end tests for sort-connector-postgres-cdc-v1.15 uber jar.
* Test flink sql Postgres cdc to HBase
*/
public class Postgres2HBaseTest extends FlinkContainerTestEnvJRE11 {

private static final Logger PG_LOG = LoggerFactory.getLogger(PostgreSQLContainer.class);
private static final Logger LOG = LoggerFactory.getLogger(Postgres2HBaseTest.class);
private static final Path postgresJar = TestUtils.getResource("sort-connector-postgres-cdc.jar");
private static final Path hbaseJar = TestUtils.getResource("sort-connector-hbase.jar");
private static final Path mysqlJdbcJar = TestUtils.getResource("mysql-driver.jar");
private static final String sqlFile;

static {
try {
sqlFile = Paths.get(Postgres2HBaseTest.class.getResource("/flinkSql/postgres2hbase_test.sql").toURI())
.toString();
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
}

@ClassRule
public static final PostgreSQLContainer POSTGRES_CONTAINER = (PostgreSQLContainer) new PostgreSQLContainer(
DockerImageName.parse("debezium/postgres:13").asCompatibleSubstituteFor("postgres"))
.withUsername("flinkuser")
.withPassword("flinkpw")
.withDatabaseName("test")
.withNetwork(NETWORK)
.withNetworkAliases("postgres")
.withLogConsumer(new Slf4jLogConsumer(PG_LOG));

@ClassRule
public static final HBaseContainer HBASE_CONTAINER =
new HBaseContainer(getNewHBaseImageName())
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_HBASE_ALIAS)
.withLogConsumer(new Slf4jLogConsumer(HBASE_LOG));

@Before
public void setup() {
waitUntilJobRunning(Duration.ofSeconds(30));
initializePostgresTable();
initializeHBaseTable(HBASE_CONTAINER);
}

private void initializePostgresTable() {
try {
Class.forName(POSTGRES_CONTAINER.getDriverClassName());
Connection conn = DriverManager
.getConnection(POSTGRES_CONTAINER.getJdbcUrl(), POSTGRES_CONTAINER.getUsername(),
POSTGRES_CONTAINER.getPassword());
Statement stat = conn.createStatement();
stat.execute(
"CREATE TABLE test_input1 (\n"
+ " id SERIAL,\n"
+ " name VARCHAR(255) NOT NULL DEFAULT 'flink',\n"
+ " description VARCHAR(512),\n"
+ " PRIMARY KEY(id)\n"
+ ");");
stat.execute(
"ALTER TABLE test_input1 REPLICA IDENTITY FULL; ");
stat.close();
conn.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@AfterClass
public static void teardown() {
if (POSTGRES_CONTAINER != null) {
POSTGRES_CONTAINER.stop();
}
if (HBASE_CONTAINER != null) {
HBASE_CONTAINER.stop();
}
}

/**
* Test flink sql postgresql cdc to HBase
*
* @throws Exception The exception may throws when execute the case
*/
@Test
public void testPostgresUpdateAndDelete() throws Exception {
submitSQLJob(sqlFile, hbaseJar, postgresJar, mysqlJdbcJar);
waitUntilJobRunning(Duration.ofSeconds(10));

// generate input
try (Connection conn =
DriverManager.getConnection(POSTGRES_CONTAINER.getJdbcUrl(), POSTGRES_CONTAINER.getUsername(),
POSTGRES_CONTAINER.getPassword());
Statement stat = conn.createStatement()) {
stat.execute(
"INSERT INTO test_input1 "
+ "VALUES (1,'jacket','water resistent white wind breaker');");
stat.execute(
"INSERT INTO test_input1 VALUES (2,'scooter','Big 2-wheel scooter ');");
stat.execute(
"update test_input1 set name = 'tom' where id = 2;");
stat.execute(
"delete from test_input1 where id = 1;");
} catch (SQLException e) {
LOG.error("Update table for CDC failed.", e);
throw e;
}

JdbcProxy proxy =
new JdbcProxy(HBASE_CONTAINER.getJdbcUrl(), null,
null,
HBASE_CONTAINER.getDriverClassName());
List<String> expectResult =
Arrays.asList("2,tom,Big 2-wheel scooter ");
proxy.checkResultWithTimeout(
expectResult,
"test_output1",
3,
60000L);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sort.tests.utils;

import org.testcontainers.containers.GenericContainer;
import org.testcontainers.utility.DockerImageName;

/**
* Docker container for HBase.
*/
@SuppressWarnings("rawtypes")
public class HBaseContainer extends GenericContainer<HBaseContainer> {

public static final String IMAGE = "harisekhon/hbase";
public static final Integer HBASE_MASTER_PORT = 16000;
public static final Integer HBASE_THRIFT_PORT = 9090;
public static final Integer HBASE_REST_PORT = 8080;

private String databaseName = "test";
private String username = "hbaseuser";
private String password = "hbasepw";

public HBaseContainer() {
this(HBaseVersion.V2_4);
}

public HBaseContainer(HBaseVersion version) {
super(DockerImageName.parse(IMAGE + ":" + version.getVersion()).asCompatibleSubstituteFor("hbase"));
addExposedPort(HBASE_MASTER_PORT);
addExposedPort(HBASE_THRIFT_PORT);
addExposedPort(HBASE_REST_PORT);
}

public HBaseContainer(String imageName) {
super(DockerImageName.parse(imageName).asCompatibleSubstituteFor("hbase"));
addExposedPort(HBASE_MASTER_PORT);
addExposedPort(HBASE_THRIFT_PORT);
addExposedPort(HBASE_REST_PORT);
}

public String getDriverClassName() {
try {
Class.forName("org.apache.hbase.jdbc.Driver");
return "org.apache.hbase.jdbc.Driver";
} catch (ClassNotFoundException e) {
throw new RuntimeException("HBase JDBC Driver not found!", e);
}
}

public String getJdbcUrl(String databaseName) {
return "jdbc:hbase://"
+ getHost()
+ ":"
+ getMappedPort(HBASE_THRIFT_PORT)
+ "/"
+ databaseName;
}

public String getJdbcUrl() {
return getJdbcUrl(databaseName);
}

public int getDatabasePort() {
return getMappedPort(HBASE_THRIFT_PORT);
}

public String getDatabaseName() {
return databaseName;
}

public String getUsername() {
return username;
}

public String getPassword() {
return password;
}

protected String getTestQueryString() {
return "status 'simple'";
}

/** HBase version enum. */
public enum HBaseVersion {

V2_2("2.2.7"),
V2_4("2.4.11");

private String version;

HBaseVersion(String version) {
this.version = version;
}

public String getVersion() {
return version;
}

@Override
public String toString() {
return "HBaseVersion{" + "version='" + version + '\'' + '}';
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.sort.tests.utils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.MountableFile;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.stream.Stream;

public class HBaseManager {

// ----------------------------------------------------------------------------------------
// HBase Variables
// ----------------------------------------------------------------------------------------
public static final String INTER_CONTAINER_HBASE_ALIAS = "hbase";
private static final String NEW_HBASE_REPOSITORY = "inlong-hbase";
private static final String NEW_HBASE_TAG = "latest";
private static final String HBASE_IMAGE_NAME = "harisekhon/hbase:2.4.11";
private static final String DEFAULT_COLUMN_FAMILY = "cf1";
public static final Logger HBASE_LOG = LoggerFactory.getLogger(HBaseContainer.class);

static {
GenericContainer oldHBase = new GenericContainer(HBASE_IMAGE_NAME);
Startables.deepStart(Stream.of(oldHBase)).join();
oldHBase.copyFileToContainer(MountableFile.forClasspathResource("/docker/hbase/start_hbase.sh"),
"/data/hbase/");
try {
oldHBase.execInContainer("chmod", "+x", "/data/hbase/start_hbase.sh");
} catch (Exception e) {
e.printStackTrace();
}
oldHBase.getDockerClient()
.commitCmd(oldHBase.getContainerId())
.withRepository(NEW_HBASE_REPOSITORY)
.withTag(NEW_HBASE_TAG).exec();
oldHBase.stop();
}

public static String getNewHBaseImageName() {
return NEW_HBASE_REPOSITORY + ":" + NEW_HBASE_TAG;
}

public static void initializeHBaseTable(HBaseContainer HBASE_CONTAINER) {
initializeHBaseTable(HBASE_CONTAINER, DEFAULT_COLUMN_FAMILY);
}

public static void initializeHBaseTable(HBaseContainer HBASE_CONTAINER, String columnFamily) {
try (Connection conn = DriverManager.getConnection(HBASE_CONTAINER.getJdbcUrl(), HBASE_CONTAINER.getUsername(),
HBASE_CONTAINER.getPassword());
Statement stat = conn.createStatement()) {
stat.execute("CREATE 'test_output1', '" + columnFamily + "'");
} catch (SQLException e) {
throw new RuntimeException("Failed to initialize HBase table", e);
}
}
}
Loading

0 comments on commit 851aa88

Please sign in to comment.