diff --git a/crates/worker/src/partition/snapshots/repository.rs b/crates/worker/src/partition/snapshots/repository.rs index 7ac5315c1..cfd229847 100644 --- a/crates/worker/src/partition/snapshots/repository.rs +++ b/crates/worker/src/partition/snapshots/repository.rs @@ -18,7 +18,7 @@ use aws_config::BehaviorVersion; use aws_credential_types::provider::ProvideCredentials; use bytes::BytesMut; use object_store::aws::AmazonS3Builder; -use object_store::{MultipartUpload, ObjectStore, PutPayload}; +use object_store::{MultipartUpload, ObjectStore, PutMode, PutOptions, PutPayload}; use serde::{Deserialize, Serialize}; use serde_with::serde_as; use tokio::io::AsyncReadExt; @@ -230,11 +230,10 @@ impl SnapshotRepository { partition_id = snapshot.partition_id, )); - // We can not do atomic CAS, but we can try to prevent the pointer from moving backwards! - // This does not help with different nodes updating the pointer concurrently but that is acceptable. - // We expect this path to not be contended, this check just serves as a correctness backstop. + // By performing a CAS on the latest snapshot pointer, we can ensure strictly monotonic updates. let maybe_stored = match self.object_store.get(&latest_path).await { Ok(result) => { + let attributes = result.attributes.clone(); let parse_result: serde_json::Result = serde_json::from_slice(result.bytes().await?.iter().as_slice()); parse_result @@ -247,6 +246,7 @@ impl SnapshotRepository { ) }) .ok() + .map(|metadata| (attributes, metadata)) } Err(object_store::Error::NotFound { .. }) => { debug!( @@ -263,9 +263,9 @@ impl SnapshotRepository { if maybe_stored .as_ref() - .is_some_and(|stored| stored.min_applied_lsn >= snapshot.min_applied_lsn) + .is_some_and(|(_, stored)| stored.min_applied_lsn >= snapshot.min_applied_lsn) { - let repository_latest_lsn = maybe_stored.expect("is some").min_applied_lsn; + let repository_latest_lsn = maybe_stored.expect("is some").1.min_applied_lsn; info!( ?repository_latest_lsn, new_snapshot_lsn = ?snapshot.min_applied_lsn, @@ -277,10 +277,20 @@ impl SnapshotRepository { )); } - let latest_json_payload = PutPayload::from(serde_json::to_string_pretty(&latest)?); + let latest_json = PutPayload::from(serde_json::to_string_pretty(&latest)?); + let conditions = maybe_stored + .map(|(attributes, _)| PutOptions { + mode: PutMode::Overwrite, + attributes, + ..PutOptions::default() + }) + .unwrap_or_else(|| PutOptions { + mode: PutMode::Create, + ..PutOptions::default() + }); let put_result = self .object_store - .put(&latest_path, latest_json_payload) + .put_opts(&latest_path, latest_json, conditions) .await?; debug!( etag = put_result.e_tag.unwrap_or_default(),