diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml
index 0bfae3530c..54c31a7500 100644
--- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml
+++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/pom.xml
@@ -183,6 +183,12 @@
elasticsearch-rest-client
7.10.2
+
+ co.elastic.clients
+ elasticsearch-java
+ 8.9.0
+
+
diff --git a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Pulsar2ElasticsearchTest.java b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Pulsar2ElasticsearchTest.java
index 6dd7eb1af0..6b9af9506e 100644
--- a/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Pulsar2ElasticsearchTest.java
+++ b/inlong-sort/sort-end-to-end-tests/sort-end-to-end-tests-v1.15/src/test/java/org/apache/inlong/sort/tests/Pulsar2ElasticsearchTest.java
@@ -17,20 +17,19 @@
package org.apache.inlong.sort.tests;
+import co.elastic.clients.elasticsearch.ElasticsearchClient;
+import co.elastic.clients.elasticsearch.core.SearchRequest;
+import co.elastic.clients.elasticsearch.core.SearchResponse;
+import co.elastic.clients.elasticsearch.core.search.Hit;
+import co.elastic.clients.json.jackson.JacksonJsonpMapper;
+import co.elastic.clients.transport.rest_client.RestClientTransport;
+import org.apache.http.HttpHost;
import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnvJRE8;
import org.apache.inlong.sort.tests.utils.TestUtils;
-
-import org.apache.http.HttpHost;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
-import org.elasticsearch.action.search.SearchRequest;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
-import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.client.indices.CreateIndexRequest;
-import org.elasticsearch.index.query.QueryBuilders;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.ClassRule;
@@ -49,6 +48,7 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
+import java.util.List;
import java.util.Objects;
public class Pulsar2ElasticsearchTest extends FlinkContainerTestEnvJRE8 {
@@ -109,9 +109,12 @@ private void initializePulsarTopic() {
private void initializeElasticsearchIndex() {
// 使用 Elasticsearch 客户端创建索引
- try (RestHighLevelClient client = new RestHighLevelClient(
- RestClient.builder(new HttpHost("localhost", ELASTICSEARCH.getMappedPort(9200), "http")))) {
- client.indices().create(new CreateIndexRequest("test-index"), RequestOptions.DEFAULT);
+ try (RestClient restClient = RestClient.builder(new HttpHost("localhost", ELASTICSEARCH.getMappedPort(9200), "http")).build()) {
+ RestClientTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
+ ElasticsearchClient client = new ElasticsearchClient(transport);
+
+ // Create Index
+ client.indices().create(c -> c.index("test-index"));
LOG.info("Created Elasticsearch index: test-index");
} catch (IOException e) {
throw new RuntimeException("Failed to create Elasticsearch index", e);
@@ -144,12 +147,16 @@ public void testPulsarToElasticsearch() throws Exception {
}
// 查询 Elasticsearch 数据
- try (RestHighLevelClient client = new RestHighLevelClient(
- RestClient.builder(new HttpHost("localhost", ELASTICSEARCH.getMappedPort(9200), "http")))) {
- SearchRequest searchRequest = new SearchRequest("test-index");
- searchRequest.source().query(QueryBuilders.matchAllQuery());
- SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
- LOG.info("Elasticsearch response: {}", searchResponse.getHits().getHits());
+ try (RestClient restClient = RestClient.builder(new HttpHost("localhost", ELASTICSEARCH.getMappedPort(9200), "http")).build()) {
+ RestClientTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
+ ElasticsearchClient client = new ElasticsearchClient(transport);
+
+ // Search Request
+ SearchRequest searchRequest = new SearchRequest.Builder().index("test-index").query(q -> q.matchAll(m -> m)).build();
+ SearchResponse