Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

object-store: support real S3's If-Match semantics #6801

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions object_store/src/aws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ pub(crate) struct Request<'a> {
payload: Option<PutPayload>,
use_session_creds: bool,
idempotent: bool,
retry_on_conflict: bool,
retry_error_body: bool,
}

Expand Down Expand Up @@ -317,6 +318,13 @@ impl<'a> Request<'a> {
Self { idempotent, ..self }
}

pub(crate) fn retry_on_conflict(self, retry_on_conflict: bool) -> Self {
Self {
retry_on_conflict,
..self
}
}

pub(crate) fn retry_error_body(self, retry_error_body: bool) -> Self {
Self {
retry_error_body,
Expand Down Expand Up @@ -412,6 +420,7 @@ impl<'a> Request<'a> {
self.builder
.with_aws_sigv4(credential.authorizer(), sha)
.retryable(&self.config.retry_config)
.retry_on_conflict(self.retry_on_conflict)
.idempotent(self.idempotent)
.retry_error_body(self.retry_error_body)
.payload(self.payload)
Expand Down Expand Up @@ -448,6 +457,7 @@ impl S3Client {
config: &self.config,
use_session_creds: true,
idempotent: false,
retry_on_conflict: false,
retry_error_body: false,
}
}
Expand Down
20 changes: 19 additions & 1 deletion object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,25 @@ impl ObjectStore for AmazonS3 {
match put {
S3ConditionalPut::ETagPutIfNotExists => Err(Error::NotImplemented),
S3ConditionalPut::ETagMatch => {
request.header(&IF_MATCH, etag.as_str()).do_put().await
match request
.header(&IF_MATCH, etag.as_str())
// Real S3 will occasionally report 409 Conflict
// if there are concurrent `If-Match` requests
// in flight, so we need to be prepared to retry
// 409 responses.
.retry_on_conflict(true)
.do_put()
.await
{
// Real S3 reports NotFound rather than PreconditionFailed when the
// object doesn't exist. Convert to PreconditionFailed for
// consistency with R2. This also matches what the HTTP spec
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😅

// says the behavior should be.
Err(Error::NotFound { path, source }) => {
Err(Error::Precondition { path, source })
}
r => r,
}
}
S3ConditionalPut::Dynamo(d) => {
d.conditional_op(&self.client, location, Some(&etag), move || {
Expand Down
14 changes: 13 additions & 1 deletion object_store/src/client/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ pub(crate) struct RetryableRequest {

sensitive: bool,
idempotent: Option<bool>,
retry_on_conflict: bool,
payload: Option<PutPayload>,

retry_error_body: bool,
Expand All @@ -217,6 +218,15 @@ impl RetryableRequest {
}
}

/// Set whether this request should be retried on a 409 Conflict response.
#[cfg(feature = "aws")]
pub(crate) fn retry_on_conflict(self, retry_on_conflict: bool) -> Self {
Self {
retry_on_conflict,
..self
}
}

/// Set whether this request contains sensitive data
///
/// This will avoid printing out the URL in error messages
Expand Down Expand Up @@ -340,7 +350,8 @@ impl RetryableRequest {
let status = r.status();
if retries == max_retries
|| now.elapsed() > retry_timeout
|| !status.is_server_error()
|| !(status.is_server_error()
|| (self.retry_on_conflict && status == StatusCode::CONFLICT))
{
return Err(match status.is_client_error() {
true => match r.text().await {
Expand Down Expand Up @@ -467,6 +478,7 @@ impl RetryExt for reqwest::RequestBuilder {
idempotent: None,
payload: None,
sensitive: false,
retry_on_conflict: false,
retry_error_body: false,
}
}
Expand Down
Loading