From c307713ce4640eb9e0ce775d62d6af999e05ce3f Mon Sep 17 00:00:00 2001 From: Emma Zhong Date: Sat, 19 Oct 2024 16:52:09 -0700 Subject: [PATCH] more comments addressing --- crates/sui-config/src/local_ip_utils.rs | 11 ++- crates/test-cluster/src/lib.rs | 6 +- .../test-cluster/src/test_indexer_handle.rs | 86 +++++++++++++++++++ 3 files changed, 96 insertions(+), 7 deletions(-) create mode 100644 crates/test-cluster/src/test_indexer_handle.rs diff --git a/crates/sui-config/src/local_ip_utils.rs b/crates/sui-config/src/local_ip_utils.rs index 5e7d1298f3629..e8fb02c7f145f 100644 --- a/crates/sui-config/src/local_ip_utils.rs +++ b/crates/sui-config/src/local_ip_utils.rs @@ -122,15 +122,18 @@ pub fn new_udp_address_for_testing(host: &str) -> Multiaddr { .unwrap() } -/// Returns a new unique TCP address (SocketAddr) for localhost, by finding a new available port on localhost. -pub fn new_local_tcp_socket_for_testing() -> SocketAddr { +/// Returns a new unique TCP address in String format for localhost, by finding a new available port on localhost. +pub fn new_local_tcp_socket_for_testing_string() -> String { format!( "{}:{}", localhost_for_testing(), get_available_port(&localhost_for_testing()) ) - .parse() - .unwrap() +} + +/// Returns a new unique TCP address (SocketAddr) for localhost, by finding a new available port on localhost. +pub fn new_local_tcp_socket_for_testing() -> SocketAddr { + new_local_tcp_socket_for_testing_string().parse().unwrap() } /// Returns a new unique TCP address (Multiaddr) for localhost, by finding a new available port on localhost. diff --git a/crates/test-cluster/src/lib.rs b/crates/test-cluster/src/lib.rs index b4f4b7f279976..5cfaf7d2b2ad8 100644 --- a/crates/test-cluster/src/lib.rs +++ b/crates/test-cluster/src/lib.rs @@ -61,7 +61,7 @@ use tokio::time::{timeout, Instant}; use tokio::{task::JoinHandle, time::sleep}; use tracing::{error, info}; -mod indexer_util; +mod test_indexer_handle; const NUM_VALIDATOR: usize = 4; @@ -92,7 +92,7 @@ pub struct TestCluster { pub swarm: Swarm, pub wallet: WalletContext, pub fullnode_handle: FullNodeHandle, - indexer_handle: Option, + indexer_handle: Option, } impl TestCluster { @@ -1135,7 +1135,7 @@ impl TestClusterBuilder { FullNodeHandle::new(fullnode.get_node_handle().unwrap(), json_rpc_address).await; let (rpc_url, indexer_handle) = if self.indexer_backed_rpc { - let handle = indexer_util::setup_indexer_backed_rpc( + let handle = test_indexer_handle::IndexerHandle::new( fullnode_handle.rpc_url.clone(), temp_data_ingestion_dir, data_ingestion_path.unwrap(), diff --git a/crates/test-cluster/src/test_indexer_handle.rs b/crates/test-cluster/src/test_indexer_handle.rs new file mode 100644 index 0000000000000..6e0595fe567a9 --- /dev/null +++ b/crates/test-cluster/src/test_indexer_handle.rs @@ -0,0 +1,86 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use jsonrpsee::http_client::{HttpClient, HttpClientBuilder}; +use std::path::PathBuf; +use std::time::Duration; +use sui_config::local_ip_utils::new_local_tcp_socket_for_testing_string; +use sui_indexer::tempdb::TempDb; +use sui_indexer::test_utils::{ + start_indexer_jsonrpc_for_testing, start_indexer_writer_for_testing, +}; +use sui_json_rpc_api::ReadApiClient; +use sui_sdk::{SuiClient, SuiClientBuilder}; +use tempfile::TempDir; +use tokio::time::sleep; + +pub(crate) struct IndexerHandle { + pub(crate) rpc_client: HttpClient, + pub(crate) sui_client: SuiClient, + pub(crate) rpc_url: String, + #[allow(unused)] + cancellation_tokens: Vec, + #[allow(unused)] + data_ingestion_dir: Option, + #[allow(unused)] + database: TempDb, +} + +impl IndexerHandle { + // TODO: this only starts indexer writer and reader (jsonrpc server) today. + // Consider adding graphql server here as well. + pub async fn new( + fullnode_rpc_url: String, + data_ingestion_dir: Option, + data_ingestion_path: PathBuf, + ) -> IndexerHandle { + let mut cancellation_tokens = vec![]; + let database = TempDb::new().unwrap(); + let pg_address = database.database().url().as_str().to_owned(); + let indexer_jsonrpc_address = new_local_tcp_socket_for_testing_string(); + + // Start indexer writer + let (_, _, writer_token) = start_indexer_writer_for_testing( + pg_address.clone(), + None, + None, + Some(data_ingestion_path.clone()), + None, + ) + .await; + cancellation_tokens.push(writer_token.drop_guard()); + + // Start indexer jsonrpc service + let (_, reader_token) = start_indexer_jsonrpc_for_testing( + pg_address.clone(), + fullnode_rpc_url, + indexer_jsonrpc_address.clone(), + None, + ) + .await; + cancellation_tokens.push(reader_token.drop_guard()); + + let rpc_address = format!("http://{}", indexer_jsonrpc_address); + + let rpc_client = HttpClientBuilder::default().build(&rpc_address).unwrap(); + + // Wait for the rpc client to be ready + while rpc_client.get_chain_identifier().await.is_err() { + sleep(Duration::from_millis(100)).await; + } + + let sui_client = SuiClientBuilder::default() + .build(&rpc_address) + .await + .unwrap(); + + IndexerHandle { + rpc_client, + sui_client, + rpc_url: rpc_address.clone(), + database, + data_ingestion_dir, + cancellation_tokens, + } + } +}