diff --git a/crates/sui-indexer-alt/src/ingestion/client.rs b/crates/sui-indexer-alt/src/ingestion/client.rs index b9b2f69d1e5fba..b99be33728c862 100644 --- a/crates/sui-indexer-alt/src/ingestion/client.rs +++ b/crates/sui-indexer-alt/src/ingestion/client.rs @@ -23,18 +23,35 @@ const MAX_TRANSIENT_RETRY_INTERVAL: Duration = Duration::from_secs(60); #[async_trait::async_trait] pub(crate) trait IngestionClientTrait: Send + Sync { - async fn fetch(&self, checkpoint: u64) -> Result>; + async fn fetch(&self, checkpoint: u64) -> FetchResult; } +#[derive(thiserror::Error, Debug)] +pub enum FetchError { + #[error("Checkpoint not found")] + NotFound, + #[error("Failed to fetch checkpoint due to permanent error: {0}")] + Permanent(#[from] anyhow::Error), + #[error("Failed to fetch checkpoint due to {reason}: {error}")] + Transient { + reason: &'static str, + #[source] + error: anyhow::Error, + }, +} + +pub type FetchResult = Result; + #[derive(Clone)] pub(crate) struct IngestionClient { client: Arc, + /// Wrap the metrics in an `Arc` to keep copies of the client cheap. metrics: Arc, } impl IngestionClient { pub(crate) fn new_remote(url: Url, metrics: Arc) -> IngestionResult { - let client = Arc::new(RemoteIngestionClient::new(url, metrics.clone())?); + let client = Arc::new(RemoteIngestionClient::new(url)?); Ok(IngestionClient { client, metrics }) } @@ -61,7 +78,17 @@ impl IngestionClient { return Err(BE::permanent(IngestionError::Cancelled)); } - let bytes = client.fetch(checkpoint).await?; + let bytes = client.fetch(checkpoint).await.map_err(|err| match err { + FetchError::NotFound => BE::permanent(IngestionError::NotFound(checkpoint)), + FetchError::Permanent(error) => { + BE::permanent(IngestionError::FetchError(checkpoint, error)) + } + FetchError::Transient { reason, error } => self.metrics.inc_retry( + checkpoint, + reason, + IngestionError::FetchError(checkpoint, error), + ), + })?; self.metrics.total_ingested_bytes.inc_by(bytes.len() as u64); let data: CheckpointData = Blob::from_bytes(&bytes).map_err(|e| { diff --git a/crates/sui-indexer-alt/src/ingestion/error.rs b/crates/sui-indexer-alt/src/ingestion/error.rs index 35469b00f1bfac..0d9c5ba4612fe2 100644 --- a/crates/sui-indexer-alt/src/ingestion/error.rs +++ b/crates/sui-indexer-alt/src/ingestion/error.rs @@ -2,7 +2,6 @@ // SPDX-License-Identifier: Apache-2.0 use reqwest::StatusCode; - pub type Result = std::result::Result; #[derive(thiserror::Error, Debug)] @@ -13,6 +12,9 @@ pub enum Error { #[error("Failed to deserialize checkpoint {0}: {1}")] DeserializationError(u64, #[source] anyhow::Error), + #[error("Failed to fetch checkpoint {0}: {1}")] + FetchError(u64, #[source] anyhow::Error), + #[error("Failed to fetch checkpoint {0}: {1}")] HttpError(u64, StatusCode), diff --git a/crates/sui-indexer-alt/src/ingestion/local_client.rs b/crates/sui-indexer-alt/src/ingestion/local_client.rs index 4ed38a7c78eb7c..25f58cf66364a4 100644 --- a/crates/sui-indexer-alt/src/ingestion/local_client.rs +++ b/crates/sui-indexer-alt/src/ingestion/local_client.rs @@ -1,8 +1,7 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use crate::ingestion::client::IngestionClientTrait; -use crate::ingestion::Error as IngestionError; +use crate::ingestion::client::{FetchError, FetchResult, IngestionClientTrait}; use axum::body::Bytes; use std::path::PathBuf; @@ -18,13 +17,16 @@ impl LocalIngestionClient { #[async_trait::async_trait] impl IngestionClientTrait for LocalIngestionClient { - async fn fetch(&self, checkpoint: u64) -> Result> { + async fn fetch(&self, checkpoint: u64) -> FetchResult { let path = self.path.join(format!("{}.chk", checkpoint)); let bytes = tokio::fs::read(path).await.map_err(|e| { if e.kind() == std::io::ErrorKind::NotFound { - backoff::Error::permanent(IngestionError::NotFound(checkpoint)) + FetchError::NotFound } else { - backoff::Error::transient(IngestionError::IoError(e)) + FetchError::Transient { + reason: "IO error", + error: e.into(), + } } })?; Ok(Bytes::from(bytes)) diff --git a/crates/sui-indexer-alt/src/ingestion/remote_client.rs b/crates/sui-indexer-alt/src/ingestion/remote_client.rs index 990785134c7261..3e06137591a194 100644 --- a/crates/sui-indexer-alt/src/ingestion/remote_client.rs +++ b/crates/sui-indexer-alt/src/ingestion/remote_client.rs @@ -1,30 +1,23 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use crate::ingestion::client::IngestionClientTrait; -use crate::ingestion::Error; +use crate::ingestion::client::{FetchError, FetchResult, IngestionClientTrait}; +use crate::ingestion::remote_client::HttpError::Http; use crate::ingestion::Result as IngestionResult; -use crate::metrics::IndexerMetrics; -use axum::body::Bytes; -use backoff::Error as BE; use reqwest::{Client, StatusCode}; -use std::sync::Arc; use tracing::{debug, error}; use url::Url; pub(crate) struct RemoteIngestionClient { url: Url, client: Client, - /// Wrap the metrics in an `Arc` to keep copies of the client cheap. - metrics: Arc, } impl RemoteIngestionClient { - pub(crate) fn new(url: Url, metrics: Arc) -> IngestionResult { + pub(crate) fn new(url: Url) -> IngestionResult { Ok(Self { url, client: Client::builder().build()?, - metrics, }) } } @@ -40,17 +33,22 @@ impl IngestionClientTrait for RemoteIngestionClient { /// - rate limiting, /// - server errors (5xx), /// - issues getting a full response and deserializing it as [CheckpointData]. - async fn fetch(&self, checkpoint: u64) -> Result> { + async fn fetch(&self, checkpoint: u64) -> FetchResult { // SAFETY: The path being joined is statically known to be valid. let url = self .url .join(&format!("/{checkpoint}.chk")) .expect("Unexpected invalid URL"); - let response = self.client.get(url).send().await.map_err(|e| { - self.metrics - .inc_retry(checkpoint, "request", Error::ReqwestError(e)) - })?; + let response = self + .client + .get(url) + .send() + .await + .map_err(|e| FetchError::Transient { + reason: "request", + error: e.into(), + })?; match response.status() { code if code.is_success() => { @@ -58,57 +56,65 @@ impl IngestionClientTrait for RemoteIngestionClient { // checkpoint from them is considered a transient error -- the store being // fetched from needs to be corrected, and ingestion will keep retrying it // until it is. - response.bytes().await.map_err(|e| { - self.metrics - .inc_retry(checkpoint, "bytes", Error::ReqwestError(e)) + response.bytes().await.map_err(|e| FetchError::Transient { + reason: "bytes", + error: e.into(), }) } // Treat 404s as a special case so we can match on this error type. code @ StatusCode::NOT_FOUND => { debug!(checkpoint, %code, "Checkpoint not found"); - Err(BE::permanent(Error::NotFound(checkpoint))) + Err(FetchError::NotFound) } // Timeouts are a client error but they are usually transient. - code @ StatusCode::REQUEST_TIMEOUT => Err(self.metrics.inc_retry( - checkpoint, - "timeout", - Error::HttpError(checkpoint, code), - )), + code @ StatusCode::REQUEST_TIMEOUT => Err(FetchError::Transient { + reason: "timeout", + error: status_code_to_error(code), + }), // Rate limiting is also a client error, but the backoff will eventually widen the // interval appropriately. - code @ StatusCode::TOO_MANY_REQUESTS => Err(self.metrics.inc_retry( - checkpoint, - "too_many_requests", - Error::HttpError(checkpoint, code), - )), + code @ StatusCode::TOO_MANY_REQUESTS => Err(FetchError::Transient { + reason: "too_many_requests", + error: status_code_to_error(code), + }), // Assume that if the server is facing difficulties, it will recover eventually. - code if code.is_server_error() => Err(self.metrics.inc_retry( - checkpoint, - "server_error", - Error::HttpError(checkpoint, code), - )), + code if code.is_server_error() => Err(FetchError::Transient { + reason: "server_error", + error: status_code_to_error(code), + }), // For everything else, assume it's a permanent error and don't retry. code => { error!(checkpoint, %code, "Permanent error, giving up!"); - Err(BE::permanent(Error::HttpError(checkpoint, code))) + Err(FetchError::Permanent(status_code_to_error(code))) } } } } +#[derive(thiserror::Error, Debug, Eq, PartialEq)] +pub enum HttpError { + #[error("HTTP error with status code: {0}")] + Http(StatusCode), +} + +fn status_code_to_error(code: StatusCode) -> anyhow::Error { + Http(code).into() +} + #[cfg(test)] pub(crate) mod tests { use super::*; use crate::ingestion::client::IngestionClient; + use crate::ingestion::error::Error; use crate::ingestion::test_utils::test_checkpoint_data; use crate::metrics::tests::test_metrics; use axum::http::StatusCode; - use std::sync::Mutex; + use std::sync::{Arc, Mutex}; use tokio_util::sync::CancellationToken; use wiremock::{ matchers::{method, path_regex}, @@ -131,6 +137,17 @@ pub(crate) mod tests { IngestionClient::new_remote(Url::parse(&uri).unwrap(), Arc::new(test_metrics())).unwrap() } + fn assert_http_error(error: Error, checkpoint: u64, code: StatusCode) { + let Error::FetchError(c, inner) = error else { + panic!("Expected FetchError, got: {:?}", error); + }; + assert_eq!(c, checkpoint); + let Some(http_error) = inner.downcast_ref::() else { + panic!("Expected HttpError, got: {:?}", inner); + }; + assert_eq!(http_error, &Http(code)); + } + #[tokio::test] async fn fail_on_not_found() { let server = MockServer::start().await; @@ -156,10 +173,7 @@ pub(crate) mod tests { .await .unwrap_err(); - assert!(matches!( - error, - Error::HttpError(42, StatusCode::IM_A_TEAPOT) - )); + assert_http_error(error, 42, StatusCode::IM_A_TEAPOT); } /// Even if the server is repeatedly returning transient errors, it is possible to cancel the @@ -224,10 +238,7 @@ pub(crate) mod tests { .await .unwrap_err(); - assert!( - matches!(error, Error::HttpError(42, StatusCode::IM_A_TEAPOT),), - "{error}" - ); + assert_http_error(error, 42, StatusCode::IM_A_TEAPOT); } /// Assume that certain errors will recover by themselves, and keep retrying with an @@ -255,10 +266,7 @@ pub(crate) mod tests { .await .unwrap_err(); - assert!(matches!( - error, - Error::HttpError(42, StatusCode::IM_A_TEAPOT) - )); + assert_http_error(error, 42, StatusCode::IM_A_TEAPOT); } /// Treat deserialization failure as another kind of transient error -- all checkpoint data