-
Notifications
You must be signed in to change notification settings - Fork 44
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce SnapshotRepository and object store integration #2310
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @pcholakov for creating this PR. It looks good to me! I left 2 very minor comments
.into_string() | ||
.map(|path| format!("file://{path}")) | ||
}) | ||
.map_err(|e| anyhow!("Unable to convert path to string: {:?}", e))?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.map_err(|e| anyhow!("Unable to convert path to string: {:?}", e))?; | |
.context("Unable to convert path to string")?; |
This will still include the 'inner' error in the output string when printed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That approach doesn't work here because OsString::into_string()
returns Result<String, OsString>
, which doesn't meet Anyhow's trait bounds :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for creating this PR @pcholakov. The changes look really good. The one question I have is whether there is a way to avoid materializing the tarball and re-reading into memory. It would be awesome if we can stream the tarballing into the object-store upload.
/// Write a partition snapshot to the snapshot repository. | ||
pub(crate) async fn put( | ||
&self, | ||
partition_id: PartitionId, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't partition_id
already part of PartitionSnapshotMetadata
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed :-)
// todo(pavel): don't buffer the entire snapshot in memory! | ||
let payload = PutPayload::from(tokio::fs::read(tarball.path()).await?); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That would indeed be great. Especially once we have larger snapshots.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ObjecStore already supports multi part upload, you can use that to upload the tar in chunks instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implemented in the latest revision! 🎉
// the latest snapshot is always first. | ||
let inverted_sort_key = format!("{:016x}", u64::MAX - lsn.as_u64()); | ||
|
||
// The snapshot data / metadata key format is: [<base_prefix>/]<partition_id>/<sort_key>_<lsn>_<snapshot_id>.tar |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the idea for distinguishing full from incremental snapshots in the future? Would the latter have a completely different path or contain a marker file that denotes them as incremental?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm about to introduce this shortly to this PR - the key idea is to upload the tar archives and metadata JSON files separately, so that interested nodes can easily query just the metadata. We can gradually introduce additional attributes to the metadata JSON schema to support referencing the constituent parts of an incremental snapshot. The snapshot format version field within the metadata blob will allow nodes to know how to interpret it - or fail loudly if the Restate server is an older version that doesn't understand it.
The paths will be something like:
[<prefix>/]metadata/<partition_id>/<sort_key>-<snapshot_id>-{lsn}.json
[<prefix>/]snapshot/<partition_id>/<sort_key>-<snapshot_id>-{lsn}.tar
I imagine that at some point we'll add incremental snapshots and the repository format will then look something along the lines of:
[<prefix>/]metadata/<partition_id>/<sort_key>-<snapshot_id>-{lsn}.json
(V2)[<prefix>/]files/<partition_id>/<snapshot_id>-{filename}.sst
In this world, there will no longer be 1:1 metadata-to-snapshot correspondence but rather a 1:n relationship. Additionally, we may want to write some sort of index metadata to make it cheaper to garbage collect disused SSTs - but I haven't thought too much about that yet.
9f6d162
to
d686e7e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @tillrohrmann and @muhamadazmy for your early input, it was really valuable! I've pushed a new revision but I still want to remove tar archiving before I mark it ready for review.
.into_string() | ||
.map(|path| format!("file://{path}")) | ||
}) | ||
.map_err(|e| anyhow!("Unable to convert path to string: {:?}", e))?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That approach doesn't work here because OsString::into_string()
returns Result<String, OsString>
, which doesn't meet Anyhow's trait bounds :-)
let mut tarball = tar::Builder::new(NamedTempFile::new_in(&staging_path)?); | ||
debug!( | ||
"Creating snapshot tarball of {:?} in: {:?}...", | ||
&staging_path, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've renamed staging_path
to local_snapshot_path
for clarity - that's the raw RocksDB column family export directory with the SSTs plus our own metadata JSON blob. We then tar that directory up into an archive at the path snapshot_archive_path
.
// todo(pavel): don't buffer the entire snapshot in memory! | ||
let payload = PutPayload::from(tokio::fs::read(tarball.path()).await?); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implemented in the latest revision! 🎉
768bddf
to
56e659f
Compare
56e659f
to
cee99e6
Compare
76f4843
to
38268d6
Compare
Substantial changes since initial revision
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for creating this PR @pcholakov. The changes look really nice. I left a few minor comments. The one question I had was whether concurrent modifications of a snapshot metadata.json
or the latest.json
can be a problem (e.g. if an old and new leader upload a snapshot at the same time)?
/// Restate cluster name which produced the snapshot. | ||
pub lsn: Lsn, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment seems to be a bit off.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The entire field is redundant! (We have min_applied_lsn
below.)
debug!( | ||
%lsn, | ||
"Publishing partition snapshot to: {}", | ||
self.destination, | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can instrument put
via #[instrument()]
and include the lsn
, snapshot id, etc.
let put_result = self | ||
.object_store | ||
.put(&metadata_key, metadata_json_payload) | ||
.await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a possibility for two processes taking a snapshot for the same lsn (e.g. an old leader and a new one) which aren't exactly the same because the effective lsn is different? If this is possible, is this a problem?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely! This is partly why I'm still on the fence about the exact snapshot naming scheme. One simple solution is to use snapshot IDs to disambiguate snapshots for the same LSN as they must be (modulo ULID collision) unique across nodes. I'd combine that with conditional put (only succeed if file does not exist) and complain loudly if it ever fails.
let put_result = self | ||
.object_store | ||
.put(&latest_path, latest_json_payload) | ||
.await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same question here but for different lsns. How are we gonna us e the latest.json
? I could imagine how a slow old leader completes a snapshot after a new snapshot has been completed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have an idea here that hadn't made it into the PR just yet: just download the previous pointer and check that we aren't moving backwards. This should be enough to prevent the worst case of some node going to sleep mid-snapshot and wreaking havoc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since I wrote that comment, we have been blessed with proper S3 conditional put, so I rewrote the update logic to perform a CAS 🎉 I'm not doing this preemptively since this path should be uncontended, but the check is there as a defensive measure against going backwards and overwriting something we didn't mean to.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good :-)
for file in &snapshot.files { | ||
let filename = file.name.trim_start_matches("/"); | ||
let key = object_store::path::Path::from(format!( | ||
"{}/{}", | ||
snapshot_prefix.as_str(), | ||
filename | ||
)); | ||
let put_result = put_snapshot_object( | ||
local_snapshot_path.join(filename).as_path(), | ||
&key, | ||
&self.object_store, | ||
) | ||
.await?; | ||
debug!( | ||
etag = put_result.e_tag.unwrap_or_default(), | ||
?key, | ||
"Put snapshot data file completed", | ||
); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Uploading multiple files concurrently, will probably only cause higher and less predictable resource utilization. And we aren't in a rush, I guess.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed! It was easier to be more predictable with a single upload stream. The impact I'm most concerned about is the memory overhead. S3 advises using fairly large chunks - order 100MB - for maximum throughput so maybe it's worth looking into memory mapped IO down the line.
defc6ee
to
7291ede
Compare
@tillrohrmann if you could take another look please, that would be great! I think I've covered all the comments:
The partition snapshot prefix looks like this in S3 with the latest changes: |
7291ede
to
ff6d9ce
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot for updating the PR @pcholakov. It looks really good to me. I think we are very close to merging it.
The last remaining questions I had were around resource management in case of failures when uploading snapshots. In particular, who is cleaning up partial snapshot artifacts (ssts) and when are we deleting the local snapshot files.
When conditionally updating the latest.json
should we retry in case there was a concurrent modification?
I was also wondering whether we shouldn't configure a local snapshot directory if no destination was specified. That way users can control the snapshotting by configuring a valid destination.
crates/types/src/config/worker.rs
Outdated
#[serde(flatten, skip_serializing_if = "HashMap::is_empty")] | ||
pub additional_options: HashMap<String, String>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With serde(flatten)
this will act a bit as catch all fields specified under SnapshotOptions
in the toml that aren't specifically defined in SnapshotOptions
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, totally didn't see that because I haven't tested this option at all! I'm not sure how to handle it with serde but I'll find a way to make it work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you test it and it works?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't seem to use this field anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I clearly did all the precursor work but then forgot to delete the field - sorry about that! This is now removed.
base_dir | ||
.join("pp-snapshots") | ||
.into_os_string() | ||
.into_string() | ||
.map(|path| format!("file://{path}")) | ||
.map_err(|e| anyhow!("Unable to convert path to string: {:?}", e))? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we by default snapshot to local disk which is not accessible by every PP process, then we have to make the trim logic conditional on this fact. It will also require us to implement a snapshot exchange mechanism if it is only local because once we trim and then start new PPs then we need such a mechanism. Instead I would suggest to only create the SnapshotRepository
, if the destination is configured. And it is the responsibility of the user to ensure that destination
is accessible by all nodes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is sensible - it became quite apparent when I started working on the auto-trim-by-archived-LSN PR earlier. Partly this was motivated by wanting the CreateSnapshot
RPC to just work out of the box, but it's not worth the potential confusion. I still think I would prefer automated trimming to be opt-in regardless of the existence of a valid SnapshotRepository
but we can have that discussion in a separate PR.
// locations just work. This makes object_store behave similarly to the Lambda invoker. | ||
let object_store: Arc<dyn ObjectStore> = if destination.scheme() == "s3" | ||
&& destination.query().is_none() | ||
&& snapshots_options.additional_options.is_empty() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if additional_options
contains some other settings but not the credentials and is therefore not empty?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can make that work; there are a handful of config keys (region + access key) that are in conflict with the AWS config provider. It's a reasonable expectation to merge the configs.
// SDK credentials provider so that the conventional environment variables and config | ||
// locations just work. This makes object_store behave similarly to the Lambda invoker. | ||
let object_store: Arc<dyn ObjectStore> = if destination.scheme() == "s3" | ||
&& destination.query().is_none() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why isn't it a good idea to rely on default credentials if the destination contains a query part?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The query part is how object_store
configuration typically works - you can pass API keys and other bits of config as URL parameters. I explicitly didn't want to deal with merging config from two completely different config providers, I'm not even sure it's possible to do it in a sane way. The logic is that if you wish to override the config, then you own setting up all of it. I think that's reasonable behavior except I see that I've completely neglected to mention that in the SnapshotsOptions
docs - I'll fix that.
&key, | ||
&self.object_store, | ||
) | ||
.await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens with already uploaded files if we fail at this point? Will this leave an incomplete snapshot behind?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct; my thinking was that we have to deal with pruning the repository separately anyway and I would handle it there - but I can make a best-effort attempt at cleanup on upload. In general, I've altogether skipped hardening the snapshot path with things like retries. I was planning to do that as a follow-up but can certainly address it now if the PR is not getting too big.
None | ||
} | ||
Err(e) => { | ||
bail!("Failed to get latest snapshot pointer: {}", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering whether a warn logging might be good enough. Technically we did complete the snapshot. It just might be unused because latest.json
isn't updated. How would the caller handle an error compared to an Ok(())
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternatively, we might wanna clean up the snapshot because it wouldn't be used because latest.json
hasn't been updated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I probably would rather make a best-effort attempt at cleaning up partially uploaded keys, and return error. As far as response semantics, what I hope to achieve is that the caller has an unambiguous confirmation that a snapshot exists, at a given LSN (I know technically we don't yet return the LSN in CreateSnapshotResponse
but still.) I think this will become important in the future where the cluster controller might want to orchestrate snapshot/restore sequences across nodes.
I have one more follow up to make here - currently we return an error if the LSN is unchanged from the latest archived-LSN snapshot in the repository. That should be a no-op and return success, with the ID of the existing latest snapshot, basically making it an idempotent no-op operation to call repeatedly, even if the log is not moving.
let put_result = self | ||
.object_store | ||
.put_opts(&latest_path, latest_json, conditions) | ||
.await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the contract of the put method wrt to updating the latest.json
because I've left a few comments regarding this. Is your intention that a successfully uploaded snapshot must update the latest.json
and if it fails, then the whole put method failed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there was a concurrent update, shouldn't we retry until we know that there is a newer snapshot?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Up till your comment, I was thinking of the put contract as returning the status of whether the specific create-snapshot request succeeded, or not. But if you zoom out a bit, the caller really only cares that a snapshot exists, at some LSN, in the shared repository.
Partly also, in the current world, we don't expect any contention on latest.json
so this is mainly just a paranoid defensive line against a node experiencing a really long pause between starting a snapshot, and trying to bump the latest pointer - long enough that another processor has become the leader, and taken over snapshotting.
I think a perfectly reasonable fallback here is that, if there is a concurrent update, we just read the latest value and return that to the caller. I'll update the code to behave like that.
debug!("Performing multipart upload for {key}"); | ||
let mut upload = object_store.put_multipart(key).await?; | ||
|
||
let mut buf = BytesMut::new(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you pass in this buffer into this method, then you can reuse it across uploading multiple files and don't have reallocate it for every file again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call! Initially, I was thinking of doing multiple puts in parallel so deliberately did not reuse this.
Subsequently, I changed my thinking around concurrency: I think we should optimise the restore path for maximum throughput as we want a cold Partition Processor to get up to speed ASAP, but on the create snapshot path we should rather optimise for minimal disruption to the ongoing Restate request processing. Let me know if you disagree with the thinking here.
loop { | ||
let mut len = 0; | ||
buf.reserve(MULTIPART_UPLOAD_CHUNK_SIZE_BYTES); | ||
|
||
// Ensure full buffer unless at EOF | ||
while buf.len() < MULTIPART_UPLOAD_CHUNK_SIZE_BYTES { | ||
len = snapshot.read_buf(&mut buf).await?; | ||
if len == 0 { | ||
break; | ||
} | ||
} | ||
|
||
if !buf.is_empty() { | ||
upload | ||
.put_part(PutPayload::from_bytes(buf.split().freeze())) | ||
.await?; | ||
} | ||
|
||
if len == 0 { | ||
break; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic looks sound to me :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great! I had some extra logging to make sure we are definitely not allocating more into the buffer but it's a new API I haven't worked with before 😅
I was answering something slightly different, my apologies! I think this is the relevant answer: #2310 (comment) :-) |
… conditional updates)
The SnapshotRepository retry policy is set to 60s total timeout.
…est S3 etag conditional update support
3a87e07
to
aee1a12
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I really think this is everything covered now 😅
); | ||
snapshot2.min_applied_lsn = snapshot1.min_applied_lsn.next(); | ||
|
||
repository.put(&snapshot2, source_dir).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No words. 🤦♂️
I added coverage and made the LatestSnapshot
struct construction a lot nicer in the process. Thank you for flagging this!
@@ -139,6 +140,7 @@ metrics-exporter-prometheus = { version = "0.15", default-features = false, feat | |||
"async-runtime", | |||
] } | |||
moka = "0.12.5" | |||
object_store = { version = "0.11.1", features = ["aws"] } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we are not officially supporting anything other than S3, it was easier to just not compile the other providers. The file provider is always enabled, it seems.
@@ -23,7 +23,7 @@ arc-swap = { workspace = true } | |||
futures = { workspace = true } | |||
futures-util = { workspace = true } | |||
http = { workspace = true } | |||
pprof = { version = "0.13", features = ["criterion", "flamegraph"] } | |||
pprof = { version = "0.14", features = ["criterion", "flamegraph"] } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unrelated RUSTSEC update.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for fixing this :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work @pcholakov. The changes look good to me. +1 for merging :-)
@@ -23,7 +23,7 @@ arc-swap = { workspace = true } | |||
futures = { workspace = true } | |||
futures-util = { workspace = true } | |||
http = { workspace = true } | |||
pprof = { version = "0.13", features = ["criterion", "flamegraph"] } | |||
pprof = { version = "0.14", features = ["criterion", "flamegraph"] } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for fixing this :-)
This change introduces a SnapshotRepository responsible for uploading snapshots to a remote object store.
Sample usage
Configuration:
Currently only
s3://
andfile://
URLs are supported and work just as expected.Snapshot creation:
Future work:
Closes: #2197