Skip to content

Commit

Permalink
Fix multipart uploads with checksums on object locked buckets (#6794)
Browse files Browse the repository at this point in the history
Fix multipart uploads with checksums on object locked buckets (#6794)
  • Loading branch information
avantgardnerio authored Dec 9, 2024
1 parent 9bbbb28 commit fcf4aa4
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 11 deletions.
1 change: 1 addition & 0 deletions .github/workflows/object_store.yml
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ jobs:
echo "LOCALSTACK_CONTAINER=$(docker run -d -p 4566:4566 localstack/localstack:4.0.3)" >> $GITHUB_ENV
echo "EC2_METADATA_CONTAINER=$(docker run -d -p 1338:1338 amazon/amazon-ec2-metadata-mock:v1.9.2 --imdsv2)" >> $GITHUB_ENV
aws --endpoint-url=http://localhost:4566 s3 mb s3://test-bucket
aws --endpoint-url=http://localhost:4566 s3api create-bucket --bucket test-object-lock --object-lock-enabled-for-bucket
aws --endpoint-url=http://localhost:4566 dynamodb create-table --table-name test-table --key-schema AttributeName=path,KeyType=HASH AttributeName=etag,KeyType=RANGE --attribute-definitions AttributeName=path,AttributeType=S AttributeName=etag,AttributeType=S --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5
KMS_KEY=$(aws --endpoint-url=http://localhost:4566 kms create-key --description "test key")
Expand Down
39 changes: 31 additions & 8 deletions object_store/src/aws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::client::list::ListClient;
use crate::client::retry::RetryExt;
use crate::client::s3::{
CompleteMultipartUpload, CompleteMultipartUploadResult, CopyPartResult,
InitiateMultipartUploadResult, ListResponse,
InitiateMultipartUploadResult, ListResponse, PartMetadata,
};
use crate::client::GetOptionsExt;
use crate::multipart::PartId;
Expand Down Expand Up @@ -62,6 +62,7 @@ use std::sync::Arc;
const VERSION_HEADER: &str = "x-amz-version-id";
const SHA256_CHECKSUM: &str = "x-amz-checksum-sha256";
const USER_DEFINED_METADATA_HEADER_PREFIX: &str = "x-amz-meta-";
const ALGORITHM: &str = "x-amz-checksum-algorithm";

/// A specialized `Error` for object store-related errors
#[derive(Debug, Snafu)]
Expand Down Expand Up @@ -390,10 +391,9 @@ impl<'a> Request<'a> {
let payload_sha256 = sha256.finish();

if let Some(Checksum::SHA256) = self.config.checksum {
self.builder = self.builder.header(
"x-amz-checksum-sha256",
BASE64_STANDARD.encode(payload_sha256),
);
self.builder = self
.builder
.header(SHA256_CHECKSUM, BASE64_STANDARD.encode(payload_sha256));
}
self.payload_sha256 = Some(payload_sha256);
}
Expand Down Expand Up @@ -617,8 +617,15 @@ impl S3Client {
location: &Path,
opts: PutMultipartOpts,
) -> Result<MultipartId> {
let response = self
.request(Method::POST, location)
let mut request = self.request(Method::POST, location);
if let Some(algorithm) = self.config.checksum {
match algorithm {
Checksum::SHA256 => {
request = request.header(ALGORITHM, "SHA256");
}
}
}
let response = request
.query(&[("uploads", "")])
.with_encryption_headers()
.with_attributes(opts.attributes)
Expand Down Expand Up @@ -669,8 +676,13 @@ impl S3Client {
request = request.with_encryption_headers();
}
let response = request.send().await?;
let checksum_sha256 = response
.headers()
.get(SHA256_CHECKSUM)
.and_then(|v| v.to_str().ok())
.map(|v| v.to_string());

let content_id = match is_copy {
let e_tag = match is_copy {
false => get_etag(response.headers()).context(MetadataSnafu)?,
true => {
let response = response
Expand All @@ -682,6 +694,17 @@ impl S3Client {
response.e_tag
}
};

let content_id = if self.config.checksum == Some(Checksum::SHA256) {
let meta = PartMetadata {
e_tag,
checksum_sha256,
};
quick_xml::se::to_string(&meta).unwrap()
} else {
e_tag
};

Ok(PartId { content_id })
}

Expand Down
60 changes: 60 additions & 0 deletions object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,66 @@ mod tests {

const NON_EXISTENT_NAME: &str = "nonexistentname";

#[tokio::test]
async fn write_multipart_file_with_signature() {
maybe_skip_integration!();

let store = AmazonS3Builder::from_env()
.with_checksum_algorithm(Checksum::SHA256)
.build()
.unwrap();

let str = "test.bin";
let path = Path::parse(str).unwrap();
let opts = PutMultipartOpts::default();
let mut upload = store.put_multipart_opts(&path, opts).await.unwrap();

upload
.put_part(PutPayload::from(vec![0u8; 10_000_000]))
.await
.unwrap();
upload
.put_part(PutPayload::from(vec![0u8; 5_000_000]))
.await
.unwrap();

let res = upload.complete().await.unwrap();
assert!(res.e_tag.is_some(), "Should have valid etag");

store.delete(&path).await.unwrap();
}

#[tokio::test]
async fn write_multipart_file_with_signature_object_lock() {
maybe_skip_integration!();

let bucket = "test-object-lock";
let store = AmazonS3Builder::from_env()
.with_bucket_name(bucket)
.with_checksum_algorithm(Checksum::SHA256)
.build()
.unwrap();

let str = "test.bin";
let path = Path::parse(str).unwrap();
let opts = PutMultipartOpts::default();
let mut upload = store.put_multipart_opts(&path, opts).await.unwrap();

upload
.put_part(PutPayload::from(vec![0u8; 10_000_000]))
.await
.unwrap();
upload
.put_part(PutPayload::from(vec![0u8; 5_000_000]))
.await
.unwrap();

let res = upload.complete().await.unwrap();
assert!(res.e_tag.is_some(), "Should have valid etag");

store.delete(&path).await.unwrap();
}

#[tokio::test]
async fn s3_test() {
maybe_skip_integration!();
Expand Down
27 changes: 24 additions & 3 deletions object_store/src/client/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,32 @@ pub(crate) struct CompleteMultipartUpload {
pub part: Vec<MultipartPart>,
}

#[derive(Serialize, Deserialize)]
pub(crate) struct PartMetadata {
pub e_tag: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub checksum_sha256: Option<String>,
}

impl From<Vec<PartId>> for CompleteMultipartUpload {
fn from(value: Vec<PartId>) -> Self {
let part = value
.into_iter()
.enumerate()
.map(|(part_number, part)| MultipartPart {
e_tag: part.content_id,
part_number: part_number + 1,
.map(|(part_idx, part)| {
let md = match quick_xml::de::from_str::<PartMetadata>(&part.content_id) {
Ok(md) => md,
// fallback to old way
Err(_) => PartMetadata {
e_tag: part.content_id.clone(),
checksum_sha256: None,
},
};
MultipartPart {
e_tag: md.e_tag,
part_number: part_idx + 1,
checksum_sha256: md.checksum_sha256,
}
})
.collect();
Self { part }
Expand All @@ -126,6 +144,9 @@ pub(crate) struct MultipartPart {
pub e_tag: String,
#[serde(rename = "PartNumber")]
pub part_number: usize,
#[serde(rename = "ChecksumSHA256")]
#[serde(skip_serializing_if = "Option::is_none")]
pub checksum_sha256: Option<String>,
}

#[derive(Debug, Deserialize)]
Expand Down

0 comments on commit fcf4aa4

Please sign in to comment.