From fc51b101b6dd859b07210abc97416b774ceea9a8 Mon Sep 17 00:00:00 2001 From: Brent Gardner Date: Tue, 3 Dec 2024 09:44:36 -0700 Subject: [PATCH] wtf --- object_store/src/aws/client.rs | 27 +++++++++++++++++++-------- object_store/src/aws/mod.rs | 21 +++++++++++++++++---- object_store/src/buffered.rs | 2 ++ object_store/src/client/s3.rs | 4 ++-- object_store/src/integration.rs | 2 ++ object_store/src/lib.rs | 1 + 6 files changed, 43 insertions(+), 14 deletions(-) diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs index af54b8a0d4ce..a1cc049f48dc 100644 --- a/object_store/src/aws/client.rs +++ b/object_store/src/aws/client.rs @@ -638,9 +638,12 @@ impl S3Client { location: &Path, opts: PutMultipartOpts, ) -> Result { - let response = self - .request(Method::POST, location) - .header("x-amz-checksum-algorithm", "SHA256") + let mut req = self + .request(Method::POST, location); + if opts.checksum { + req = req.header("x-amz-checksum-algorithm", "SHA256"); + } + let response = req .query(&[("uploads", "")]) .with_encryption_headers() .with_attributes(opts.attributes) @@ -678,11 +681,17 @@ impl S3Client { .idempotent(true); request = match data { - PutPartPayload::Part(payload) => request.with_payload(payload), - PutPartPayload::Copy(path) => request.header( - "x-amz-copy-source", - &format!("{}/{}", self.config.bucket, encode_path(path)), - ), + PutPartPayload::Part(payload) => { + println!("put_part payload={}", payload.content_length()); + request.with_payload(payload) + }, + PutPartPayload::Copy(path) => { + println!("put_part path={path:?}"); + request.header( + "x-amz-copy-source", + &format!("{}/{}", self.config.bucket, encode_path(path)), + ) + }, }; if self @@ -721,11 +730,13 @@ impl S3Client { } pub(crate) async fn abort_multipart(&self, location: &Path, upload_id: &str) -> Result<()> { + println!("Aborting"); self.request(Method::DELETE, location) .query(&[("uploadId", upload_id)]) .with_encryption_headers() .send() .await?; + println!("Aborted"); Ok(()) } diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index 9716c103b9ef..040e81028803 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -309,13 +309,19 @@ impl ObjectStore for AmazonS3 { } async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { + println!("copy_if_not_exists from={from:?} to={to:?}"); let (k, v, status) = match &self.client.config.copy_if_not_exists { Some(S3CopyIfNotExists::Header(k, v)) => (k, v, StatusCode::PRECONDITION_FAILED), Some(S3CopyIfNotExists::HeaderWithStatus(k, v, status)) => (k, v, *status), Some(S3CopyIfNotExists::Multipart) => { + let opts = PutMultipartOpts { + tags: Default::default(), + attributes: Default::default(), + checksum: false, + }; let upload_id = self .client - .create_multipart(to, PutMultipartOpts::default()) + .create_multipart(to, opts) .await?; let res = async { @@ -323,7 +329,8 @@ impl ObjectStore for AmazonS3 { .client .put_part(to, &upload_id, 0, PutPartPayload::Copy(from)) .await?; - match self + println!("copy_if_not_exists part={part:#?}"); + let res = self .client .complete_multipart( to, @@ -331,7 +338,8 @@ impl ObjectStore for AmazonS3 { vec![part], CompleteMultipartMode::Create, ) - .await + .await; + match res { Err(e @ Error::Precondition { .. }) => Err(Error::AlreadyExists { path: to.to_string(), @@ -442,8 +450,13 @@ impl MultipartUpload for S3MultiPartUpload { #[async_trait] impl MultipartStore for AmazonS3 { async fn create_multipart(&self, path: &Path) -> Result { + let opts = PutMultipartOpts { + tags: Default::default(), + attributes: Default::default(), + checksum: true, + }; self.client - .create_multipart(path, PutMultipartOpts::default()) + .create_multipart(path, opts) .await } diff --git a/object_store/src/buffered.rs b/object_store/src/buffered.rs index fcd7e064e7c1..036fd9cbe0ed 100644 --- a/object_store/src/buffered.rs +++ b/object_store/src/buffered.rs @@ -325,6 +325,7 @@ impl BufWriter { let opts = PutMultipartOpts { attributes: self.attributes.take().unwrap_or_default(), tags: self.tags.take().unwrap_or_default(), + checksum: true, }; let upload = self.store.put_multipart_opts(&path, opts).await?; let mut chunked = @@ -384,6 +385,7 @@ impl AsyncWrite for BufWriter { let opts = PutMultipartOpts { attributes: self.attributes.take().unwrap_or_default(), tags: self.tags.take().unwrap_or_default(), + checksum: true, }; let store = Arc::clone(&self.store); self.state = BufWriterState::Prepare(Box::pin(async move { diff --git a/object_store/src/client/s3.rs b/object_store/src/client/s3.rs index f131adb532ad..d15d042dc47d 100644 --- a/object_store/src/client/s3.rs +++ b/object_store/src/client/s3.rs @@ -111,9 +111,9 @@ impl From> for CompleteMultipartUpload { let part = value .into_iter() .enumerate() - .map(|(part_number, part)| MultipartPart { + .map(|(idx, part)| MultipartPart { e_tag: part.content_id, - part_number: part_number + 1, + part_number: idx + 1, checksum_sha256: None, }) .collect(); diff --git a/object_store/src/integration.rs b/object_store/src/integration.rs index 30177878306f..74e4d8e94929 100644 --- a/object_store/src/integration.rs +++ b/object_store/src/integration.rs @@ -1020,7 +1020,9 @@ pub async fn copy_if_not_exists(storage: &DynObjectStore) { // copy_if_not_exists() copies contents and allows deleting original storage.delete(&path2).await.unwrap(); + println!("copy_if_not_exists"); storage.copy_if_not_exists(&path1, &path2).await.unwrap(); + println!("copied_if_not_exists"); storage.delete(&path1).await.unwrap(); let new_contents = storage.get(&path2).await.unwrap().bytes().await.unwrap(); assert_eq!(&new_contents, &contents1); diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index 4d8d8f02a0bc..409009ed9b8c 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -1189,6 +1189,7 @@ pub struct PutMultipartOpts { /// /// Implementations that don't support an attribute should return an error pub attributes: Attributes, + pub checksum: bool, } impl From for PutMultipartOpts {