Skip to content

Commit

Permalink
Clean up errors
Browse files Browse the repository at this point in the history
  • Loading branch information
lxfind committed Oct 19, 2024
1 parent bfb69c8 commit a2d7663
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 62 deletions.
33 changes: 30 additions & 3 deletions crates/sui-indexer-alt/src/ingestion/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,35 @@ const MAX_TRANSIENT_RETRY_INTERVAL: Duration = Duration::from_secs(60);

#[async_trait::async_trait]
pub(crate) trait IngestionClientTrait: Send + Sync {
async fn fetch(&self, checkpoint: u64) -> Result<Bytes, BE<IngestionError>>;
async fn fetch(&self, checkpoint: u64) -> FetchResult;
}

#[derive(thiserror::Error, Debug)]
pub enum FetchError {
#[error("Checkpoint not found")]
NotFound,
#[error("Failed to fetch checkpoint due to permanent error: {0}")]
Permanent(#[from] anyhow::Error),
#[error("Failed to fetch checkpoint due to {reason}: {error}")]
Transient {
reason: &'static str,
#[source]
error: anyhow::Error,
},
}

pub type FetchResult = Result<Bytes, FetchError>;

#[derive(Clone)]
pub(crate) struct IngestionClient {
client: Arc<dyn IngestionClientTrait>,
/// Wrap the metrics in an `Arc` to keep copies of the client cheap.
metrics: Arc<IndexerMetrics>,
}

impl IngestionClient {
pub(crate) fn new_remote(url: Url, metrics: Arc<IndexerMetrics>) -> IngestionResult<Self> {
let client = Arc::new(RemoteIngestionClient::new(url, metrics.clone())?);
let client = Arc::new(RemoteIngestionClient::new(url)?);
Ok(IngestionClient { client, metrics })
}

Expand All @@ -61,7 +78,17 @@ impl IngestionClient {
return Err(BE::permanent(IngestionError::Cancelled));
}

let bytes = client.fetch(checkpoint).await?;
let bytes = client.fetch(checkpoint).await.map_err(|err| match err {
FetchError::NotFound => BE::permanent(IngestionError::NotFound(checkpoint)),
FetchError::Permanent(error) => {
BE::permanent(IngestionError::FetchError(checkpoint, error))
}
FetchError::Transient { reason, error } => self.metrics.inc_retry(
checkpoint,
reason,
IngestionError::FetchError(checkpoint, error),
),
})?;

self.metrics.total_ingested_bytes.inc_by(bytes.len() as u64);
let data: CheckpointData = Blob::from_bytes(&bytes).map_err(|e| {
Expand Down
7 changes: 1 addition & 6 deletions crates/sui-indexer-alt/src/ingestion/error.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use reqwest::StatusCode;

pub type Result<T> = std::result::Result<T, Error>;

#[derive(thiserror::Error, Debug)]
Expand All @@ -14,7 +12,7 @@ pub enum Error {
DeserializationError(u64, #[source] anyhow::Error),

#[error("Failed to fetch checkpoint {0}: {1}")]
HttpError(u64, StatusCode),
FetchError(u64, #[source] anyhow::Error),

#[error(transparent)]
ReqwestError(#[from] reqwest::Error),
Expand All @@ -24,7 +22,4 @@ pub enum Error {

#[error("Shutdown signal received, stopping ingestion service")]
Cancelled,

#[error(transparent)]
IoError(#[from] std::io::Error),
}
12 changes: 7 additions & 5 deletions crates/sui-indexer-alt/src/ingestion/local_client.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::ingestion::client::IngestionClientTrait;
use crate::ingestion::Error as IngestionError;
use crate::ingestion::client::{FetchError, FetchResult, IngestionClientTrait};
use axum::body::Bytes;
use std::path::PathBuf;

Expand All @@ -18,13 +17,16 @@ impl LocalIngestionClient {

#[async_trait::async_trait]
impl IngestionClientTrait for LocalIngestionClient {
async fn fetch(&self, checkpoint: u64) -> Result<Bytes, backoff::Error<IngestionError>> {
async fn fetch(&self, checkpoint: u64) -> FetchResult {
let path = self.path.join(format!("{}.chk", checkpoint));
let bytes = tokio::fs::read(path).await.map_err(|e| {
if e.kind() == std::io::ErrorKind::NotFound {
backoff::Error::permanent(IngestionError::NotFound(checkpoint))
FetchError::NotFound
} else {
backoff::Error::transient(IngestionError::IoError(e))
FetchError::Transient {
reason: "IO error",
error: e.into(),
}
}
})?;
Ok(Bytes::from(bytes))
Expand Down
104 changes: 56 additions & 48 deletions crates/sui-indexer-alt/src/ingestion/remote_client.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,23 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::ingestion::client::IngestionClientTrait;
use crate::ingestion::Error;
use crate::ingestion::client::{FetchError, FetchResult, IngestionClientTrait};
use crate::ingestion::remote_client::HttpError::Http;
use crate::ingestion::Result as IngestionResult;
use crate::metrics::IndexerMetrics;
use axum::body::Bytes;
use backoff::Error as BE;
use reqwest::{Client, StatusCode};
use std::sync::Arc;
use tracing::{debug, error};
use url::Url;

pub(crate) struct RemoteIngestionClient {
url: Url,
client: Client,
/// Wrap the metrics in an `Arc` to keep copies of the client cheap.
metrics: Arc<IndexerMetrics>,
}

impl RemoteIngestionClient {
pub(crate) fn new(url: Url, metrics: Arc<IndexerMetrics>) -> IngestionResult<Self> {
pub(crate) fn new(url: Url) -> IngestionResult<Self> {
Ok(Self {
url,
client: Client::builder().build()?,
metrics,
})
}
}
Expand All @@ -40,75 +33,88 @@ impl IngestionClientTrait for RemoteIngestionClient {
/// - rate limiting,
/// - server errors (5xx),
/// - issues getting a full response and deserializing it as [CheckpointData].
async fn fetch(&self, checkpoint: u64) -> Result<Bytes, BE<Error>> {
async fn fetch(&self, checkpoint: u64) -> FetchResult {
// SAFETY: The path being joined is statically known to be valid.
let url = self
.url
.join(&format!("/{checkpoint}.chk"))
.expect("Unexpected invalid URL");

let response = self.client.get(url).send().await.map_err(|e| {
self.metrics
.inc_retry(checkpoint, "request", Error::ReqwestError(e))
})?;
let response = self
.client
.get(url)
.send()
.await
.map_err(|e| FetchError::Transient {
reason: "request",
error: e.into(),
})?;

match response.status() {
code if code.is_success() => {
// Failure to extract all the bytes from the payload, or to deserialize the
// checkpoint from them is considered a transient error -- the store being
// fetched from needs to be corrected, and ingestion will keep retrying it
// until it is.
response.bytes().await.map_err(|e| {
self.metrics
.inc_retry(checkpoint, "bytes", Error::ReqwestError(e))
response.bytes().await.map_err(|e| FetchError::Transient {
reason: "bytes",
error: e.into(),
})
}

// Treat 404s as a special case so we can match on this error type.
code @ StatusCode::NOT_FOUND => {
debug!(checkpoint, %code, "Checkpoint not found");
Err(BE::permanent(Error::NotFound(checkpoint)))
Err(FetchError::NotFound)
}

// Timeouts are a client error but they are usually transient.
code @ StatusCode::REQUEST_TIMEOUT => Err(self.metrics.inc_retry(
checkpoint,
"timeout",
Error::HttpError(checkpoint, code),
)),
code @ StatusCode::REQUEST_TIMEOUT => Err(FetchError::Transient {
reason: "timeout",
error: status_code_to_error(code),
}),

// Rate limiting is also a client error, but the backoff will eventually widen the
// interval appropriately.
code @ StatusCode::TOO_MANY_REQUESTS => Err(self.metrics.inc_retry(
checkpoint,
"too_many_requests",
Error::HttpError(checkpoint, code),
)),
code @ StatusCode::TOO_MANY_REQUESTS => Err(FetchError::Transient {
reason: "too_many_requests",
error: status_code_to_error(code),
}),

// Assume that if the server is facing difficulties, it will recover eventually.
code if code.is_server_error() => Err(self.metrics.inc_retry(
checkpoint,
"server_error",
Error::HttpError(checkpoint, code),
)),
code if code.is_server_error() => Err(FetchError::Transient {
reason: "server_error",
error: status_code_to_error(code),
}),

// For everything else, assume it's a permanent error and don't retry.
code => {
error!(checkpoint, %code, "Permanent error, giving up!");
Err(BE::permanent(Error::HttpError(checkpoint, code)))
Err(FetchError::Permanent(status_code_to_error(code)))
}
}
}
}

#[derive(thiserror::Error, Debug, Eq, PartialEq)]
pub enum HttpError {
#[error("HTTP error with status code: {0}")]
Http(StatusCode),
}

fn status_code_to_error(code: StatusCode) -> anyhow::Error {
Http(code).into()
}

#[cfg(test)]
pub(crate) mod tests {
use super::*;
use crate::ingestion::client::IngestionClient;
use crate::ingestion::error::Error;
use crate::ingestion::test_utils::test_checkpoint_data;
use crate::metrics::tests::test_metrics;
use axum::http::StatusCode;
use std::sync::Mutex;
use std::sync::{Arc, Mutex};
use tokio_util::sync::CancellationToken;
use wiremock::{
matchers::{method, path_regex},
Expand All @@ -131,6 +137,17 @@ pub(crate) mod tests {
IngestionClient::new_remote(Url::parse(&uri).unwrap(), Arc::new(test_metrics())).unwrap()
}

fn assert_http_error(error: Error, checkpoint: u64, code: StatusCode) {
let Error::FetchError(c, inner) = error else {
panic!("Expected FetchError, got: {:?}", error);
};
assert_eq!(c, checkpoint);
let Some(http_error) = inner.downcast_ref::<HttpError>() else {
panic!("Expected HttpError, got: {:?}", inner);
};
assert_eq!(http_error, &Http(code));
}

#[tokio::test]
async fn fail_on_not_found() {
let server = MockServer::start().await;
Expand All @@ -156,10 +173,7 @@ pub(crate) mod tests {
.await
.unwrap_err();

assert!(matches!(
error,
Error::HttpError(42, StatusCode::IM_A_TEAPOT)
));
assert_http_error(error, 42, StatusCode::IM_A_TEAPOT);
}

/// Even if the server is repeatedly returning transient errors, it is possible to cancel the
Expand Down Expand Up @@ -224,10 +238,7 @@ pub(crate) mod tests {
.await
.unwrap_err();

assert!(
matches!(error, Error::HttpError(42, StatusCode::IM_A_TEAPOT),),
"{error}"
);
assert_http_error(error, 42, StatusCode::IM_A_TEAPOT);
}

/// Assume that certain errors will recover by themselves, and keep retrying with an
Expand Down Expand Up @@ -255,10 +266,7 @@ pub(crate) mod tests {
.await
.unwrap_err();

assert!(matches!(
error,
Error::HttpError(42, StatusCode::IM_A_TEAPOT)
));
assert_http_error(error, 42, StatusCode::IM_A_TEAPOT);
}

/// Treat deserialization failure as another kind of transient error -- all checkpoint data
Expand Down

0 comments on commit a2d7663

Please sign in to comment.