Skip to content

Commit

Permalink
fix: Fix the panic issue with the parallelism() call (#19849) (#19880)
Browse files Browse the repository at this point in the history
Signed-off-by: Shanicky Chen <[email protected]>
Co-authored-by: Shanicky Chen <[email protected]>
  • Loading branch information
github-actions[bot] and shanicky authored Dec 23, 2024
1 parent 33ed977 commit b655733
Show file tree
Hide file tree
Showing 15 changed files with 39 additions and 26 deletions.
2 changes: 1 addition & 1 deletion src/batch/executors/src/executor/join/local_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ impl BoxedExecutorBuilder for LocalLookupJoinExecutorBuilder {
let worker_slot_mapping: HashMap<WorkerSlotId, WorkerNode> = worker_nodes
.iter()
.flat_map(|worker| {
(0..(worker.parallelism()))
(0..(worker.compute_node_parallelism()))
.map(|i| (WorkerSlotId::new(worker.id, i), worker.clone()))
})
.collect();
Expand Down
5 changes: 4 additions & 1 deletion src/batch/src/worker_manager/worker_node_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,10 @@ impl WorkerNodeSelector {
} else {
self.apply_worker_node_mask(self.manager.list_serving_worker_nodes())
};
worker_nodes.iter().map(|node| node.parallelism()).sum()
worker_nodes
.iter()
.map(|node| node.compute_node_parallelism())
.sum()
}

pub fn fragment_mapping(&self, fragment_id: FragmentId) -> Result<WorkerSlotMapping> {
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/vnode_mapping/vnode_placement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub fn place_vnode(
.iter()
.filter(|w| w.property.as_ref().map_or(false, |p| p.is_serving))
.sorted_by_key(|w| w.id)
.map(|w| (0..w.parallelism()).map(|idx| WorkerSlotId::new(w.id, idx)))
.map(|w| (0..w.compute_node_parallelism()).map(|idx| WorkerSlotId::new(w.id, idx)))
.collect();

// Set serving parallelism to the minimum of total number of worker slots, specified
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ struct RwWorkerNode {
port: Option<String>,
r#type: String,
state: String,
parallelism: i32,
parallelism: Option<i32>,
is_streaming: Option<bool>,
is_serving: Option<bool>,
is_unschedulable: Option<bool>,
Expand Down Expand Up @@ -59,11 +59,7 @@ async fn read_rw_worker_nodes_info(reader: &SysCatalogReaderImpl) -> Result<Vec<
port: host.map(|h| h.port.to_string()),
r#type: worker.get_type().unwrap().as_str_name().into(),
state: worker.get_state().unwrap().as_str_name().into(),
parallelism: if is_compute {
worker.parallelism() as i32
} else {
0
},
parallelism: worker.parallelism().map(|parallelism| parallelism as _),
is_streaming: if is_compute {
property.map(|p| p.is_streaming)
} else {
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/handler/show.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ struct ShowClusterRow {
addr: String,
r#type: String,
state: String,
parallelism: i32,
parallelism: Option<i32>,
is_streaming: Option<bool>,
is_serving: Option<bool>,
is_unschedulable: Option<bool>,
Expand Down Expand Up @@ -483,7 +483,7 @@ pub async fn handle_show_object(
addr: addr.to_string(),
r#type: worker.get_type().unwrap().as_str_name().into(),
state: worker.get_state().unwrap().as_str_name().to_owned(),
parallelism: worker.parallelism() as _,
parallelism: worker.parallelism().map(|parallelism| parallelism as i32),
is_streaming: property.map(|p| p.is_streaming),
is_serving: property.map(|p| p.is_serving),
is_unschedulable: property.map(|p| p.is_unschedulable),
Expand Down
2 changes: 1 addition & 1 deletion src/meta/service/src/cluster_limit_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl ClusterLimitServiceImpl {
.list_worker_node(Some(WorkerType::ComputeNode), Some(State::Running))
.await?
.into_iter()
.map(|e| (e.id as _, e.parallelism()))
.map(|e| (e.id as _, e.compute_node_parallelism()))
.collect();
let worker_actor_count: HashMap<u32, WorkerActorCount> = self
.metadata_manager
Expand Down
13 changes: 8 additions & 5 deletions src/meta/src/barrier/context/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,9 @@ impl GlobalBarrierWorkerContextImpl {
let active_worker_slots: HashSet<_> = active_nodes
.current()
.values()
.flat_map(|node| (0..node.parallelism()).map(|idx| WorkerSlotId::new(node.id, idx)))
.flat_map(|node| {
(0..node.compute_node_parallelism()).map(|idx| WorkerSlotId::new(node.id, idx))
})
.collect();

let expired_worker_slots: BTreeSet<_> = all_inuse_worker_slots
Expand Down Expand Up @@ -439,7 +441,8 @@ impl GlobalBarrierWorkerContextImpl {
.current()
.values()
.flat_map(|worker| {
(0..worker.parallelism()).map(move |i| WorkerSlotId::new(worker.id, i as _))
(0..worker.compute_node_parallelism())
.map(move |i| WorkerSlotId::new(worker.id, i as _))
})
.collect_vec();

Expand All @@ -457,7 +460,7 @@ impl GlobalBarrierWorkerContextImpl {
.current()
.values()
.flat_map(|worker| {
(0..worker.parallelism() * factor)
(0..worker.compute_node_parallelism() * factor)
.map(move |i| WorkerSlotId::new(worker.id, i as _))
})
.collect_vec();
Expand Down Expand Up @@ -513,7 +516,7 @@ impl GlobalBarrierWorkerContextImpl {
let current_nodes = active_nodes
.current()
.values()
.map(|node| (node.id, &node.host, node.parallelism()))
.map(|node| (node.id, &node.host, node.compute_node_parallelism()))
.collect_vec();
warn!(
current_nodes = ?current_nodes,
Expand Down Expand Up @@ -554,7 +557,7 @@ impl GlobalBarrierWorkerContextImpl {
let available_parallelism = active_nodes
.current()
.values()
.map(|worker_node| worker_node.parallelism())
.map(|worker_node| worker_node.compute_node_parallelism())
.sum();

let table_parallelisms: HashMap<_, _> = {
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/controller/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ impl StreamingClusterInfo {
pub fn parallelism(&self) -> usize {
self.worker_nodes
.values()
.map(|worker| worker.parallelism())
.map(|worker| worker.compute_node_parallelism())
.sum()
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/manager/diagnose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ impl DiagnoseCommand {
&mut row,
worker_node.get_state().ok().map(|s| s.as_str_name()),
);
row.add_cell(worker_node.parallelism().into());
try_add_cell(&mut row, worker_node.parallelism());
try_add_cell(
&mut row,
worker_node.property.as_ref().map(|p| p.is_streaming),
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1818,7 +1818,7 @@ impl ScaleController {

let schedulable_worker_slots = workers
.values()
.map(|worker| (worker.id as WorkerId, worker.parallelism()))
.map(|worker| (worker.id as WorkerId, worker.compute_node_parallelism()))
.collect::<BTreeMap<_, _>>();

// index for no shuffle relation
Expand Down Expand Up @@ -2579,7 +2579,7 @@ impl GlobalStreamManager {

match prev_worker {
// todo, add label checking in further changes
Some(prev_worker) if prev_worker.parallelism() != worker.parallelism() => {
Some(prev_worker) if prev_worker.compute_node_parallelism() != worker.compute_node_parallelism() => {
tracing::info!(worker = worker.id, "worker parallelism changed");
should_trigger = true;
}
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/stream/stream_graph/schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ impl Scheduler {

let slots = workers
.iter()
.map(|(worker_id, worker)| (*worker_id as WorkerId, worker.parallelism()))
.map(|(worker_id, worker)| (*worker_id as WorkerId, worker.compute_node_parallelism()))
.collect();

let parallelism = default_parallelism.get();
Expand Down
5 changes: 4 additions & 1 deletion src/meta/src/stream/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,10 @@ impl GlobalStreamManager {
.collect::<BTreeSet<_>>();

// Check if the provided parallelism is valid.
let available_parallelism = worker_nodes.iter().map(|w| w.parallelism()).sum::<usize>();
let available_parallelism = worker_nodes
.iter()
.map(|w| w.compute_node_parallelism())
.sum::<usize>();
let max_parallelism = self
.metadata_manager
.get_job_max_parallelism(table_id)
Expand Down
10 changes: 9 additions & 1 deletion src/prost/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,13 +221,21 @@ impl stream_plan::MaterializeNode {

// Encapsulating the use of parallelism.
impl common::WorkerNode {
pub fn parallelism(&self) -> usize {
pub fn compute_node_parallelism(&self) -> usize {
assert_eq!(self.r#type(), WorkerType::ComputeNode);
self.property
.as_ref()
.expect("property should be exist")
.parallelism as usize
}

pub fn parallelism(&self) -> Option<usize> {
if WorkerType::ComputeNode == self.r#type() {
Some(self.compute_node_parallelism())
} else {
None
}
}
}

impl stream_plan::SourceNode {
Expand Down
2 changes: 1 addition & 1 deletion src/tests/simulation/src/ctl_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ impl Fragment {
self.r
.worker_nodes
.iter()
.map(|w| (w.id, w.parallelism()))
.map(|w| (w.id, w.compute_node_parallelism()))
.collect()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ async fn test_cordon_normal() -> Result<()> {
let rest_worker_slots: HashSet<_> = workers
.iter()
.flat_map(|worker| {
(0..worker.parallelism()).map(|idx| WorkerSlotId::new(worker.id, idx as _))
(0..worker.compute_node_parallelism()).map(|idx| WorkerSlotId::new(worker.id, idx as _))
})
.collect();

Expand Down

0 comments on commit b655733

Please sign in to comment.