Skip to content

Commit

Permalink
Refactor serving.rs for efficient worker-to-vnode mapping, update imp…
Browse files Browse the repository at this point in the history
…orts.
  • Loading branch information
shanicky committed May 11, 2024
1 parent f107855 commit b6e2700
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 29 deletions.
38 changes: 14 additions & 24 deletions src/ctl/src/cmd_impl/meta/serving.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,26 @@ use std::collections::HashMap;

use comfy_table::{Row, Table};
use itertools::Itertools;
use risingwave_common::hash::{ParallelUnitId, VirtualNode};
use risingwave_common::hash::{ParallelUnitId, VirtualNode, WorkerId};
use risingwave_pb::common::{WorkerNode, WorkerType};

use crate::CtlContext;

pub async fn list_serving_fragment_mappings(context: &CtlContext) -> anyhow::Result<()> {
let meta_client = context.meta_client().await?;
let mappings = meta_client.list_serving_vnode_mappings().await?;
let workers = meta_client
let workers: HashMap<_, _> = meta_client
.list_worker_nodes(Some(WorkerType::ComputeNode))
.await?;
let mut pu_to_worker: HashMap<ParallelUnitId, &WorkerNode> = HashMap::new();
for w in &workers {
for pu in &w.parallel_units {
pu_to_worker.insert(pu.id, w);
}
}
.await?
.into_iter()
.map(|worker| (worker.id, worker))
.collect();

let mut table = Table::new();
table.set_header({
let mut row = Row::new();
row.add_cell("Table Id".into());
row.add_cell("Fragment Id".into());
row.add_cell("Parallel Unit Id".into());
row.add_cell("Virtual Node".into());
row.add_cell("Worker".into());
row
Expand All @@ -48,28 +44,22 @@ pub async fn list_serving_fragment_mappings(context: &CtlContext) -> anyhow::Res
let rows = mappings
.iter()
.flat_map(|(fragment_id, (table_id, mapping))| {
let mut pu_vnodes: HashMap<ParallelUnitId, Vec<VirtualNode>> = HashMap::new();
for (vnode, pu) in mapping.iter_with_vnode() {
pu_vnodes.entry(pu).or_default().push(vnode);
let mut worker_nodes: HashMap<WorkerId, Vec<VirtualNode>> = HashMap::new();
for (vnode, worker) in mapping.iter_with_vnode() {
worker_nodes.entry(worker).or_default().push(vnode);
}
pu_vnodes.into_iter().map(|(pu_id, vnodes)| {
(
*table_id,
*fragment_id,
pu_id,
vnodes,
pu_to_worker.get(&pu_id),
)
worker_nodes.into_iter().map(|(worker_id, vnodes)| {
(*table_id, *fragment_id, vnodes, workers.get(&worker_id))
})
})
.collect_vec();
for (table_id, fragment_id, pu_id, vnodes, worker) in
rows.into_iter().sorted_by_key(|(t, f, p, ..)| (*t, *f, *p))

for (table_id, fragment_id, vnodes, worker) in
rows.into_iter().sorted_by_key(|(t, f, ..)| (*t, *f))
{
let mut row = Row::new();
row.add_cell(table_id.into());
row.add_cell(fragment_id.into());
row.add_cell(pu_id.into());
row.add_cell(
format!(
"{} in total: {}",
Expand Down
7 changes: 4 additions & 3 deletions src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,10 @@ use crate::controller::rename::{alter_relation_rename, alter_relation_rename_ref
use crate::controller::utils::{
check_connection_name_duplicate, check_database_name_duplicate,
check_function_signature_duplicate, check_relation_name_duplicate, check_schema_name_duplicate,
ensure_object_id, ensure_object_not_refer, ensure_schema_empty, ensure_user_id, get_fragment_mappings_by_jobs, get_parallel_unit_to_worker_map,
get_referring_objects, get_referring_objects_cascade, get_user_privilege,
list_user_info_by_ids, resolve_source_register_info_for_jobs, PartialObject,
ensure_object_id, ensure_object_not_refer, ensure_schema_empty, ensure_user_id,
get_fragment_mappings_by_jobs, get_parallel_unit_to_worker_map, get_referring_objects,
get_referring_objects_cascade, get_user_privilege, list_user_info_by_ids,
resolve_source_register_info_for_jobs, PartialObject,
};
use crate::controller::ObjectModel;
use crate::manager::{Catalog, MetaSrvEnv, NotificationVersion, IGNORED_NOTIFICATION_VERSION};
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ use risingwave_pb::meta::subscribe_response::{
};
use risingwave_pb::meta::table_fragments::PbActorStatus;
use risingwave_pb::meta::{
FragmentWorkerMapping,
PbFragmentWorkerMapping, PbRelation, PbRelationGroup, PbTableFragments, Relation,
FragmentWorkerMapping, PbFragmentWorkerMapping, PbRelation, PbRelationGroup, PbTableFragments,
Relation,
};
use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits};
use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism;
Expand Down

0 comments on commit b6e2700

Please sign in to comment.