diff --git a/Cargo.lock b/Cargo.lock index 9b68c7441..6d9f95ca9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -69,6 +69,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "aligned-vec" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e0966165eaf052580bd70eb1b32cb3d6245774c0104d1b2793e9650bf83b52a" +dependencies = [ + "equator", +] + [[package]] name = "allocator-api2" version = "0.2.16" @@ -543,9 +552,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "aws-config" -version = "1.5.4" +version = "1.5.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "caf6cfe2881cb1fcbba9ae946fb9a6480d3b7a714ca84c74925014a89ef3387a" +checksum = "9b49afaa341e8dd8577e1a2200468f98956d6eda50bcf4a53246cc00174ba924" dependencies = [ "aws-credential-types", "aws-runtime", @@ -563,7 +572,6 @@ dependencies = [ "fastrand", "hex", "http 0.2.12", - "hyper 0.14.30", "ring", "time", "tokio", @@ -574,9 +582,9 @@ dependencies = [ [[package]] name = "aws-credential-types" -version = "1.2.0" +version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e16838e6c9e12125face1c1eff1343c75e3ff540de98ff7ebd61874a89bcfeb9" +checksum = "60e8f6b615cb5fc60a98132268508ad104310f0cfb25a1c22eee76efdf9154da" dependencies = [ "aws-smithy-async", "aws-smithy-runtime-api", @@ -586,15 +594,16 @@ dependencies = [ [[package]] name = "aws-runtime" -version = "1.3.1" +version = "1.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87c5f920ffd1e0526ec9e70e50bf444db50b204395a0fa7016bbf9e31ea1698f" +checksum = "a10d5c055aa540164d9561a0e2e74ad30f0dcf7393c3a92f6733ddf9c5762468" dependencies = [ "aws-credential-types", "aws-sigv4", "aws-smithy-async", "aws-smithy-eventstream", "aws-smithy-http", + "aws-smithy-runtime", "aws-smithy-runtime-api", "aws-smithy-types", "aws-types", @@ -602,6 +611,7 @@ dependencies = [ "fastrand", "http 0.2.12", "http-body 0.4.6", + "once_cell", "percent-encoding", "pin-project-lite", "tracing", @@ -633,9 +643,9 @@ dependencies = [ [[package]] name = "aws-sdk-sso" -version = "1.35.0" +version = "1.49.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc3ef4ee9cdd19ec6e8b10d963b79637844bbf41c31177b77a188eaa941e69f7" +checksum = "09677244a9da92172c8dc60109b4a9658597d4d298b188dd0018b6a66b410ca4" dependencies = [ "aws-credential-types", "aws-runtime", @@ -655,9 +665,9 @@ dependencies = [ [[package]] name = "aws-sdk-ssooidc" -version = "1.36.0" +version = "1.50.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "527f3da450ea1f09f95155dba6153bd0d83fe0923344a12e1944dfa5d0b32064" +checksum = "81fea2f3a8bb3bd10932ae7ad59cc59f65f270fc9183a7e91f501dc5efbef7ee" dependencies = [ "aws-credential-types", "aws-runtime", @@ -677,9 +687,9 @@ dependencies = [ [[package]] name = "aws-sdk-sts" -version = "1.35.0" +version = "1.50.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94316606a4aa2cb7a302388411b8776b3fbd254e8506e2dc43918286d8212e9b" +checksum = "6ada54e5f26ac246dc79727def52f7f8ed38915cb47781e2a72213957dc3a7d5" dependencies = [ "aws-credential-types", "aws-runtime", @@ -700,9 +710,9 @@ dependencies = [ [[package]] name = "aws-sigv4" -version = "1.2.3" +version = "1.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5df1b0fa6be58efe9d4ccc257df0a53b89cd8909e86591a13ca54817c87517be" +checksum = "5619742a0d8f253be760bfbb8e8e8368c69e3587e4637af5754e488a611499b1" dependencies = [ "aws-credential-types", "aws-smithy-eventstream", @@ -735,9 +745,9 @@ dependencies = [ [[package]] name = "aws-smithy-eventstream" -version = "0.60.4" +version = "0.60.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6363078f927f612b970edf9d1903ef5cef9a64d1e8423525ebb1f0a1633c858" +checksum = "cef7d0a272725f87e51ba2bf89f8c21e4df61b9e49ae1ac367a6d69916ef7c90" dependencies = [ "aws-smithy-types", "bytes", @@ -746,9 +756,9 @@ dependencies = [ [[package]] name = "aws-smithy-http" -version = "0.60.9" +version = "0.60.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9cd0ae3d97daa0a2bf377a4d8e8e1362cae590c4a1aad0d40058ebca18eb91e" +checksum = "5c8bc3e8fdc6b8d07d976e301c02fe553f72a39b7a9fea820e023268467d7ab6" dependencies = [ "aws-smithy-eventstream", "aws-smithy-runtime-api", @@ -786,9 +796,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.6.2" +version = "1.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce87155eba55e11768b8c1afa607f3e864ae82f03caf63258b37455b0ad02537" +checksum = "be28bd063fa91fd871d131fc8b68d7cd4c5fa0869bea68daca50dcb1cbd76be2" dependencies = [ "aws-smithy-async", "aws-smithy-http", @@ -796,22 +806,26 @@ dependencies = [ "aws-smithy-types", "bytes", "fastrand", + "h2 0.3.26", "http 0.2.12", "http-body 0.4.6", "http-body 1.0.1", "httparse", + "hyper 0.14.30", + "hyper-rustls 0.24.2", "once_cell", "pin-project-lite", "pin-utils", + "rustls 0.21.12", "tokio", "tracing", ] [[package]] name = "aws-smithy-runtime-api" -version = "1.7.1" +version = "1.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30819352ed0a04ecf6a2f3477e344d2d1ba33d43e0f09ad9047c12e0d923616f" +checksum = "92165296a47a812b267b4f41032ff8069ab7ff783696d217f0994a0d7ab585cd" dependencies = [ "aws-smithy-async", "aws-smithy-types", @@ -826,9 +840,9 @@ dependencies = [ [[package]] name = "aws-smithy-types" -version = "1.2.0" +version = "1.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cfe321a6b21f5d8eabd0ade9c55d3d0335f3c3157fc2b3e87f05f34b539e4df5" +checksum = "4fbd94a32b3a7d55d3806fe27d98d3ad393050439dd05eb53ece36ec5e3d3510" dependencies = [ "base64-simd", "bytes", @@ -852,9 +866,9 @@ dependencies = [ [[package]] name = "aws-smithy-xml" -version = "0.60.8" +version = "0.60.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d123fbc2a4adc3c301652ba8e149bf4bc1d1725affb9784eb20c953ace06bf55" +checksum = "ab0b0166827aa700d3dc519f72f8b3a91c35d0b8d042dc5d643a91e6f80648fc" dependencies = [ "xmlparser", ] @@ -2667,6 +2681,26 @@ dependencies = [ "syn 2.0.85", ] +[[package]] +name = "equator" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c35da53b5a021d2484a7cc49b2ac7f2d840f8236a286f84202369bd338d761ea" +dependencies = [ + "equator-macro", +] + +[[package]] +name = "equator-macro" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3bf679796c0322556351f287a51b49e48f7c4986e727b5dd78c972d30e2e16cc" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.85", +] + [[package]] name = "equivalent" version = "1.0.1" @@ -3059,6 +3093,25 @@ dependencies = [ "syn 2.0.85", ] +[[package]] +name = "h2" +version = "0.3.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81fe527a889e1532da5c525686d96d4c2e74cdd345badf8dfef9f6b39dd5f5e8" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http 0.2.12", + "indexmap 2.2.6", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "h2" version = "0.4.5" @@ -3266,6 +3319,7 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", + "h2 0.3.26", "http 0.2.12", "http-body 0.4.6", "httparse", @@ -3288,7 +3342,7 @@ dependencies = [ "bytes", "futures-channel", "futures-util", - "h2", + "h2 0.4.5", "http 1.1.0", "http-body 1.0.1", "httparse", @@ -5189,10 +5243,11 @@ checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" [[package]] name = "pprof" -version = "0.13.0" +version = "0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef5c97c51bd34c7e742402e216abdeb44d415fbe6ae41d56b114723e953711cb" +checksum = "ebbe2f8898beba44815fdc9e5a4ae9c929e21c5dc29b0c774a15555f7f58d6d0" dependencies = [ + "aligned-vec", "backtrace", "cfg-if", "criterion", @@ -5811,7 +5866,7 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2", + "h2 0.4.5", "http 1.1.0", "http-body 1.0.1", "http-body-util", @@ -6682,7 +6737,7 @@ dependencies = [ "bytestring", "derive_builder", "futures", - "h2", + "h2 0.4.5", "http 1.1.0", "http-body-util", "http-serde", @@ -7021,6 +7076,9 @@ dependencies = [ "anyhow", "assert2", "async-channel", + "async-trait", + "aws-config", + "aws-credential-types", "bytes", "bytestring", "codederror", @@ -7031,6 +7089,7 @@ dependencies = [ "humantime", "itertools 0.13.0", "metrics", + "object_store", "opentelemetry", "parking_lot", "pin-project", @@ -7058,6 +7117,7 @@ dependencies = [ "restate-types", "restate-wal-protocol", "rstest", + "rust-rocksdb", "schemars", "serde", "serde_json", @@ -7073,6 +7133,7 @@ dependencies = [ "tracing-opentelemetry", "tracing-subscriber", "ulid", + "url", ] [[package]] @@ -8450,7 +8511,7 @@ dependencies = [ "base64 0.22.0", "bytes", "flate2", - "h2", + "h2 0.4.5", "http 1.1.0", "http-body 1.0.1", "http-body-util", diff --git a/Cargo.toml b/Cargo.toml index dfc87b142..41e12c1fa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -81,6 +81,8 @@ assert2 = "0.3.11" async-channel = "2.1.1" async-trait = "0.1.73" axum = { version = "0.7.5", default-features = false } +aws-config = "1.5.10" +aws-credential-types = "1.2.1" base64 = "0.22" bitflags = { version = "2.6.0" } bytes = { version = "1.7", features = ["serde"] } @@ -102,7 +104,6 @@ datafusion = { version = "42.0.0", default-features = false, features = [ "regex_expressions", "unicode_expressions", ] } -object_store = { version = "0.11.1"} datafusion-expr = { version = "42.0.0" } derive_builder = "0.20.0" derive_more = { version = "1", features = ["full"] } @@ -139,6 +140,7 @@ metrics-exporter-prometheus = { version = "0.15", default-features = false, feat "async-runtime", ] } moka = "0.12.5" +object_store = { version = "0.11.1", features = ["aws"] } once_cell = "1.18" opentelemetry = { version = "0.24.0" } opentelemetry-http = { version = "0.13.0" } diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index 4b7936807..6c17a869e 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -23,7 +23,7 @@ arc-swap = { workspace = true } futures = { workspace = true } futures-util = { workspace = true } http = { workspace = true } -pprof = { version = "0.13", features = ["criterion", "flamegraph"] } +pprof = { version = "0.14", features = ["criterion", "flamegraph"] } reqwest = { workspace = true } rlimit = { workspace = true } serde_json = { workspace = true } diff --git a/crates/admin/src/cluster_controller/service.rs b/crates/admin/src/cluster_controller/service.rs index 18123ceab..d9560fd68 100644 --- a/crates/admin/src/cluster_controller/service.rs +++ b/crates/admin/src/cluster_controller/service.rs @@ -449,7 +449,7 @@ where ) -> anyhow::Result { // todo(pavel): make snapshot RPC timeout configurable, especially if this includes remote upload in the future let response = tokio::time::timeout( - Duration::from_secs(30), + Duration::from_secs(90), self.create_snapshot_router.call( &self.network_sender, node_id, diff --git a/crates/core/src/task_center/task_kind.rs b/crates/core/src/task_center/task_kind.rs index 072f94f9b..fe3de49ff 100644 --- a/crates/core/src/task_center/task_kind.rs +++ b/crates/core/src/task_center/task_kind.rs @@ -98,8 +98,7 @@ pub enum TaskKind { /// Kafka ingestion related task Kafka, PartitionProcessor, - /// Longer-running, low-priority tasks that is responsible for the export, and potentially - /// upload to remote storage, of partition store snapshots. + /// Low-priority tasks responsible for partition snapshot-related I/O. #[strum(props(OnCancel = "abort", OnError = "log"))] PartitionSnapshotProducer, #[strum(props(OnError = "log"))] diff --git a/crates/core/src/worker_api/partition_processor_manager.rs b/crates/core/src/worker_api/partition_processor_manager.rs index a1eb2ba9e..10fec1904 100644 --- a/crates/core/src/worker_api/partition_processor_manager.rs +++ b/crates/core/src/worker_api/partition_processor_manager.rs @@ -75,15 +75,21 @@ pub struct SnapshotCreated { pub enum SnapshotError { #[error("Partition {0} not found")] PartitionNotFound(PartitionId), - #[error("Snapshot creation already in progress for partition {0}")] + #[error("Snapshot creation already in progress")] SnapshotInProgress(PartitionId), - #[error("Partition processor state does not allow snapshot export {0}")] + /// Partition Processor is not fully caught up. + #[error("Partition processor state does not permit snapshotting")] InvalidState(PartitionId), - #[error("Snapshot failed for partition {0}: {1}")] - SnapshotExportError(PartitionId, #[source] anyhow::Error), - #[error("Snapshot failed for partition {0}: {1}")] - SnapshotMetadataHeaderError(PartitionId, #[source] io::Error), - #[error("Internal error creating snapshot for partition {0}: {1}")] + #[error("Snapshot destination is not configured")] + RepositoryNotConfigured(PartitionId), + /// Database snapshot export error. + #[error("Snapshot export failed: {1}")] + SnapshotExport(PartitionId, #[source] anyhow::Error), + #[error("Snapshot IO error: {1}")] + SnapshotIo(PartitionId, #[source] io::Error), + #[error("Snapshot repository IO error: {1}")] + RepositoryIo(PartitionId, #[source] anyhow::Error), + #[error("Internal error creating snapshot: {1}")] Internal(PartitionId, String), } @@ -93,8 +99,10 @@ impl SnapshotError { SnapshotError::PartitionNotFound(partition_id) => *partition_id, SnapshotError::SnapshotInProgress(partition_id) => *partition_id, SnapshotError::InvalidState(partition_id) => *partition_id, - SnapshotError::SnapshotExportError(partition_id, _) => *partition_id, - SnapshotError::SnapshotMetadataHeaderError(partition_id, _) => *partition_id, + SnapshotError::RepositoryNotConfigured(partition_id) => *partition_id, + SnapshotError::SnapshotExport(partition_id, _) => *partition_id, + SnapshotError::SnapshotIo(partition_id, _) => *partition_id, + SnapshotError::RepositoryIo(partition_id, _) => *partition_id, SnapshotError::Internal(partition_id, _) => *partition_id, } } diff --git a/crates/partition-store/src/partition_store_manager.rs b/crates/partition-store/src/partition_store_manager.rs index 72f39ab4e..053423ad4 100644 --- a/crates/partition-store/src/partition_store_manager.rs +++ b/crates/partition-store/src/partition_store_manager.rs @@ -212,13 +212,13 @@ impl PartitionStoreManager { // RocksDB will create the snapshot directory but the parent must exist first: tokio::fs::create_dir_all(snapshot_base_path) .await - .map_err(|e| SnapshotError::SnapshotExportError(partition_id, e.into()))?; + .map_err(|e| SnapshotError::SnapshotIo(partition_id, e))?; let snapshot_dir = snapshot_base_path.join(snapshot_id.to_string()); partition_store .create_snapshot(snapshot_dir) .await - .map_err(|e| SnapshotError::SnapshotExportError(partition_id, e.into())) + .map_err(|e| SnapshotError::SnapshotExport(partition_id, e.into())) } pub async fn drop_partition(&self, partition_id: PartitionId) { diff --git a/crates/service-client/Cargo.toml b/crates/service-client/Cargo.toml index ca85eb334..2a2a612f7 100644 --- a/crates/service-client/Cargo.toml +++ b/crates/service-client/Cargo.toml @@ -13,8 +13,8 @@ options_schema = ["dep:schemars", "restate-types/schemars"] [dependencies] arc-swap = { workspace = true } -aws-config = { version = "1.5.4", default-features = false, features = ["rt-tokio", "sso"] } -aws-credential-types = {version = "1.2.0", default-features = false} +aws-config = { workspace = true } +aws-credential-types = { workspace = true } aws-sdk-lambda = {version = "1.36.0", default-features = false, features = ["rt-tokio"]} aws-sdk-sts = {version = "1.35.0", default-features = false, features = ["rt-tokio"]} base64 = { workspace = true } diff --git a/crates/types/protobuf/restate/cluster.proto b/crates/types/protobuf/restate/cluster.proto index 972b26fb1..5dde6bc31 100644 --- a/crates/types/protobuf/restate/cluster.proto +++ b/crates/types/protobuf/restate/cluster.proto @@ -1,4 +1,4 @@ -// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// Copyright (c) 2024 - 2025 Restate Software, Inc., Restate GmbH. // All rights reserved. // // Use of this software is governed by the Business Source License diff --git a/crates/types/protobuf/restate/log_server_common.proto b/crates/types/protobuf/restate/log_server_common.proto index 5679a45f5..e2073f32d 100644 --- a/crates/types/protobuf/restate/log_server_common.proto +++ b/crates/types/protobuf/restate/log_server_common.proto @@ -1,5 +1,5 @@ -// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH. +// Copyright (c) 2024 - 2025 Restate Software, Inc., Restate GmbH. // All rights reserved. // // Use of this software is governed by the Business Source License diff --git a/crates/types/src/config/worker.rs b/crates/types/src/config/worker.rs index 59776fe61..04b88076f 100644 --- a/crates/types/src/config/worker.rs +++ b/crates/types/src/config/worker.rs @@ -8,18 +8,18 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use serde::{Deserialize, Serialize}; -use serde_with::serde_as; use std::num::{NonZeroU16, NonZeroU64, NonZeroUsize}; use std::path::PathBuf; use std::time::Duration; -use tracing::warn; -use restate_serde_util::NonZeroByteCount; +use serde::{Deserialize, Serialize}; +use serde_with::serde_as; +use tracing::warn; use super::{CommonOptions, RocksDbOptions, RocksDbOptionsBuilder}; use crate::identifiers::PartitionId; use crate::retries::RetryPolicy; +use restate_serde_util::NonZeroByteCount; /// # Worker options #[serde_as] @@ -359,6 +359,17 @@ impl Default for StorageOptions { #[serde(rename_all = "kebab-case")] #[builder(default)] pub struct SnapshotsOptions { + /// # Destination + /// + /// Where snapshots are moved after they get created. This property support URLs with either + /// `s3://` or `file://` protocol scheme. The URL is parsed with + /// + /// Example: `s3://snapshots-bucket/restate/cluster` will send snapshots to the specified + /// bucket, prefixing keys with the URL path. + /// + /// Default: `None` + pub destination: Option, + /// # Automatic snapshot creation frequency /// /// Number of log records that trigger a snapshot to be created. diff --git a/crates/worker/Cargo.toml b/crates/worker/Cargo.toml index 214383ac3..89c85ee4d 100644 --- a/crates/worker/Cargo.toml +++ b/crates/worker/Cargo.toml @@ -45,6 +45,9 @@ restate-wal-protocol = { workspace = true } anyhow = { workspace = true } assert2 = { workspace = true } async-channel = { workspace = true } +async-trait = { workspace = true } +aws-config = { workspace = true } +aws-credential-types = { workspace = true } bytes = { workspace = true } bytestring = { workspace = true } codederror = { workspace = true } @@ -54,6 +57,7 @@ futures = { workspace = true } humantime = { workspace = true } itertools = { workspace = true } metrics = { workspace = true } +object_store = { workspace = true } opentelemetry = { workspace = true } parking_lot = { workspace = true } pin-project = { workspace = true } @@ -70,6 +74,7 @@ tokio-util = { workspace = true } tracing = { workspace = true } tracing-opentelemetry = { workspace = true } ulid = { workspace = true } +url = { workspace = true } [dev-dependencies] restate-bifrost = { workspace = true, features = ["test-util"] } @@ -80,10 +85,11 @@ restate-service-protocol = { workspace = true, features = ["test-util"] } restate-storage-api = { workspace = true, features = ["test-util"] } restate-test-util = { workspace = true, features = ["prost"] } restate-types = { workspace = true, features = ["test-util"] } -prost = { workspace = true } -rstest = { workspace = true } googletest = { workspace = true } +prost = { workspace = true } +rocksdb = { workspace = true } +rstest = { workspace = true } tempfile = { workspace = true } test-log = { workspace = true } tracing-subscriber = { workspace = true } diff --git a/crates/worker/src/lib.rs b/crates/worker/src/lib.rs index 06498669a..e4592dbdb 100644 --- a/crates/worker/src/lib.rs +++ b/crates/worker/src/lib.rs @@ -47,6 +47,7 @@ use restate_types::live::Live; use restate_types::protobuf::common::WorkerStatus; use crate::partition::invoker_storage_reader::InvokerStorageReader; +use crate::partition::snapshots::SnapshotRepository; use crate::partition_processor_manager::PartitionProcessorManager; pub use self::error::*; @@ -61,7 +62,7 @@ type PartitionProcessorBuilder = partition::PartitionProcessorBuilder< #[derive(Debug, thiserror::Error, CodedError)] #[error("failed creating worker: {0}")] pub enum BuildError { - Datafusion( + DataFusion( #[from] #[code] restate_storage_query_datafusion::BuildError, @@ -80,6 +81,9 @@ pub enum BuildError { ), #[code(unknown)] Invoker(#[from] restate_invoker_impl::BuildError), + #[error("failed constructing partition snapshot repository: {0}")] + #[code(unknown)] + SnapshotRepository(#[from] anyhow::Error), } #[derive(Debug, thiserror::Error, CodedError)] @@ -136,6 +140,15 @@ impl Worker { ) .await?; + let snapshots_options = &config.worker.snapshots; + if snapshots_options.snapshot_interval_num_records.is_some() + && snapshots_options.destination.is_none() + { + return Err(BuildError::SnapshotRepository(anyhow::anyhow!( + "Periodic snapshot interval set without a specified snapshot destination" + ))); + } + let partition_processor_manager = PartitionProcessorManager::new( health_status, updateable_config.clone(), @@ -143,6 +156,9 @@ impl Worker { partition_store_manager.clone(), router_builder, bifrost, + SnapshotRepository::create_if_configured(snapshots_options) + .await + .map_err(BuildError::SnapshotRepository)?, ); // handle RPCs diff --git a/crates/worker/src/partition/mod.rs b/crates/worker/src/partition/mod.rs index 825b23b19..e9327bb03 100644 --- a/crates/worker/src/partition/mod.rs +++ b/crates/worker/src/partition/mod.rs @@ -74,6 +74,7 @@ mod cleaner; pub mod invoker_storage_reader; mod leadership; pub mod shuffle; +pub mod snapshots; mod state_machine; pub mod types; diff --git a/crates/worker/src/partition/snapshots/mod.rs b/crates/worker/src/partition/snapshots/mod.rs new file mode 100644 index 000000000..e92f2cfc7 --- /dev/null +++ b/crates/worker/src/partition/snapshots/mod.rs @@ -0,0 +1,15 @@ +// Copyright (c) 2024 - 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +mod repository; +mod snapshot_task; + +pub use repository::SnapshotRepository; +pub use snapshot_task::*; diff --git a/crates/worker/src/partition/snapshots/repository.rs b/crates/worker/src/partition/snapshots/repository.rs new file mode 100644 index 000000000..f30b1adb6 --- /dev/null +++ b/crates/worker/src/partition/snapshots/repository.rs @@ -0,0 +1,750 @@ +// Copyright (c) 2024 - 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use std::time::Duration; + +use anyhow::{anyhow, bail, Context}; +use async_trait::async_trait; +use aws_config::default_provider::credentials::DefaultCredentialsChain; +use aws_config::BehaviorVersion; +use aws_credential_types::provider::ProvideCredentials; +use bytes::BytesMut; +use object_store::aws::{AmazonS3Builder, S3ConditionalPut}; +use object_store::{MultipartUpload, ObjectStore, PutMode, PutOptions, PutPayload, UpdateVersion}; +use serde::{Deserialize, Serialize}; +use serde_with::serde_as; +use tokio::io::AsyncReadExt; +use tracing::{debug, info, instrument, warn}; +use url::Url; + +use restate_partition_store::snapshots::{PartitionSnapshotMetadata, SnapshotFormatVersion}; +use restate_types::config::SnapshotsOptions; +use restate_types::identifiers::{PartitionId, SnapshotId}; +use restate_types::logs::Lsn; + +/// Provides read and write access to the long-term partition snapshot storage destination. +/// +/// The repository wraps access to an object store "bucket" that contains snapshot metadata and data +/// optimized for efficient retrieval. The bucket layout is split into two top-level prefixes for +/// snapshot metadata and data respectively. While full snapshot archives contain all relevant +/// metadata, this split layout allows for efficient retrieval of only the metadata upfront. It also +/// enables us to evolve the data storage layout independently in the future. +/// +/// A single top-level `latest.json` file is the only key which is repeatedly overwritten; all other +/// data is immutable until the pruning policy allows for deletion. +/// +/// - `[/]/latest.json` - latest snapshot metadata for the partition +/// - `[/]/{lsn}_{snapshot_id}/metadata.json` - snapshot descriptor +/// - `[/]/{lsn}_{snapshot_id}/*.sst` - data files (explicitly named in `metadata.json`) +#[derive(Clone)] +pub struct SnapshotRepository { + object_store: Arc, + destination: Url, + prefix: String, +} + +#[serde_as] +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +pub struct LatestSnapshot { + pub version: SnapshotFormatVersion, + + pub partition_id: PartitionId, + + /// Restate cluster name which produced the snapshot. + pub cluster_name: String, + + /// Node that produced this snapshot. + pub node_name: String, + + /// Local node time when the snapshot was created. + #[serde(with = "serde_with::As::")] + pub created_at: humantime::Timestamp, + + /// Unique snapshot id. + pub snapshot_id: SnapshotId, + + /// The minimum LSN guaranteed to be applied in this snapshot. The actual + /// LSN may be >= [minimum_lsn]. + pub min_applied_lsn: Lsn, + + /// The relative path within the snapshot repository where the snapshot data is stored. + pub path: String, +} + +impl LatestSnapshot { + pub fn from_snapshot(snapshot: &PartitionSnapshotMetadata, path: String) -> Self { + LatestSnapshot { + version: snapshot.version, + cluster_name: snapshot.cluster_name.clone(), + node_name: snapshot.node_name.clone(), + partition_id: snapshot.partition_id, + snapshot_id: snapshot.snapshot_id, + created_at: snapshot.created_at.clone(), + min_applied_lsn: snapshot.min_applied_lsn, + path, + } + } +} + +impl SnapshotRepository { + /// Creates an instance of the repository if a snapshots destination is configured. + pub async fn create_if_configured( + snapshots_options: &SnapshotsOptions, + ) -> anyhow::Result> { + let mut destination = if let Some(ref destination) = snapshots_options.destination { + Url::parse(destination).context("Failed parsing snapshot repository URL")? + } else { + return Ok(None); + }; + // Prevent passing configuration options to object_store via the destination URL. + destination + .query() + .inspect(|params| info!("Snapshot destination parameters ignored: {params}")); + destination.set_query(None); + + let object_store = create_object_store_client(destination.clone()).await?; + + // The prefix must be stripped of any leading slash and, unless it is empty, must end in a + // single "/" character. + let prefix: String = destination.path().into(); + let prefix = match prefix.as_str() { + "" | "/" => "".to_string(), + prefix => format!("{}/", prefix.trim_start_matches('/').trim_end_matches('/')), + }; + + Ok(Some(SnapshotRepository { + object_store, + destination, + prefix, + })) + } + + /// Write a partition snapshot to the snapshot repository. + #[instrument( + level = "debug", + skip_all, + err, + fields( + snapshot_id = ?snapshot.snapshot_id, + partition_id = ?snapshot.partition_id, + ) + )] + pub(crate) async fn put( + &self, + snapshot: &PartitionSnapshotMetadata, + local_snapshot_path: PathBuf, + ) -> anyhow::Result<()> { + debug!("Publishing partition snapshot to: {}", self.destination); + + let put_result = self + .put_snapshot_inner(snapshot, local_snapshot_path.as_path()) + .await; + + // We only log the error here since (a) it's relatively unlikely for rmdir to fail, and (b) + // if we've uploaded the snapshot, we should get the response back to the caller. Logging at + // WARN level as repeated failures could compromise the cluster. + let _ = tokio::fs::remove_dir_all(local_snapshot_path.as_path()) + .await + .inspect_err(|e| warn!("Failed to delete local snapshot files: {}", e)); + + match put_result { + Ok(_) => Ok(()), + Err(put_error) => { + for filename in put_error.uploaded_files { + let path = object_store::path::Path::from(format!( + "{}{}", + put_error.full_snapshot_path, filename + )); + + // We disregard errors at this point; the snapshot repository pruning mechanism + // should catch these eventually. + let _ = self.object_store.delete(&path).await.inspect_err(|e| { + info!( + "Failed to delete file from partially uploaded snapshot: {}", + e + ) + }); + } + Err(put_error.error) + } + } + } + + // It is the outer put method's responsibility to clean up partial progress. + async fn put_snapshot_inner( + &self, + snapshot: &PartitionSnapshotMetadata, + local_snapshot_path: &Path, + ) -> Result<(), PutSnapshotError> { + // A unique snapshot path within the partition prefix. We pad the LSN to ensure correct + // lexicographic sorting. + let snapshot_prefix = Self::get_snapshot_prefix(snapshot); + let full_snapshot_path = format!( + "{prefix}{partition_id}/{snapshot_prefix}", + prefix = self.prefix, + partition_id = snapshot.partition_id, + ); + + debug!( + "Uploading snapshot from {:?} to {}", + local_snapshot_path, full_snapshot_path + ); + + let mut progress = SnapshotUploadProgress::with_snapshot_path(full_snapshot_path.clone()); + let mut buf = BytesMut::new(); + for file in &snapshot.files { + let filename = file.name.trim_start_matches("/"); + let key = object_store::path::Path::from(format!( + "{}/{}", + full_snapshot_path.as_str(), + filename + )); + + let put_result = put_snapshot_object( + local_snapshot_path.join(filename).as_path(), + &key, + &self.object_store, + &mut buf, + ) + .await + .map_err(|e| PutSnapshotError::from(e, progress.clone()))?; + + debug!( + etag = put_result.e_tag.unwrap_or_default(), + ?key, + "Put snapshot data file completed", + ); + progress.push(file.name.clone()); + } + + let metadata_key = object_store::path::Path::from(format!( + "{}/metadata.json", + full_snapshot_path.as_str() + )); + let metadata_json_payload = PutPayload::from( + serde_json::to_string_pretty(snapshot).expect("Can always serialize JSON"), + ); + + let put_result = self + .object_store + .put(&metadata_key, metadata_json_payload) + .await + .map_err(|e| PutSnapshotError::from(e, progress.clone()))?; + progress.push("/metadata.json".to_owned()); + + debug!( + etag = put_result.e_tag.unwrap_or_default(), + key = ?metadata_key, + "Successfully published snapshot metadata", + ); + + let latest_path = object_store::path::Path::from(format!( + "{prefix}{partition_id}/latest.json", + prefix = self.prefix, + partition_id = snapshot.partition_id, + )); + + // By performing a CAS on the latest snapshot pointer, we can ensure strictly monotonic updates. + let maybe_stored = self + .get_latest_snapshot_metadata_for_update(snapshot, &latest_path) + .await + .map_err(|e| PutSnapshotError::from(e, progress.clone()))?; + if maybe_stored.as_ref().is_some_and(|(latest_stored, _)| { + latest_stored.min_applied_lsn >= snapshot.min_applied_lsn + }) { + let repository_latest_lsn = maybe_stored.expect("is some").0.min_applied_lsn; + info!( + ?repository_latest_lsn, + new_snapshot_lsn = ?snapshot.min_applied_lsn, + "The newly uploaded snapshot is no newer than the already-stored latest snapshot, will not update latest pointer" + ); + return Ok(()); + } + + let latest = LatestSnapshot::from_snapshot(snapshot, snapshot_prefix); + let latest = PutPayload::from( + serde_json::to_string_pretty(&latest) + .map_err(|e| PutSnapshotError::from(e, progress.clone()))?, + ); + + // The object_store file provider supports create-if-not-exists but not update-version on put + let use_conditional_update = !matches!(self.destination.scheme(), "file"); + let conditions = maybe_stored + .map(|(_, version)| PutOptions { + mode: match use_conditional_update { + true => PutMode::Update(version), + false => PutMode::Overwrite, + }, + ..PutOptions::default() + }) + .unwrap_or_else(|| PutOptions::from(PutMode::Create)); + + // Note: this call may return an error on concurrent modification. Since we don't expect any + // contention (here), and this doesn't cause any correctness issues, we don't bother with + // retrying the entire put_snapshot attempt on object_store::Error::Precondition. + let put_result = self + .object_store + .put_opts(&latest_path, latest, conditions) + .await + .map_err(|e| PutSnapshotError::from(e, progress.clone()))?; + + debug!( + etag = put_result.e_tag.unwrap_or_default(), + key = ?latest_path, + "Successfully updated latest snapshot pointer", + ); + + Ok(()) + } + + fn get_snapshot_prefix(snapshot: &PartitionSnapshotMetadata) -> String { + format!( + "lsn_{lsn:020}-{snapshot_id}", + lsn = snapshot.min_applied_lsn, + snapshot_id = snapshot.snapshot_id + ) + } + + async fn get_latest_snapshot_metadata_for_update( + &self, + snapshot: &PartitionSnapshotMetadata, + path: &object_store::path::Path, + ) -> anyhow::Result> { + match self.object_store.get(path).await { + Ok(result) => { + let version = UpdateVersion { + e_tag: result.meta.e_tag.clone(), + version: result.meta.version.clone(), + }; + let latest: LatestSnapshot = serde_json::from_slice( + result.bytes().await?.iter().as_slice(), + ) + .inspect_err(|e| { + debug!( + repository_latest_lsn = "unknown", + new_snapshot_lsn = ?snapshot.min_applied_lsn, + "Failed to parse stored latest snapshot pointer, refusing to overwrite: {}", + e + ) + }) + .map_err(|e| anyhow!("Failed to parse latest snapshot metadata: {}", e))?; + if snapshot.cluster_name != latest.cluster_name { + // This indicates a serious misconfiguration and we should complain loudly + bail!("Snapshot does not match the cluster name of latest snapshot at destination!"); + } + Ok(Some((latest, version))) + } + Err(object_store::Error::NotFound { .. }) => { + debug!( + repository_latest_lsn = "none", + new_snapshot_lsn = ?snapshot.min_applied_lsn, + "No latest snapshot pointer found, will create one" + ); + Ok(None) + } + Err(e) => { + bail!("Failed to get latest snapshot pointer: {}", e); + } + } + } +} + +async fn create_object_store_client(destination: Url) -> anyhow::Result> { + // We use the AWS SDK configuration and credentials provider so that the conventional AWS + // environment variables and config files work as expected. The object_store crate has its + // own configuration mechanism which doesn't support many of the AWS conventions. This + // differs quite a lot from the Lambda invoker which uses the AWS SDK, and that would be a + // very surprising inconsistency for customers. This mechanism allows us to infer the region + // and securely obtain session credentials without any hard-coded configuration. + let object_store: Arc = if destination.scheme() == "s3" { + debug!("Using AWS SDK credentials provider"); + let aws_region = aws_config::load_defaults(BehaviorVersion::v2024_03_28()) + .await + .region() + .context("Unable to determine AWS region to use with S3")? + .clone(); + + let store = AmazonS3Builder::new() + .with_url(destination) + .with_region(aws_region.to_string()) + .with_conditional_put(S3ConditionalPut::ETagMatch) + .with_credentials(Arc::new(AwsSdkCredentialsProvider { + credentials_provider: DefaultCredentialsChain::builder().build().await, + })) + .with_retry(object_store::RetryConfig { + max_retries: 8, + retry_timeout: Duration::from_secs(60), + backoff: object_store::BackoffConfig { + init_backoff: Duration::from_millis(100), + max_backoff: Duration::from_secs(5), + base: 2., + }, + }) + .build()?; + + Arc::new(store) + } else { + object_store::parse_url(&destination)?.0.into() + }; + Ok(object_store) +} + +#[derive(Clone, Debug)] +struct SnapshotUploadProgress { + pub full_snapshot_path: String, + pub uploaded_files: Vec, +} + +impl SnapshotUploadProgress { + fn with_snapshot_path(full_snapshot_path: String) -> Self { + SnapshotUploadProgress { + full_snapshot_path, + uploaded_files: vec![], + } + } + + fn push(&mut self, filename: String) { + self.uploaded_files.push(filename); + } +} + +struct PutSnapshotError { + pub full_snapshot_path: String, + pub uploaded_files: Vec, + pub error: anyhow::Error, +} + +impl PutSnapshotError { + fn from(error: E, progress: SnapshotUploadProgress) -> Self + where + E: Into, + { + PutSnapshotError { + error: error.into(), + full_snapshot_path: progress.full_snapshot_path, + uploaded_files: progress.uploaded_files, + } + } +} + +/// S3 and other stores require a certain minimum size for the parts of a multipart upload. It is an +/// API error to attempt a multipart put below this size, apart from the final segment. +const MULTIPART_UPLOAD_CHUNK_SIZE_BYTES: usize = 5 * 1024 * 1024; + +// The object_store `put_multipart` method does not currently support PutMode, so we don't pass this +// at all; however since we upload snapshots to a unique path on every attempt, we don't expect any +// conflicts to arise. +async fn put_snapshot_object( + file_path: &Path, + key: &object_store::path::Path, + object_store: &Arc, + buf: &mut BytesMut, +) -> anyhow::Result { + debug!(path = ?file_path, "Putting snapshot object from local file"); + let mut snapshot = tokio::fs::File::open(file_path).await?; + + if snapshot.metadata().await?.len() < MULTIPART_UPLOAD_CHUNK_SIZE_BYTES as u64 { + let payload = PutPayload::from(tokio::fs::read(file_path).await?); + return object_store.put(key, payload).await.map_err(|e| e.into()); + } + + debug!("Performing multipart upload for {key}"); + let mut upload = object_store.put_multipart(key).await?; + + let result: anyhow::Result<_> = async { + loop { + let mut len = 0; + buf.reserve(MULTIPART_UPLOAD_CHUNK_SIZE_BYTES); + + // Ensure full buffer unless at EOF + while buf.len() < MULTIPART_UPLOAD_CHUNK_SIZE_BYTES { + len = snapshot.read_buf(buf).await?; + if len == 0 { + break; + } + } + + if !buf.is_empty() { + upload + .put_part(PutPayload::from_bytes(buf.split().freeze())) + .await?; + } + + if len == 0 { + break; + } + } + upload.complete().await.map_err(|e| anyhow!(e)) + } + .await; + + match result { + Ok(r) => Ok(r), + Err(e) => { + debug!("Aborting failed multipart upload"); + upload.abort().await?; + Err(e) + } + } +} + +#[derive(Debug)] +struct AwsSdkCredentialsProvider { + credentials_provider: DefaultCredentialsChain, +} + +#[async_trait] +impl object_store::CredentialProvider for AwsSdkCredentialsProvider { + type Credential = object_store::aws::AwsCredential; + + async fn get_credential(&self) -> object_store::Result> { + let creds = self + .credentials_provider + .provide_credentials() + .await + .map_err(|e| { + // object_store's error detail rendering is not great but aws_config logs the + // detailed underlying cause at WARN level so we don't need to do it again here + object_store::Error::Unauthenticated { + path: "".to_string(), + source: e.into(), + } + })?; + + Ok(Arc::new(object_store::aws::AwsCredential { + key_id: creds.access_key_id().to_string(), + secret_key: creds.secret_access_key().to_string(), + token: creds.session_token().map(|t| t.to_string()), + })) + } +} + +#[cfg(test)] +mod tests { + use bytes::Bytes; + use object_store::path::Path; + use object_store::ObjectStore; + use std::time::SystemTime; + use tempfile::TempDir; + use tokio::io::AsyncWriteExt; + use tracing::info; + use tracing_subscriber::layer::SubscriberExt; + use tracing_subscriber::util::SubscriberInitExt; + use tracing_subscriber::{fmt, EnvFilter}; + use url::Url; + + use super::{LatestSnapshot, SnapshotRepository}; + use restate_partition_store::snapshots::{PartitionSnapshotMetadata, SnapshotFormatVersion}; + use restate_types::config::SnapshotsOptions; + use restate_types::identifiers::{PartitionId, PartitionKey, SnapshotId}; + use restate_types::logs::{Lsn, SequenceNumber}; + + #[tokio::test] + async fn test_overwrite_unparsable_latest() -> anyhow::Result<()> { + tracing_subscriber::registry() + .with(fmt::layer()) + .with(EnvFilter::from_default_env()) + .init(); + + let snapshot_source = TempDir::new()?; + let source_dir = snapshot_source.path().to_path_buf(); + + let mut data = tokio::fs::File::create(source_dir.join("data.sst")).await?; + data.write_all(b"snapshot-data").await?; + + let snapshot = mock_snapshot_metadata( + "/data.sst".to_owned(), + source_dir.to_string_lossy().to_string(), + ); + + let snapshots_destination: TempDir = TempDir::new()?; + let destination_dir = snapshots_destination.path().to_owned(); + let opts = SnapshotsOptions { + destination: Some( + Url::from_file_path(snapshots_destination.path()) + .unwrap() + .to_string(), + ), + ..SnapshotsOptions::default() + }; + let repository = SnapshotRepository::create_if_configured(&opts) + .await? + .unwrap(); + + // Write invalid JSON to latest.json + let latest_path = destination_dir.join(format!("{}/latest.json", PartitionId::MIN)); + tokio::fs::create_dir_all(latest_path.parent().unwrap()).await?; + info!("Creating file: {:?}", latest_path); + let mut latest = tokio::fs::File::create(&latest_path).await?; + latest.write_all(b"not valid json").await?; + + assert!(repository.put(&snapshot, source_dir).await.is_err()); + + Ok(()) + } + + #[tokio::test] + async fn test_put_snapshot_local_filesystem() -> anyhow::Result<()> { + let snapshots_destination = TempDir::new()?; + test_put_snapshot( + Url::from_file_path(snapshots_destination.path()) + .unwrap() + .to_string(), + ) + .await + } + + /// For this test to run, set RESTATE_S3_INTEGRATION_TEST_BUCKET_NAME to a writable S3 bucket name + #[tokio::test] + async fn test_put_snapshot_s3() -> anyhow::Result<()> { + let Ok(bucket_name) = std::env::var("RESTATE_S3_INTEGRATION_TEST_BUCKET_NAME") else { + return Ok(()); + }; + test_put_snapshot(format!("s3://{bucket_name}/integration-test")).await + } + + async fn test_put_snapshot(destination: String) -> anyhow::Result<()> { + tracing_subscriber::registry() + .with(fmt::layer()) + .with(EnvFilter::from_default_env()) + .init(); + + let snapshot_source = TempDir::new()?; + let source_dir = snapshot_source.path().to_path_buf(); + + let destination_url = Url::parse(destination.as_str())?; + let path = destination_url.path().to_string(); + let object_store = super::create_object_store_client(destination_url).await?; + + let latest = object_store + .get(&Path::from(format!( + "{}/{}/latest.json", + path, + PartitionId::MIN, + ))) + .await; + assert!(matches!(latest, Err(object_store::Error::NotFound { .. }))); + + let mut data = tokio::fs::File::create(source_dir.join("data.sst")).await?; + data.write_all(b"snapshot-data").await?; + + let mut snapshot1 = mock_snapshot_metadata( + "/data.sst".to_owned(), + source_dir.to_string_lossy().to_string(), + ); + snapshot1.min_applied_lsn = Lsn::new( + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH)? + .as_millis() as u64, + ); + + let opts = SnapshotsOptions { + destination: Some(destination), + ..SnapshotsOptions::default() + }; + + let repository = SnapshotRepository::create_if_configured(&opts) + .await? + .unwrap(); + + repository.put(&snapshot1, source_dir.clone()).await?; + + let snapshot_prefix = SnapshotRepository::get_snapshot_prefix(&snapshot1); + let data = object_store + .get(&Path::from(format!( + "{}/{}/{}/data.sst", + path, snapshot1.partition_id, snapshot_prefix, + ))) + .await?; + assert_eq!(data.bytes().await?, Bytes::from_static(b"snapshot-data")); + + let metadata = object_store + .get(&Path::from(format!( + "{}/{}/{}/metadata.json", + path, snapshot1.partition_id, snapshot_prefix, + ))) + .await?; + let metadata: PartitionSnapshotMetadata = serde_json::from_slice(&metadata.bytes().await?)?; + assert_eq!(snapshot1.snapshot_id, metadata.snapshot_id); + + let latest = object_store + .get(&Path::from(format!( + "{}/{}/latest.json", + path, snapshot1.partition_id, + ))) + .await?; + let latest: LatestSnapshot = serde_json::from_slice(&latest.bytes().await?)?; + assert_eq!( + LatestSnapshot::from_snapshot(&snapshot1, snapshot_prefix), + latest + ); + + let snapshot_source = TempDir::new()?; + let source_dir = snapshot_source.path().to_path_buf(); + + let mut data = tokio::fs::File::create(source_dir.join("data.sst")).await?; + data.write_all(b"snapshot-data").await?; + + let mut snapshot2 = mock_snapshot_metadata( + "/data.sst".to_owned(), + source_dir.to_string_lossy().to_string(), + ); + snapshot2.min_applied_lsn = snapshot1.min_applied_lsn.next(); + + repository.put(&snapshot2, source_dir).await?; + + let latest = object_store + .get(&Path::from(format!( + "{}/{}/latest.json", + path, snapshot2.partition_id, + ))) + .await?; + let latest: LatestSnapshot = serde_json::from_slice(&latest.bytes().await?)?; + assert_eq!( + LatestSnapshot::from_snapshot( + &snapshot2, + SnapshotRepository::get_snapshot_prefix(&snapshot2) + ), + latest + ); + + Ok(()) + } + + fn mock_snapshot_metadata(file_name: String, directory: String) -> PartitionSnapshotMetadata { + PartitionSnapshotMetadata { + version: SnapshotFormatVersion::V1, + cluster_name: "test-cluster".to_string(), + node_name: "node".to_string(), + partition_id: PartitionId::MIN, + created_at: humantime::Timestamp::from(SystemTime::now()), + snapshot_id: SnapshotId::new(), + key_range: PartitionKey::MIN..=PartitionKey::MAX, + min_applied_lsn: Lsn::new(1), + db_comparator_name: "leveldb.BytewiseComparator".to_string(), + // this is totally bogus, but it doesn't matter since we won't be importing it into RocksDB + files: vec![rocksdb::LiveFile { + column_family_name: "data-0".to_owned(), + name: file_name, + directory, + size: 0, + level: 0, + start_key: Some(vec![0]), + end_key: Some(vec![0xff, 0xff]), + num_entries: 0, + num_deletions: 0, + smallest_seqno: 0, + largest_seqno: 0, + }], + } + } +} diff --git a/crates/worker/src/partition_processor_manager/snapshot_task.rs b/crates/worker/src/partition/snapshots/snapshot_task.rs similarity index 71% rename from crates/worker/src/partition_processor_manager/snapshot_task.rs rename to crates/worker/src/partition/snapshots/snapshot_task.rs index e41217a01..3546dd2e1 100644 --- a/crates/worker/src/partition_processor_manager/snapshot_task.rs +++ b/crates/worker/src/partition/snapshots/snapshot_task.rs @@ -20,6 +20,8 @@ use restate_partition_store::snapshots::{ use restate_partition_store::PartitionStoreManager; use restate_types::identifiers::{PartitionId, SnapshotId}; +use crate::partition::snapshots::SnapshotRepository; + /// Creates a partition store snapshot along with Restate snapshot metadata. pub struct SnapshotPartitionTask { pub snapshot_id: SnapshotId, @@ -28,6 +30,7 @@ pub struct SnapshotPartitionTask { pub partition_store_manager: PartitionStoreManager, pub cluster_name: String, pub node_name: String, + pub snapshot_repository: SnapshotRepository, } impl SnapshotPartitionTask { @@ -59,43 +62,32 @@ impl SnapshotPartitionTask { ) .await?; - let metadata = self.write_snapshot_metadata_header(snapshot).await?; + let metadata = self.metadata(&snapshot, SystemTime::now()); - // todo(pavel): SnapshotRepository integration will go in here in a future PR + self.snapshot_repository + .put(&metadata, snapshot.base_dir) + .await + .map_err(|e| SnapshotError::RepositoryIo(self.partition_id, e))?; Ok(metadata) } - async fn write_snapshot_metadata_header( + fn metadata( &self, - snapshot: LocalPartitionSnapshot, - ) -> Result { - let snapshot_meta = PartitionSnapshotMetadata { + snapshot: &LocalPartitionSnapshot, + created_at: SystemTime, + ) -> PartitionSnapshotMetadata { + PartitionSnapshotMetadata { version: SnapshotFormatVersion::V1, cluster_name: self.cluster_name.clone(), node_name: self.node_name.clone(), partition_id: self.partition_id, - created_at: humantime::Timestamp::from(SystemTime::now()), + created_at: humantime::Timestamp::from(created_at), snapshot_id: self.snapshot_id, - key_range: snapshot.key_range, + key_range: snapshot.key_range.clone(), min_applied_lsn: snapshot.min_applied_lsn, db_comparator_name: snapshot.db_comparator_name.clone(), files: snapshot.files.clone(), - }; - let metadata_json = - serde_json::to_string_pretty(&snapshot_meta).expect("Can always serialize JSON"); - - let metadata_path = snapshot.base_dir.join("metadata.json"); - tokio::fs::write(metadata_path.clone(), metadata_json) - .await - .map_err(|e| SnapshotError::SnapshotMetadataHeaderError(self.partition_id, e))?; - - debug!( - lsn = %snapshot.min_applied_lsn, - "Partition snapshot metadata written to {:?}", - metadata_path - ); - - Ok(snapshot_meta) + } } } diff --git a/crates/worker/src/partition_processor_manager.rs b/crates/worker/src/partition_processor_manager.rs index a67598d07..7027b1f9e 100644 --- a/crates/worker/src/partition_processor_manager.rs +++ b/crates/worker/src/partition_processor_manager.rs @@ -11,9 +11,9 @@ mod message_handler; mod persisted_lsn_watchdog; mod processor_state; -mod snapshot_task; mod spawn_processor_task; +use restate_types::identifiers::SnapshotId; use std::collections::hash_map::Entry; use std::collections::{BTreeMap, HashMap}; use std::ops::{Add, RangeInclusive}; @@ -49,7 +49,7 @@ use restate_types::cluster::cluster_state::{PartitionProcessorStatus, RunMode}; use restate_types::config::Configuration; use restate_types::epoch::EpochMetadata; use restate_types::health::HealthStatus; -use restate_types::identifiers::{LeaderEpoch, PartitionId, PartitionKey, SnapshotId}; +use restate_types::identifiers::{LeaderEpoch, PartitionId, PartitionKey}; use restate_types::live::Live; use restate_types::logs::{Lsn, SequenceNumber}; use restate_types::metadata_store::keys::partition_processor_epoch_key; @@ -72,12 +72,12 @@ use crate::metric_definitions::PARTITION_LAST_APPLIED_LOG_LSN; use crate::metric_definitions::PARTITION_LAST_PERSISTED_LOG_LSN; use crate::metric_definitions::PARTITION_TIME_SINCE_LAST_RECORD; use crate::metric_definitions::PARTITION_TIME_SINCE_LAST_STATUS_UPDATE; +use crate::partition::snapshots::{SnapshotPartitionTask, SnapshotRepository}; use crate::partition_processor_manager::message_handler::PartitionProcessorManagerMessageHandler; use crate::partition_processor_manager::persisted_lsn_watchdog::PersistedLogLsnWatchdog; use crate::partition_processor_manager::processor_state::{ LeaderEpochToken, ProcessorState, StartedProcessor, }; -use crate::partition_processor_manager::snapshot_task::SnapshotPartitionTask; use crate::partition_processor_manager::spawn_processor_task::SpawnPartitionProcessorTask; pub struct PartitionProcessorManager { @@ -103,6 +103,7 @@ pub struct PartitionProcessorManager { pending_snapshots: HashMap, snapshot_export_tasks: FuturesUnordered>, + snapshot_repository: Option, } struct PendingSnapshotTask { @@ -174,6 +175,7 @@ impl PartitionProcessorManager { partition_store_manager: PartitionStoreManager, router_builder: &mut MessageRouterBuilder, bifrost: Bifrost, + snapshot_repository: Option, ) -> Self { let incoming_update_processors = router_builder.subscribe_to_stream(2); let incoming_partition_processor_rpc = router_builder.subscribe_to_stream(128); @@ -198,6 +200,7 @@ impl PartitionProcessorManager { asynchronous_operations: JoinSet::default(), snapshot_export_tasks: FuturesUnordered::default(), pending_snapshots: HashMap::default(), + snapshot_repository, } } @@ -699,15 +702,24 @@ impl PartitionProcessorManager { } }; + let snapshot_repository = self.snapshot_repository.clone(); + let Some(snapshot_repository) = snapshot_repository else { + let _ = sender.send(Err(SnapshotError::RepositoryNotConfigured(partition_id))); + return; + }; + if !processor_state.should_publish_snapshots() { let _ = sender.send(Err(SnapshotError::InvalidState(partition_id))); return; } - self.spawn_create_snapshot_task(partition_id, Some(sender)); + self.spawn_create_snapshot_task(partition_id, snapshot_repository, Some(sender)); } - fn on_create_snapshot_task_completed(&mut self, result: SnapshotResultInternal) { + fn on_create_snapshot_task_completed( + &mut self, + result: Result, + ) { let (partition_id, response) = match result { Ok(metadata) => { self.archived_lsns @@ -773,7 +785,11 @@ impl PartitionProcessorManager { last_applied_lsn = %status.last_applied_log_lsn.unwrap_or(SequenceNumber::INVALID), "Requesting partition snapshot", ); - self.spawn_create_snapshot_task(partition_id, None); + self.spawn_create_snapshot_task( + partition_id, + self.snapshot_repository.clone().expect("is some"), // validated on startup + None, + ); } } @@ -782,6 +798,7 @@ impl PartitionProcessorManager { fn spawn_create_snapshot_task( &mut self, partition_id: PartitionId, + snapshot_repository: SnapshotRepository, sender: Option>, ) { match self.pending_snapshots.entry(partition_id) { @@ -798,6 +815,7 @@ impl PartitionProcessorManager { partition_store_manager: self.partition_store_manager.clone(), cluster_name: config.common.cluster_name().into(), node_name: config.common.node_name().into(), + snapshot_repository, }; let spawn_task_result = TaskCenter::spawn_unmanaged( @@ -973,6 +991,7 @@ mod tests { partition_store_manager, &mut env_builder.router_builder, bifrost, + None, ); let env = env_builder.build().await; diff --git a/crates/worker/src/partition_processor_manager/message_handler.rs b/crates/worker/src/partition_processor_manager/message_handler.rs index 394135b29..4a6d075f2 100644 --- a/crates/worker/src/partition_processor_manager/message_handler.rs +++ b/crates/worker/src/partition_processor_manager/message_handler.rs @@ -14,7 +14,7 @@ use restate_core::{TaskCenter, TaskKind}; use restate_types::net::partition_processor_manager::{ CreateSnapshotRequest, CreateSnapshotResponse, SnapshotError, }; -use tracing::{debug, warn}; +use tracing::warn; /// RPC message handler for Partition Processor management operations. pub struct PartitionProcessorManagerMessageHandler { @@ -45,26 +45,12 @@ impl MessageHandler for PartitionProcessorManagerMessageHandler { .await; match create_snapshot_result.as_ref() { - Ok(snapshot) => { - debug!( - partition_id = %msg.body().partition_id, - %snapshot, - "Create snapshot successfully completed", - ); - msg.to_rpc_response(CreateSnapshotResponse { - result: Ok(snapshot.snapshot_id), - }) - } - Err(err) => { - warn!( - partition_id = %msg.body().partition_id, - "Create snapshot failed: {}", - err - ); - msg.to_rpc_response(CreateSnapshotResponse { - result: Err(SnapshotError::SnapshotCreationFailed(err.to_string())), - }) - } + Ok(snapshot) => msg.to_rpc_response(CreateSnapshotResponse { + result: Ok(snapshot.snapshot_id), + }), + Err(err) => msg.to_rpc_response(CreateSnapshotResponse { + result: Err(SnapshotError::SnapshotCreationFailed(err.to_string())), + }), } .send() .await diff --git a/deny.toml b/deny.toml index 3d2515f0b..501ec3559 100644 --- a/deny.toml +++ b/deny.toml @@ -170,6 +170,7 @@ unknown-git = "deny" allow-registry = ["https://github.com/rust-lang/crates.io-index"] allow-git = [ "https://github.com/restatedev/rust-rocksdb.git", + "https://github.com/apache/arrow-rs.git", ] [sources.allow-org]