Skip to content

Commit

Permalink
fix: new es impl
Browse files Browse the repository at this point in the history
  • Loading branch information
PeterZh6 committed Sep 29, 2024
1 parent 2c3a607 commit eb4c0d9
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,12 @@
<artifactId>elasticsearch-rest-client</artifactId>
<version>7.10.2</version>
</dependency>
<dependency>
<groupId>co.elastic.clients</groupId>
<artifactId>elasticsearch-java</artifactId>
<version>8.9.0</version> <!-- 使用最新版本或与你的 Elasticsearch 版本兼容的版本 -->
</dependency>


</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,19 @@

package org.apache.inlong.sort.tests;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.core.SearchRequest;
import co.elastic.clients.elasticsearch.core.SearchResponse;
import co.elastic.clients.elasticsearch.core.search.Hit;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import org.apache.http.HttpHost;
import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnvJRE8;
import org.apache.inlong.sort.tests.utils.TestUtils;

import org.apache.http.HttpHost;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.index.query.QueryBuilders;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.ClassRule;
Expand All @@ -49,6 +48,7 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.List;
import java.util.Objects;

public class Pulsar2ElasticsearchTest extends FlinkContainerTestEnvJRE8 {
Expand Down Expand Up @@ -109,9 +109,12 @@ private void initializePulsarTopic() {

private void initializeElasticsearchIndex() {
// 使用 Elasticsearch 客户端创建索引
try (RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(new HttpHost("localhost", ELASTICSEARCH.getMappedPort(9200), "http")))) {
client.indices().create(new CreateIndexRequest("test-index"), RequestOptions.DEFAULT);
try (RestClient restClient = RestClient.builder(new HttpHost("localhost", ELASTICSEARCH.getMappedPort(9200), "http")).build()) {
RestClientTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
ElasticsearchClient client = new ElasticsearchClient(transport);

// Create Index
client.indices().create(c -> c.index("test-index"));
LOG.info("Created Elasticsearch index: test-index");
} catch (IOException e) {
throw new RuntimeException("Failed to create Elasticsearch index", e);
Expand Down Expand Up @@ -144,12 +147,16 @@ public void testPulsarToElasticsearch() throws Exception {
}

// 查询 Elasticsearch 数据
try (RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(new HttpHost("localhost", ELASTICSEARCH.getMappedPort(9200), "http")))) {
SearchRequest searchRequest = new SearchRequest("test-index");
searchRequest.source().query(QueryBuilders.matchAllQuery());
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
LOG.info("Elasticsearch response: {}", searchResponse.getHits().getHits());
try (RestClient restClient = RestClient.builder(new HttpHost("localhost", ELASTICSEARCH.getMappedPort(9200), "http")).build()) {
RestClientTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
ElasticsearchClient client = new ElasticsearchClient(transport);

// Search Request
SearchRequest searchRequest = new SearchRequest.Builder().index("test-index").query(q -> q.matchAll(m -> m)).build();
SearchResponse<Object> searchResponse = client.search(searchRequest, Object.class);

List<Hit<Object>> hits = searchResponse.hits().hits();
LOG.info("Elasticsearch response: {}", hits);
} catch (IOException e) {
LOG.error("Failed to query Elasticsearch", e);
}
Expand Down

0 comments on commit eb4c0d9

Please sign in to comment.