From 2c3a607d3648e59109938509d4851fd038b60c31 Mon Sep 17 00:00:00 2001 From: PeterZh6 Date: Sun, 29 Sep 2024 20:00:32 +0800 Subject: [PATCH] fix: new es test --- .../sort-end-to-end-tests-v1.15/pom.xml | 11 +---------- .../sort/tests/Pulsar2ElasticsearchTest.java | 17 +++++++++-------- 2 files changed, 10 insertions(+), 18 deletions(-) diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml index 800afe0186..0bfae3530c 100644 --- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml @@ -183,16 +183,7 @@ elasticsearch-rest-client 7.10.2 - - org.apache.httpcomponents - httpclient - 4.5.13 - - - org.apache.httpcomponents - httpcore - 4.4.14 - + diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Pulsar2ElasticsearchTest.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Pulsar2ElasticsearchTest.java index 075de94448..6dd7eb1af0 100644 --- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Pulsar2ElasticsearchTest.java +++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Pulsar2ElasticsearchTest.java @@ -20,6 +20,7 @@ import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnvJRE8; import org.apache.inlong.sort.tests.utils.TestUtils; +import org.apache.http.HttpHost; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; @@ -28,6 +29,7 @@ import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.client.indices.CreateIndexRequest; import org.elasticsearch.index.query.QueryBuilders; import org.junit.AfterClass; import org.junit.Before; @@ -46,6 +48,7 @@ import java.net.URISyntaxException; import java.nio.file.Path; import java.nio.file.Paths; +import java.time.Duration; import java.util.Objects; public class Pulsar2ElasticsearchTest extends FlinkContainerTestEnvJRE8 { @@ -78,8 +81,8 @@ public class Pulsar2ElasticsearchTest extends FlinkContainerTestEnvJRE8 { @ClassRule public static final ElasticsearchContainer ELASTICSEARCH = - new ElasticsearchContainer(DockerImageName.parse("docker.elastic.co/elasticsearch/elasticsearch:7.10.1")) - .withExposedPorts(9200) + new ElasticsearchContainer(DockerImageName.parse("docker.elastic.co/elasticsearch/elasticsearch:7.9.3")) + .withExposedPorts(9200, 9300) .withNetwork(NETWORK) .withNetworkAliases("elasticsearch") .withLogConsumer(new Slf4jLogConsumer(ELASTICSEARCH_LOG)); @@ -106,9 +109,8 @@ private void initializePulsarTopic() { private void initializeElasticsearchIndex() { // 使用 Elasticsearch 客户端创建索引 - try (RestHighLevelClient client = new RestHighLevelClient(RestClient.builder( - new org.elasticsearch.client.RestClientBuilder.HttpHost("localhost", ELASTICSEARCH.getMappedPort(9200), - "http")))) { + try (RestHighLevelClient client = new RestHighLevelClient( + RestClient.builder(new HttpHost("localhost", ELASTICSEARCH.getMappedPort(9200), "http")))) { client.indices().create(new CreateIndexRequest("test-index"), RequestOptions.DEFAULT); LOG.info("Created Elasticsearch index: test-index"); } catch (IOException e) { @@ -142,9 +144,8 @@ public void testPulsarToElasticsearch() throws Exception { } // 查询 Elasticsearch 数据 - try (RestHighLevelClient client = new RestHighLevelClient(RestClient.builder( - new org.elasticsearch.client.RestClientBuilder.HttpHost("localhost", ELASTICSEARCH.getMappedPort(9200), - "http")))) { + try (RestHighLevelClient client = new RestHighLevelClient( + RestClient.builder(new HttpHost("localhost", ELASTICSEARCH.getMappedPort(9200), "http")))) { SearchRequest searchRequest = new SearchRequest("test-index"); searchRequest.source().query(QueryBuilders.matchAllQuery()); SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);