diff --git a/crates/worker/src/partition/snapshots/repository.rs b/crates/worker/src/partition/snapshots/repository.rs index 344a9c4fa..61939df0f 100644 --- a/crates/worker/src/partition/snapshots/repository.rs +++ b/crates/worker/src/partition/snapshots/repository.rs @@ -50,6 +50,35 @@ pub struct SnapshotRepository { prefix: String, } +#[serde_as] +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct LatestSnapshot { + pub version: SnapshotFormatVersion, + + /// Restate cluster name which produced the snapshot. + pub lsn: Lsn, + + /// Restate partition id. + pub partition_id: PartitionId, + + /// Node that produced this snapshot. + pub node_name: String, + + /// Local node time when the snapshot was created. + #[serde(with = "serde_with::As::")] + pub created_at: humantime::Timestamp, + + /// Snapshot id. + pub snapshot_id: SnapshotId, + + /// The minimum LSN guaranteed to be applied in this snapshot. The actual + /// LSN may be >= [minimum_lsn]. + pub min_applied_lsn: Lsn, + + /// The relative path within the snapshot repository where the snapshot data is stored. + pub path: String, +} + impl SnapshotRepository { pub async fn create( base_dir: PathBuf, @@ -127,10 +156,10 @@ impl SnapshotRepository { self.destination, ); + let relative_snapshot_path = format!("lsn_{lsn}", lsn = snapshot.min_applied_lsn); let snapshot_prefix = format!( - "{prefix}{partition_id}/lsn_{lsn}", + "{prefix}{partition_id}/{relative_snapshot_path}", prefix = self.prefix, - lsn = snapshot.min_applied_lsn, ); debug!( @@ -151,7 +180,7 @@ impl SnapshotRepository { &key, &self.object_store, ) - .await?; + .await?; debug!( etag = put_result.e_tag.unwrap_or_default(), ?key, @@ -174,7 +203,28 @@ impl SnapshotRepository { "Successfully published snapshot metadata", ); - // todo(pavel): (re)write latest.json pointer object + let latest = LatestSnapshot { + version: snapshot.version, + lsn: snapshot.min_applied_lsn, + partition_id, + node_name: snapshot.node_name.clone(), + created_at: snapshot.created_at.clone(), + snapshot_id: snapshot.snapshot_id, + min_applied_lsn: snapshot.min_applied_lsn, + path: relative_snapshot_path, + }; + let latest_path = object_store::path::Path::from(format!( + "{prefix}{partition_id}/latest.json", + prefix = self.prefix, + partition_id = partition_id, + )); + let latest_json_payload = PutPayload::from(serde_json::to_string_pretty(&latest)?); + let put_result = self.object_store.put(&latest_path, latest_json_payload).await?; + debug!( + etag = put_result.e_tag.unwrap_or_default(), + key = ?latest_path, + "Successfully updated latest snapshot pointer", + ); tokio::fs::remove_dir_all(local_snapshot_path.as_path()).await?; trace!(