Skip to content

Commit

Permalink
Split snapshot export task from responding to sender
Browse files Browse the repository at this point in the history
  • Loading branch information
pcholakov committed Nov 18, 2024
1 parent 877ea86 commit 693c174
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 77 deletions.
58 changes: 52 additions & 6 deletions crates/core/src/worker_api/partition_processor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
// by the Apache License, Version 2.0.

use std::collections::BTreeMap;

use std::io;
use tokio::sync::{mpsc, oneshot};

use restate_types::{
Expand All @@ -21,7 +21,7 @@ use crate::ShutdownError;

#[derive(Debug)]
pub enum ProcessorsManagerCommand {
CreateSnapshot(PartitionId, oneshot::Sender<anyhow::Result<SnapshotId>>),
CreateSnapshot(PartitionId, oneshot::Sender<SnapshotResult>),
GetState(oneshot::Sender<BTreeMap<PartitionId, PartitionProcessorStatus>>),
}

Expand All @@ -33,12 +33,20 @@ impl ProcessorsManagerHandle {
Self(sender)
}

pub async fn create_snapshot(&self, partition_id: PartitionId) -> anyhow::Result<SnapshotId> {
pub async fn create_snapshot(&self, partition_id: PartitionId) -> SnapshotResult {
let (tx, rx) = oneshot::channel();
self.0
.send(ProcessorsManagerCommand::CreateSnapshot(partition_id, tx))
.await?;
rx.await?
.await
.map_err(|_| {
SnapshotError::Internal(
partition_id,
"Unable to send command to PartitionProcessorManager".to_string(),
)
})?;
rx.await.map_err(|_| {
SnapshotError::Internal(partition_id, "Unable to receive response".to_string())
})?
}

pub async fn get_state(
Expand All @@ -48,7 +56,45 @@ impl ProcessorsManagerHandle {
self.0
.send(ProcessorsManagerCommand::GetState(tx))
.await
.unwrap();
.map_err(|_| ShutdownError)?;
rx.await.map_err(|_| ShutdownError)
}
}

pub type SnapshotResult = Result<SnapshotCreated, SnapshotError>;

#[derive(Debug, Clone, derive_more::Display)]
#[display("{}", snapshot_id)]
pub struct SnapshotCreated {
pub snapshot_id: SnapshotId,
pub partition_id: PartitionId,
}

#[derive(Debug, thiserror::Error)]
pub enum SnapshotError {
#[error("Partition {0} not found")]
PartitionNotFound(PartitionId),
#[error("Snapshot creation already in progress for partition {0}")]
SnapshotInProgress(PartitionId),
#[error("Partition processor state does not allow snapshot export {0}")]
InvalidState(PartitionId),
#[error("Snapshot failed for partition {0}: {1}")]
SnapshotExportError(PartitionId, #[source] anyhow::Error),
#[error("Snapshot failed for partition {0}: {1}")]
SnapshotMetadataHeaderError(PartitionId, #[source] io::Error),
#[error("Internal error creating snapshot for partition {0}: {1}")]
Internal(PartitionId, String),
}

impl SnapshotError {
pub fn partition_id(&self) -> PartitionId {
match self {
SnapshotError::PartitionNotFound(partition_id) => *partition_id,
SnapshotError::SnapshotInProgress(partition_id) => *partition_id,
SnapshotError::InvalidState(partition_id) => *partition_id,
SnapshotError::SnapshotExportError(partition_id, _) => *partition_id,
SnapshotError::SnapshotMetadataHeaderError(partition_id, _) => *partition_id,
SnapshotError::Internal(partition_id, _) => *partition_id,
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,14 @@ impl MessageHandler for PartitionProcessorManagerMessageHandler {
.await;

match create_snapshot_result.as_ref() {
Ok(snapshot_id) => {
Ok(snapshot) => {
debug!(
partition_id = %msg.body().partition_id,
%snapshot_id,
%snapshot,
"Create snapshot successfully completed",
);
msg.to_rpc_response(CreateSnapshotResponse {
result: Ok(*snapshot_id),
result: Ok(snapshot.snapshot_id),
})
}
Err(err) => {
Expand Down
113 changes: 66 additions & 47 deletions crates/worker/src/partition_processor_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ mod processor_state;
mod snapshot_task;
mod spawn_processor_task;

use std::collections::hash_map::Entry;
use std::collections::{BTreeMap, HashMap};
use std::ops::{Add, RangeInclusive};
use std::sync::Arc;
use std::time::Duration;

use anyhow::anyhow;
use futures::stream::StreamExt;
use futures::stream::{FuturesUnordered, StreamExt};
use metrics::gauge;
use tokio::sync::oneshot;
use tokio::sync::{mpsc, watch};
Expand All @@ -41,23 +41,29 @@ use crate::partition_processor_manager::persisted_lsn_watchdog::PersistedLogLsnW
use crate::partition_processor_manager::processor_state::{
LeaderEpochToken, ProcessorState, StartedProcessor,
};
use crate::partition_processor_manager::snapshot_task::SnapshotPartitionTask;
use crate::partition_processor_manager::snapshot_task::{
PendingSnapshotTask, SnapshotPartitionTask,
};
use crate::partition_processor_manager::spawn_processor_task::SpawnPartitionProcessorTask;
use restate_bifrost::Bifrost;
use restate_core::network::{Incoming, MessageRouterBuilder, MessageStream};
use restate_core::worker_api::{ProcessorsManagerCommand, ProcessorsManagerHandle};
use restate_core::worker_api::{
ProcessorsManagerCommand, ProcessorsManagerHandle, SnapshotCreated, SnapshotError,
SnapshotResult,
};
use restate_core::{cancellation_watcher, Metadata, ShutdownError, TaskHandle, TaskKind};
use restate_core::{RuntimeRootTaskHandle, TaskCenter};
use restate_invoker_api::StatusHandle;
use restate_invoker_impl::{BuildError, ChannelStatusReader};
use restate_metadata_store::{MetadataStoreClient, ReadModifyWriteError};
use restate_partition_store::snapshots::PartitionSnapshotMetadata;
use restate_partition_store::PartitionStoreManager;
use restate_types::cluster::cluster_state::ReplayStatus;
use restate_types::cluster::cluster_state::{PartitionProcessorStatus, RunMode};
use restate_types::config::Configuration;
use restate_types::epoch::EpochMetadata;
use restate_types::health::HealthStatus;
use restate_types::identifiers::{LeaderEpoch, PartitionId, PartitionKey, SnapshotId};
use restate_types::identifiers::{LeaderEpoch, PartitionId, PartitionKey};
use restate_types::live::Live;
use restate_types::logs::{Lsn, SequenceNumber};
use restate_types::metadata_store::keys::partition_processor_epoch_key;
Expand Down Expand Up @@ -97,7 +103,9 @@ pub struct PartitionProcessorManager {

asynchronous_operations: JoinSet<AsynchronousEvent>,

pending_snapshot_export_tasks: BTreeMap<PartitionId, TaskHandle<anyhow::Result<SnapshotId>>>,
pending_snapshots: HashMap<PartitionId, PendingSnapshotTask>,
snapshot_export_tasks:
FuturesUnordered<TaskHandle<Result<PartitionSnapshotMetadata, SnapshotError>>>,
}

#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -188,7 +196,8 @@ impl PartitionProcessorManager {
invokers_status_reader: MultiplexedInvokerStatusReader::default(),
pending_control_processors: None,
asynchronous_operations: JoinSet::default(),
pending_snapshot_export_tasks: BTreeMap::default(),
snapshot_export_tasks: FuturesUnordered::default(),
pending_snapshots: HashMap::default(),
}
}

Expand Down Expand Up @@ -262,6 +271,9 @@ impl PartitionProcessorManager {
Some(partition_processor_rpc) = self.incoming_partition_processor_rpc.next() => {
self.on_partition_processor_rpc(partition_processor_rpc);
}
Some(result) = self.snapshot_export_tasks.next() => {
self.on_snapshot_task_result(result.expect("export task ok??"));
}
_ = &mut shutdown => {
self.health_status.update(WorkerStatus::Unknown);
return Ok(());
Expand Down Expand Up @@ -555,12 +567,11 @@ impl PartitionProcessorManager {
}

fn on_command(&mut self, command: ProcessorsManagerCommand) {
use ProcessorsManagerCommand::*;
match command {
CreateSnapshot(partition_id, sender) => {
ProcessorsManagerCommand::CreateSnapshot(partition_id, sender) => {
self.handle_create_snapshot(partition_id, sender);
}
GetState(sender) => {
ProcessorsManagerCommand::GetState(sender) => {
let _ = sender.send(self.get_state());
}
}
Expand All @@ -569,33 +580,27 @@ impl PartitionProcessorManager {
fn handle_create_snapshot(
&mut self,
partition_id: PartitionId,
sender: oneshot::Sender<anyhow::Result<SnapshotId>>,
sender: oneshot::Sender<SnapshotResult>,
) {
let processor_state = match self.processor_states.get(&partition_id) {
Some(state) => state,
None => {
let _ = sender.send(Err(anyhow::anyhow!(
"Partition processor '{}' not found",
partition_id
)));
let _ = sender.send(Err(SnapshotError::PartitionNotFound(partition_id)));
return;
}
};

if !processor_state.can_create_snapshot() {
let _ = sender.send(Err(anyhow::anyhow!(
"Partition processor '{}' is not in a state that allows snapshot creation",
partition_id
)));
let _ = sender.send(Err(SnapshotError::InvalidState(partition_id)));
return;
}

let archived_lsn_sender = match self.archived_lsn_channels.get(&partition_id).cloned() {
Some(watch) => watch.0.clone(),
None => {
let _ = sender.send(Err(anyhow::anyhow!(
"No archived LSNs channel found for partition: {}",
partition_id
let _ = sender.send(Err(SnapshotError::Internal(
partition_id,
"No archived LSNs channel found!".to_string(),
)));
return;
}
Expand Down Expand Up @@ -703,6 +708,30 @@ impl PartitionProcessorManager {
}
}

fn on_snapshot_task_result(
&mut self,
result: Result<PartitionSnapshotMetadata, SnapshotError>,
) {
match result {
Ok(metadata) => {
if let Some(pending) = self.pending_snapshots.remove(&metadata.partition_id) {
let _ = pending.sender.send(Ok(SnapshotCreated {
snapshot_id: metadata.snapshot_id,
partition_id: metadata.partition_id,
}));
}
}
Err(snapshot_error) => {
if let Some(pending) = self
.pending_snapshots
.remove(&snapshot_error.partition_id())
{
let _ = pending.sender.send(Err(snapshot_error));
}
}
}
}

fn trigger_periodic_partition_snapshots(&mut self) {
let Some(records_per_snapshot) = self
.updateable_config
Expand Down Expand Up @@ -753,21 +782,10 @@ impl PartitionProcessorManager {
fn spawn_create_snapshot_task(
&mut self,
partition_id: PartitionId,
result_sender: oneshot::Sender<anyhow::Result<SnapshotId>>,
result_sender: oneshot::Sender<SnapshotResult>,
archived_lsn_sender: watch::Sender<Option<Lsn>>,
) {
debug!("In spawn create snapshot task...");

if self
.pending_snapshot_export_tasks
.get(&partition_id)
.is_some_and(|task| !task.is_finished())
{
warn!(%partition_id, "Snapshot creation already in progress, rejecting request");
result_sender
.send(Err(anyhow!("Snapshot creation already in progress")))
.ok();
} else {
if let Entry::Vacant(entry) = self.pending_snapshots.entry(partition_id) {
let config = self.updateable_config.live_load();

let snapshot_base_path = config.worker.snapshots.snapshots_dir(partition_id);
Expand All @@ -778,11 +796,10 @@ impl PartitionProcessorManager {
partition_id,
partition_store_manager: self.partition_store_manager.clone(),
snapshot_base_path,
result_sender,
archived_lsn_sender,
};
let snapshot_span = tracing::info_span!("create-snapshot");

let snapshot_span = tracing::info_span!("create-snapshot");
let spawn_task_result = restate_core::task_center().spawn_unmanaged(
TaskKind::PartitionSnapshotProducer,
"create-snapshot",
Expand All @@ -793,19 +810,21 @@ impl PartitionProcessorManager {

match spawn_task_result {
Ok(task) => {
_ = self
.pending_snapshot_export_tasks
.insert(partition_id, task);
self.snapshot_export_tasks.push(task);
entry.insert(PendingSnapshotTask {
sender: result_sender,
});
}
Err(err) => {
// todo(pavel): how do we solve the ownership of sender moving to the (now failed) task?
// sender.send(Err(anyhow!("Shutting down"))).ok();
warn!(
"Failed to spawn task but lost track of the sender :-( - {}",
err
);
Err(_) => {
result_sender
.send(Err(SnapshotError::InvalidState(partition_id)))
.ok();
}
}
} else {
result_sender
.send(Err(SnapshotError::SnapshotInProgress(partition_id)))
.ok();
}
}

Expand Down
Loading

0 comments on commit 693c174

Please sign in to comment.