Skip to content

Commit

Permalink
feat(meta): support reload runtime info of single database (#19597)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Dec 2, 2024
1 parent e2a140e commit 3960994
Show file tree
Hide file tree
Showing 8 changed files with 339 additions and 112 deletions.
10 changes: 9 additions & 1 deletion src/meta/src/barrier/context/context_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ use crate::barrier::context::{GlobalBarrierWorkerContext, GlobalBarrierWorkerCon
use crate::barrier::progress::TrackingJob;
use crate::barrier::{
BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot, Command, CreateStreamingJobCommandInfo,
CreateStreamingJobType, RecoveryReason, ReplaceStreamJobPlan, Scheduled,
CreateStreamingJobType, DatabaseRuntimeInfoSnapshot, RecoveryReason, ReplaceStreamJobPlan,
Scheduled,
};
use crate::hummock::CommitEpochInfo;
use crate::{MetaError, MetaResult};
Expand Down Expand Up @@ -83,6 +84,13 @@ impl GlobalBarrierWorkerContext for GlobalBarrierWorkerContextImpl {
async fn reload_runtime_info(&self) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
self.reload_runtime_info_impl().await
}

async fn reload_database_runtime_info(
&self,
database_id: DatabaseId,
) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>> {
self.reload_database_runtime_info_impl(database_id).await
}
}

impl GlobalBarrierWorkerContextImpl {
Expand Down
9 changes: 8 additions & 1 deletion src/meta/src/barrier/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ use crate::barrier::command::CommandContext;
use crate::barrier::progress::TrackingJob;
use crate::barrier::schedule::ScheduledBarriers;
use crate::barrier::{
BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot, RecoveryReason, Scheduled,
BarrierManagerStatus, BarrierWorkerRuntimeInfoSnapshot, DatabaseRuntimeInfoSnapshot,
RecoveryReason, Scheduled,
};
use crate::hummock::{CommitEpochInfo, HummockManagerRef};
use crate::manager::{MetaSrvEnv, MetadataManager};
Expand Down Expand Up @@ -69,6 +70,12 @@ pub(super) trait GlobalBarrierWorkerContext: Send + Sync + 'static {
) -> MetaResult<StreamingControlHandle>;

async fn reload_runtime_info(&self) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot>;

#[expect(dead_code)]
async fn reload_database_runtime_info(
&self,
database_id: DatabaseId,
) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>>;
}

