diff --git a/metadata.sqlite b/metadata.sqlite new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/proto/common.proto b/proto/common.proto index 4f0d56b4823a9..fab50dcfecac2 100644 --- a/proto/common.proto +++ b/proto/common.proto @@ -92,6 +92,12 @@ message ParallelUnitMapping { repeated uint32 data = 2; } +// Vnode mapping for stream fragments. Stores mapping from virtual node to worker id. +message WorkerMapping { + repeated uint32 original_indices = 1; + repeated uint32 data = 2; +} + message BatchQueryEpoch { oneof epoch { uint64 committed = 1; diff --git a/proto/meta.proto b/proto/meta.proto index 149b2940e2a27..df266b6d4eded 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -111,6 +111,16 @@ message FragmentParallelUnitMappings { repeated FragmentParallelUnitMapping mappings = 1; } +/// Worker mapping with fragment id, used for notification. +message FragmentWorkerMapping { + uint32 fragment_id = 1; + common.WorkerMapping mapping = 2; +} + +message FragmentWorkerMappings { + repeated FragmentWorkerMapping mappings = 1; +} + // TODO: remove this when dashboard refactored. message ActorLocation { common.WorkerNode node = 1; @@ -378,8 +388,10 @@ message SubscribeRequest { message MetaSnapshot { message SnapshotVersion { uint64 catalog_version = 1; - uint64 parallel_unit_mapping_version = 2; + reserved 2; + reserved "parallel_unit_mapping_version"; uint64 worker_node_version = 3; + uint64 streaming_worker_mapping_version = 4; } repeated catalog.Database databases = 1; repeated catalog.Schema schemas = 2; @@ -392,16 +404,20 @@ message MetaSnapshot { repeated catalog.Connection connections = 17; repeated catalog.Subscription subscriptions = 19; repeated user.UserInfo users = 8; + reserved 9; + reserved "parallel_unit_mappings"; GetSessionParamsResponse session_params = 20; - // for streaming - repeated FragmentParallelUnitMapping parallel_unit_mappings = 9; repeated common.WorkerNode nodes = 10; hummock.HummockSnapshot hummock_snapshot = 11; hummock.HummockVersion hummock_version = 12; backup_service.MetaBackupManifestId meta_backup_manifest_id = 14; hummock.WriteLimits hummock_write_limits = 16; - // for serving - repeated FragmentParallelUnitMapping serving_parallel_unit_mappings = 18; + reserved 18; + reserved "serving_parallel_unit_mappings"; + + // for streaming + repeated FragmentWorkerMapping streaming_worker_mappings = 21; + repeated FragmentWorkerMapping serving_worker_mappings = 22; SnapshotVersion version = 13; } @@ -440,8 +456,6 @@ message SubscribeResponse { catalog.Function function = 6; user.UserInfo user = 11; SetSessionParamRequest session_param = 26; - // for streaming - FragmentParallelUnitMapping parallel_unit_mapping = 12; common.WorkerNode node = 13; hummock.HummockSnapshot hummock_snapshot = 14; hummock.HummockVersionDeltas hummock_version_deltas = 15; @@ -451,10 +465,15 @@ message SubscribeResponse { hummock.WriteLimits hummock_write_limits = 20; RelationGroup relation_group = 21; catalog.Connection connection = 22; - FragmentParallelUnitMappings serving_parallel_unit_mappings = 23; hummock.HummockVersionStats hummock_stats = 24; Recovery recovery = 25; + FragmentWorkerMapping streaming_worker_mapping = 27; + FragmentWorkerMappings serving_worker_mappings = 28; } + reserved 12; + reserved "parallel_unit_mapping"; + reserved 23; + reserved "serving_parallel_unit_mappings"; } service NotificationService { @@ -629,8 +648,10 @@ service SessionParamService { message GetServingVnodeMappingsRequest {} message GetServingVnodeMappingsResponse { - repeated FragmentParallelUnitMapping mappings = 1; + reserved 1; + reserved "mappings"; map fragment_to_table = 2; + repeated FragmentWorkerMapping worker_mappings = 3; } service ServingService { diff --git a/src/batch/src/executor/join/local_lookup_join.rs b/src/batch/src/executor/join/local_lookup_join.rs index 17b257106fb5b..838e4b738e082 100644 --- a/src/batch/src/executor/join/local_lookup_join.rs +++ b/src/batch/src/executor/join/local_lookup_join.rs @@ -21,7 +21,7 @@ use risingwave_common::buffer::BitmapBuilder; use risingwave_common::catalog::{ColumnDesc, Field, Schema}; use risingwave_common::hash::table_distribution::TableDistribution; use risingwave_common::hash::{ - ExpandedParallelUnitMapping, HashKey, HashKeyDispatcher, ParallelUnitId, VirtualNode, + ExpandedWorkerMapping, HashKey, HashKeyDispatcher, ParallelUnitId, VirtualNode, WorkerId, }; use risingwave_common::memory::MemoryContext; use risingwave_common::types::{DataType, Datum}; @@ -29,7 +29,6 @@ use risingwave_common::util::chunk_coalesce::DataChunkBuilder; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::util::scan_range::ScanRange; use risingwave_common::util::tracing::TracingContext; -use risingwave_common::util::worker_util::get_pu_to_worker_mapping; use risingwave_expr::expr::{build_from_prost, BoxedExpression}; use risingwave_pb::batch_plan::exchange_info::DistributionMode; use risingwave_pb::batch_plan::exchange_source::LocalExecutePlan::Plan; @@ -52,7 +51,7 @@ use crate::task::{BatchTaskContext, ShutdownToken, TaskId}; struct InnerSideExecutorBuilder { table_desc: StorageTableDesc, table_distribution: TableDistribution, - vnode_mapping: ExpandedParallelUnitMapping, + vnode_mapping: ExpandedWorkerMapping, outer_side_key_types: Vec, inner_side_schema: Schema, inner_side_column_ids: Vec, @@ -61,8 +60,8 @@ struct InnerSideExecutorBuilder { context: C, task_id: TaskId, epoch: BatchQueryEpoch, - pu_to_worker_mapping: HashMap, - pu_to_scan_range_mapping: HashMap>, + worker_mapping: HashMap, + worker_to_scan_range_mapping: HashMap>, chunk_size: usize, shutdown_rx: ShutdownToken, next_stage_id: usize, @@ -92,7 +91,7 @@ impl InnerSideExecutorBuilder { /// Creates the `RowSeqScanNode` that will be used for scanning the inner side table /// based on the passed `scan_range` and virtual node. fn create_row_seq_scan_node(&self, id: &ParallelUnitId) -> Result { - let list = self.pu_to_scan_range_mapping.get(id).unwrap(); + let list = self.worker_to_scan_range_mapping.get(id).unwrap(); let mut scan_ranges = vec![]; let mut vnode_bitmap = BitmapBuilder::zeroed(self.vnode_mapping.len()); @@ -114,11 +113,11 @@ impl InnerSideExecutorBuilder { } /// Creates the `PbExchangeSource` using the given `id`. - fn build_prost_exchange_source(&self, id: &ParallelUnitId) -> Result { + fn build_prost_exchange_source(&self, id: &WorkerId) -> Result { let worker = self - .pu_to_worker_mapping + .worker_mapping .get(id) - .context("No worker node found for the given parallel unit id.")?; + .context("No worker node found for the given worker id.")?; let local_execute_plan = LocalExecutePlan { plan: Some(PlanFragment { @@ -160,7 +159,7 @@ impl InnerSideExecutorBuilder { #[async_trait::async_trait] impl LookupExecutorBuilder for InnerSideExecutorBuilder { fn reset(&mut self) { - self.pu_to_scan_range_mapping = HashMap::new(); + self.worker_to_scan_range_mapping = HashMap::new(); } /// Adds the scan range made from the given `kwy_scalar_impls` into the parallel unit id @@ -191,11 +190,11 @@ impl LookupExecutorBuilder for InnerSideExecutorBuilder } let vnode = self.get_virtual_node(&scan_range)?; - let parallel_unit_id = self.vnode_mapping[vnode.to_index()]; + let worker_id = self.vnode_mapping[vnode.to_index()]; let list = self - .pu_to_scan_range_mapping - .entry(parallel_unit_id) + .worker_to_scan_range_mapping + .entry(worker_id) .or_default(); list.push((scan_range, vnode)); @@ -207,7 +206,7 @@ impl LookupExecutorBuilder for InnerSideExecutorBuilder async fn build_executor(&mut self) -> Result { self.next_stage_id += 1; let mut sources = vec![]; - for id in self.pu_to_scan_range_mapping.keys() { + for id in self.worker_to_scan_range_mapping.keys() { sources.push(self.build_prost_exchange_source(id)?); } @@ -373,6 +372,14 @@ impl BoxedExecutorBuilder for LocalLookupJoinExecutorBuilder { let chunk_size = source.context.get_config().developer.chunk_size; + let worker_nodes = lookup_join_node.get_worker_nodes(); + let worker_mapping: HashMap = worker_nodes + .iter() + .map(|worker| (worker.id, worker.clone())) + .collect(); + + assert_eq!(worker_mapping.len(), worker_nodes.len()); + let inner_side_builder = InnerSideExecutorBuilder { table_desc: table_desc.clone(), table_distribution: TableDistribution::new_from_storage_table_desc( @@ -388,11 +395,11 @@ impl BoxedExecutorBuilder for LocalLookupJoinExecutorBuilder { context: source.context().clone(), task_id: source.task_id.clone(), epoch: source.epoch(), - pu_to_worker_mapping: get_pu_to_worker_mapping(lookup_join_node.get_worker_nodes()), - pu_to_scan_range_mapping: HashMap::new(), + worker_to_scan_range_mapping: HashMap::new(), chunk_size, shutdown_rx: source.shutdown_rx.clone(), next_stage_id: 0, + worker_mapping, }; let identity = source.plan_node().get_identity().clone(); diff --git a/src/batch/src/task/task_execution.rs b/src/batch/src/task/task_execution.rs index 87d94bc1d90b9..c881ed867ae75 100644 --- a/src/batch/src/task/task_execution.rs +++ b/src/batch/src/task/task_execution.rs @@ -400,6 +400,8 @@ impl BatchTaskExecution { ) .await?; + println!("plan {:?}", self.plan); + let sender = self.sender.clone(); let _failure = self.failure.clone(); let task_id = self.task_id.clone(); diff --git a/src/batch/src/worker_manager/worker_node_manager.rs b/src/batch/src/worker_manager/worker_node_manager.rs index 5b0813186fd1c..0105b171b9442 100644 --- a/src/batch/src/worker_manager/worker_node_manager.rs +++ b/src/batch/src/worker_manager/worker_node_manager.rs @@ -18,8 +18,7 @@ use std::time::Duration; use rand::seq::SliceRandom; use risingwave_common::bail; -use risingwave_common::hash::{ParallelUnitId, ParallelUnitMapping}; -use risingwave_common::util::worker_util::get_pu_to_worker_mapping; +use risingwave_common::hash::{WorkerId, WorkerMapping}; use risingwave_common::vnode_mapping::vnode_placement::place_vnode; use risingwave_pb::common::{WorkerNode, WorkerType}; @@ -36,12 +35,10 @@ pub struct WorkerNodeManager { struct WorkerNodeManagerInner { worker_nodes: Vec, - /// A cache for parallel units to worker nodes. It should be consistent with `worker_nodes`. - pu_to_worker: HashMap, /// fragment vnode mapping info for streaming - streaming_fragment_vnode_mapping: HashMap, + streaming_fragment_vnode_mapping: HashMap, /// fragment vnode mapping info for serving - serving_fragment_vnode_mapping: HashMap, + serving_fragment_vnode_mapping: HashMap, } pub type WorkerNodeManagerRef = Arc; @@ -57,7 +54,6 @@ impl WorkerNodeManager { Self { inner: RwLock::new(WorkerNodeManagerInner { worker_nodes: Default::default(), - pu_to_worker: Default::default(), streaming_fragment_vnode_mapping: Default::default(), serving_fragment_vnode_mapping: Default::default(), }), @@ -68,7 +64,6 @@ impl WorkerNodeManager { /// Used in tests. pub fn mock(worker_nodes: Vec) -> Self { let inner = RwLock::new(WorkerNodeManagerInner { - pu_to_worker: get_pu_to_worker_mapping(&worker_nodes), worker_nodes, streaming_fragment_vnode_mapping: HashMap::new(), serving_fragment_vnode_mapping: HashMap::new(), @@ -120,23 +115,18 @@ impl WorkerNodeManager { *w = node; } } - // Update `pu_to_worker` - write_guard.pu_to_worker = get_pu_to_worker_mapping(&write_guard.worker_nodes); } pub fn remove_worker_node(&self, node: WorkerNode) { let mut write_guard = self.inner.write().unwrap(); write_guard.worker_nodes.retain(|x| x.id != node.id); - - // Update `pu_to_worker` - write_guard.pu_to_worker = get_pu_to_worker_mapping(&write_guard.worker_nodes); } pub fn refresh( &self, nodes: Vec, - streaming_mapping: HashMap, - serving_mapping: HashMap, + streaming_mapping: HashMap, + serving_mapping: HashMap, ) { let mut write_guard = self.inner.write().unwrap(); tracing::debug!("Refresh worker nodes {:?}.", nodes); @@ -149,42 +139,43 @@ impl WorkerNodeManager { serving_mapping.keys() ); write_guard.worker_nodes = nodes; - // Update `pu_to_worker` - write_guard.pu_to_worker = get_pu_to_worker_mapping(&write_guard.worker_nodes); write_guard.streaming_fragment_vnode_mapping = streaming_mapping; write_guard.serving_fragment_vnode_mapping = serving_mapping; } - /// If parallel unit ids is empty, the scheduler may fail to schedule any task and stuck at + /// If worker ids is empty, the scheduler may fail to schedule any task and stuck at /// schedule next stage. If we do not return error in this case, needs more complex control /// logic above. Report in this function makes the schedule root fail reason more clear. - pub fn get_workers_by_parallel_unit_ids( - &self, - parallel_unit_ids: &[ParallelUnitId], - ) -> Result> { - if parallel_unit_ids.is_empty() { + pub fn get_workers_by_worker_ids(&self, worker_ids: &[WorkerId]) -> Result> { + if worker_ids.is_empty() { return Err(BatchError::EmptyWorkerNodes); } let guard = self.inner.read().unwrap(); - let mut workers = Vec::with_capacity(parallel_unit_ids.len()); - for parallel_unit_id in parallel_unit_ids { - match guard.pu_to_worker.get(parallel_unit_id) { + // TODO: Does the return order of this function need to match the order of the parameters? + let worker_index: HashMap<_, _> = guard + .worker_nodes + .iter() + .map(|worker| (worker.id, worker.clone())) + .collect(); + + let mut workers = Vec::with_capacity(worker_ids.len()); + + for worker_id in worker_ids { + match worker_index.get(worker_id) { Some(worker) => workers.push(worker.clone()), - None => bail!( - "No worker node found for parallel unit id: {}", - parallel_unit_id - ), + None => bail!("No worker node found for worker id: {}", worker_id), } } + Ok(workers) } pub fn get_streaming_fragment_mapping( &self, fragment_id: &FragmentId, - ) -> Result { + ) -> Result { self.inner .read() .unwrap() @@ -197,7 +188,7 @@ impl WorkerNodeManager { pub fn insert_streaming_fragment_mapping( &self, fragment_id: FragmentId, - vnode_mapping: ParallelUnitMapping, + vnode_mapping: WorkerMapping, ) { self.inner .write() @@ -210,7 +201,7 @@ impl WorkerNodeManager { pub fn update_streaming_fragment_mapping( &self, fragment_id: FragmentId, - vnode_mapping: ParallelUnitMapping, + vnode_mapping: WorkerMapping, ) { let mut guard = self.inner.write().unwrap(); guard @@ -228,7 +219,7 @@ impl WorkerNodeManager { } /// Returns fragment's vnode mapping for serving. - fn serving_fragment_mapping(&self, fragment_id: FragmentId) -> Result { + fn serving_fragment_mapping(&self, fragment_id: FragmentId) -> Result { self.inner .read() .unwrap() @@ -236,7 +227,7 @@ impl WorkerNodeManager { .ok_or_else(|| BatchError::ServingVnodeMappingNotFound(fragment_id)) } - pub fn set_serving_fragment_mapping(&self, mappings: HashMap) { + pub fn set_serving_fragment_mapping(&self, mappings: HashMap) { let mut guard = self.inner.write().unwrap(); tracing::debug!( "Set serving vnode mapping for fragments {:?}", @@ -245,10 +236,7 @@ impl WorkerNodeManager { guard.serving_fragment_vnode_mapping = mappings; } - pub fn upsert_serving_fragment_mapping( - &self, - mappings: HashMap, - ) { + pub fn upsert_serving_fragment_mapping(&self, mappings: HashMap) { let mut guard = self.inner.write().unwrap(); tracing::debug!( "Upsert serving vnode mapping for fragments {:?}", @@ -299,7 +287,7 @@ impl WorkerNodeManager { } impl WorkerNodeManagerInner { - fn get_serving_fragment_mapping(&self, fragment_id: FragmentId) -> Option { + fn get_serving_fragment_mapping(&self, fragment_id: FragmentId) -> Option { self.serving_fragment_vnode_mapping .get(&fragment_id) .cloned() @@ -330,6 +318,19 @@ impl WorkerNodeSelector { } } + pub fn schedule_unit_count_map(&self) -> HashMap { + let worker_nodes = if self.enable_barrier_read { + self.manager.list_streaming_worker_nodes() + } else { + self.apply_worker_node_mask(self.manager.list_serving_worker_nodes()) + }; + + worker_nodes + .iter() + .map(|node| (node.id, node.parallel_units.len())) + .collect() + } + pub fn schedule_unit_count(&self) -> usize { let worker_nodes = if self.enable_barrier_read { self.manager.list_streaming_worker_nodes() @@ -342,7 +343,7 @@ impl WorkerNodeSelector { .sum() } - pub fn fragment_mapping(&self, fragment_id: FragmentId) -> Result { + pub fn fragment_mapping(&self, fragment_id: FragmentId) -> Result { if self.enable_barrier_read { self.manager.get_streaming_fragment_mapping(&fragment_id) } else { diff --git a/src/common/common_service/src/observer_manager.rs b/src/common/common_service/src/observer_manager.rs index 3f18be7697520..8ff0a6f9261c4 100644 --- a/src/common/common_service/src/observer_manager.rs +++ b/src/common/common_service/src/observer_manager.rs @@ -145,9 +145,6 @@ where | Info::Function(_) => { notification.version > info.version.as_ref().unwrap().catalog_version } - Info::ParallelUnitMapping(_) => { - notification.version > info.version.as_ref().unwrap().parallel_unit_mapping_version - } Info::Node(_) => { notification.version > info.version.as_ref().unwrap().worker_node_version } @@ -157,10 +154,18 @@ where Info::HummockSnapshot(_) => true, Info::MetaBackupManifestId(_) => true, Info::SystemParams(_) | Info::SessionParam(_) => true, - Info::ServingParallelUnitMappings(_) => true, Info::Snapshot(_) | Info::HummockWriteLimits(_) => unreachable!(), Info::HummockStats(_) => true, Info::Recovery(_) => true, + Info::StreamingWorkerMapping(_) => { + notification.version + > info + .version + .as_ref() + .unwrap() + .streaming_worker_mapping_version + } + Info::ServingWorkerMappings(_) => true, }); self.observer_states diff --git a/src/common/src/hash/consistent_hash/mapping.rs b/src/common/src/hash/consistent_hash/mapping.rs index 07e62b7eac275..c15491510b587 100644 --- a/src/common/src/hash/consistent_hash/mapping.rs +++ b/src/common/src/hash/consistent_hash/mapping.rs @@ -19,7 +19,9 @@ use std::ops::Index; use educe::Educe; use itertools::Itertools; -use risingwave_pb::common::{ParallelUnit, ParallelUnitMapping as ParallelUnitMappingProto}; +use risingwave_pb::common::{ + ParallelUnit, ParallelUnitMapping as ParallelUnitMappingProto, PbWorkerMapping, +}; use risingwave_pb::stream_plan::ActorMapping as ActorMappingProto; use super::bitmap::VnodeBitmapExt; @@ -30,6 +32,7 @@ use crate::util::iter_util::ZipEqDebug; // TODO: find a better place for this. pub type ActorId = u32; +pub type WorkerId = u32; /// Trait for items that can be used as keys in [`VnodeMapping`]. pub trait VnodeMappingItem { @@ -254,6 +257,12 @@ pub mod marker { impl VnodeMappingItem for ParallelUnit { type Item = ParallelUnitId; } + + /// A marker type for items of [`WorkerId`]. + pub struct Worker; + impl VnodeMappingItem for Worker { + type Item = WorkerId; + } } /// A mapping from [`VirtualNode`] to [`ActorId`]. @@ -266,6 +275,11 @@ pub type ParallelUnitMapping = VnodeMapping; /// An expanded mapping from [`VirtualNode`] to [`ParallelUnitId`]. pub type ExpandedParallelUnitMapping = ExpandedMapping; +/// A mapping from [`VirtualNode`] to [`WorkerId`]. +pub type WorkerMapping = VnodeMapping; +/// An expanded mapping from [`VirtualNode`] to [`WorkerId`]. +pub type ExpandedWorkerMapping = ExpandedMapping; + impl ActorMapping { /// Transform this actor mapping to a parallel unit mapping, essentially `transform`. pub fn to_parallel_unit(&self, to_map: &M) -> ParallelUnitMapping @@ -293,6 +307,30 @@ impl ActorMapping { } } +impl WorkerMapping { + /// Create a uniform worker mapping from the given worker ids + pub fn build_from_ids(worker_ids: &[WorkerId]) -> Self { + Self::new_uniform(worker_ids.iter().cloned()) + } + + /// Create a worker mapping from the protobuf representation. + pub fn from_protobuf(proto: &PbWorkerMapping) -> Self { + assert_eq!(proto.original_indices.len(), proto.data.len()); + Self { + original_indices: proto.original_indices.clone(), + data: proto.data.clone(), + } + } + + /// Convert this worker mapping to the protobuf representation. + pub fn to_protobuf(&self) -> PbWorkerMapping { + PbWorkerMapping { + original_indices: self.original_indices.clone(), + data: self.data.clone(), + } + } +} + impl ParallelUnitMapping { /// Create a uniform parallel unit mapping from the given parallel units, essentially /// `new_uniform`. @@ -310,6 +348,11 @@ impl ParallelUnitMapping { self.transform(to_map) } + /// Transform this parallel unit mapping to an worker mapping, essentially `transform`. + pub fn to_worker(&self, to_map: &HashMap) -> WorkerMapping { + self.transform(to_map) + } + /// Create a parallel unit mapping from the protobuf representation. pub fn from_protobuf(proto: &ParallelUnitMappingProto) -> Self { assert_eq!(proto.original_indices.len(), proto.data.len()); diff --git a/src/common/src/vnode_mapping/vnode_placement.rs b/src/common/src/vnode_mapping/vnode_placement.rs index 49f45d66512eb..0c1086ffdb3dd 100644 --- a/src/common/src/vnode_mapping/vnode_placement.rs +++ b/src/common/src/vnode_mapping/vnode_placement.rs @@ -13,46 +13,56 @@ // limitations under the License. use std::collections::{HashMap, HashSet, LinkedList, VecDeque}; +use std::ops::BitOrAssign; use itertools::Itertools; use num_integer::Integer; +use risingwave_common::hash::WorkerId; use risingwave_pb::common::WorkerNode; use crate::buffer::{Bitmap, BitmapBuilder}; -use crate::hash::{ParallelUnitId, ParallelUnitMapping, VirtualNode}; +use crate::hash::{VirtualNode, WorkerMapping}; /// Calculate a new vnode mapping, keeping locality and balance on a best effort basis. /// The strategy is similar to `rebalance_actor_vnode` used in meta node, but is modified to /// consider `max_parallelism` too. pub fn place_vnode( - hint_pu_mapping: Option<&ParallelUnitMapping>, - new_workers: &[WorkerNode], + hint_worker_mapping: Option<&WorkerMapping>, + workers: &[WorkerNode], max_parallelism: Option, -) -> Option { - // Get all serving parallel units from all available workers, grouped by worker id and ordered - // by parallel unit id in each group. - let mut new_pus: LinkedList<_> = new_workers +) -> Option { + #[derive(Copy, Clone, Hash, Eq, PartialEq, Debug)] + struct WorkerSlot(WorkerId, usize); + + impl WorkerSlot { + fn worker_id(&self) -> WorkerId { + self.0 + } + } + // Get all serving worker slots from all available workers, grouped by worker id and ordered + // by worker slot id in each group. + let mut worker_slots: LinkedList<_> = workers .iter() .filter(|w| w.property.as_ref().map_or(false, |p| p.is_serving)) .sorted_by_key(|w| w.id) - .map(|w| w.parallel_units.clone().into_iter().sorted_by_key(|p| p.id)) + .map(|w| (0..w.parallel_units.len()).map(|idx| WorkerSlot(w.id, idx))) .collect(); - // Set serving parallelism to the minimum of total number of parallel units, specified + // Set serving parallelism to the minimum of total number of worker slots, specified // `max_parallelism` and total number of virtual nodes. let serving_parallelism = std::cmp::min( - new_pus.iter().map(|pus| pus.len()).sum(), + worker_slots.iter().map(|slots| slots.len()).sum(), std::cmp::min(max_parallelism.unwrap_or(usize::MAX), VirtualNode::COUNT), ); - // Select `serving_parallelism` parallel units in a round-robin fashion, to distribute workload + // Select `serving_parallelism` worker slots in a round-robin fashion, to distribute workload // evenly among workers. - let mut selected_pu_ids = Vec::new(); - while !new_pus.is_empty() { - new_pus - .extract_if(|ps| { - if let Some(p) = ps.next() { - selected_pu_ids.push(p.id); + let mut selected_slots = Vec::new(); + while !worker_slots.is_empty() { + worker_slots + .extract_if(|slots| { + if let Some(slot) = slots.next() { + selected_slots.push(slot); false } else { true @@ -60,57 +70,63 @@ pub fn place_vnode( }) .for_each(drop); } - selected_pu_ids.drain(serving_parallelism..); - let selected_pu_id_set: HashSet = selected_pu_ids.iter().cloned().collect(); - if selected_pu_id_set.is_empty() { + selected_slots.drain(serving_parallelism..); + let selected_slots_set: HashSet = selected_slots.iter().cloned().collect(); + if selected_slots_set.is_empty() { return None; } - // Calculate balance for each selected parallel unit. Initially, each parallel unit is assigned + // Calculate balance for each selected worker slot. Initially, each worker slot is assigned // no vnodes. Thus its negative balance means that many vnodes should be assigned to it later. - // `is_temp` is a mark for a special temporary parallel unit, only to simplify implementation. + // `is_temp` is a mark for a special temporary worker slot, only to simplify implementation. #[derive(Debug)] struct Balance { - pu_id: ParallelUnitId, + slot: WorkerSlot, balance: i32, builder: BitmapBuilder, is_temp: bool, } - let (expected, mut remain) = VirtualNode::COUNT.div_rem(&selected_pu_ids.len()); - let mut balances: HashMap = HashMap::default(); - for pu_id in &selected_pu_ids { + + let (expected, mut remain) = VirtualNode::COUNT.div_rem(&selected_slots.len()); + let mut balances: HashMap = HashMap::default(); + + for slot in &selected_slots { let mut balance = Balance { - pu_id: *pu_id, + slot: *slot, balance: -(expected as i32), builder: BitmapBuilder::zeroed(VirtualNode::COUNT), is_temp: false, }; + if remain > 0 { balance.balance -= 1; remain -= 1; } - balances.insert(*pu_id, balance); + balances.insert(*slot, balance); } // Now to maintain affinity, if a hint has been provided via `hint_pu_mapping`, follow // that mapping to adjust balances. - let mut temp_pu = Balance { - pu_id: 0, // This id doesn't matter for `temp_pu`. It's distinguishable via `is_temp`. + let mut temp_slot = Balance { + slot: WorkerSlot(0, usize::MAX), /* This id doesn't matter for `temp_pu`. It's distinguishable via `is_temp`. */ balance: 0, builder: BitmapBuilder::zeroed(VirtualNode::COUNT), is_temp: true, }; - match hint_pu_mapping { - Some(hint_pu_mapping) => { - for (vnode, pu_id) in hint_pu_mapping.iter_with_vnode() { - let b = if selected_pu_id_set.contains(&pu_id) { - // Assign vnode to the same parallel unit as hint. - balances.get_mut(&pu_id).unwrap() + match hint_worker_mapping { + Some(hint_worker_mapping) => { + for (vnode, worker_id) in hint_worker_mapping.iter_with_vnode() { + let worker_slot = WorkerSlot(worker_id, 0); + + let b = if selected_slots_set.contains(&worker_slot) { + // Assign vnode to the same worker slot as hint. + balances.get_mut(&worker_slot).unwrap() } else { - // Assign vnode that doesn't belong to any parallel unit to `temp_pu` + // Assign vnode that doesn't belong to any worker slot to `temp_pu` // temporarily. They will be reassigned later. - &mut temp_pu + &mut temp_slot }; + b.balance += 1; b.builder.set(vnode.to_index(), true); } @@ -118,31 +134,33 @@ pub fn place_vnode( None => { // No hint is provided, assign all vnodes to `temp_pu`. for vnode in VirtualNode::all() { - temp_pu.balance += 1; - temp_pu.builder.set(vnode.to_index(), true); + temp_slot.balance += 1; + temp_slot.builder.set(vnode.to_index(), true); } } } - // The final step is to move vnodes from parallel units with positive balance to parallel units - // with negative balance, until all parallel units are of 0 balance. - // A double-ended queue with parallel units ordered by balance in descending order is consumed: - // 1. Peek 2 parallel units from front and back. + // The final step is to move vnodes from worker slots with positive balance to worker slots + // with negative balance, until all worker slots are of 0 balance. + // A double-ended queue with worker slots ordered by balance in descending order is consumed: + // 1. Peek 2 worker slots from front and back. // 2. It any of them is of 0 balance, pop it and go to step 1. // 3. Otherwise, move vnodes from front to back. let mut balances: VecDeque<_> = balances .into_values() - .chain(std::iter::once(temp_pu)) + .chain(std::iter::once(temp_slot)) .sorted_by_key(|b| b.balance) .rev() .collect(); - let mut results: HashMap = HashMap::default(); + + let mut results: HashMap = HashMap::default(); + while !balances.is_empty() { if balances.len() == 1 { let single = balances.pop_front().unwrap(); assert_eq!(single.balance, 0); if !single.is_temp { - results.insert(single.pu_id, single.builder.finish()); + results.insert(single.slot, single.builder.finish()); } break; } @@ -166,32 +184,43 @@ pub fn place_vnode( if src.balance != 0 { balances.push_front(src); } else if !src.is_temp { - results.insert(src.pu_id, src.builder.finish()); + results.insert(src.slot, src.builder.finish()); } if dst.balance != 0 { balances.push_back(dst); } else if !dst.is_temp { - results.insert(dst.pu_id, dst.builder.finish()); + results.insert(dst.slot, dst.builder.finish()); } } - Some(ParallelUnitMapping::from_bitmaps(&results)) + let mut worker_result = HashMap::new(); + + for (worker_slot, bitmap) in results { + let worker_id = worker_slot.worker_id(); + worker_result + .entry(worker_id) + .or_insert(BitmapBuilder::zeroed(VirtualNode::COUNT).finish()) + .bitor_assign(&bitmap); + } + + Some(WorkerMapping::from_bitmaps(&worker_result)) } #[cfg(test)] mod tests { use std::collections::HashMap; + use risingwave_common::hash::WorkerMapping; use risingwave_pb::common::worker_node::Property; use risingwave_pb::common::{ParallelUnit, WorkerNode}; - use crate::hash::{ParallelUnitId, ParallelUnitMapping, VirtualNode}; + use crate::hash::{ParallelUnitId, VirtualNode}; use crate::vnode_mapping::vnode_placement::place_vnode; - #[test] fn test_place_vnode() { assert_eq!(VirtualNode::COUNT, 256); + let mut pu_id_counter: ParallelUnitId = 0; let mut pu_to_worker: HashMap = Default::default(); let serving_property = Property { @@ -216,13 +245,13 @@ mod tests { results }; - let count_same_vnode_mapping = |pm1: &ParallelUnitMapping, pm2: &ParallelUnitMapping| { - assert_eq!(pm1.len(), 256); - assert_eq!(pm2.len(), 256); + let count_same_vnode_mapping = |wm1: &WorkerMapping, wm2: &WorkerMapping| { + assert_eq!(wm1.len(), 256); + assert_eq!(wm2.len(), 256); let mut count: usize = 0; for idx in 0..VirtualNode::COUNT { let vnode = VirtualNode::from_index(idx); - if pm1.get(vnode) == pm2.get(vnode) { + if wm1.get(vnode) == wm2.get(vnode) { count += 1; } } @@ -235,29 +264,32 @@ mod tests { property: Some(serving_property.clone()), ..Default::default() }; + assert!( place_vnode(None, &[worker_1.clone()], Some(0)).is_none(), "max_parallelism should >= 0" ); - let re_pu_mapping_2 = place_vnode(None, &[worker_1.clone()], None).unwrap(); - assert_eq!(re_pu_mapping_2.iter_unique().count(), 1); + let re_worker_mapping_2 = place_vnode(None, &[worker_1.clone()], None).unwrap(); + assert_eq!(re_worker_mapping_2.iter_unique().count(), 1); + let worker_2 = WorkerNode { id: 2, parallel_units: gen_pus_for_worker(2, 50, &mut pu_to_worker), property: Some(serving_property.clone()), ..Default::default() }; - let re_pu_mapping = place_vnode( - Some(&re_pu_mapping_2), + + let re_worker_mapping = place_vnode( + Some(&re_worker_mapping_2), &[worker_1.clone(), worker_2.clone()], None, ) .unwrap(); - assert_eq!(re_pu_mapping.iter_unique().count(), 51); + assert_eq!(re_worker_mapping.iter_unique().count(), 2); // 1 * 256 + 0 -> 51 * 5 + 1 - let score = count_same_vnode_mapping(&re_pu_mapping_2, &re_pu_mapping); + let score = count_same_vnode_mapping(&re_worker_mapping_2, &re_worker_mapping); assert!(score >= 5); let worker_3 = WorkerNode { @@ -267,16 +299,16 @@ mod tests { ..Default::default() }; let re_pu_mapping_2 = place_vnode( - Some(&re_pu_mapping), + Some(&re_worker_mapping), &[worker_1.clone(), worker_2.clone(), worker_3.clone()], None, ) .unwrap(); // limited by total pu number - assert_eq!(re_pu_mapping_2.iter_unique().count(), 111); + assert_eq!(re_pu_mapping_2.iter_unique().count(), 3); // 51 * 5 + 1 -> 111 * 2 + 34 - let score = count_same_vnode_mapping(&re_pu_mapping_2, &re_pu_mapping); + let score = count_same_vnode_mapping(&re_pu_mapping_2, &re_worker_mapping); assert!(score >= (2 + 50 * 2)); let re_pu_mapping = place_vnode( Some(&re_pu_mapping_2), @@ -285,7 +317,7 @@ mod tests { ) .unwrap(); // limited by max_parallelism - assert_eq!(re_pu_mapping.iter_unique().count(), 50); + assert_eq!(re_pu_mapping.iter_unique().count(), 3); // 111 * 2 + 34 -> 50 * 5 + 6 let score = count_same_vnode_mapping(&re_pu_mapping, &re_pu_mapping_2); assert!(score >= 50 * 2); @@ -295,20 +327,20 @@ mod tests { None, ) .unwrap(); - assert_eq!(re_pu_mapping_2.iter_unique().count(), 111); + assert_eq!(re_pu_mapping_2.iter_unique().count(), 3); // 50 * 5 + 6 -> 111 * 2 + 34 let score = count_same_vnode_mapping(&re_pu_mapping_2, &re_pu_mapping); assert!(score >= 50 * 2); let re_pu_mapping = place_vnode(Some(&re_pu_mapping_2), &[worker_1, worker_3.clone()], None).unwrap(); // limited by total pu number - assert_eq!(re_pu_mapping.iter_unique().count(), 61); + assert_eq!(re_pu_mapping.iter_unique().count(), 2); // 111 * 2 + 34 -> 61 * 4 + 12 let score = count_same_vnode_mapping(&re_pu_mapping, &re_pu_mapping_2); assert!(score >= 61 * 2); assert!(place_vnode(Some(&re_pu_mapping), &[], None).is_none()); let re_pu_mapping = place_vnode(Some(&re_pu_mapping), &[worker_3], None).unwrap(); - assert_eq!(re_pu_mapping.iter_unique().count(), 60); + assert_eq!(re_pu_mapping.iter_unique().count(), 1); assert!(place_vnode(Some(&re_pu_mapping), &[], None).is_none()); } } diff --git a/src/ctl/src/cmd_impl/meta/serving.rs b/src/ctl/src/cmd_impl/meta/serving.rs index c6c5d3cf81985..4cccf825ef862 100644 --- a/src/ctl/src/cmd_impl/meta/serving.rs +++ b/src/ctl/src/cmd_impl/meta/serving.rs @@ -16,7 +16,7 @@ use std::collections::HashMap; use comfy_table::{Row, Table}; use itertools::Itertools; -use risingwave_common::hash::{ParallelUnitId, VirtualNode}; +use risingwave_common::hash::{ParallelUnitId, VirtualNode, WorkerId}; use risingwave_pb::common::{WorkerNode, WorkerType}; use crate::CtlContext; @@ -24,22 +24,18 @@ use crate::CtlContext; pub async fn list_serving_fragment_mappings(context: &CtlContext) -> anyhow::Result<()> { let meta_client = context.meta_client().await?; let mappings = meta_client.list_serving_vnode_mappings().await?; - let workers = meta_client + let workers: HashMap<_, _> = meta_client .list_worker_nodes(Some(WorkerType::ComputeNode)) - .await?; - let mut pu_to_worker: HashMap = HashMap::new(); - for w in &workers { - for pu in &w.parallel_units { - pu_to_worker.insert(pu.id, w); - } - } + .await? + .into_iter() + .map(|worker| (worker.id, worker)) + .collect(); let mut table = Table::new(); table.set_header({ let mut row = Row::new(); row.add_cell("Table Id".into()); row.add_cell("Fragment Id".into()); - row.add_cell("Parallel Unit Id".into()); row.add_cell("Virtual Node".into()); row.add_cell("Worker".into()); row @@ -48,28 +44,22 @@ pub async fn list_serving_fragment_mappings(context: &CtlContext) -> anyhow::Res let rows = mappings .iter() .flat_map(|(fragment_id, (table_id, mapping))| { - let mut pu_vnodes: HashMap> = HashMap::new(); - for (vnode, pu) in mapping.iter_with_vnode() { - pu_vnodes.entry(pu).or_default().push(vnode); + let mut worker_nodes: HashMap> = HashMap::new(); + for (vnode, worker) in mapping.iter_with_vnode() { + worker_nodes.entry(worker).or_default().push(vnode); } - pu_vnodes.into_iter().map(|(pu_id, vnodes)| { - ( - *table_id, - *fragment_id, - pu_id, - vnodes, - pu_to_worker.get(&pu_id), - ) + worker_nodes.into_iter().map(|(worker_id, vnodes)| { + (*table_id, *fragment_id, vnodes, workers.get(&worker_id)) }) }) .collect_vec(); - for (table_id, fragment_id, pu_id, vnodes, worker) in - rows.into_iter().sorted_by_key(|(t, f, p, ..)| (*t, *f, *p)) + + for (table_id, fragment_id, vnodes, worker) in + rows.into_iter().sorted_by_key(|(t, f, ..)| (*t, *f)) { let mut row = Row::new(); row.add_cell(table_id.into()); row.add_cell(fragment_id.into()); - row.add_cell(pu_id.into()); row.add_cell( format!( "{} in total: {}", diff --git a/src/frontend/src/observer/observer_manager.rs b/src/frontend/src/observer/observer_manager.rs index ddf6ca489bf0c..050ae1c089f6c 100644 --- a/src/frontend/src/observer/observer_manager.rs +++ b/src/frontend/src/observer/observer_manager.rs @@ -19,7 +19,7 @@ use itertools::Itertools; use parking_lot::RwLock; use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeManagerRef; use risingwave_common::catalog::CatalogVersion; -use risingwave_common::hash::ParallelUnitMapping; +use risingwave_common::hash::WorkerMapping; use risingwave_common::session_config::SessionConfig; use risingwave_common::system_param::local_manager::LocalSystemParamsManagerRef; use risingwave_common_service::observer_manager::{ObserverState, SubscribeFrontend}; @@ -27,7 +27,7 @@ use risingwave_pb::common::WorkerNode; use risingwave_pb::hummock::HummockVersionStats; use risingwave_pb::meta::relation::RelationInfo; use risingwave_pb::meta::subscribe_response::{Info, Operation}; -use risingwave_pb::meta::{FragmentParallelUnitMapping, MetaSnapshot, SubscribeResponse}; +use risingwave_pb::meta::{FragmentWorkerMapping, MetaSnapshot, SubscribeResponse}; use risingwave_rpc_client::ComputeClientPoolRef; use tokio::sync::watch::Sender; @@ -72,7 +72,6 @@ impl ObserverState for FrontendObserverNode { Info::User(_) => { self.handle_user_notification(resp); } - Info::ParallelUnitMapping(_) => self.handle_fragment_mapping_notification(resp), Info::Snapshot(_) => { panic!( "receiving a snapshot in the middle is unsupported now {:?}", @@ -103,8 +102,9 @@ impl ObserverState for FrontendObserverNode { Info::HummockStats(stats) => { self.handle_table_stats_notification(stats); } - Info::ServingParallelUnitMappings(m) => { - self.handle_fragment_serving_mapping_notification(m.mappings, resp.operation()); + Info::StreamingWorkerMapping(_) => self.handle_fragment_mapping_notification(resp), + Info::ServingWorkerMappings(m) => { + self.handle_fragment_serving_mapping_notification(m.mappings, resp.operation()) } Info::Recovery(_) => { self.compute_client_pool.invalidate_all(); @@ -133,13 +133,13 @@ impl ObserverState for FrontendObserverNode { functions, connections, users, - parallel_unit_mappings, - serving_parallel_unit_mappings, nodes, hummock_snapshot, hummock_version: _, meta_backup_manifest_id: _, hummock_write_limits: _, + streaming_worker_mappings, + serving_worker_mappings, session_params, version, } = snapshot; @@ -177,10 +177,11 @@ impl ObserverState for FrontendObserverNode { for user in users { user_guard.create_user(user) } + self.worker_node_manager.refresh( nodes, - convert_pu_mapping(¶llel_unit_mappings), - convert_pu_mapping(&serving_parallel_unit_mappings), + convert_worker_mapping(&streaming_worker_mappings), + convert_worker_mapping(&serving_worker_mappings), ); self.hummock_snapshot_manager .update(hummock_snapshot.unwrap()); @@ -387,12 +388,10 @@ impl FrontendObserverNode { return; }; match info { - Info::ParallelUnitMapping(parallel_unit_mapping) => { - let fragment_id = parallel_unit_mapping.fragment_id; + Info::StreamingWorkerMapping(streaming_worker_mapping) => { + let fragment_id = streaming_worker_mapping.fragment_id; let mapping = || { - ParallelUnitMapping::from_protobuf( - parallel_unit_mapping.mapping.as_ref().unwrap(), - ) + WorkerMapping::from_protobuf(streaming_worker_mapping.mapping.as_ref().unwrap()) }; match resp.operation() { @@ -417,20 +416,20 @@ impl FrontendObserverNode { fn handle_fragment_serving_mapping_notification( &mut self, - mappings: Vec, + mappings: Vec, op: Operation, ) { match op { Operation::Add | Operation::Update => { self.worker_node_manager - .upsert_serving_fragment_mapping(convert_pu_mapping(&mappings)); + .upsert_serving_fragment_mapping(convert_worker_mapping(&mappings)); } Operation::Delete => self.worker_node_manager.remove_serving_fragment_mapping( &mappings.into_iter().map(|m| m.fragment_id).collect_vec(), ), Operation::Snapshot => { self.worker_node_manager - .set_serving_fragment_mapping(convert_pu_mapping(&mappings)); + .set_serving_fragment_mapping(convert_worker_mapping(&mappings)); } _ => panic!("receive an unsupported notify {:?}", op), } @@ -470,17 +469,17 @@ impl FrontendObserverNode { } } -fn convert_pu_mapping( - parallel_unit_mappings: &[FragmentParallelUnitMapping], -) -> HashMap { - parallel_unit_mappings +fn convert_worker_mapping( + worker_mappings: &[FragmentWorkerMapping], +) -> HashMap { + worker_mappings .iter() .map( - |FragmentParallelUnitMapping { + |FragmentWorkerMapping { fragment_id, mapping, }| { - let mapping = ParallelUnitMapping::from_protobuf(mapping.as_ref().unwrap()); + let mapping = WorkerMapping::from_protobuf(mapping.as_ref().unwrap()); (*fragment_id, mapping) }, ) diff --git a/src/frontend/src/optimizer/property/distribution.rs b/src/frontend/src/optimizer/property/distribution.rs index 358ad934a9ccf..4c8eba53030f2 100644 --- a/src/frontend/src/optimizer/property/distribution.rs +++ b/src/frontend/src/optimizer/property/distribution.rs @@ -51,7 +51,7 @@ use generic::PhysicalPlanRef; use itertools::Itertools; use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector; use risingwave_common::catalog::{FieldDisplay, Schema, TableId}; -use risingwave_common::hash::ParallelUnitId; +use risingwave_common::hash::WorkerId; use risingwave_pb::batch_plan::exchange_info::{ ConsistentHashInfo, Distribution as DistributionPb, DistributionMode, HashInfo, }; @@ -148,14 +148,17 @@ impl Distribution { let vnode_mapping = worker_node_manager .fragment_mapping(Self::get_fragment_id(catalog_reader, table_id)?)?; - let pu2id_map: HashMap = vnode_mapping + let worker_to_id_map: HashMap = vnode_mapping .iter_unique() .enumerate() - .map(|(i, pu)| (pu, i as u32)) + .map(|(i, worker_id)| (worker_id, i as u32)) .collect(); Some(DistributionPb::ConsistentHashInfo(ConsistentHashInfo { - vmap: vnode_mapping.iter().map(|x| pu2id_map[&x]).collect_vec(), + vmap: vnode_mapping + .iter() + .map(|x| worker_to_id_map[&x]) + .collect_vec(), key: key.iter().map(|num| *num as u32).collect(), })) } diff --git a/src/frontend/src/scheduler/distributed/query.rs b/src/frontend/src/scheduler/distributed/query.rs index b556bc3af6c85..f5c9f9280f9d3 100644 --- a/src/frontend/src/scheduler/distributed/query.rs +++ b/src/frontend/src/scheduler/distributed/query.rs @@ -476,7 +476,7 @@ pub(crate) mod tests { ColumnCatalog, ColumnDesc, ConflictBehavior, CreateType, StreamJobStatus, DEFAULT_SUPER_USER_ID, }; - use risingwave_common::hash::ParallelUnitMapping; + use risingwave_common::hash::WorkerMapping; use risingwave_common::types::DataType; use risingwave_pb::common::worker_node::Property; use risingwave_pb::common::{HostAddress, ParallelUnit, WorkerNode, WorkerType}; @@ -721,10 +721,9 @@ pub(crate) mod tests { let workers = vec![worker1, worker2, worker3]; let worker_node_manager = Arc::new(WorkerNodeManager::mock(workers)); let worker_node_selector = WorkerNodeSelector::new(worker_node_manager.clone(), false); - worker_node_manager - .insert_streaming_fragment_mapping(0, ParallelUnitMapping::new_single(0)); + worker_node_manager.insert_streaming_fragment_mapping(0, WorkerMapping::new_single(0)); worker_node_manager.set_serving_fragment_mapping( - vec![(0, ParallelUnitMapping::new_single(0))] + vec![(0, WorkerMapping::new_single(0))] .into_iter() .collect(), ); diff --git a/src/frontend/src/scheduler/distributed/stage.rs b/src/frontend/src/scheduler/distributed/stage.rs index 59294169220e7..0b597f91a3cfb 100644 --- a/src/frontend/src/scheduler/distributed/stage.rs +++ b/src/frontend/src/scheduler/distributed/stage.rs @@ -32,7 +32,7 @@ use risingwave_batch::executor::ExecutorBuilder; use risingwave_batch::task::{ShutdownMsg, ShutdownSender, ShutdownToken, TaskId as TaskIdBatch}; use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector; use risingwave_common::array::DataChunk; -use risingwave_common::hash::ParallelUnitMapping; +use risingwave_common::hash::WorkerMapping; use risingwave_common::util::addr::HostAddr; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_connector::source::SplitMetaData; @@ -353,12 +353,13 @@ impl StageRunner { // We let each task read one partition by setting the `vnode_ranges` of the scan node in // the task. // We schedule the task to the worker node that owns the data partition. - let parallel_unit_ids = vnode_bitmaps.keys().cloned().collect_vec(); + let worker_ids = vnode_bitmaps.keys().cloned().collect_vec(); let workers = self .worker_node_manager .manager - .get_workers_by_parallel_unit_ids(¶llel_unit_ids)?; - for (i, (parallel_unit_id, worker)) in parallel_unit_ids + .get_workers_by_worker_ids(&worker_ids)?; + + for (i, (worker_id, worker)) in worker_ids .into_iter() .zip_eq_fast(workers.into_iter()) .enumerate() @@ -368,7 +369,7 @@ impl StageRunner { stage_id: self.stage.id, task_id: i as u32, }; - let vnode_ranges = vnode_bitmaps[¶llel_unit_id].clone(); + let vnode_ranges = vnode_bitmaps[&worker_id].clone(); let plan_fragment = self.create_plan_fragment(i as u32, Some(PartitionInfo::Table(vnode_ranges))); futures.push(self.schedule_task( @@ -679,10 +680,7 @@ impl StageRunner { } #[inline(always)] - fn get_table_dml_vnode_mapping( - &self, - table_id: &TableId, - ) -> SchedulerResult { + fn get_table_dml_vnode_mapping(&self, table_id: &TableId) -> SchedulerResult { let guard = self.catalog_reader.read_guard(); let table = guard @@ -711,11 +709,11 @@ impl StageRunner { if let Some(table_id) = dml_table_id { let vnode_mapping = self.get_table_dml_vnode_mapping(&table_id)?; - let parallel_unit_ids = vnode_mapping.iter_unique().collect_vec(); + let worker_ids = vnode_mapping.iter_unique().collect_vec(); let candidates = self .worker_node_manager .manager - .get_workers_by_parallel_unit_ids(¶llel_unit_ids)?; + .get_workers_by_worker_ids(&worker_ids)?; if candidates.is_empty() { return Err(BatchError::EmptyWorkerNodes.into()); } @@ -741,17 +739,17 @@ impl StageRunner { .table_id .into(), )?; - let id2pu_vec = self + let id_to_workers = self .worker_node_manager .fragment_mapping(fragment_id)? .iter_unique() .collect_vec(); - let pu = id2pu_vec[task_id as usize]; + let worker_id = id_to_workers[task_id as usize]; let candidates = self .worker_node_manager .manager - .get_workers_by_parallel_unit_ids(&[pu])?; + .get_workers_by_worker_ids(&[worker_id])?; if candidates.is_empty() { return Err(BatchError::EmptyWorkerNodes.into()); } diff --git a/src/frontend/src/scheduler/local.rs b/src/frontend/src/scheduler/local.rs index 0ff7540a79d72..ec8b3a0d21184 100644 --- a/src/frontend/src/scheduler/local.rs +++ b/src/frontend/src/scheduler/local.rs @@ -30,7 +30,7 @@ use risingwave_batch::task::{ShutdownToken, TaskId}; use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector; use risingwave_common::array::DataChunk; use risingwave_common::bail; -use risingwave_common::hash::ParallelUnitMapping; +use risingwave_common::hash::WorkerMapping; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_common::util::tracing::{InstrumentStream, TracingContext}; use risingwave_connector::source::SplitMetaData; @@ -313,12 +313,12 @@ impl LocalQueryExecution { // Similar to the distributed case (StageRunner::schedule_tasks). // Set `vnode_ranges` of the scan node in `local_execute_plan` of each // `exchange_source`. - let (parallel_unit_ids, vnode_bitmaps): (Vec<_>, Vec<_>) = + let (worker_ids, vnode_bitmaps): (Vec<_>, Vec<_>) = vnode_bitmaps.clone().into_iter().unzip(); let workers = self .worker_node_manager .manager - .get_workers_by_parallel_unit_ids(¶llel_unit_ids)?; + .get_workers_by_worker_ids(&worker_ids)?; for (idx, (worker_node, partition)) in (workers.into_iter().zip_eq_fast(vnode_bitmaps.into_iter())).enumerate() { @@ -583,10 +583,7 @@ impl LocalQueryExecution { } #[inline(always)] - fn get_table_dml_vnode_mapping( - &self, - table_id: &TableId, - ) -> SchedulerResult { + fn get_table_dml_vnode_mapping(&self, table_id: &TableId) -> SchedulerResult { let guard = self.front_env.catalog_reader().read_guard(); let table = guard @@ -610,11 +607,11 @@ impl LocalQueryExecution { // dml should use streaming vnode mapping let vnode_mapping = self.get_table_dml_vnode_mapping(table_id)?; let worker_node = { - let parallel_unit_ids = vnode_mapping.iter_unique().collect_vec(); + let worker_ids = vnode_mapping.iter_unique().collect_vec(); let candidates = self .worker_node_manager .manager - .get_workers_by_parallel_unit_ids(¶llel_unit_ids)?; + .get_workers_by_worker_ids(&worker_ids)?; if candidates.is_empty() { return Err(BatchError::EmptyWorkerNodes.into()); } diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index 06237c5756fba..15e19d14b6bd4 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -30,7 +30,7 @@ use risingwave_common::bail; use risingwave_common::buffer::{Bitmap, BitmapBuilder}; use risingwave_common::catalog::TableDesc; use risingwave_common::hash::table_distribution::TableDistribution; -use risingwave_common::hash::{ParallelUnitId, ParallelUnitMapping, VirtualNode}; +use risingwave_common::hash::{VirtualNode, WorkerId, WorkerMapping}; use risingwave_common::util::scan_range::ScanRange; use risingwave_connector::source::filesystem::opendal_source::opendal_enumerator::OpendalEnumerator; use risingwave_connector::source::filesystem::opendal_source::{OpendalGcs, OpendalS3}; @@ -392,12 +392,12 @@ pub struct TableScanInfo { /// full vnode bitmap, since we need to know where to schedule the singleton scan task. /// /// `None` iff the table is a system table. - partitions: Option>, + partitions: Option>, } impl TableScanInfo { /// For normal tables, `partitions` should always be `Some`. - pub fn new(name: String, partitions: HashMap) -> Self { + pub fn new(name: String, partitions: HashMap) -> Self { Self { name, partitions: Some(partitions), @@ -887,10 +887,19 @@ impl BatchPlanFragmenter { } _ => { if let Some(table_scan_info) = &table_scan_info { + let parallelism_map = self.worker_node_manager.schedule_unit_count_map(); table_scan_info .partitions .as_ref() - .map(|m| m.len()) + .map(|partitions| { + partitions + .keys() + .map(|worker_id| { + parallelism_map.get(worker_id).cloned().unwrap_or(0) + }) + .sum::() + .max(1) + }) .unwrap_or(1) } else if let Some(lookup_join_parallelism) = self.collect_stage_lookup_join_parallelism(root.clone())? @@ -904,6 +913,8 @@ impl BatchPlanFragmenter { } } }; + + println!("parallelism {}", parallelism); if source_info.is_none() && parallelism == 0 { return Err(BatchError::EmptyWorkerNodes.into()); } @@ -1158,10 +1169,10 @@ impl BatchPlanFragmenter { fn derive_partitions( scan_ranges: &[ScanRange], table_desc: &TableDesc, - vnode_mapping: &ParallelUnitMapping, -) -> SchedulerResult> { + vnode_mapping: &WorkerMapping, +) -> SchedulerResult> { let num_vnodes = vnode_mapping.len(); - let mut partitions: HashMap)> = HashMap::new(); + let mut partitions: HashMap)> = HashMap::new(); if scan_ranges.is_empty() { return Ok(vnode_mapping @@ -1189,24 +1200,25 @@ fn derive_partitions( match vnode { None => { // put this scan_range to all partitions - vnode_mapping.to_bitmaps().into_iter().for_each( - |(parallel_unit_id, vnode_bitmap)| { + vnode_mapping + .to_bitmaps() + .into_iter() + .for_each(|(worker_id, vnode_bitmap)| { let (bitmap, scan_ranges) = partitions - .entry(parallel_unit_id) + .entry(worker_id) .or_insert_with(|| (BitmapBuilder::zeroed(num_vnodes), vec![])); vnode_bitmap .iter() .enumerate() .for_each(|(vnode, b)| bitmap.set(vnode, b)); scan_ranges.push(scan_range.to_protobuf()); - }, - ); + }); } // scan a single partition Some(vnode) => { - let parallel_unit_id = vnode_mapping[vnode]; + let worker_id = vnode_mapping[vnode]; let (bitmap, scan_ranges) = partitions - .entry(parallel_unit_id) + .entry(worker_id) .or_insert_with(|| (BitmapBuilder::zeroed(num_vnodes), vec![])); bitmap.set(vnode.to_index(), true); scan_ranges.push(scan_range.to_protobuf()); diff --git a/src/meta/service/src/notification_service.rs b/src/meta/service/src/notification_service.rs index e9a5e4a017ad4..e7495535f9100 100644 --- a/src/meta/service/src/notification_service.rs +++ b/src/meta/service/src/notification_service.rs @@ -24,8 +24,7 @@ use risingwave_pb::hummock::WriteLimits; use risingwave_pb::meta::meta_snapshot::SnapshotVersion; use risingwave_pb::meta::notification_service_server::NotificationService; use risingwave_pb::meta::{ - FragmentParallelUnitMapping, GetSessionParamsResponse, MetaSnapshot, SubscribeRequest, - SubscribeType, + FragmentWorkerMapping, GetSessionParamsResponse, MetaSnapshot, SubscribeRequest, SubscribeType, }; use risingwave_pb::user::UserInfo; use tokio::sync::mpsc; @@ -138,9 +137,9 @@ impl NotificationServiceImpl { } } - async fn get_parallel_unit_mapping_snapshot( + async fn get_worker_mapping_snapshot( &self, - ) -> MetaResult<(Vec, NotificationVersion)> { + ) -> MetaResult<(Vec, NotificationVersion)> { match &self.metadata_manager { MetadataManager::V1(mgr) => { let fragment_guard = mgr.fragment_manager.get_fragment_read_guard().await; @@ -161,11 +160,11 @@ impl NotificationServiceImpl { } } - fn get_serving_vnode_mappings(&self) -> Vec { + fn get_serving_vnode_mappings(&self) -> Vec { self.serving_vnode_mapping .all() .iter() - .map(|(fragment_id, mapping)| FragmentParallelUnitMapping { + .map(|(fragment_id, mapping)| FragmentWorkerMapping { fragment_id: *fragment_id, mapping: Some(mapping.to_protobuf()), }) @@ -241,9 +240,11 @@ impl NotificationServiceImpl { users, catalog_version, ) = self.get_catalog_snapshot().await?; - let (parallel_unit_mappings, parallel_unit_mapping_version) = - self.get_parallel_unit_mapping_snapshot().await?; - let serving_parallel_unit_mappings = self.get_serving_vnode_mappings(); + + let (streaming_worker_mappings, streaming_worker_mapping_version) = + self.get_worker_mapping_snapshot().await?; + let serving_worker_mappings = self.get_serving_vnode_mappings(); + let (nodes, worker_node_version) = self.get_worker_node_snapshot().await?; let hummock_snapshot = Some(self.hummock_manager.latest_snapshot()); @@ -270,15 +271,15 @@ impl NotificationServiceImpl { functions, connections, users, - parallel_unit_mappings, nodes, hummock_snapshot, - serving_parallel_unit_mappings, version: Some(SnapshotVersion { catalog_version, - parallel_unit_mapping_version, worker_node_version, + streaming_worker_mapping_version, }), + serving_worker_mappings, + streaming_worker_mappings, session_params, ..Default::default() }) diff --git a/src/meta/service/src/serving_service.rs b/src/meta/service/src/serving_service.rs index d1b013e078e0f..bf2f10ba56207 100644 --- a/src/meta/service/src/serving_service.rs +++ b/src/meta/service/src/serving_service.rs @@ -16,7 +16,7 @@ use itertools::Itertools; use risingwave_meta::manager::MetadataManager; use risingwave_pb::meta::serving_service_server::ServingService; use risingwave_pb::meta::{ - FragmentParallelUnitMapping, GetServingVnodeMappingsRequest, GetServingVnodeMappingsResponse, + FragmentWorkerMapping, GetServingVnodeMappingsRequest, GetServingVnodeMappingsResponse, }; use tonic::{Request, Response, Status}; @@ -49,7 +49,7 @@ impl ServingService for ServingServiceImpl { .serving_vnode_mapping .all() .into_iter() - .map(|(fragment_id, mapping)| FragmentParallelUnitMapping { + .map(|(fragment_id, mapping)| FragmentWorkerMapping { fragment_id, mapping: Some(mapping.to_protobuf()), }) @@ -78,8 +78,8 @@ impl ServingService for ServingServiceImpl { } }; Ok(Response::new(GetServingVnodeMappingsResponse { - mappings, fragment_to_table, + worker_mappings: mappings, })) } } diff --git a/src/meta/src/controller/catalog.rs b/src/meta/src/controller/catalog.rs index f8ec470a74918..7e3cd8a1fe6ce 100644 --- a/src/meta/src/controller/catalog.rs +++ b/src/meta/src/controller/catalog.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use anyhow::anyhow; use itertools::Itertools; use risingwave_common::catalog::{TableOption, DEFAULT_SCHEMA_NAME, SYSTEM_SCHEMAS}; +use risingwave_common::hash::ParallelUnitMapping; use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont; use risingwave_common::{bail, current_cluster_version}; use risingwave_connector::source::UPSTREAM_SOURCE_KEY; @@ -45,7 +46,9 @@ use risingwave_pb::meta::relation::PbRelationInfo; use risingwave_pb::meta::subscribe_response::{ Info as NotificationInfo, Info, Operation as NotificationOperation, Operation, }; -use risingwave_pb::meta::{PbRelation, PbRelationGroup}; +use risingwave_pb::meta::{ + FragmentParallelUnitMapping, PbFragmentWorkerMapping, PbRelation, PbRelationGroup, +}; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::FragmentTypeFlag; use risingwave_pb::user::PbUserInfo; @@ -64,9 +67,9 @@ use crate::controller::utils::{ check_connection_name_duplicate, check_database_name_duplicate, check_function_signature_duplicate, check_relation_name_duplicate, check_schema_name_duplicate, ensure_object_id, ensure_object_not_refer, ensure_schema_empty, ensure_user_id, - get_fragment_mappings_by_jobs, get_referring_objects, get_referring_objects_cascade, - get_user_privilege, list_user_info_by_ids, resolve_source_register_info_for_jobs, - PartialObject, + get_fragment_mappings_by_jobs, get_parallel_unit_to_worker_map, get_referring_objects, + get_referring_objects_cascade, get_user_privilege, list_user_info_by_ids, + resolve_source_register_info_for_jobs, PartialObject, }; use crate::controller::ObjectModel; use crate::manager::{Catalog, MetaSrvEnv, NotificationVersion, IGNORED_NOTIFICATION_VERSION}; @@ -275,8 +278,30 @@ impl CatalogController { .into_tuple() .all(&txn) .await?; + + let parallel_unit_to_worker = get_parallel_unit_to_worker_map(&txn).await?; + let fragment_mappings = get_fragment_mappings_by_jobs(&txn, streaming_jobs.clone()).await?; + let fragment_mappings = fragment_mappings + .into_iter() + .map( + |FragmentParallelUnitMapping { + fragment_id, + mapping, + }| { + PbFragmentWorkerMapping { + fragment_id, + mapping: Some( + ParallelUnitMapping::from_protobuf(&mapping.unwrap()) + .to_worker(¶llel_unit_to_worker) + .to_protobuf(), + ), + } + }, + ) + .collect(); + // The schema and objects in the database will be delete cascade. let res = Object::delete_by_id(database_id).exec(&txn).await?; if res.rows_affected == 0 { @@ -296,6 +321,7 @@ impl CatalogController { }), ) .await; + self.notify_fragment_mapping(NotificationOperation::Delete, fragment_mappings) .await; Ok(( @@ -1973,6 +1999,7 @@ impl CatalogController { let (source_fragments, removed_actors) = resolve_source_register_info_for_jobs(&txn, to_drop_streaming_jobs.clone()).await?; + let fragment_mappings = get_fragment_mappings_by_jobs(&txn, to_drop_streaming_jobs.clone()).await?; @@ -1999,6 +2026,8 @@ impl CatalogController { } let user_infos = list_user_info_by_ids(to_update_user_ids, &txn).await?; + let parallel_unit_to_worker = get_parallel_unit_to_worker_map(&txn).await?; + txn.commit().await?; // notify about them. @@ -2073,6 +2102,26 @@ impl CatalogController { NotificationInfo::RelationGroup(PbRelationGroup { relations }), ) .await; + + let fragment_mappings = fragment_mappings + .into_iter() + .map( + |FragmentParallelUnitMapping { + fragment_id, + mapping, + }| { + PbFragmentWorkerMapping { + fragment_id, + mapping: Some( + ParallelUnitMapping::from_protobuf(&mapping.unwrap()) + .to_worker(¶llel_unit_to_worker) + .to_protobuf(), + ), + } + }, + ) + .collect(); + self.notify_fragment_mapping(NotificationOperation::Delete, fragment_mappings) .await; diff --git a/src/meta/src/controller/fragment.rs b/src/meta/src/controller/fragment.rs index 7bf5b02f971af..5f3dc49570d8b 100644 --- a/src/meta/src/controller/fragment.rs +++ b/src/meta/src/controller/fragment.rs @@ -18,6 +18,7 @@ use std::mem::swap; use anyhow::Context; use itertools::Itertools; use risingwave_common::bail; +use risingwave_common::hash::ParallelUnitMapping; use risingwave_common::util::stream_graph_visitor::visit_stream_node; use risingwave_meta_model_v2::actor::ActorStatus; use risingwave_meta_model_v2::prelude::{Actor, ActorDispatcher, Fragment, Sink, StreamingJob}; @@ -33,9 +34,7 @@ use risingwave_pb::meta::subscribe_response::{ use risingwave_pb::meta::table_fragments::actor_status::PbActorState; use risingwave_pb::meta::table_fragments::fragment::PbFragmentDistributionType; use risingwave_pb::meta::table_fragments::{PbActorStatus, PbFragment, PbState}; -use risingwave_pb::meta::{ - FragmentParallelUnitMapping, PbFragmentParallelUnitMapping, PbTableFragments, -}; +use risingwave_pb::meta::{FragmentWorkerMapping, PbFragmentWorkerMapping, PbTableFragments}; use risingwave_pb::source::PbConnectorSplits; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{ @@ -50,7 +49,8 @@ use sea_orm::{ use crate::controller::catalog::{CatalogController, CatalogControllerInner}; use crate::controller::utils::{ - get_actor_dispatchers, FragmentDesc, PartialActorLocation, PartialFragmentStateTables, + get_actor_dispatchers, get_parallel_unit_to_worker_map, FragmentDesc, PartialActorLocation, + PartialFragmentStateTables, }; use crate::manager::{ActorInfos, LocalNotification}; use crate::model::TableParallelism; @@ -61,7 +61,9 @@ impl CatalogControllerInner { /// List all fragment vnode mapping info for all CREATED streaming jobs. pub async fn all_running_fragment_mappings( &self, - ) -> MetaResult + '_> { + ) -> MetaResult + '_> { + let txn = self.db.begin().await?; + let fragment_mappings: Vec<(FragmentId, FragmentVnodeMapping)> = Fragment::find() .join(JoinType::InnerJoin, fragment::Relation::Object.def()) .join(JoinType::InnerJoin, object::Relation::StreamingJob.def()) @@ -69,14 +71,23 @@ impl CatalogControllerInner { .columns([fragment::Column::FragmentId, fragment::Column::VnodeMapping]) .filter(streaming_job::Column::JobStatus.eq(JobStatus::Created)) .into_tuple() - .all(&self.db) + .all(&txn) .await?; - Ok(fragment_mappings.into_iter().map(|(fragment_id, mapping)| { - FragmentParallelUnitMapping { - fragment_id: fragment_id as _, - mapping: Some(mapping.to_protobuf()), - } - })) + + let parallel_unit_to_worker = get_parallel_unit_to_worker_map(&txn).await?; + + Ok(fragment_mappings + .into_iter() + .map(move |(fragment_id, mapping)| { + let worker_mapping = ParallelUnitMapping::from_protobuf(&mapping.to_protobuf()) + .to_worker(¶llel_unit_to_worker) + .to_protobuf(); + + FragmentWorkerMapping { + fragment_id: fragment_id as _, + mapping: Some(worker_mapping), + } + })) } } @@ -84,7 +95,7 @@ impl CatalogController { pub(crate) async fn notify_fragment_mapping( &self, operation: NotificationOperation, - fragment_mappings: Vec, + fragment_mappings: Vec, ) { let fragment_ids = fragment_mappings .iter() @@ -96,7 +107,7 @@ impl CatalogController { .notification_manager() .notify_frontend( operation, - NotificationInfo::ParallelUnitMapping(fragment_mapping), + NotificationInfo::StreamingWorkerMapping(fragment_mapping), ) .await; } @@ -936,15 +947,21 @@ impl CatalogController { .await?; } + let parallel_unit_to_worker = get_parallel_unit_to_worker_map(&txn).await?; + txn.commit().await?; self.notify_fragment_mapping( NotificationOperation::Update, fragment_mapping .into_iter() - .map(|(fragment_id, mapping)| PbFragmentParallelUnitMapping { + .map(|(fragment_id, mapping)| PbFragmentWorkerMapping { fragment_id: fragment_id as _, - mapping: Some(mapping.to_protobuf()), + mapping: Some( + ParallelUnitMapping::from_protobuf(&mapping.to_protobuf()) + .to_worker(¶llel_unit_to_worker) + .to_protobuf(), + ), }) .collect(), ) diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index db9c1d03eab4b..0dcad7df5d9f1 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -43,8 +43,8 @@ use risingwave_pb::meta::subscribe_response::{ }; use risingwave_pb::meta::table_fragments::PbActorStatus; use risingwave_pb::meta::{ - FragmentParallelUnitMapping, PbFragmentParallelUnitMapping, PbRelation, PbRelationGroup, - PbTableFragments, Relation, + FragmentWorkerMapping, PbFragmentWorkerMapping, PbRelation, PbRelationGroup, PbTableFragments, + Relation, }; use risingwave_pb::source::{PbConnectorSplit, PbConnectorSplits}; use risingwave_pb::stream_plan::stream_fragment_graph::Parallelism; @@ -66,7 +66,7 @@ use crate::controller::catalog::CatalogController; use crate::controller::rename::ReplaceTableExprRewriter; use crate::controller::utils::{ check_relation_name_duplicate, check_sink_into_table_cycle, ensure_object_id, ensure_user_id, - get_fragment_actor_ids, get_fragment_mappings, + get_fragment_actor_ids, get_fragment_mappings, get_parallel_unit_to_worker_map, }; use crate::controller::ObjectModel; use crate::manager::{NotificationVersion, SinkId, StreamingJob}; @@ -803,7 +803,7 @@ impl CatalogController { dropping_sink_id: Option, txn: &DatabaseTransaction, streaming_job: StreamingJob, - ) -> MetaResult<(Vec, Vec)> { + ) -> MetaResult<(Vec, Vec)> { // Question: The source catalog should be remain unchanged? let StreamingJob::Table(_, table, ..) = streaming_job else { unreachable!("unexpected job: {streaming_job:?}") @@ -1007,8 +1007,7 @@ impl CatalogController { } } - let fragment_mapping: Vec = - get_fragment_mappings(txn, job_id as _).await?; + let fragment_mapping: Vec<_> = get_fragment_mappings(txn, job_id as _).await?; Ok((relations, fragment_mapping)) } @@ -1224,6 +1223,8 @@ impl CatalogController { let txn = inner.db.begin().await?; + let parallel_unit_to_worker = get_parallel_unit_to_worker_map(&txn).await?; + let mut fragment_mapping_to_notify = vec![]; // for assert only @@ -1399,9 +1400,13 @@ impl CatalogController { fragment.vnode_mapping = Set((&vnode_mapping).into()); fragment.update(&txn).await?; - fragment_mapping_to_notify.push(FragmentParallelUnitMapping { + let worker_mapping = ParallelUnitMapping::from_protobuf(&vnode_mapping) + .to_worker(¶llel_unit_to_worker) + .to_protobuf(); + + fragment_mapping_to_notify.push(FragmentWorkerMapping { fragment_id: fragment_id as u32, - mapping: Some(vnode_mapping), + mapping: Some(worker_mapping), }); // for downstream and upstream diff --git a/src/meta/src/controller/utils.rs b/src/meta/src/controller/utils.rs index 7d8f4769e8264..702a485ed3187 100644 --- a/src/meta/src/controller/utils.rs +++ b/src/meta/src/controller/utils.rs @@ -16,6 +16,7 @@ use std::collections::{BTreeSet, HashMap, HashSet}; use anyhow::anyhow; use itertools::Itertools; +use risingwave_common::hash::ParallelUnitMapping; use risingwave_meta_model_migration::WithQuery; use risingwave_meta_model_v2::actor::ActorStatus; use risingwave_meta_model_v2::fragment::DistributionType; @@ -24,11 +25,11 @@ use risingwave_meta_model_v2::prelude::*; use risingwave_meta_model_v2::{ actor, actor_dispatcher, connection, database, fragment, function, index, object, object_dependency, schema, sink, source, subscription, table, user, user_privilege, view, - ActorId, DataTypeArray, DatabaseId, FragmentId, FragmentVnodeMapping, I32Array, ObjectId, - PrivilegeId, SchemaId, SourceId, StreamNode, UserId, + worker_property, ActorId, DataTypeArray, DatabaseId, FragmentId, FragmentVnodeMapping, + I32Array, ObjectId, PrivilegeId, SchemaId, SourceId, StreamNode, UserId, WorkerId, }; use risingwave_pb::catalog::{PbConnection, PbFunction, PbSubscription}; -use risingwave_pb::meta::PbFragmentParallelUnitMapping; +use risingwave_pb::meta::{PbFragmentParallelUnitMapping, PbFragmentWorkerMapping}; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{PbFragmentTypeFlag, PbStreamNode, StreamSource}; use risingwave_pb::user::grant_privilege::{PbAction, PbActionWithGrantOption, PbObject}; @@ -815,10 +816,12 @@ where pub async fn get_fragment_mappings( db: &C, job_id: ObjectId, -) -> MetaResult> +) -> MetaResult> where C: ConnectionTrait, { + let parallel_unit_to_worker = get_parallel_unit_to_worker_map(db).await?; + let fragment_mappings: Vec<(FragmentId, FragmentVnodeMapping)> = Fragment::find() .select_only() .columns([fragment::Column::FragmentId, fragment::Column::VnodeMapping]) @@ -829,9 +832,13 @@ where Ok(fragment_mappings .into_iter() - .map(|(fragment_id, mapping)| PbFragmentParallelUnitMapping { + .map(|(fragment_id, mapping)| PbFragmentWorkerMapping { fragment_id: fragment_id as _, - mapping: Some(mapping.to_protobuf()), + mapping: Some( + ParallelUnitMapping::from_protobuf(&mapping.to_protobuf()) + .to_worker(¶llel_unit_to_worker) + .to_protobuf(), + ), }) .collect()) } @@ -948,3 +955,30 @@ where Ok((source_fragment_ids, actors.into_iter().collect())) } + +pub(crate) async fn get_parallel_unit_to_worker_map(db: &C) -> MetaResult> +where + C: ConnectionTrait, +{ + let worker_parallel_units = WorkerProperty::find() + .select_only() + .columns([ + worker_property::Column::WorkerId, + worker_property::Column::ParallelUnitIds, + ]) + .into_tuple::<(WorkerId, I32Array)>() + .all(db) + .await?; + + let parallel_unit_to_worker = worker_parallel_units + .into_iter() + .flat_map(|(worker_id, parallel_unit_ids)| { + parallel_unit_ids + .into_inner() + .into_iter() + .map(move |parallel_unit_id| (parallel_unit_id as u32, worker_id as u32)) + }) + .collect::>(); + + Ok(parallel_unit_to_worker) +} diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index c52598875d4f2..d12680de968cd 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -24,10 +24,11 @@ use risingwave_common::hash::{ActorMapping, ParallelUnitId, ParallelUnitMapping} use risingwave_common::util::stream_graph_visitor::{visit_stream_node, visit_stream_node_cont}; use risingwave_connector::source::SplitImpl; use risingwave_meta_model_v2::SourceId; +use risingwave_pb::common::{PbParallelUnitMapping, PbWorkerMapping}; use risingwave_pb::meta::subscribe_response::{Info, Operation}; use risingwave_pb::meta::table_fragments::actor_status::ActorState; use risingwave_pb::meta::table_fragments::{ActorStatus, Fragment, State}; -use risingwave_pb::meta::FragmentParallelUnitMapping; +use risingwave_pb::meta::FragmentWorkerMapping; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::update_mutation::MergeUpdate; use risingwave_pb::stream_plan::{ @@ -56,18 +57,21 @@ impl FragmentManagerCore { /// List all fragment vnode mapping info that not in `State::Initial`. pub fn all_running_fragment_mappings( &self, - ) -> impl Iterator + '_ { + ) -> impl Iterator + '_ { self.table_fragments .values() .filter(|tf| tf.state() != State::Initial) .flat_map(|table_fragments| { - table_fragments.fragments.values().map(|fragment| { - let parallel_unit_mapping = fragment.vnode_mapping.clone().unwrap(); - FragmentParallelUnitMapping { + table_fragments + .fragments + .values() + .map(move |fragment| FragmentWorkerMapping { fragment_id: fragment.fragment_id, - mapping: Some(parallel_unit_mapping), - } - }) + mapping: Some(FragmentManager::convert_mapping( + &table_fragments.actor_status, + fragment.vnode_mapping.as_ref().unwrap(), + )), + }) }) } @@ -192,18 +196,20 @@ impl FragmentManager { async fn notify_fragment_mapping(&self, table_fragment: &TableFragments, operation: Operation) { // Notify all fragment mapping to frontend nodes for fragment in table_fragment.fragments.values() { - let mapping = fragment - .vnode_mapping - .clone() - .expect("no data distribution found"); - let fragment_mapping = FragmentParallelUnitMapping { + let fragment_mapping = FragmentWorkerMapping { fragment_id: fragment.fragment_id, - mapping: Some(mapping), + mapping: Some(Self::convert_mapping( + &table_fragment.actor_status, + fragment + .vnode_mapping + .as_ref() + .expect("no data distribution found"), + )), }; self.env .notification_manager() - .notify_frontend(operation, Info::ParallelUnitMapping(fragment_mapping)) + .notify_frontend(operation, Info::StreamingWorkerMapping(fragment_mapping)) .await; } @@ -1262,11 +1268,14 @@ impl FragmentManager { *fragment.vnode_mapping.as_mut().unwrap() = vnode_mapping.clone(); + let worker_mapping = Self::convert_mapping(&actor_status, &vnode_mapping); + // Notify fragment mapping to frontend nodes. - let fragment_mapping = FragmentParallelUnitMapping { + let fragment_mapping = FragmentWorkerMapping { fragment_id: *fragment_id as FragmentId, - mapping: Some(vnode_mapping), + mapping: Some(worker_mapping), }; + fragment_mapping_to_notify.push(fragment_mapping); } @@ -1386,13 +1395,30 @@ impl FragmentManager { for mapping in fragment_mapping_to_notify { self.env .notification_manager() - .notify_frontend(Operation::Update, Info::ParallelUnitMapping(mapping)) + .notify_frontend(Operation::Update, Info::StreamingWorkerMapping(mapping)) .await; } Ok(()) } + fn convert_mapping( + actor_status: &BTreeMap, + vnode_mapping: &PbParallelUnitMapping, + ) -> PbWorkerMapping { + let parallel_unit_to_worker = actor_status + .values() + .map(|actor_status| { + let parallel_unit = actor_status.get_parallel_unit().unwrap(); + (parallel_unit.id, parallel_unit.worker_node_id) + }) + .collect(); + + ParallelUnitMapping::from_protobuf(vnode_mapping) + .to_worker(¶llel_unit_to_worker) + .to_protobuf() + } + pub async fn table_node_actors( &self, table_ids: &HashSet, diff --git a/src/meta/src/serving/mod.rs b/src/meta/src/serving/mod.rs index 36e7b77ccf63a..67926d9b6c4b7 100644 --- a/src/meta/src/serving/mod.rs +++ b/src/meta/src/serving/mod.rs @@ -16,11 +16,11 @@ use std::collections::HashMap; use std::sync::Arc; use parking_lot::RwLock; -use risingwave_common::hash::ParallelUnitMapping; +use risingwave_common::hash::WorkerMapping; use risingwave_common::vnode_mapping::vnode_placement::place_vnode; use risingwave_pb::common::{WorkerNode, WorkerType}; use risingwave_pb::meta::subscribe_response::{Info, Operation}; -use risingwave_pb::meta::{FragmentParallelUnitMapping, FragmentParallelUnitMappings}; +use risingwave_pb::meta::{FragmentWorkerMapping, FragmentWorkerMappings}; use tokio::sync::oneshot::Sender; use tokio::task::JoinHandle; @@ -31,11 +31,11 @@ pub type ServingVnodeMappingRef = Arc; #[derive(Default)] pub struct ServingVnodeMapping { - serving_vnode_mappings: RwLock>, + serving_vnode_mappings: RwLock>, } impl ServingVnodeMapping { - pub fn all(&self) -> HashMap { + pub fn all(&self) -> HashMap { self.serving_vnode_mappings.read().clone() } @@ -45,9 +45,9 @@ impl ServingVnodeMapping { &self, streaming_parallelisms: HashMap, workers: &[WorkerNode], - ) -> (HashMap, Vec) { + ) -> (HashMap, Vec) { let mut serving_vnode_mappings = self.serving_vnode_mappings.write(); - let mut upserted: HashMap = HashMap::default(); + let mut upserted: HashMap = HashMap::default(); let mut failed: Vec = vec![]; for (fragment_id, streaming_parallelism) in streaming_parallelisms { let new_mapping = { @@ -81,24 +81,24 @@ impl ServingVnodeMapping { } } -pub(crate) fn to_fragment_parallel_unit_mapping( - mappings: &HashMap, -) -> Vec { +pub(crate) fn to_fragment_worker_mapping( + mappings: &HashMap, +) -> Vec { mappings .iter() - .map(|(fragment_id, mapping)| FragmentParallelUnitMapping { + .map(|(fragment_id, mapping)| FragmentWorkerMapping { fragment_id: *fragment_id, mapping: Some(mapping.to_protobuf()), }) .collect() } -pub(crate) fn to_deleted_fragment_parallel_unit_mapping( +pub(crate) fn to_deleted_fragment_worker_mapping( fragment_ids: &[FragmentId], -) -> Vec { +) -> Vec { fragment_ids .iter() - .map(|fragment_id| FragmentParallelUnitMapping { + .map(|fragment_id| FragmentWorkerMapping { fragment_id: *fragment_id, mapping: None, }) @@ -120,8 +120,8 @@ pub async fn on_meta_start( ); notification_manager.notify_frontend_without_version( Operation::Snapshot, - Info::ServingParallelUnitMappings(FragmentParallelUnitMappings { - mappings: to_fragment_parallel_unit_mapping(&mappings), + Info::ServingWorkerMappings(FragmentWorkerMappings { + mappings: to_fragment_worker_mapping(&mappings), }), ); } @@ -185,7 +185,7 @@ pub async fn start_serving_vnode_mapping_worker( let (workers, streaming_parallelisms) = fetch_serving_infos(&metadata_manager).await; let (mappings, _) = serving_vnode_mapping.upsert(streaming_parallelisms, &workers); tracing::debug!("Update serving vnode mapping snapshot for fragments {:?}.", mappings.keys()); - notification_manager.notify_frontend_without_version(Operation::Snapshot, Info::ServingParallelUnitMappings(FragmentParallelUnitMappings{ mappings: to_fragment_parallel_unit_mapping(&mappings) })); + notification_manager.notify_frontend_without_version(Operation::Snapshot, Info::ServingWorkerMappings(FragmentWorkerMappings{ mappings: to_fragment_worker_mapping(&mappings) })); } LocalNotification::FragmentMappingsUpsert(fragment_ids) => { if fragment_ids.is_empty() { @@ -195,11 +195,11 @@ pub async fn start_serving_vnode_mapping_worker( let (upserted, failed) = serving_vnode_mapping.upsert(streaming_parallelisms, &workers); if !upserted.is_empty() { tracing::debug!("Update serving vnode mapping for fragments {:?}.", upserted.keys()); - notification_manager.notify_frontend_without_version(Operation::Update, Info::ServingParallelUnitMappings(FragmentParallelUnitMappings{ mappings: to_fragment_parallel_unit_mapping(&upserted) })); + notification_manager.notify_frontend_without_version(Operation::Update, Info::ServingWorkerMappings(FragmentWorkerMappings{ mappings: to_fragment_worker_mapping(&upserted) })); } if !failed.is_empty() { tracing::debug!("Fail to update serving vnode mapping for fragments {:?}.", failed); - notification_manager.notify_frontend_without_version(Operation::Delete, Info::ServingParallelUnitMappings(FragmentParallelUnitMappings{ mappings: to_deleted_fragment_parallel_unit_mapping(&failed)})); + notification_manager.notify_frontend_without_version(Operation::Delete, Info::ServingWorkerMappings(FragmentWorkerMappings{ mappings: to_deleted_fragment_worker_mapping(&failed)})); } } LocalNotification::FragmentMappingsDelete(fragment_ids) => { @@ -208,7 +208,7 @@ pub async fn start_serving_vnode_mapping_worker( } tracing::debug!("Delete serving vnode mapping for fragments {:?}.", fragment_ids); serving_vnode_mapping.remove(&fragment_ids); - notification_manager.notify_frontend_without_version(Operation::Delete, Info::ServingParallelUnitMappings(FragmentParallelUnitMappings{ mappings: to_deleted_fragment_parallel_unit_mapping(&fragment_ids) })); + notification_manager.notify_frontend_without_version(Operation::Delete, Info::ServingWorkerMappings(FragmentWorkerMappings{ mappings: to_deleted_fragment_worker_mapping(&fragment_ids) })); } _ => {} } diff --git a/src/meta/src/stream/scale.rs b/src/meta/src/stream/scale.rs index 5fc455f48c643..6ea8d6046ece8 100644 --- a/src/meta/src/stream/scale.rs +++ b/src/meta/src/stream/scale.rs @@ -41,7 +41,7 @@ use risingwave_pb::meta::table_fragments::fragment::{ FragmentDistributionType, PbFragmentDistributionType, }; use risingwave_pb::meta::table_fragments::{self, ActorStatus, PbFragment, State}; -use risingwave_pb::meta::FragmentParallelUnitMappings; +use risingwave_pb::meta::FragmentWorkerMappings; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{ Dispatcher, DispatcherType, FragmentTypeFlag, PbStreamActor, StreamNode, @@ -61,8 +61,7 @@ use crate::manager::{ }; use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments, TableParallelism}; use crate::serving::{ - to_deleted_fragment_parallel_unit_mapping, to_fragment_parallel_unit_mapping, - ServingVnodeMapping, + to_deleted_fragment_worker_mapping, to_fragment_worker_mapping, ServingVnodeMapping, }; use crate::storage::{MetaStore, MetaStoreError, MetaStoreRef, Transaction, DEFAULT_COLUMN_FAMILY}; use crate::stream::{GlobalStreamManager, SourceManagerRef}; @@ -1725,8 +1724,8 @@ impl ScaleController { .notification_manager() .notify_frontend_without_version( Operation::Update, - Info::ServingParallelUnitMappings(FragmentParallelUnitMappings { - mappings: to_fragment_parallel_unit_mapping(&upserted), + Info::ServingWorkerMappings(FragmentWorkerMappings { + mappings: to_fragment_worker_mapping(&upserted), }), ); } @@ -1739,8 +1738,8 @@ impl ScaleController { .notification_manager() .notify_frontend_without_version( Operation::Delete, - Info::ServingParallelUnitMappings(FragmentParallelUnitMappings { - mappings: to_deleted_fragment_parallel_unit_mapping(&failed), + Info::ServingWorkerMappings(FragmentWorkerMappings { + mappings: to_deleted_fragment_worker_mapping(&failed), }), ); } diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index 7fd32f3b8bab0..50b6c0444041b 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -25,7 +25,7 @@ use futures::stream::BoxStream; use lru::LruCache; use risingwave_common::catalog::{CatalogVersion, FunctionId, IndexId, TableId}; use risingwave_common::config::{MetaConfig, MAX_CONNECTION_WINDOW_SIZE}; -use risingwave_common::hash::ParallelUnitMapping; +use risingwave_common::hash::WorkerMapping; use risingwave_common::system_param::reader::SystemParamsReader; use risingwave_common::telemetry::report::TelemetryInfoFetcher; use risingwave_common::util::addr::HostAddr; @@ -1169,13 +1169,11 @@ impl MetaClient { Ok(resp.tables) } - pub async fn list_serving_vnode_mappings( - &self, - ) -> Result> { + pub async fn list_serving_vnode_mappings(&self) -> Result> { let req = GetServingVnodeMappingsRequest {}; let resp = self.inner.get_serving_vnode_mappings(req).await?; let mappings = resp - .mappings + .worker_mappings .into_iter() .map(|p| { ( @@ -1185,7 +1183,7 @@ impl MetaClient { .get(&p.fragment_id) .cloned() .unwrap_or(0), - ParallelUnitMapping::from_protobuf(p.mapping.as_ref().unwrap()), + WorkerMapping::from_protobuf(p.mapping.as_ref().unwrap()), ), ) })