Skip to content

Commit

Permalink
Address review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
benesch committed Nov 6, 2024
1 parent 3206e8c commit dc2a634
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 26 deletions.
12 changes: 11 additions & 1 deletion object_store/src/aws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,7 @@ impl S3Client {
PutPartPayload::Part(payload) => request.with_payload(payload),
PutPartPayload::Copy(path) => request.header(
"x-amz-copy-source",
&format!("{}/{}", self.config.bucket, path),
&format!("{}/{}", self.config.bucket, encode_path(path)),
),
};

Expand Down Expand Up @@ -671,6 +671,16 @@ impl S3Client {
Ok(PartId { content_id })
}

pub(crate) async fn abort_multipart(&self, location: &Path, upload_id: &str) -> Result<()> {
self.request(Method::DELETE, location)
.query(&[("uploadId", upload_id)])
.with_encryption_headers()
.send()
.await?;

Ok(())
}

pub(crate) async fn complete_multipart(
&self,
location: &Path,
Expand Down
55 changes: 34 additions & 21 deletions object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,27 +302,40 @@ impl ObjectStore for AmazonS3 {
.client
.create_multipart(to, PutMultipartOpts::default())
.await?;
let part_id = self
.client
.put_part(to, &upload_id, 0, PutPartPayload::Copy(from))
.await?;
let res = match self
.client
.complete_multipart(
to,
&upload_id,
vec![part_id],
CompleteMultipartMode::Create,
)
.await
{
Err(e @ Error::Precondition { .. }) => Err(Error::AlreadyExists {
path: to.to_string(),
source: Box::new(e),
}),
Ok(_) => Ok(()),
Err(e) => Err(e.into()),
};

let res = async {
let part_id = self
.client
.put_part(to, &upload_id, 0, PutPartPayload::Copy(from))
.await?;
match self
.client
.complete_multipart(
to,
&upload_id,
vec![part_id],
CompleteMultipartMode::Create,
)
.await
{
Err(e @ Error::Precondition { .. }) => Err(Error::AlreadyExists {
path: to.to_string(),
source: Box::new(e),
}),
Ok(_) => Ok(()),
Err(e) => Err(e.into()),
}
}
.await;

// If the multipart upload failed, make a best effort attempt to
// clean it up. It's the caller's responsibility to add a
// lifecycle rule if guaranteed cleanup is required, as we
// cannot protect against an ill-timed process crash.
if res.is_err() {
let _ = self.client.abort_multipart(to, &upload_id).await;
}

return res;
}
Some(S3CopyIfNotExists::Dynamo(lock)) => {
Expand Down
14 changes: 10 additions & 4 deletions object_store/src/aws/precondition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,17 @@ pub enum S3CopyIfNotExists {
/// Encoded as `header-with-status:<HEADER_NAME>:<HEADER_VALUE>:<STATUS>` ignoring whitespace
HeaderWithStatus(String, String, reqwest::StatusCode),
/// Native Amazon S3 supports copy if not exists through a multipart upload
/// where the upload copies an existing object and is completed only if
/// the new object does not already exist.
/// where the upload copies an existing object and is completed only if the
/// new object does not already exist.
///
/// WARNING: When using this mode, `copy_if_not_exists` does not copy
/// tags or attributes from the source object.
/// WARNING: When using this mode, `copy_if_not_exists` does not copy tags
/// or attributes from the source object.
///
/// WARNING: When using this mode, `copy_if_not_exists` makes only a best
/// effort attempt to clean up the multipart upload if the copy operation
/// fails. Consider using a lifecycle rule to automatically clean up
/// abandoned multipart uploads. See [the module
/// docs](super#multipart-uploads) for details.
///
/// Encoded as `multipart` ignoring whitespace.
Multipart,
Expand Down

0 comments on commit dc2a634

Please sign in to comment.