pub(super) struct GlobalBarrierWorkerContextImpl {
Expand Down
228 changes: 186 additions & 42 deletions src/meta/src/barrier/context/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::collections::{BTreeSet, HashMap, HashSet};
use std::time::Duration;

use anyhow::{anyhow, Context};
use futures::future::try_join_all;
use itertools::Itertools;
use risingwave_common::bail;
use risingwave_common::catalog::{DatabaseId, TableId};
Expand All @@ -30,7 +31,7 @@ use tracing::{debug, info, warn};
use super::BarrierWorkerRuntimeInfoSnapshot;
use crate::barrier::context::GlobalBarrierWorkerContextImpl;
use crate::barrier::info::InflightDatabaseInfo;
use crate::barrier::InflightSubscriptionInfo;
use crate::barrier::{DatabaseRuntimeInfoSnapshot, InflightSubscriptionInfo};
use crate::controller::fragment::InflightFragmentInfo;
use crate::manager::ActiveStreamingWorkerNodes;
use crate::model::{ActorId, StreamJobFragments, TableParallelism};
Expand All @@ -39,15 +40,16 @@ use crate::{model, MetaResult};

impl GlobalBarrierWorkerContextImpl {
/// Clean catalogs for creating streaming jobs that are in foreground mode or table fragments not persisted.
async fn clean_dirty_streaming_jobs(&self) -> MetaResult<()> {
async fn clean_dirty_streaming_jobs(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
let database_id = database_id.map(|database_id| database_id.database_id as _);
self.metadata_manager
.catalog_controller
.clean_dirty_subscription()
.clean_dirty_subscription(database_id)
.await?;
let source_ids = self
.metadata_manager
.catalog_controller
.clean_dirty_creating_jobs()
.clean_dirty_creating_jobs(database_id)
.await?;

// unregister cleaned sources.
Expand All @@ -64,54 +66,61 @@ impl GlobalBarrierWorkerContextImpl {
Ok(())
}

async fn recover_background_mv_progress(
&self,
) -> MetaResult<HashMap<TableId, (String, StreamJobFragments)>> {
async fn list_background_mv_progress(&self) -> MetaResult<Vec<(String, StreamJobFragments)>> {
let mgr = &self.metadata_manager;
let mviews = mgr
.catalog_controller
.list_background_creating_mviews(false)
.await?;

let mut mview_map = HashMap::new();
for mview in &mviews {
try_join_all(mviews.into_iter().map(|mview| async move {
let table_id = TableId::new(mview.table_id as _);
let table_fragments = mgr
.catalog_controller
.get_job_fragments_by_id(mview.table_id)
.await?;
let stream_job_fragments = StreamJobFragments::from_protobuf(table_fragments);
if stream_job_fragments
.tracking_progress_actor_ids()
.is_empty()
{
// If there's no tracking actor in the mview, we can finish the job directly.
mgr.catalog_controller
.finish_streaming_job(mview.table_id, None)
.await?;
} else {
mview_map.insert(table_id, (mview.definition.clone(), stream_job_fragments));
}
}

assert_eq!(stream_job_fragments.stream_job_id(), table_id);
Ok((mview.definition, stream_job_fragments))
}))
.await
// If failed, enter recovery mode.

Ok(mview_map)
}

/// Resolve actor information from cluster, fragment manager and `ChangedTableId`.
/// We use `changed_table_id` to modify the actors to be sent or collected. Because these actor
/// will create or drop before this barrier flow through them.
async fn resolve_graph_info(&self) -> MetaResult<HashMap<DatabaseId, InflightDatabaseInfo>> {
async fn resolve_graph_info(
&self,
database_id: Option<DatabaseId>,
) -> MetaResult<HashMap<DatabaseId, InflightDatabaseInfo>> {
let database_id = database_id.map(|database_id| database_id.database_id as _);
let all_actor_infos = self
.metadata_manager
.catalog_controller
.load_all_actors()
.load_all_actors(database_id)
.await?;

Ok(all_actor_infos
.into_iter()
.map(|(database_id, actor_infos)| (database_id, InflightDatabaseInfo::new(actor_infos)))
.map(|(loaded_database_id, actor_infos)| {
if let Some(database_id) = database_id {
assert_eq!(database_id, loaded_database_id);
}
(
DatabaseId::new(loaded_database_id as _),
InflightDatabaseInfo::new(actor_infos.into_iter().map(
|(job_id, actor_infos)| {
(
TableId::new(job_id as _),
actor_infos
.into_iter()
.map(|(fragment_id, info)| (fragment_id as _, info)),
)
},
)),
)
})
.collect())
}

Expand All @@ -121,16 +130,42 @@ impl GlobalBarrierWorkerContextImpl {
{
{
{
self.clean_dirty_streaming_jobs()
self.clean_dirty_streaming_jobs(None)
.await
.context("clean dirty streaming jobs")?;

// Mview progress needs to be recovered.
tracing::info!("recovering mview progress");
let background_jobs = self
.recover_background_mv_progress()
.await
.context("recover mview progress should not fail")?;
let background_jobs = {
let jobs = self
.list_background_mv_progress()
.await
.context("recover mview progress should not fail")?;
let mut background_jobs = HashMap::new();
for (definition, stream_job_fragments) in jobs {
if stream_job_fragments
.tracking_progress_actor_ids()
.is_empty()
{
// If there's no tracking actor in the mview, we can finish the job directly.
self.metadata_manager
.catalog_controller
.finish_streaming_job(
stream_job_fragments.stream_job_id().table_id as _,
None,
)
.await?;
} else {
background_jobs
.try_insert(
stream_job_fragments.stream_job_id(),
(definition, stream_job_fragments),
)
.expect("non-duplicate");
}
}
background_jobs
};
tracing::info!("recovered mview progress");

// This is a quick path to accelerate the process of dropping and canceling streaming jobs.
Expand All @@ -140,11 +175,7 @@ impl GlobalBarrierWorkerContextImpl {
ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone())
.await?;

let background_streaming_jobs = self
.metadata_manager
.list_background_creating_jobs()
.await?;

let background_streaming_jobs = background_jobs.keys().cloned().collect_vec();
info!(
"background streaming jobs: {:?} total {}",
background_streaming_jobs,
Expand All @@ -165,7 +196,7 @@ impl GlobalBarrierWorkerContextImpl {
warn!(error = %err.as_report(), "scale actors failed");
})?;

self.resolve_graph_info().await.inspect_err(|err| {
self.resolve_graph_info(None).await.inspect_err(|err| {
warn!(error = %err.as_report(), "resolve actor info failed");
})?
} else {
Expand All @@ -179,7 +210,7 @@ impl GlobalBarrierWorkerContextImpl {
};

if self.scheduled_barriers.pre_apply_drop_cancel(None) {
info = self.resolve_graph_info().await.inspect_err(|err| {
info = self.resolve_graph_info(None).await.inspect_err(|err| {
warn!(error = %err.as_report(), "resolve actor info failed");
})?
}
Expand Down Expand Up @@ -209,7 +240,7 @@ impl GlobalBarrierWorkerContextImpl {

let subscription_infos = self
.metadata_manager
.get_mv_depended_subscriptions()
.get_mv_depended_subscriptions(None)
.await?
.into_iter()
.map(|(database_id, mv_depended_subscriptions)| {
Expand Down Expand Up @@ -243,6 +274,119 @@ impl GlobalBarrierWorkerContextImpl {
}
}
}

#[expect(dead_code)]
pub(super) async fn reload_database_runtime_info_impl(
&self,
database_id: DatabaseId,
) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>> {
self.clean_dirty_streaming_jobs(Some(database_id))
.await
.context("clean dirty streaming jobs")?;

// Mview progress needs to be recovered.
tracing::info!(?database_id, "recovering mview progress of database");
let background_jobs = self
.list_background_mv_progress()
.await
.context("recover mview progress of database should not fail")?;
tracing::info!(?database_id, "recovered mview progress");

// This is a quick path to accelerate the process of dropping and canceling streaming jobs.
let _ = self
.scheduled_barriers
.pre_apply_drop_cancel(Some(database_id));

let info = self
.resolve_graph_info(Some(database_id))
.await
.inspect_err(|err| {
warn!(error = %err.as_report(), "resolve actor info failed");
})?;
assert!(info.len() <= 1);
let Some(info) = info.into_iter().next().map(|(loaded_database_id, info)| {
assert_eq!(loaded_database_id, database_id);
info
}) else {
return Ok(None);
};

let background_jobs = {
let jobs = background_jobs;
let mut background_jobs = HashMap::new();
for (definition, stream_job_fragments) in jobs {
if !info.contains_job(stream_job_fragments.stream_job_id()) {
continue;
}
if stream_job_fragments
.tracking_progress_actor_ids()
.is_empty()
{
// If there's no tracking actor in the mview, we can finish the job directly.
self.metadata_manager
.catalog_controller
.finish_streaming_job(
stream_job_fragments.stream_job_id().table_id as _,
None,
)
.await?;
} else {
background_jobs
.try_insert(
stream_job_fragments.stream_job_id(),
(definition, stream_job_fragments),
)
.expect("non-duplicate");
}
}
background_jobs
};

let state_table_committed_epochs: HashMap<_, _> = self
.hummock_manager
.on_current_version(|version| {
version
.state_table_info
.info()
.iter()
.map(|(table_id, info)| (*table_id, info.committed_epoch))
.collect()
})
.await;

let subscription_infos = self
.metadata_manager
.get_mv_depended_subscriptions(Some(database_id))
.await?;
assert!(subscription_infos.len() <= 1);
let mv_depended_subscriptions = subscription_infos
.into_iter()
.next()
.map(|(loaded_database_id, subscriptions)| {
assert_eq!(loaded_database_id, database_id);
subscriptions
})
.unwrap_or_default();
let subscription_info = InflightSubscriptionInfo {
mv_depended_subscriptions,
};

// update and build all actors.
let stream_actors = self.load_all_actors().await.inspect_err(|err| {
warn!(error = %err.as_report(), "update actors failed");
})?;

// get split assignments for all actors
let source_splits = self.source_manager.list_assignments().await;
Ok(Some(DatabaseRuntimeInfoSnapshot {
database_fragment_info: info,
state_table_committed_epochs,
subscription_info,
stream_actors,
source_splits,
background_jobs,
}))
}
}

impl GlobalBarrierWorkerContextImpl {
Expand Down Expand Up @@ -277,7 +421,7 @@ impl GlobalBarrierWorkerContextImpl {

if expired_worker_slots.is_empty() {
debug!("no expired worker slots, skipping.");
return self.resolve_graph_info().await;
return self.resolve_graph_info(None).await;
}

debug!("start migrate actors.");
Expand Down Expand Up @@ -387,7 +531,7 @@ impl GlobalBarrierWorkerContextImpl {

info!("migrate actors succeed.");

self.resolve_graph_info().await
self.resolve_graph_info(None).await
}

async fn scale_actors(&self, active_nodes: &ActiveStreamingWorkerNodes) -> MetaResult<()> {
Expand Down
Loading

0 comments on commit 3960994

Please sign in to comment.