diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/java/org/apache/inlong/sort/tests/Kafka2Elasticsearch7Test.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/java/org/apache/inlong/sort/tests/Kafka2Elasticsearch7Test.java index dca7b62d104..d33d061b6f7 100644 --- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/java/org/apache/inlong/sort/tests/Kafka2Elasticsearch7Test.java +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/java/org/apache/inlong/sort/tests/Kafka2Elasticsearch7Test.java @@ -18,6 +18,7 @@ package org.apache.inlong.sort.tests; import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnvJRE8; +import org.apache.inlong.sort.tests.utils.PlaceholderResolver; import org.apache.inlong.sort.tests.utils.TestUtils; import co.elastic.clients.elasticsearch.ElasticsearchClient; @@ -34,20 +35,30 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.testcontainers.containers.Container; +import org.testcontainers.containers.Container.ExecResult; import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.elasticsearch.ElasticsearchContainer; import org.testcontainers.utility.DockerImageName; import java.io.IOException; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.time.Duration; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Objects; public class Kafka2Elasticsearch7Test extends FlinkContainerTestEnvJRE8 { private static final Logger LOG = LoggerFactory.getLogger(Kafka2Elasticsearch7Test.class); + public static final Logger KAFKA_LOG = LoggerFactory.getLogger(KafkaContainer.class); + public static final Logger ELASTICSEARCH_LOGGER = LoggerFactory.getLogger(ElasticsearchContainer.class); private static final Path kafkaJar = TestUtils.getResource("sort-connector-kafka.jar"); private static final Path elasticsearchJar = TestUtils.getResource("sort-connector-elasticsearch7.jar"); @@ -68,14 +79,17 @@ public class Kafka2Elasticsearch7Test extends FlinkContainerTestEnvJRE8 { public static final KafkaContainer KAFKA = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1")) .withNetwork(NETWORK) - .withNetworkAliases("kafka"); + .withNetworkAliases("kafka") + .withEmbeddedZookeeper() + .withLogConsumer(new Slf4jLogConsumer(KAFKA_LOG)); @ClassRule public static final ElasticsearchContainer ELASTICSEARCH = new ElasticsearchContainer(DockerImageName.parse("docker.elastic.co/elasticsearch/elasticsearch:7.17.24")) .withExposedPorts(9200) .withNetwork(NETWORK) - .withNetworkAliases("elasticsearch"); + .withNetworkAliases("elasticsearch") + .withLogConsumer(new Slf4jLogConsumer(ELASTICSEARCH_LOGGER)); @Before public void setup() { @@ -85,12 +99,35 @@ public void setup() { } private void initializeKafkaTopic(String topic) { + String fileName = "kafka_test_kafka_init.txt"; + int port = KafkaContainer.ZOOKEEPER_PORT; + + Map properties = new HashMap<>(); + properties.put("TOPIC", topic); + properties.put("ZOOKEEPER_PORT", port); + try { - Container.ExecResult result = KAFKA.execInContainer("kafka-topics", "--create", "--topic", topic, - "--bootstrap-server", "localhost:9093", "--partitions", "1", "--replication-factor", "1"); - LOG.info("Kafka topic created: {}", result.getStdout()); - } catch (Exception e) { - throw new RuntimeException("Failed to initialize Kafka topic", e); + String createKafkaStatement = getCreateStatement(fileName, properties); + ExecResult result = KAFKA.execInContainer("bash", "-c", createKafkaStatement); + LOG.info("Create kafka topic: {}, std: {}", createKafkaStatement, result.getStdout()); + if (result.getExitCode() != 0) { + throw new RuntimeException("Init kafka topic failed. Exit code:" + result.getExitCode()); + } + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } + } + + private String getCreateStatement(String fileName, Map properties) { + URL url = Objects.requireNonNull(Kafka2Elasticsearch7Test.class.getResource("/env/" + fileName)); + + try { + Path file = Paths.get(url.toURI()); + return PlaceholderResolver.getDefaultResolver().resolveByMap( + new String(Files.readAllBytes(file), StandardCharsets.UTF_8), + properties); + } catch (IOException | URISyntaxException e) { + throw new RuntimeException(e); } } @@ -150,7 +187,8 @@ public void testKafkaToElasticsearch() throws Exception { private java.util.Properties getKafkaProducerConfig() { java.util.Properties props = new java.util.Properties(); - props.put("bootstrap.servers", "localhost:9093"); + String bootstrapServers = KAFKA.getBootstrapServers(); + props.put("bootstrap.servers", bootstrapServers); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); return props; diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/resources/env/kafka_test_kafka_init.txt b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/resources/env/kafka_test_kafka_init.txt new file mode 100644 index 00000000000..b2f31d78fa4 --- /dev/null +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/resources/env/kafka_test_kafka_init.txt @@ -0,0 +1 @@ +kafka-topics --create --topic ${TOPIC} --replication-factor 1 --partitions 1 --zookeeper localhost:${ZOOKEEPER_PORT} \ No newline at end of file diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/resources/flinkSql/kafka_to_elasticsearch.sql b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/resources/flinkSql/kafka_to_elasticsearch.sql index 4e227a2751a..548e2d3f580 100644 --- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/resources/flinkSql/kafka_to_elasticsearch.sql +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.18/src/test/resources/flinkSql/kafka_to_elasticsearch.sql @@ -3,10 +3,10 @@ CREATE TABLE kafka_source ( ) WITH ( 'connector' = 'kafka-inlong', 'topic' = 'test-topic', - 'bootstrap.servers' = 'localhost:9093', - 'group.id' = 'flink-group', - 'format' = 'json', - 'scan.startup.mode' = 'earliest' + 'properties.bootstrap.servers' = 'kafka:9092', + 'properties.group.id' = 'flink-group', + 'scan.startup.mode' = 'earliest-offset', + 'format' = 'json' ); @@ -16,7 +16,6 @@ CREATE TABLE elasticsearch_sink ( 'connector' = 'elasticsearch7-inlong', 'hosts' = 'http://localhost:9200', 'index' = 'test-index', - 'document-type' = '_doc', 'format' = 'json' );