diff --git a/README.md b/README.md index 319c7f7025ea3..05f8ed8e3fb75 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@
- + ### 🌊Stream Processing Redefined.
@@ -67,8 +67,8 @@ tar xvf risingwave-v1.3.0-x86_64-unknown-linux.tar.gz ``` **Mac** ``` -brew tap risingwavelabs/risingwave -brew install risingwave +brew tap risingwavelabs/risingwave +brew install risingwave risingwave playground ``` Now connect to RisingWave using `psql`: diff --git a/src/frontend/src/handler/create_mv.rs b/src/frontend/src/handler/create_mv.rs index 3fa9129f39743..053ba5aa30f19 100644 --- a/src/frontend/src/handler/create_mv.rs +++ b/src/frontend/src/handler/create_mv.rs @@ -26,6 +26,7 @@ use crate::binder::{Binder, BoundQuery, BoundSetExpr}; use crate::catalog::{check_valid_column_name, CatalogError}; use crate::handler::privilege::resolve_query_privileges; use crate::handler::HandlerArgs; +use crate::optimizer::plan_node::generic::GenericPlanRef; use crate::optimizer::plan_node::Explain; use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRef, RelationCollectorVisitor}; use crate::planner::Planner; @@ -175,7 +176,7 @@ It only indicates the physical clustering of the data, which may improve the per let (plan, table) = gen_create_mv_plan(&session, context.into(), query, name, columns, emit_mode)?; - let context = plan.plan_base().ctx.clone(); + let context = plan.plan_base().ctx().clone(); let mut graph = build_graph(plan); graph.parallelism = session .config() diff --git a/src/frontend/src/optimizer/plan_node/batch.rs b/src/frontend/src/optimizer/plan_node/batch.rs index d62a85095d21c..2cb2360b3e51d 100644 --- a/src/frontend/src/optimizer/plan_node/batch.rs +++ b/src/frontend/src/optimizer/plan_node/batch.rs @@ -12,14 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. -use super::generic::GenericPlanRef; +use super::generic::PhysicalPlanRef; use crate::optimizer::property::Order; -/// A subtrait of [`GenericPlanRef`] for batch plans. +/// A subtrait of [`PhysicalPlanRef`] for batch plans. /// /// Due to the lack of refactoring, all plan nodes currently implement this trait /// through [`super::PlanBase`]. One may still use this trait as a bound for -/// expecting a batch plan, in contrast to [`GenericPlanRef`]. -pub trait BatchPlanRef: GenericPlanRef { +/// accessing a batch plan, in contrast to [`GenericPlanRef`] or +/// [`PhysicalPlanRef`]. +/// +/// [`GenericPlanRef`]: super::generic::GenericPlanRef +pub trait BatchPlanRef: PhysicalPlanRef { fn order(&self) -> &Order; } diff --git a/src/frontend/src/optimizer/plan_node/batch_delete.rs b/src/frontend/src/optimizer/plan_node/batch_delete.rs index 42db0a1c4a774..85d22a46b450e 100644 --- a/src/frontend/src/optimizer/plan_node/batch_delete.rs +++ b/src/frontend/src/optimizer/plan_node/batch_delete.rs @@ -33,11 +33,8 @@ pub struct BatchDelete { impl BatchDelete { pub fn new(core: generic::Delete) -> Self { assert_eq!(core.input.distribution(), &Distribution::Single); - let base: PlanBase = PlanBase::new_batch_from_logical( - &core, - core.input.distribution().clone(), - Order::any(), - ); + let base: PlanBase = + PlanBase::new_batch_with_core(&core, core.input.distribution().clone(), Order::any()); Self { base, core } } } diff --git a/src/frontend/src/optimizer/plan_node/batch_exchange.rs b/src/frontend/src/optimizer/plan_node/batch_exchange.rs index 583838e877c5e..6477c7ec213e2 100644 --- a/src/frontend/src/optimizer/plan_node/batch_exchange.rs +++ b/src/frontend/src/optimizer/plan_node/batch_exchange.rs @@ -17,6 +17,8 @@ use risingwave_common::error::Result; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::{ExchangeNode, MergeSortExchangeNode}; +use super::batch::BatchPlanRef; +use super::generic::{GenericPlanRef, PhysicalPlanRef}; use super::utils::{childless_record, Distill}; use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch}; use crate::optimizer::plan_node::ToLocalBatch; @@ -43,12 +45,12 @@ impl Distill for BatchExchange { fn distill<'a>(&self) -> XmlNode<'a> { let input_schema = self.input.schema(); let order = OrderDisplay { - order: &self.base.order, + order: self.base.order(), input_schema, } .distill(); let dist = Pretty::display(&DistributionDisplay { - distribution: &self.base.dist, + distribution: self.base.distribution(), input_schema, }); childless_record("BatchExchange", vec![("order", order), ("dist", dist)]) @@ -75,18 +77,18 @@ impl ToDistributedBatch for BatchExchange { /// The serialization of Batch Exchange is default cuz it will be rewritten in scheduler. impl ToBatchPb for BatchExchange { fn to_batch_prost_body(&self) -> NodeBody { - if self.base.order.is_any() { + if self.base.order().is_any() { NodeBody::Exchange(ExchangeNode { sources: vec![], - input_schema: self.base.schema.to_prost(), + input_schema: self.base.schema().to_prost(), }) } else { NodeBody::MergeSortExchange(MergeSortExchangeNode { exchange: Some(ExchangeNode { sources: vec![], - input_schema: self.base.schema.to_prost(), + input_schema: self.base.schema().to_prost(), }), - column_orders: self.base.order.to_protobuf(), + column_orders: self.base.order().to_protobuf(), }) } } diff --git a/src/frontend/src/optimizer/plan_node/batch_expand.rs b/src/frontend/src/optimizer/plan_node/batch_expand.rs index 870368701be44..af4413b9e5152 100644 --- a/src/frontend/src/optimizer/plan_node/batch_expand.rs +++ b/src/frontend/src/optimizer/plan_node/batch_expand.rs @@ -41,7 +41,7 @@ impl BatchExpand { | Distribution::UpstreamHashShard(_, _) => Distribution::SomeShard, Distribution::Broadcast => unreachable!(), }; - let base = PlanBase::new_batch_from_logical(&core, dist, Order::any()); + let base = PlanBase::new_batch_with_core(&core, dist, Order::any()); BatchExpand { base, core } } diff --git a/src/frontend/src/optimizer/plan_node/batch_filter.rs b/src/frontend/src/optimizer/plan_node/batch_filter.rs index 6bc5086c7a29b..4bff7cbfee3c0 100644 --- a/src/frontend/src/optimizer/plan_node/batch_filter.rs +++ b/src/frontend/src/optimizer/plan_node/batch_filter.rs @@ -32,7 +32,7 @@ pub struct BatchFilter { impl BatchFilter { pub fn new(core: generic::Filter) -> Self { // TODO: derive from input - let base = PlanBase::new_batch_from_logical( + let base = PlanBase::new_batch_with_core( &core, core.input.distribution().clone(), core.input.order().clone(), diff --git a/src/frontend/src/optimizer/plan_node/batch_group_topn.rs b/src/frontend/src/optimizer/plan_node/batch_group_topn.rs index 8f6684dc4d85b..70ee8328623f5 100644 --- a/src/frontend/src/optimizer/plan_node/batch_group_topn.rs +++ b/src/frontend/src/optimizer/plan_node/batch_group_topn.rs @@ -33,11 +33,8 @@ pub struct BatchGroupTopN { impl BatchGroupTopN { pub fn new(core: generic::TopN) -> Self { assert!(!core.group_key.is_empty()); - let base = PlanBase::new_batch_from_logical( - &core, - core.input.distribution().clone(), - Order::any(), - ); + let base = + PlanBase::new_batch_with_core(&core, core.input.distribution().clone(), Order::any()); BatchGroupTopN { base, core } } diff --git a/src/frontend/src/optimizer/plan_node/batch_hash_agg.rs b/src/frontend/src/optimizer/plan_node/batch_hash_agg.rs index fa14076912689..b4ab3341ace29 100644 --- a/src/frontend/src/optimizer/plan_node/batch_hash_agg.rs +++ b/src/frontend/src/optimizer/plan_node/batch_hash_agg.rs @@ -42,7 +42,7 @@ impl BatchHashAgg { let dist = core .i2o_col_mapping() .rewrite_provided_distribution(input_dist); - let base = PlanBase::new_batch_from_logical(&core, dist, Order::any()); + let base = PlanBase::new_batch_with_core(&core, dist, Order::any()); BatchHashAgg { base, core } } diff --git a/src/frontend/src/optimizer/plan_node/batch_hash_join.rs b/src/frontend/src/optimizer/plan_node/batch_hash_join.rs index 5adfa6f5fd622..bad586d4af1e4 100644 --- a/src/frontend/src/optimizer/plan_node/batch_hash_join.rs +++ b/src/frontend/src/optimizer/plan_node/batch_hash_join.rs @@ -46,7 +46,7 @@ pub struct BatchHashJoin { impl BatchHashJoin { pub fn new(core: generic::Join, eq_join_predicate: EqJoinPredicate) -> Self { let dist = Self::derive_dist(core.left.distribution(), core.right.distribution(), &core); - let base = PlanBase::new_batch_from_logical(&core, dist, Order::any()); + let base = PlanBase::new_batch_with_core(&core, dist, Order::any()); Self { base, @@ -91,7 +91,7 @@ impl BatchHashJoin { impl Distill for BatchHashJoin { fn distill<'a>(&self) -> XmlNode<'a> { - let verbose = self.base.ctx.is_explain_verbose(); + let verbose = self.base.ctx().is_explain_verbose(); let mut vec = Vec::with_capacity(if verbose { 3 } else { 2 }); vec.push(("type", Pretty::debug(&self.core.join_type))); diff --git a/src/frontend/src/optimizer/plan_node/batch_hop_window.rs b/src/frontend/src/optimizer/plan_node/batch_hop_window.rs index 68381956b8a9a..2a4a27f9a0583 100644 --- a/src/frontend/src/optimizer/plan_node/batch_hop_window.rs +++ b/src/frontend/src/optimizer/plan_node/batch_hop_window.rs @@ -44,11 +44,8 @@ impl BatchHopWindow { let distribution = core .i2o_col_mapping() .rewrite_provided_distribution(core.input.distribution()); - let base = PlanBase::new_batch_from_logical( - &core, - distribution, - core.get_out_column_index_order(), - ); + let base = + PlanBase::new_batch_with_core(&core, distribution, core.get_out_column_index_order()); BatchHopWindow { base, core, diff --git a/src/frontend/src/optimizer/plan_node/batch_insert.rs b/src/frontend/src/optimizer/plan_node/batch_insert.rs index 0a2d2dddec8c5..aec05eee145b8 100644 --- a/src/frontend/src/optimizer/plan_node/batch_insert.rs +++ b/src/frontend/src/optimizer/plan_node/batch_insert.rs @@ -18,6 +18,7 @@ use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::InsertNode; use risingwave_pb::plan_common::{DefaultColumns, IndexAndExpr}; +use super::generic::GenericPlanRef; use super::utils::{childless_record, Distill}; use super::{generic, ExprRewritable, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch}; use crate::expr::Expr; @@ -34,11 +35,8 @@ pub struct BatchInsert { impl BatchInsert { pub fn new(core: generic::Insert) -> Self { assert_eq!(core.input.distribution(), &Distribution::Single); - let base: PlanBase = PlanBase::new_batch_from_logical( - &core, - core.input.distribution().clone(), - Order::any(), - ); + let base: PlanBase = + PlanBase::new_batch_with_core(&core, core.input.distribution().clone(), Order::any()); BatchInsert { base, core } } @@ -46,7 +44,9 @@ impl BatchInsert { impl Distill for BatchInsert { fn distill<'a>(&self) -> XmlNode<'a> { - let vec = self.core.fields_pretty(self.base.ctx.is_explain_verbose()); + let vec = self + .core + .fields_pretty(self.base.ctx().is_explain_verbose()); childless_record("BatchInsert", vec) } } diff --git a/src/frontend/src/optimizer/plan_node/batch_limit.rs b/src/frontend/src/optimizer/plan_node/batch_limit.rs index 17ee2c3fb69f3..93b14d0198979 100644 --- a/src/frontend/src/optimizer/plan_node/batch_limit.rs +++ b/src/frontend/src/optimizer/plan_node/batch_limit.rs @@ -16,6 +16,7 @@ use risingwave_common::error::Result; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::LimitNode; +use super::generic::PhysicalPlanRef; use super::utils::impl_distill_by_unit; use super::{ generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch, @@ -32,7 +33,7 @@ pub struct BatchLimit { impl BatchLimit { pub fn new(core: generic::Limit) -> Self { - let base = PlanBase::new_batch_from_logical( + let base = PlanBase::new_batch_with_core( &core, core.input.distribution().clone(), core.input.order().clone(), diff --git a/src/frontend/src/optimizer/plan_node/batch_lookup_join.rs b/src/frontend/src/optimizer/plan_node/batch_lookup_join.rs index 99eb905da4661..48f99668c3af7 100644 --- a/src/frontend/src/optimizer/plan_node/batch_lookup_join.rs +++ b/src/frontend/src/optimizer/plan_node/batch_lookup_join.rs @@ -18,7 +18,7 @@ use risingwave_common::error::Result; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::{DistributedLookupJoinNode, LocalLookupJoinNode}; -use super::generic::{self}; +use super::generic::{self, GenericPlanRef}; use super::utils::{childless_record, Distill}; use super::ExprRewritable; use crate::expr::{Expr, ExprRewriter}; @@ -68,7 +68,7 @@ impl BatchLookupJoin { assert!(eq_join_predicate.has_eq()); assert!(eq_join_predicate.eq_keys_are_type_aligned()); let dist = Self::derive_dist(core.left.distribution(), &core); - let base = PlanBase::new_batch_from_logical(&core, dist, Order::any()); + let base = PlanBase::new_batch_with_core(&core, dist, Order::any()); Self { base, core, @@ -112,7 +112,7 @@ impl BatchLookupJoin { impl Distill for BatchLookupJoin { fn distill<'a>(&self) -> XmlNode<'a> { - let verbose = self.base.ctx.is_explain_verbose(); + let verbose = self.base.ctx().is_explain_verbose(); let mut vec = Vec::with_capacity(if verbose { 3 } else { 2 }); vec.push(("type", Pretty::debug(&self.core.join_type))); diff --git a/src/frontend/src/optimizer/plan_node/batch_nested_loop_join.rs b/src/frontend/src/optimizer/plan_node/batch_nested_loop_join.rs index d743523d05911..8980ad2f23f6d 100644 --- a/src/frontend/src/optimizer/plan_node/batch_nested_loop_join.rs +++ b/src/frontend/src/optimizer/plan_node/batch_nested_loop_join.rs @@ -17,7 +17,7 @@ use risingwave_common::error::Result; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::NestedLoopJoinNode; -use super::generic::{self}; +use super::generic::{self, GenericPlanRef}; use super::utils::{childless_record, Distill}; use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeBinary, ToBatchPb, ToDistributedBatch}; use crate::expr::{Expr, ExprImpl, ExprRewriter}; @@ -37,7 +37,7 @@ pub struct BatchNestedLoopJoin { impl BatchNestedLoopJoin { pub fn new(core: generic::Join) -> Self { let dist = Self::derive_dist(core.left.distribution(), core.right.distribution()); - let base = PlanBase::new_batch_from_logical(&core, dist, Order::any()); + let base = PlanBase::new_batch_with_core(&core, dist, Order::any()); Self { base, core } } @@ -51,7 +51,7 @@ impl BatchNestedLoopJoin { impl Distill for BatchNestedLoopJoin { fn distill<'a>(&self) -> XmlNode<'a> { - let verbose = self.base.ctx.is_explain_verbose(); + let verbose = self.base.ctx().is_explain_verbose(); let mut vec = Vec::with_capacity(if verbose { 3 } else { 2 }); vec.push(("type", Pretty::debug(&self.core.join_type))); diff --git a/src/frontend/src/optimizer/plan_node/batch_over_window.rs b/src/frontend/src/optimizer/plan_node/batch_over_window.rs index f04587059aecd..fb455758f331a 100644 --- a/src/frontend/src/optimizer/plan_node/batch_over_window.rs +++ b/src/frontend/src/optimizer/plan_node/batch_over_window.rs @@ -17,6 +17,7 @@ use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::SortOverWindowNode; +use super::batch::BatchPlanRef; use super::generic::PlanWindowFunction; use super::utils::impl_distill_by_unit; use super::{ @@ -46,7 +47,7 @@ impl BatchOverWindow { .collect(), ); - let base = PlanBase::new_batch_from_logical(&core, input_dist, order); + let base = PlanBase::new_batch_with_core(&core, input_dist, order); BatchOverWindow { base, core } } diff --git a/src/frontend/src/optimizer/plan_node/batch_project.rs b/src/frontend/src/optimizer/plan_node/batch_project.rs index 591d7d13caed8..642683967c5c3 100644 --- a/src/frontend/src/optimizer/plan_node/batch_project.rs +++ b/src/frontend/src/optimizer/plan_node/batch_project.rs @@ -18,6 +18,7 @@ use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::ProjectNode; use risingwave_pb::expr::ExprNode; +use super::generic::GenericPlanRef; use super::utils::{childless_record, Distill}; use super::{ generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch, @@ -43,7 +44,7 @@ impl BatchProject { .i2o_col_mapping() .rewrite_provided_order(core.input.order()); - let base = PlanBase::new_batch_from_logical(&core, distribution, order); + let base = PlanBase::new_batch_with_core(&core, distribution, order); BatchProject { base, core } } diff --git a/src/frontend/src/optimizer/plan_node/batch_project_set.rs b/src/frontend/src/optimizer/plan_node/batch_project_set.rs index 331ca8e5235de..5888df9d15889 100644 --- a/src/frontend/src/optimizer/plan_node/batch_project_set.rs +++ b/src/frontend/src/optimizer/plan_node/batch_project_set.rs @@ -38,11 +38,8 @@ impl BatchProjectSet { .i2o_col_mapping() .rewrite_provided_distribution(core.input.distribution()); - let base = PlanBase::new_batch_from_logical( - &core, - distribution, - core.get_out_column_index_order(), - ); + let base = + PlanBase::new_batch_with_core(&core, distribution, core.get_out_column_index_order()); BatchProjectSet { base, core } } } diff --git a/src/frontend/src/optimizer/plan_node/batch_seq_scan.rs b/src/frontend/src/optimizer/plan_node/batch_seq_scan.rs index dc0e553caf308..6834ed29353b9 100644 --- a/src/frontend/src/optimizer/plan_node/batch_seq_scan.rs +++ b/src/frontend/src/optimizer/plan_node/batch_seq_scan.rs @@ -24,6 +24,8 @@ use risingwave_pb::batch_plan::row_seq_scan_node::ChunkSize; use risingwave_pb::batch_plan::{RowSeqScanNode, SysRowSeqScanNode}; use risingwave_pb::plan_common::PbColumnDesc; +use super::batch::BatchPlanRef; +use super::generic::{GenericPlanRef, PhysicalPlanRef}; use super::utils::{childless_record, Distill}; use super::{generic, ExprRewritable, PlanBase, PlanRef, ToBatchPb, ToDistributedBatch}; use crate::catalog::ColumnId; @@ -46,7 +48,7 @@ impl BatchSeqScan { } else { core.get_out_column_index_order() }; - let base = PlanBase::new_batch_from_logical(&core, dist, order); + let base = PlanBase::new_batch_with_core(&core, dist, order); { // validate scan_range @@ -180,7 +182,7 @@ fn range_to_string(name: &str, range: &(Bound, Bound)) - impl Distill for BatchSeqScan { fn distill<'a>(&self) -> XmlNode<'a> { - let verbose = self.base.ctx.is_explain_verbose(); + let verbose = self.base.ctx().is_explain_verbose(); let mut vec = Vec::with_capacity(4); vec.push(("table", Pretty::from(self.core.table_name.clone()))); vec.push(("columns", self.core.columns_pretty(verbose))); @@ -196,7 +198,7 @@ impl Distill for BatchSeqScan { if verbose { let dist = Pretty::display(&DistributionDisplay { distribution: self.distribution(), - input_schema: &self.base.schema, + input_schema: self.base.schema(), }); vec.push(("distribution", dist)); } diff --git a/src/frontend/src/optimizer/plan_node/batch_simple_agg.rs b/src/frontend/src/optimizer/plan_node/batch_simple_agg.rs index ff184324a5fb9..bae8d70c2eedf 100644 --- a/src/frontend/src/optimizer/plan_node/batch_simple_agg.rs +++ b/src/frontend/src/optimizer/plan_node/batch_simple_agg.rs @@ -16,7 +16,7 @@ use risingwave_common::error::Result; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::SortAggNode; -use super::generic::{self, PlanAggCall}; +use super::generic::{self, GenericPlanRef, PlanAggCall}; use super::utils::impl_distill_by_unit; use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch}; use crate::expr::ExprRewriter; @@ -32,7 +32,7 @@ pub struct BatchSimpleAgg { impl BatchSimpleAgg { pub fn new(core: generic::Agg) -> Self { let input_dist = core.input.distribution().clone(); - let base = PlanBase::new_batch_from_logical(&core, input_dist, Order::any()); + let base = PlanBase::new_batch_with_core(&core, input_dist, Order::any()); BatchSimpleAgg { base, core } } @@ -41,8 +41,11 @@ impl BatchSimpleAgg { } fn two_phase_agg_enabled(&self) -> bool { - let session_ctx = self.base.ctx.session_ctx(); - session_ctx.config().get_enable_two_phase_agg() + self.base + .ctx() + .session_ctx() + .config() + .get_enable_two_phase_agg() } pub(crate) fn can_two_phase_agg(&self) -> bool { diff --git a/src/frontend/src/optimizer/plan_node/batch_sort.rs b/src/frontend/src/optimizer/plan_node/batch_sort.rs index 8576a18c19333..e7bff6d51d85b 100644 --- a/src/frontend/src/optimizer/plan_node/batch_sort.rs +++ b/src/frontend/src/optimizer/plan_node/batch_sort.rs @@ -17,6 +17,7 @@ use risingwave_common::error::Result; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::SortNode; +use super::batch::BatchPlanRef; use super::utils::{childless_record, Distill}; use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch}; use crate::optimizer::plan_node::ToLocalBatch; @@ -56,7 +57,7 @@ impl PlanTreeNodeUnary for BatchSort { } fn clone_with_input(&self, input: PlanRef) -> Self { - Self::new(input, self.base.order.clone()) + Self::new(input, self.base.order().clone()) } } impl_plan_tree_node_for_unary! {BatchSort} @@ -70,7 +71,7 @@ impl ToDistributedBatch for BatchSort { impl ToBatchPb for BatchSort { fn to_batch_prost_body(&self) -> NodeBody { - let column_orders = self.base.order.to_protobuf(); + let column_orders = self.base.order().to_protobuf(); NodeBody::Sort(SortNode { column_orders }) } } diff --git a/src/frontend/src/optimizer/plan_node/batch_sort_agg.rs b/src/frontend/src/optimizer/plan_node/batch_sort_agg.rs index 00facef473a37..2252d4c0c0ee0 100644 --- a/src/frontend/src/optimizer/plan_node/batch_sort_agg.rs +++ b/src/frontend/src/optimizer/plan_node/batch_sort_agg.rs @@ -54,7 +54,7 @@ impl BatchSortAgg { let order = core.i2o_col_mapping().rewrite_provided_order(&input_order); - let base = PlanBase::new_batch_from_logical(&core, dist, order); + let base = PlanBase::new_batch_with_core(&core, dist, order); BatchSortAgg { base, diff --git a/src/frontend/src/optimizer/plan_node/batch_source.rs b/src/frontend/src/optimizer/plan_node/batch_source.rs index 8d43b4a7f6663..9e2cd6006db0b 100644 --- a/src/frontend/src/optimizer/plan_node/batch_source.rs +++ b/src/frontend/src/optimizer/plan_node/batch_source.rs @@ -19,6 +19,7 @@ use risingwave_common::error::Result; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::SourceNode; +use super::generic::GenericPlanRef; use super::utils::{childless_record, column_names_pretty, Distill}; use super::{ generic, ExprRewritable, PlanBase, PlanRef, ToBatchPb, ToDistributedBatch, ToLocalBatch, @@ -35,7 +36,7 @@ pub struct BatchSource { impl BatchSource { pub fn new(core: generic::Source) -> Self { - let base = PlanBase::new_batch_from_logical( + let base = PlanBase::new_batch_with_core( &core, // Use `Single` by default, will be updated later with `clone_with_dist`. Distribution::Single, @@ -58,8 +59,9 @@ impl BatchSource { } pub fn clone_with_dist(&self) -> Self { - let mut base = self.base.clone(); - base.dist = Distribution::SomeShard; + let base = self + .base + .clone_with_new_distribution(Distribution::SomeShard); Self { base, core: self.core.clone(), diff --git a/src/frontend/src/optimizer/plan_node/batch_table_function.rs b/src/frontend/src/optimizer/plan_node/batch_table_function.rs index 91aa1af0abbe7..0b9887cd4aaba 100644 --- a/src/frontend/src/optimizer/plan_node/batch_table_function.rs +++ b/src/frontend/src/optimizer/plan_node/batch_table_function.rs @@ -17,6 +17,7 @@ use risingwave_common::error::Result; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::TableFunctionNode; +use super::generic::GenericPlanRef; use super::utils::{childless_record, Distill}; use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeLeaf, ToBatchPb, ToDistributedBatch}; use crate::expr::ExprRewriter; @@ -39,7 +40,7 @@ impl BatchTableFunction { } pub fn with_dist(logical: LogicalTableFunction, dist: Distribution) -> Self { - let ctx = logical.base.ctx.clone(); + let ctx = logical.base.ctx().clone(); let base = PlanBase::new_batch(ctx, logical.schema().clone(), dist, Order::any()); BatchTableFunction { base, logical } } diff --git a/src/frontend/src/optimizer/plan_node/batch_topn.rs b/src/frontend/src/optimizer/plan_node/batch_topn.rs index e5f44bd2ef0e2..b2eda24046d28 100644 --- a/src/frontend/src/optimizer/plan_node/batch_topn.rs +++ b/src/frontend/src/optimizer/plan_node/batch_topn.rs @@ -35,7 +35,7 @@ pub struct BatchTopN { impl BatchTopN { pub fn new(core: generic::TopN) -> Self { assert!(core.group_key.is_empty()); - let base = PlanBase::new_batch_from_logical( + let base = PlanBase::new_batch_with_core( &core, core.input.distribution().clone(), // BatchTopN outputs data in the order of specified order diff --git a/src/frontend/src/optimizer/plan_node/batch_union.rs b/src/frontend/src/optimizer/plan_node/batch_union.rs index 31b4a541dfe4a..c7c71111174c6 100644 --- a/src/frontend/src/optimizer/plan_node/batch_union.rs +++ b/src/frontend/src/optimizer/plan_node/batch_union.rs @@ -40,7 +40,7 @@ impl BatchUnion { Distribution::SomeShard }; - let base = PlanBase::new_batch_from_logical(&core, dist, Order::any()); + let base = PlanBase::new_batch_with_core(&core, dist, Order::any()); BatchUnion { base, core } } } diff --git a/src/frontend/src/optimizer/plan_node/batch_update.rs b/src/frontend/src/optimizer/plan_node/batch_update.rs index feebedeb07aaf..20e4b8b6b966c 100644 --- a/src/frontend/src/optimizer/plan_node/batch_update.rs +++ b/src/frontend/src/optimizer/plan_node/batch_update.rs @@ -18,6 +18,7 @@ use risingwave_common::error::Result; use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::UpdateNode; +use super::generic::GenericPlanRef; use super::utils::impl_distill_by_unit; use super::{ generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch, diff --git a/src/frontend/src/optimizer/plan_node/batch_values.rs b/src/frontend/src/optimizer/plan_node/batch_values.rs index 5f4e2308493a9..9348cddba7422 100644 --- a/src/frontend/src/optimizer/plan_node/batch_values.rs +++ b/src/frontend/src/optimizer/plan_node/batch_values.rs @@ -18,6 +18,7 @@ use risingwave_pb::batch_plan::plan_node::NodeBody; use risingwave_pb::batch_plan::values_node::ExprTuple; use risingwave_pb::batch_plan::ValuesNode; +use super::generic::GenericPlanRef; use super::utils::{childless_record, Distill}; use super::{ ExprRewritable, LogicalValues, PlanBase, PlanRef, PlanTreeNodeLeaf, ToBatchPb, @@ -42,7 +43,7 @@ impl BatchValues { } pub fn with_dist(logical: LogicalValues, dist: Distribution) -> Self { - let ctx = logical.base.ctx.clone(); + let ctx = logical.base.ctx().clone(); let base = PlanBase::new_batch(ctx, logical.schema().clone(), dist, Order::any()); BatchValues { base, logical } } diff --git a/src/frontend/src/optimizer/plan_node/generic/mod.rs b/src/frontend/src/optimizer/plan_node/generic/mod.rs index 49038500b4301..aec59c90bcc4e 100644 --- a/src/frontend/src/optimizer/plan_node/generic/mod.rs +++ b/src/frontend/src/optimizer/plan_node/generic/mod.rs @@ -18,9 +18,9 @@ use std::hash::Hash; use pretty_xmlish::XmlNode; use risingwave_common::catalog::Schema; -use super::{stream, EqJoinPredicate}; +use super::{stream, EqJoinPredicate, PlanNodeId}; use crate::optimizer::optimizer_context::OptimizerContextRef; -use crate::optimizer::property::FunctionalDependencySet; +use crate::optimizer::property::{Distribution, FunctionalDependencySet}; pub mod dynamic_filter; pub use dynamic_filter::*; @@ -85,21 +85,18 @@ macro_rules! impl_distill_unit_from_fields { pub(super) use impl_distill_unit_from_fields; pub trait GenericPlanRef: Eq + Hash { + fn id(&self) -> PlanNodeId; fn schema(&self) -> &Schema; fn stream_key(&self) -> Option<&[usize]>; fn functional_dependency(&self) -> &FunctionalDependencySet; fn ctx(&self) -> OptimizerContextRef; } +pub trait PhysicalPlanRef: GenericPlanRef { + fn distribution(&self) -> &Distribution; +} + pub trait GenericPlanNode { - /// return (schema, `stream_key`, fds) - fn logical_properties(&self) -> (Schema, Option>, FunctionalDependencySet) { - ( - self.schema(), - self.stream_key(), - self.functional_dependency(), - ) - } fn functional_dependency(&self) -> FunctionalDependencySet; fn schema(&self) -> Schema; fn stream_key(&self) -> Option>; diff --git a/src/frontend/src/optimizer/plan_node/logical_apply.rs b/src/frontend/src/optimizer/plan_node/logical_apply.rs index 7640f093fc933..b398ce7494f61 100644 --- a/src/frontend/src/optimizer/plan_node/logical_apply.rs +++ b/src/frontend/src/optimizer/plan_node/logical_apply.rs @@ -18,7 +18,9 @@ use risingwave_common::catalog::Schema; use risingwave_common::error::{ErrorCode, Result, RwError}; use risingwave_pb::plan_common::JoinType; -use super::generic::{self, push_down_into_join, push_down_join_condition, GenericPlanNode}; +use super::generic::{ + self, push_down_into_join, push_down_join_condition, GenericPlanNode, GenericPlanRef, +}; use super::utils::{childless_record, Distill}; use super::{ ColPrunable, LogicalJoin, LogicalProject, PlanBase, PlanRef, PlanTreeNodeBinary, diff --git a/src/frontend/src/optimizer/plan_node/logical_expand.rs b/src/frontend/src/optimizer/plan_node/logical_expand.rs index b32374e6dc427..d1f3b666feef5 100644 --- a/src/frontend/src/optimizer/plan_node/logical_expand.rs +++ b/src/frontend/src/optimizer/plan_node/logical_expand.rs @@ -15,6 +15,7 @@ use itertools::Itertools; use risingwave_common::error::Result; +use super::generic::GenericPlanRef; use super::utils::impl_distill_by_unit; use super::{ gen_filter_and_pushdown, generic, BatchExpand, ColPrunable, ExprRewritable, PlanBase, PlanRef, @@ -192,7 +193,7 @@ mod tests { let mut values = LogicalValues::new(vec![], Schema { fields }, ctx); values .base - .functional_dependency + .functional_dependency_mut() .add_functional_dependency_by_column_indices(&[0], &[1, 2]); let column_subsets = vec![vec![0, 1], vec![2]]; diff --git a/src/frontend/src/optimizer/plan_node/logical_filter.rs b/src/frontend/src/optimizer/plan_node/logical_filter.rs index 72ee7d246b83d..a62b91aac5277 100644 --- a/src/frontend/src/optimizer/plan_node/logical_filter.rs +++ b/src/frontend/src/optimizer/plan_node/logical_filter.rs @@ -18,6 +18,7 @@ use risingwave_common::bail; use risingwave_common::error::Result; use risingwave_common::types::DataType; +use super::generic::GenericPlanRef; use super::utils::impl_distill_by_unit; use super::{ generic, ColPrunable, ExprRewritable, LogicalProject, PlanBase, PlanRef, PlanTreeNodeUnary, @@ -462,7 +463,7 @@ mod tests { // 3 --> 1, 2 values .base - .functional_dependency + .functional_dependency_mut() .add_functional_dependency_by_column_indices(&[3], &[1, 2]); // v1 = 0 AND v2 = v3 let predicate = ExprImpl::FunctionCall(Box::new( diff --git a/src/frontend/src/optimizer/plan_node/logical_hop_window.rs b/src/frontend/src/optimizer/plan_node/logical_hop_window.rs index e4bd65efa647c..da2ec2138c3d1 100644 --- a/src/frontend/src/optimizer/plan_node/logical_hop_window.rs +++ b/src/frontend/src/optimizer/plan_node/logical_hop_window.rs @@ -17,7 +17,7 @@ use itertools::Itertools; use risingwave_common::error::Result; use risingwave_common::types::Interval; -use super::generic::GenericPlanNode; +use super::generic::{GenericPlanNode, GenericPlanRef}; use super::utils::impl_distill_by_unit; use super::{ gen_filter_and_pushdown, generic, BatchHopWindow, ColPrunable, ExprRewritable, LogicalFilter, @@ -446,7 +446,7 @@ mod test { // 0, 1 --> 2 values .base - .functional_dependency + .functional_dependency_mut() .add_functional_dependency_by_column_indices(&[0, 1], &[2]); let hop_window: PlanRef = LogicalHopWindow::new( values.into(), diff --git a/src/frontend/src/optimizer/plan_node/logical_insert.rs b/src/frontend/src/optimizer/plan_node/logical_insert.rs index f44000b502223..e93b77d79c1f2 100644 --- a/src/frontend/src/optimizer/plan_node/logical_insert.rs +++ b/src/frontend/src/optimizer/plan_node/logical_insert.rs @@ -16,6 +16,7 @@ use pretty_xmlish::XmlNode; use risingwave_common::catalog::TableVersionId; use risingwave_common::error::Result; +use super::generic::GenericPlanRef; use super::utils::{childless_record, Distill}; use super::{ gen_filter_and_pushdown, generic, BatchInsert, ColPrunable, ExprRewritable, LogicalProject, @@ -90,7 +91,9 @@ impl_plan_tree_node_for_unary! {LogicalInsert} impl Distill for LogicalInsert { fn distill<'a>(&self) -> XmlNode<'a> { - let vec = self.core.fields_pretty(self.base.ctx.is_explain_verbose()); + let vec = self + .core + .fields_pretty(self.base.ctx().is_explain_verbose()); childless_record("LogicalInsert", vec) } } diff --git a/src/frontend/src/optimizer/plan_node/logical_join.rs b/src/frontend/src/optimizer/plan_node/logical_join.rs index cfc49a1da3353..a586af2f0bf42 100644 --- a/src/frontend/src/optimizer/plan_node/logical_join.rs +++ b/src/frontend/src/optimizer/plan_node/logical_join.rs @@ -55,7 +55,7 @@ pub struct LogicalJoin { impl Distill for LogicalJoin { fn distill<'a>(&self) -> XmlNode<'a> { - let verbose = self.base.ctx.is_explain_verbose(); + let verbose = self.base.ctx().is_explain_verbose(); let mut vec = Vec::with_capacity(if verbose { 3 } else { 2 }); vec.push(("type", Pretty::debug(&self.join_type()))); @@ -1296,7 +1296,8 @@ impl ToBatch for LogicalJoin { logical_join.left = logical_join.left.to_batch()?; logical_join.right = logical_join.right.to_batch()?; - let config = self.base.ctx.session_ctx().config(); + let ctx = self.base.ctx(); + let config = ctx.session_ctx().config(); if predicate.has_eq() { if !predicate.eq_keys_are_type_aligned() { @@ -2000,7 +2001,7 @@ mod tests { // 0 --> 1 values .base - .functional_dependency + .functional_dependency_mut() .add_functional_dependency_by_column_indices(&[0], &[1]); values }; @@ -2014,7 +2015,7 @@ mod tests { // 0 --> 1, 2 values .base - .functional_dependency + .functional_dependency_mut() .add_functional_dependency_by_column_indices(&[0], &[1, 2]); values }; diff --git a/src/frontend/src/optimizer/plan_node/logical_multi_join.rs b/src/frontend/src/optimizer/plan_node/logical_multi_join.rs index 1c0253ab7aafd..9b740abd7718e 100644 --- a/src/frontend/src/optimizer/plan_node/logical_multi_join.rs +++ b/src/frontend/src/optimizer/plan_node/logical_multi_join.rs @@ -863,6 +863,7 @@ mod test { use super::*; use crate::expr::{FunctionCall, InputRef}; use crate::optimizer::optimizer_context::OptimizerContext; + use crate::optimizer::plan_node::generic::GenericPlanRef; use crate::optimizer::plan_node::LogicalValues; use crate::optimizer::property::FunctionalDependency; #[tokio::test] @@ -883,7 +884,7 @@ mod test { // 0 --> 1 values .base - .functional_dependency + .functional_dependency_mut() .add_functional_dependency_by_column_indices(&[0], &[1]); values }; @@ -897,7 +898,7 @@ mod test { // 0 --> 1, 2 values .base - .functional_dependency + .functional_dependency_mut() .add_functional_dependency_by_column_indices(&[0], &[1, 2]); values }; @@ -910,7 +911,7 @@ mod test { // {} --> 0 values .base - .functional_dependency + .functional_dependency_mut() .add_functional_dependency_by_column_indices(&[], &[0]); values }; diff --git a/src/frontend/src/optimizer/plan_node/logical_now.rs b/src/frontend/src/optimizer/plan_node/logical_now.rs index 2792c4848e3b3..1d720db15b71a 100644 --- a/src/frontend/src/optimizer/plan_node/logical_now.rs +++ b/src/frontend/src/optimizer/plan_node/logical_now.rs @@ -18,6 +18,7 @@ use risingwave_common::catalog::{Field, Schema}; use risingwave_common::error::Result; use risingwave_common::types::DataType; +use super::generic::GenericPlanRef; use super::utils::{childless_record, Distill}; use super::{ ColPrunable, ColumnPruningContext, ExprRewritable, LogicalFilter, PlanBase, PlanRef, @@ -53,7 +54,7 @@ impl LogicalNow { impl Distill for LogicalNow { fn distill<'a>(&self) -> XmlNode<'a> { - let vec = if self.base.ctx.is_explain_verbose() { + let vec = if self.base.ctx().is_explain_verbose() { vec![("output", column_names_pretty(self.schema()))] } else { vec![] diff --git a/src/frontend/src/optimizer/plan_node/logical_project.rs b/src/frontend/src/optimizer/plan_node/logical_project.rs index f3bb51cc7f971..a96de7d91ecd5 100644 --- a/src/frontend/src/optimizer/plan_node/logical_project.rs +++ b/src/frontend/src/optimizer/plan_node/logical_project.rs @@ -284,7 +284,7 @@ impl ToStream for LogicalProject { // But the target size of `out_col_change` should be the same as the length of the new // schema. let (map, _) = out_col_change.into_parts(); - let out_col_change = ColIndexMapping::with_target_size(map, proj.base.schema.len()); + let out_col_change = ColIndexMapping::with_target_size(map, proj.base.schema().len()); Ok((proj.into(), out_col_change)) } } diff --git a/src/frontend/src/optimizer/plan_node/logical_project_set.rs b/src/frontend/src/optimizer/plan_node/logical_project_set.rs index cba907eeeb379..4bf6b18cdabe3 100644 --- a/src/frontend/src/optimizer/plan_node/logical_project_set.rs +++ b/src/frontend/src/optimizer/plan_node/logical_project_set.rs @@ -427,7 +427,7 @@ mod test { let mut values = LogicalValues::new(vec![], Schema { fields }, ctx); values .base - .functional_dependency + .functional_dependency_mut() .add_functional_dependency_by_column_indices(&[1], &[2]); let project_set = LogicalProjectSet::new( values.into(), @@ -449,8 +449,9 @@ mod test { ); let fd_set: HashSet = project_set .base - .functional_dependency - .into_dependencies() + .functional_dependency() + .as_dependencies() + .clone() .into_iter() .collect(); let expected_fd_set: HashSet = diff --git a/src/frontend/src/optimizer/plan_node/logical_scan.rs b/src/frontend/src/optimizer/plan_node/logical_scan.rs index a499a9c6ea3d3..07d2a6c7653e7 100644 --- a/src/frontend/src/optimizer/plan_node/logical_scan.rs +++ b/src/frontend/src/optimizer/plan_node/logical_scan.rs @@ -273,7 +273,7 @@ impl LogicalScan { self.output_col_idx().to_vec(), self.core.table_desc.clone(), self.indexes().to_vec(), - self.base.ctx.clone(), + self.base.ctx().clone(), predicate, self.for_system_time_as_of_proctime(), self.table_cardinality(), @@ -288,7 +288,7 @@ impl LogicalScan { output_col_idx, self.core.table_desc.clone(), self.indexes().to_vec(), - self.base.ctx.clone(), + self.base.ctx().clone(), self.predicate().clone(), self.for_system_time_as_of_proctime(), self.table_cardinality(), @@ -309,7 +309,7 @@ impl_plan_tree_node_for_leaf! {LogicalScan} impl Distill for LogicalScan { fn distill<'a>(&self) -> XmlNode<'a> { - let verbose = self.base.ctx.is_explain_verbose(); + let verbose = self.base.ctx().is_explain_verbose(); let mut vec = Vec::with_capacity(5); vec.push(("table", Pretty::from(self.table_name().to_owned()))); let key_is_columns = @@ -440,7 +440,7 @@ impl LogicalScan { let (scan_ranges, predicate) = self.predicate().clone().split_to_scan_ranges( self.core.table_desc.clone(), self.base - .ctx + .ctx() .session_ctx() .config() .get_max_split_range_gap(), @@ -551,7 +551,7 @@ impl ToStream for LogicalScan { None.into(), ))); } - match self.base.stream_key.is_none() { + match self.base.stream_key().is_none() { true => { let mut col_ids = HashSet::new(); diff --git a/src/frontend/src/optimizer/plan_node/logical_share.rs b/src/frontend/src/optimizer/plan_node/logical_share.rs index d924ee7180168..d6b5711740a98 100644 --- a/src/frontend/src/optimizer/plan_node/logical_share.rs +++ b/src/frontend/src/optimizer/plan_node/logical_share.rs @@ -69,7 +69,7 @@ impl LogicalShare { } pub(super) fn pretty_fields<'a>(base: &PlanBase, name: &'a str) -> XmlNode<'a> { - childless_record(name, vec![("id", Pretty::debug(&base.id.0))]) + childless_record(name, vec![("id", Pretty::debug(&base.id().0))]) } } diff --git a/src/frontend/src/optimizer/plan_node/logical_source.rs b/src/frontend/src/optimizer/plan_node/logical_source.rs index 1d37da9eaa40f..45a5fbcb2240f 100644 --- a/src/frontend/src/optimizer/plan_node/logical_source.rs +++ b/src/frontend/src/optimizer/plan_node/logical_source.rs @@ -28,6 +28,7 @@ use risingwave_connector::source::{ConnectorProperties, DataType}; use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn; use risingwave_pb::plan_common::GeneratedColumnDesc; +use super::generic::GenericPlanRef; use super::stream_watermark_filter::StreamWatermarkFilter; use super::utils::{childless_record, Distill}; use super::{ @@ -204,7 +205,7 @@ impl LogicalSource { ..self.core.clone() }; let mut new_s3_plan: PlanRef = StreamSource { - base: PlanBase::new_stream_with_logical( + base: PlanBase::new_stream_with_core( &logical_source, Distribution::Single, true, // `list` will keep listing all objects, it must be append-only @@ -506,7 +507,7 @@ impl PredicatePushdown for LogicalSource { let mut new_conjunctions = Vec::with_capacity(predicate.conjunctions.len()); for expr in predicate.conjunctions { - if let Some(e) = expr_to_kafka_timestamp_range(expr, &mut range, &self.base.schema) { + if let Some(e) = expr_to_kafka_timestamp_range(expr, &mut range, self.base.schema()) { // Not recognized, so push back new_conjunctions.push(e); } diff --git a/src/frontend/src/optimizer/plan_node/logical_union.rs b/src/frontend/src/optimizer/plan_node/logical_union.rs index 51e4e620cf4ca..1f02b026c0020 100644 --- a/src/frontend/src/optimizer/plan_node/logical_union.rs +++ b/src/frontend/src/optimizer/plan_node/logical_union.rs @@ -130,7 +130,7 @@ impl ToBatch for LogicalUnion { if !self.all() { let batch_union = BatchUnion::new(new_logical).into(); Ok(BatchHashAgg::new( - generic::Agg::new(vec![], (0..self.base.schema.len()).collect(), batch_union) + generic::Agg::new(vec![], (0..self.base.schema().len()).collect(), batch_union) .with_enable_two_phase(false), ) .into()) @@ -170,7 +170,7 @@ impl ToStream for LogicalUnion { &self, ctx: &mut RewriteStreamContext, ) -> Result<(PlanRef, ColIndexMapping)> { - let original_schema = self.base.schema.clone(); + let original_schema = self.base.schema().clone(); let original_schema_len = original_schema.len(); let mut rewrites = vec![]; for input in &self.core.inputs { @@ -353,7 +353,7 @@ mod tests { // Check the result let union = plan.as_logical_union().unwrap(); - assert_eq!(union.base.schema.len(), 2); + assert_eq!(union.base.schema().len(), 2); } #[tokio::test] diff --git a/src/frontend/src/optimizer/plan_node/logical_update.rs b/src/frontend/src/optimizer/plan_node/logical_update.rs index 80e4f350d8edb..1dbe1d3d3c5c9 100644 --- a/src/frontend/src/optimizer/plan_node/logical_update.rs +++ b/src/frontend/src/optimizer/plan_node/logical_update.rs @@ -15,6 +15,7 @@ use risingwave_common::catalog::TableVersionId; use risingwave_common::error::Result; +use super::generic::GenericPlanRef; use super::utils::impl_distill_by_unit; use super::{ gen_filter_and_pushdown, generic, BatchUpdate, ColPrunable, ExprRewritable, LogicalProject, diff --git a/src/frontend/src/optimizer/plan_node/logical_values.rs b/src/frontend/src/optimizer/plan_node/logical_values.rs index c6a3d2ac0564e..e62c6400f2015 100644 --- a/src/frontend/src/optimizer/plan_node/logical_values.rs +++ b/src/frontend/src/optimizer/plan_node/logical_values.rs @@ -21,6 +21,7 @@ use risingwave_common::catalog::{Field, Schema}; use risingwave_common::error::Result; use risingwave_common::types::{DataType, ScalarImpl}; +use super::generic::GenericPlanRef; use super::utils::{childless_record, Distill}; use super::{ BatchValues, ColPrunable, ExprRewritable, LogicalFilter, PlanBase, PlanRef, PredicatePushdown, @@ -144,7 +145,7 @@ impl ColPrunable for LogicalValues { .iter() .map(|i| self.schema().fields[*i].clone()) .collect(); - Self::new(rows, Schema { fields }, self.base.ctx.clone()).into() + Self::new(rows, Schema { fields }, self.base.ctx().clone()).into() } } diff --git a/src/frontend/src/optimizer/plan_node/merge_eq_nodes.rs b/src/frontend/src/optimizer/plan_node/merge_eq_nodes.rs index 73f82e86aa260..9f2e8d94634be 100644 --- a/src/frontend/src/optimizer/plan_node/merge_eq_nodes.rs +++ b/src/frontend/src/optimizer/plan_node/merge_eq_nodes.rs @@ -15,6 +15,7 @@ use std::collections::HashMap; use std::hash::Hash; +use super::generic::GenericPlanRef; use super::{EndoPlan, LogicalShare, PlanNodeId, PlanRef, PlanTreeNodeUnary, VisitPlan}; use crate::optimizer::plan_visitor; use crate::utils::{Endo, Visit}; diff --git a/src/frontend/src/optimizer/plan_node/mod.rs b/src/frontend/src/optimizer/plan_node/mod.rs index 188787c93b8c0..f16ebfb0c792c 100644 --- a/src/frontend/src/optimizer/plan_node/mod.rs +++ b/src/frontend/src/optimizer/plan_node/mod.rs @@ -46,7 +46,7 @@ use serde::Serialize; use smallvec::SmallVec; use self::batch::BatchPlanRef; -use self::generic::GenericPlanRef; +use self::generic::{GenericPlanRef, PhysicalPlanRef}; use self::stream::StreamPlanRef; use self::utils::Distill; use super::property::{Distribution, FunctionalDependencySet, Order}; @@ -419,29 +419,31 @@ impl PlanTreeNode for PlanRef { } } -impl StreamPlanRef for PlanRef { - fn distribution(&self) -> &Distribution { - &self.plan_base().dist +impl PlanNodeMeta for PlanRef { + fn node_type(&self) -> PlanNodeType { + self.0.node_type() } - fn append_only(&self) -> bool { - self.plan_base().append_only + fn plan_base(&self) -> &PlanBase { + self.0.plan_base() } - fn emit_on_window_close(&self) -> bool { - self.plan_base().emit_on_window_close + fn convention(&self) -> Convention { + self.0.convention() } } -impl BatchPlanRef for PlanRef { - fn order(&self) -> &Order { - &self.plan_base().order +/// Implement for every type that provides [`PlanBase`] through [`PlanNodeMeta`]. +impl

GenericPlanRef for P +where + P: PlanNodeMeta + Eq + Hash, +{ + fn id(&self) -> PlanNodeId { + self.plan_base().id() } -} -impl GenericPlanRef for PlanRef { fn schema(&self) -> &Schema { - &self.plan_base().schema + self.plan_base().schema() } fn stream_key(&self) -> Option<&[usize]> { @@ -457,6 +459,47 @@ impl GenericPlanRef for PlanRef { } } +/// Implement for every type that provides [`PlanBase`] through [`PlanNodeMeta`]. +// TODO: further constrain the convention to be `Stream` or `Batch`. +impl

PhysicalPlanRef for P +where + P: PlanNodeMeta + Eq + Hash, +{ + fn distribution(&self) -> &Distribution { + self.plan_base().distribution() + } +} + +/// Implement for every type that provides [`PlanBase`] through [`PlanNodeMeta`]. +// TODO: further constrain the convention to be `Stream`. +impl

StreamPlanRef for P +where + P: PlanNodeMeta + Eq + Hash, +{ + fn append_only(&self) -> bool { + self.plan_base().append_only() + } + + fn emit_on_window_close(&self) -> bool { + self.plan_base().emit_on_window_close() + } + + fn watermark_columns(&self) -> &FixedBitSet { + self.plan_base().watermark_columns() + } +} + +/// Implement for every type that provides [`PlanBase`] through [`PlanNodeMeta`]. +// TODO: further constrain the convention to be `Batch`. +impl

BatchPlanRef for P +where + P: PlanNodeMeta + Eq + Hash, +{ + fn order(&self) -> &Order { + self.plan_base().order() + } +} + /// In order to let expression display id started from 1 for explaining, hidden column names and /// other places. We will reset expression display id to 0 and clone the whole plan to reset the /// schema. @@ -512,15 +555,15 @@ pub(crate) fn pretty_config() -> PrettyConfig { impl dyn PlanNode { pub fn id(&self) -> PlanNodeId { - self.plan_base().id + self.plan_base().id() } pub fn ctx(&self) -> OptimizerContextRef { - self.plan_base().ctx.clone() + self.plan_base().ctx().clone() } pub fn schema(&self) -> &Schema { - &self.plan_base().schema + self.plan_base().schema() } pub fn stream_key(&self) -> Option<&[usize]> { @@ -528,27 +571,28 @@ impl dyn PlanNode { } pub fn order(&self) -> &Order { - &self.plan_base().order + self.plan_base().order() } + // TODO: avoid no manual delegation pub fn distribution(&self) -> &Distribution { - &self.plan_base().dist + self.plan_base().distribution() } pub fn append_only(&self) -> bool { - self.plan_base().append_only + self.plan_base().append_only() } pub fn emit_on_window_close(&self) -> bool { - self.plan_base().emit_on_window_close + self.plan_base().emit_on_window_close() } pub fn functional_dependency(&self) -> &FunctionalDependencySet { - &self.plan_base().functional_dependency + self.plan_base().functional_dependency() } pub fn watermark_columns(&self) -> &FixedBitSet { - &self.plan_base().watermark_columns + self.plan_base().watermark_columns() } /// Serialize the plan node and its children to a stream plan proto. diff --git a/src/frontend/src/optimizer/plan_node/plan_base.rs b/src/frontend/src/optimizer/plan_node/plan_base.rs index e9a5bf26885bf..51b1aa5f41141 100644 --- a/src/frontend/src/optimizer/plan_node/plan_base.rs +++ b/src/frontend/src/optimizer/plan_node/plan_base.rs @@ -14,47 +14,132 @@ use educe::Educe; use fixedbitset::FixedBitSet; -use paste::paste; use risingwave_common::catalog::Schema; use super::generic::GenericPlanNode; use super::*; -use crate::for_all_plan_nodes; use crate::optimizer::optimizer_context::OptimizerContextRef; use crate::optimizer::property::{Distribution, FunctionalDependencySet, Order}; -/// the common fields of all nodes, please make a field named `base` in -/// every planNode and correctly value it when construct the planNode. -#[derive(Clone, Debug, Educe)] -#[educe(PartialEq, Eq, Hash)] -pub struct PlanBase { - #[educe(PartialEq(ignore))] - #[educe(Hash(ignore))] - pub id: PlanNodeId, - #[educe(PartialEq(ignore))] - #[educe(Hash(ignore))] - pub ctx: OptimizerContextRef, - pub schema: Schema, - /// the pk indices of the PlanNode's output, a empty stream key vec means there is no stream key - pub stream_key: Option>, - /// The order property of the PlanNode's output, store an `&Order::any()` here will not affect - /// correctness, but insert unnecessary sort in plan - pub order: Order, +/// Common extra fields for physical plan nodes. +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +struct PhysicalCommonExtra { /// The distribution property of the PlanNode's output, store an `Distribution::any()` here /// will not affect correctness, but insert unnecessary exchange in plan - pub dist: Distribution, + dist: Distribution, +} + +/// Extra fields for stream plan nodes. +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +struct StreamExtra { + /// Common fields for physical plan nodes. + physical: PhysicalCommonExtra, + /// The append-only property of the PlanNode's output is a stream-only property. Append-only /// means the stream contains only insert operation. - pub append_only: bool, + append_only: bool, /// Whether the output is emitted on window close. - pub emit_on_window_close: bool, - pub functional_dependency: FunctionalDependencySet, + emit_on_window_close: bool, /// The watermark column indices of the PlanNode's output. There could be watermark output from /// this stream operator. - pub watermark_columns: FixedBitSet, + watermark_columns: FixedBitSet, +} + +/// Extra fields for batch plan nodes. +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +struct BatchExtra { + /// Common fields for physical plan nodes. + physical: PhysicalCommonExtra, + + /// The order property of the PlanNode's output, store an `&Order::any()` here will not affect + /// correctness, but insert unnecessary sort in plan + order: Order, +} + +/// Extra fields for physical plan nodes. +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +enum PhysicalExtra { + Stream(StreamExtra), + Batch(BatchExtra), +} + +impl PhysicalExtra { + fn common(&self) -> &PhysicalCommonExtra { + match self { + PhysicalExtra::Stream(stream) => &stream.physical, + PhysicalExtra::Batch(batch) => &batch.physical, + } + } + + fn common_mut(&mut self) -> &mut PhysicalCommonExtra { + match self { + PhysicalExtra::Stream(stream) => &mut stream.physical, + PhysicalExtra::Batch(batch) => &mut batch.physical, + } + } + + fn stream(&self) -> &StreamExtra { + match self { + PhysicalExtra::Stream(extra) => extra, + _ => panic!("access stream properties from batch plan node"), + } + } + + fn batch(&self) -> &BatchExtra { + match self { + PhysicalExtra::Batch(extra) => extra, + _ => panic!("access batch properties from stream plan node"), + } + } +} + +/// the common fields of all nodes, please make a field named `base` in +/// every planNode and correctly value it when construct the planNode. +/// +/// All fields are intentionally made private and immutable, as they should +/// normally be the same as the given [`GenericPlanNode`] when constructing. +/// +/// - To access them, use traits including [`GenericPlanRef`], +/// [`PhysicalPlanRef`], [`StreamPlanRef`] and [`BatchPlanRef`]. +/// - To mutate them, use methods like `new_*` or `clone_with_*`. +#[derive(Clone, Debug, Educe)] +#[educe(PartialEq, Eq, Hash)] +pub struct PlanBase { + // -- common fields -- + #[educe(PartialEq(ignore), Hash(ignore))] + id: PlanNodeId, + #[educe(PartialEq(ignore), Hash(ignore))] + ctx: OptimizerContextRef, + + schema: Schema, + /// the pk indices of the PlanNode's output, a empty stream key vec means there is no stream key + // TODO: this is actually a logical and stream only property + stream_key: Option>, + functional_dependency: FunctionalDependencySet, + + /// Extra fields if the plan node is physical. + physical_extra: Option, +} + +impl PlanBase { + fn physical_extra(&self) -> &PhysicalExtra { + self.physical_extra + .as_ref() + .expect("access physical properties from logical plan node") + } + + fn physical_extra_mut(&mut self) -> &mut PhysicalExtra { + self.physical_extra + .as_mut() + .expect("access physical properties from logical plan node") + } } impl generic::GenericPlanRef for PlanBase { + fn id(&self) -> PlanNodeId { + self.id + } + fn schema(&self) -> &Schema { &self.schema } @@ -72,23 +157,29 @@ impl generic::GenericPlanRef for PlanBase { } } -impl stream::StreamPlanRef for PlanBase { +impl generic::PhysicalPlanRef for PlanBase { fn distribution(&self) -> &Distribution { - &self.dist + &self.physical_extra().common().dist } +} +impl stream::StreamPlanRef for PlanBase { fn append_only(&self) -> bool { - self.append_only + self.physical_extra().stream().append_only } fn emit_on_window_close(&self) -> bool { - self.emit_on_window_close + self.physical_extra().stream().emit_on_window_close + } + + fn watermark_columns(&self) -> &FixedBitSet { + &self.physical_extra().stream().watermark_columns } } impl batch::BatchPlanRef for PlanBase { fn order(&self) -> &Order { - &self.order + &self.physical_extra().batch().order } } @@ -100,47 +191,22 @@ impl PlanBase { functional_dependency: FunctionalDependencySet, ) -> Self { let id = ctx.next_plan_node_id(); - let watermark_columns = FixedBitSet::with_capacity(schema.len()); Self { id, ctx, schema, stream_key, - dist: Distribution::Single, - order: Order::any(), - // Logical plan node won't touch `append_only` field - append_only: true, - emit_on_window_close: false, functional_dependency, - watermark_columns, + physical_extra: None, } } - pub fn new_logical_with_core(node: &impl GenericPlanNode) -> Self { + pub fn new_logical_with_core(core: &impl GenericPlanNode) -> Self { Self::new_logical( - node.ctx(), - node.schema(), - node.stream_key(), - node.functional_dependency(), - ) - } - - pub fn new_stream_with_logical( - logical: &impl GenericPlanNode, - dist: Distribution, - append_only: bool, - emit_on_window_close: bool, - watermark_columns: FixedBitSet, - ) -> Self { - Self::new_stream( - logical.ctx(), - logical.schema(), - logical.stream_key(), - logical.functional_dependency(), - dist, - append_only, - emit_on_window_close, - watermark_columns, + core.ctx(), + core.schema(), + core.stream_key(), + core.functional_dependency(), ) } @@ -160,22 +226,36 @@ impl PlanBase { id, ctx, schema, - dist, - order: Order::any(), stream_key, - append_only, - emit_on_window_close, functional_dependency, - watermark_columns, + physical_extra: Some(PhysicalExtra::Stream({ + StreamExtra { + physical: PhysicalCommonExtra { dist }, + append_only, + emit_on_window_close, + watermark_columns, + } + })), } } - pub fn new_batch_from_logical( - logical: &impl GenericPlanNode, + pub fn new_stream_with_core( + core: &impl GenericPlanNode, dist: Distribution, - order: Order, + append_only: bool, + emit_on_window_close: bool, + watermark_columns: FixedBitSet, ) -> Self { - Self::new_batch(logical.ctx(), logical.schema(), dist, order) + Self::new_stream( + core.ctx(), + core.schema(), + core.stream_key(), + core.functional_dependency(), + dist, + append_only, + emit_on_window_close, + watermark_columns, + ) } pub fn new_batch( @@ -186,75 +266,49 @@ impl PlanBase { ) -> Self { let id = ctx.next_plan_node_id(); let functional_dependency = FunctionalDependencySet::new(schema.len()); - let watermark_columns = FixedBitSet::with_capacity(schema.len()); Self { id, ctx, schema, - dist, - order, stream_key: None, - // Batch plan node won't touch `append_only` field - append_only: true, - emit_on_window_close: false, // TODO(rc): batch EOWC support? functional_dependency, - watermark_columns, + physical_extra: Some(PhysicalExtra::Batch({ + BatchExtra { + physical: PhysicalCommonExtra { dist }, + order, + } + })), } } - pub fn derive_stream_plan_base(plan_node: &PlanRef) -> Self { - PlanBase::new_stream( - plan_node.ctx(), - plan_node.schema().clone(), - plan_node.stream_key().map(|v| v.to_vec()), - plan_node.functional_dependency().clone(), - plan_node.distribution().clone(), - plan_node.append_only(), - plan_node.emit_on_window_close(), - plan_node.watermark_columns().clone(), - ) + pub fn new_batch_with_core( + core: &impl GenericPlanNode, + dist: Distribution, + order: Order, + ) -> Self { + Self::new_batch(core.ctx(), core.schema(), dist, order) } pub fn clone_with_new_plan_id(&self) -> Self { let mut new = self.clone(); - new.id = self.ctx.next_plan_node_id(); + new.id = self.ctx().next_plan_node_id(); + new + } + + /// Clone the plan node with a new distribution. + /// + /// Panics if the plan node is not physical. + pub fn clone_with_new_distribution(&self, dist: Distribution) -> Self { + let mut new = self.clone(); + new.physical_extra_mut().common_mut().dist = dist; new } } -macro_rules! impl_base_delegate { - ($( { $convention:ident, $name:ident }),*) => { - $(paste! { - impl [<$convention $name>] { - pub fn id(&self) -> PlanNodeId { - self.plan_base().id - } - pub fn ctx(&self) -> OptimizerContextRef { - self.plan_base().ctx() - } - pub fn schema(&self) -> &Schema { - &self.plan_base().schema - } - pub fn stream_key(&self) -> Option<&[usize]> { - self.plan_base().stream_key() - } - pub fn order(&self) -> &Order { - &self.plan_base().order - } - pub fn distribution(&self) -> &Distribution { - &self.plan_base().dist - } - pub fn append_only(&self) -> bool { - self.plan_base().append_only - } - pub fn emit_on_window_close(&self) -> bool { - self.plan_base().emit_on_window_close - } - pub fn functional_dependency(&self) -> &FunctionalDependencySet { - &self.plan_base().functional_dependency - } - } - })* +// Mutators for testing only. +#[cfg(test)] +impl PlanBase { + pub fn functional_dependency_mut(&mut self) -> &mut FunctionalDependencySet { + &mut self.functional_dependency } } -for_all_plan_nodes! { impl_base_delegate } diff --git a/src/frontend/src/optimizer/plan_node/stream.rs b/src/frontend/src/optimizer/plan_node/stream.rs index 2edf997bf91fd..866c62c2413a5 100644 --- a/src/frontend/src/optimizer/plan_node/stream.rs +++ b/src/frontend/src/optimizer/plan_node/stream.rs @@ -12,16 +12,20 @@ // See the License for the specific language governing permissions and // limitations under the License. -use super::generic::GenericPlanRef; -use crate::optimizer::property::Distribution; +use fixedbitset::FixedBitSet; -/// A subtrait of [`GenericPlanRef`] for stream plans. +use super::generic::PhysicalPlanRef; + +/// A subtrait of [`PhysicalPlanRef`] for stream plans. /// /// Due to the lack of refactoring, all plan nodes currently implement this trait /// through [`super::PlanBase`]. One may still use this trait as a bound for -/// expecting a stream plan, in contrast to [`GenericPlanRef`]. -pub trait StreamPlanRef: GenericPlanRef { - fn distribution(&self) -> &Distribution; +/// accessing a stream plan, in contrast to [`GenericPlanRef`] or +/// [`PhysicalPlanRef`]. +/// +/// [`GenericPlanRef`]: super::generic::GenericPlanRef +pub trait StreamPlanRef: PhysicalPlanRef { fn append_only(&self) -> bool; fn emit_on_window_close(&self) -> bool; + fn watermark_columns(&self) -> &FixedBitSet; } diff --git a/src/frontend/src/optimizer/plan_node/stream_dedup.rs b/src/frontend/src/optimizer/plan_node/stream_dedup.rs index 6e96d0eab0e93..51b5e589e886e 100644 --- a/src/frontend/src/optimizer/plan_node/stream_dedup.rs +++ b/src/frontend/src/optimizer/plan_node/stream_dedup.rs @@ -17,7 +17,7 @@ use risingwave_common::util::sort_util::OrderType; use risingwave_pb::stream_plan::stream_node::PbNodeBody; use risingwave_pb::stream_plan::DedupNode; -use super::generic::{self, GenericPlanNode, GenericPlanRef}; +use super::generic::{self, GenericPlanNode, GenericPlanRef, PhysicalPlanRef}; use super::stream::StreamPlanRef; use super::utils::{impl_distill_by_unit, TableCatalogBuilder}; use super::{ExprRewritable, PlanBase, PlanTreeNodeUnary, StreamNode}; @@ -37,7 +37,7 @@ impl StreamDedup { // A dedup operator must be append-only. assert!(input.append_only()); - let base = PlanBase::new_stream_with_logical( + let base = PlanBase::new_stream_with_core( &core, input.distribution().clone(), true, diff --git a/src/frontend/src/optimizer/plan_node/stream_delta_join.rs b/src/frontend/src/optimizer/plan_node/stream_delta_join.rs index db9e6ac296bbf..bb18f9cffdf0f 100644 --- a/src/frontend/src/optimizer/plan_node/stream_delta_join.rs +++ b/src/frontend/src/optimizer/plan_node/stream_delta_join.rs @@ -20,7 +20,7 @@ use risingwave_pb::plan_common::JoinType; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{ArrangementInfo, DeltaIndexJoinNode}; -use super::generic::{self}; +use super::generic::{self, GenericPlanRef}; use super::utils::{childless_record, Distill}; use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeBinary, StreamNode}; use crate::expr::{Expr, ExprRewriter}; @@ -67,7 +67,7 @@ impl StreamDeltaJoin { core.i2o_col_mapping().rewrite_bitset(&watermark_columns) }; // TODO: derive from input - let base = PlanBase::new_stream_with_logical( + let base = PlanBase::new_stream_with_core( &core, dist, append_only, @@ -90,7 +90,7 @@ impl StreamDeltaJoin { impl Distill for StreamDeltaJoin { fn distill<'a>(&self) -> XmlNode<'a> { - let verbose = self.base.ctx.is_explain_verbose(); + let verbose = self.base.ctx().is_explain_verbose(); let mut vec = Vec::with_capacity(if verbose { 3 } else { 2 }); vec.push(("type", Pretty::debug(&self.core.join_type))); diff --git a/src/frontend/src/optimizer/plan_node/stream_dml.rs b/src/frontend/src/optimizer/plan_node/stream_dml.rs index c9f969384c3a4..9b000974786e4 100644 --- a/src/frontend/src/optimizer/plan_node/stream_dml.rs +++ b/src/frontend/src/optimizer/plan_node/stream_dml.rs @@ -17,6 +17,7 @@ use pretty_xmlish::{Pretty, XmlNode}; use risingwave_common::catalog::{ColumnDesc, INITIAL_TABLE_VERSION_ID}; use risingwave_pb::stream_plan::stream_node::PbNodeBody; +use super::stream::StreamPlanRef; use super::utils::{childless_record, Distill}; use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::stream_fragmenter::BuildFragmentGraphState; diff --git a/src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs b/src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs index e1ca18da937e9..a4b74f37208e7 100644 --- a/src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs +++ b/src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs @@ -17,7 +17,8 @@ pub use risingwave_pb::expr::expr_node::Type as ExprType; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::DynamicFilterNode; -use super::generic::DynamicFilter; +use super::generic::{DynamicFilter, GenericPlanRef}; +use super::stream::StreamPlanRef; use super::utils::{childless_record, column_names_pretty, watermark_pretty, Distill}; use super::{generic, ExprRewritable}; use crate::expr::{Expr, ExprImpl}; @@ -37,7 +38,7 @@ impl StreamDynamicFilter { let watermark_columns = core.watermark_columns(core.right().watermark_columns()[0]); // TODO: derive from input - let base = PlanBase::new_stream_with_logical( + let base = PlanBase::new_stream_with_core( &core, core.left().distribution().clone(), false, /* we can have a new abstraction for append only and monotonically increasing @@ -78,11 +79,11 @@ impl StreamDynamicFilter { impl Distill for StreamDynamicFilter { fn distill<'a>(&self) -> XmlNode<'a> { - let verbose = self.base.ctx.is_explain_verbose(); + let verbose = self.base.ctx().is_explain_verbose(); let pred = self.core.pretty_field(); let mut vec = Vec::with_capacity(if verbose { 3 } else { 2 }); vec.push(("predicate", pred)); - if let Some(ow) = watermark_pretty(&self.base.watermark_columns, self.schema()) { + if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) { vec.push(("output_watermarks", ow)); } vec.push(("output", column_names_pretty(self.schema()))); diff --git a/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs b/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs index 9418af8e4a364..d8c5a9635ce59 100644 --- a/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs +++ b/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs @@ -18,7 +18,7 @@ use fixedbitset::FixedBitSet; use risingwave_common::util::sort_util::OrderType; use risingwave_pb::stream_plan::stream_node::PbNodeBody; -use super::generic::{self, PlanWindowFunction}; +use super::generic::{self, GenericPlanRef, PlanWindowFunction}; use super::utils::{impl_distill_by_unit, TableCatalogBuilder}; use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::stream_fragmenter::BuildFragmentGraphState; @@ -50,7 +50,7 @@ impl StreamEowcOverWindow { // ancient rows in some rarely updated partitions that are emitted at the end of time. let watermark_columns = FixedBitSet::with_capacity(core.output_len()); - let base = PlanBase::new_stream_with_logical( + let base = PlanBase::new_stream_with_core( &core, input.distribution().clone(), true, diff --git a/src/frontend/src/optimizer/plan_node/stream_exchange.rs b/src/frontend/src/optimizer/plan_node/stream_exchange.rs index 0fa1713bf4488..99e6c3c5161a1 100644 --- a/src/frontend/src/optimizer/plan_node/stream_exchange.rs +++ b/src/frontend/src/optimizer/plan_node/stream_exchange.rs @@ -16,6 +16,8 @@ use pretty_xmlish::{Pretty, XmlNode}; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{DispatchStrategy, DispatcherType, ExchangeNode}; +use super::generic::{GenericPlanRef, PhysicalPlanRef}; +use super::stream::StreamPlanRef; use super::utils::{childless_record, plan_node_name, Distill}; use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::optimizer::property::{Distribution, DistributionDisplay}; @@ -78,7 +80,7 @@ impl StreamExchange { impl Distill for StreamExchange { fn distill<'a>(&self) -> XmlNode<'a> { let distribution_display = DistributionDisplay { - distribution: &self.base.dist, + distribution: self.base.distribution(), input_schema: self.input.schema(), }; childless_record( @@ -117,13 +119,13 @@ impl StreamNode for StreamExchange { }) } else { Some(DispatchStrategy { - r#type: match &self.base.dist { + r#type: match &self.base.distribution() { Distribution::HashShard(_) => DispatcherType::Hash, Distribution::Single => DispatcherType::Simple, Distribution::Broadcast => DispatcherType::Broadcast, _ => panic!("Do not allow Any or AnyShard in serialization process"), } as i32, - dist_key_indices: match &self.base.dist { + dist_key_indices: match &self.base.distribution() { Distribution::HashShard(keys) => { keys.iter().map(|num| *num as u32).collect() } diff --git a/src/frontend/src/optimizer/plan_node/stream_expand.rs b/src/frontend/src/optimizer/plan_node/stream_expand.rs index e0f8852a19fb5..5959b8d6be4d2 100644 --- a/src/frontend/src/optimizer/plan_node/stream_expand.rs +++ b/src/frontend/src/optimizer/plan_node/stream_expand.rs @@ -48,7 +48,7 @@ impl StreamExpand { .map(|idx| idx + input.schema().len()), ); - let base = PlanBase::new_stream_with_logical( + let base = PlanBase::new_stream_with_core( &core, dist, input.append_only(), diff --git a/src/frontend/src/optimizer/plan_node/stream_filter.rs b/src/frontend/src/optimizer/plan_node/stream_filter.rs index ff4d344607776..0f000e6b8c0db 100644 --- a/src/frontend/src/optimizer/plan_node/stream_filter.rs +++ b/src/frontend/src/optimizer/plan_node/stream_filter.rs @@ -34,7 +34,7 @@ impl StreamFilter { let input = core.input.clone(); let dist = input.distribution().clone(); // Filter executor won't change the append-only behavior of the stream. - let base = PlanBase::new_stream_with_logical( + let base = PlanBase::new_stream_with_core( &core, dist, input.append_only(), diff --git a/src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs b/src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs index 190c05c0a5ba1..95fd72e9f6aa0 100644 --- a/src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs +++ b/src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs @@ -48,7 +48,7 @@ impl_plan_tree_node_for_unary! { StreamFsFetch } impl StreamFsFetch { pub fn new(input: PlanRef, source: generic::Source) -> Self { - let base = PlanBase::new_stream_with_logical( + let base = PlanBase::new_stream_with_core( &source, Distribution::SomeShard, source.catalog.as_ref().map_or(true, |s| s.append_only), diff --git a/src/frontend/src/optimizer/plan_node/stream_group_topn.rs b/src/frontend/src/optimizer/plan_node/stream_group_topn.rs index 14711d353f9d8..3e8f3c00206c4 100644 --- a/src/frontend/src/optimizer/plan_node/stream_group_topn.rs +++ b/src/frontend/src/optimizer/plan_node/stream_group_topn.rs @@ -16,7 +16,8 @@ use fixedbitset::FixedBitSet; use pretty_xmlish::XmlNode; use risingwave_pb::stream_plan::stream_node::PbNodeBody; -use super::generic::{DistillUnit, TopNLimit}; +use super::generic::{DistillUnit, GenericPlanRef, TopNLimit}; +use super::stream::StreamPlanRef; use super::utils::{plan_node_name, watermark_pretty, Distill}; use super::{generic, ExprRewritable, PlanBase, PlanTreeNodeUnary, StreamNode}; use crate::optimizer::plan_node::generic::GenericPlanNode; @@ -135,7 +136,7 @@ impl Distill for StreamGroupTopN { { "append_only", self.input().append_only() }, ); let mut node = self.core.distill_with_name(name); - if let Some(ow) = watermark_pretty(&self.base.watermark_columns, self.schema()) { + if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) { node.fields.push(("output_watermarks".into(), ow)); } node diff --git a/src/frontend/src/optimizer/plan_node/stream_hash_agg.rs b/src/frontend/src/optimizer/plan_node/stream_hash_agg.rs index 25e1ac801f97c..55ab6b5906e59 100644 --- a/src/frontend/src/optimizer/plan_node/stream_hash_agg.rs +++ b/src/frontend/src/optimizer/plan_node/stream_hash_agg.rs @@ -18,10 +18,11 @@ use pretty_xmlish::XmlNode; use risingwave_common::error::{ErrorCode, Result}; use risingwave_pb::stream_plan::stream_node::PbNodeBody; -use super::generic::{self, PlanAggCall}; +use super::generic::{self, GenericPlanRef, PlanAggCall}; use super::utils::{childless_record, plan_node_name, watermark_pretty, Distill}; use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::expr::ExprRewriter; +use crate::optimizer::plan_node::stream::StreamPlanRef; use crate::stream_fragmenter::BuildFragmentGraphState; use crate::utils::{ColIndexMapping, ColIndexMappingRewriteExt, IndexSet}; @@ -85,7 +86,7 @@ impl StreamHashAgg { } // Hash agg executor might change the append-only behavior of the stream. - let base = PlanBase::new_stream_with_logical( + let base = PlanBase::new_stream_with_core( &core, dist, emit_on_window_close, // in EOWC mode, we produce append only output @@ -142,7 +143,7 @@ impl StreamHashAgg { impl Distill for StreamHashAgg { fn distill<'a>(&self) -> XmlNode<'a> { let mut vec = self.core.fields_pretty(); - if let Some(ow) = watermark_pretty(&self.base.watermark_columns, self.schema()) { + if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) { vec.push(("output_watermarks", ow)); } childless_record( @@ -214,7 +215,7 @@ impl StreamNode for StreamHashAgg { }) .collect(), row_count_index: self.row_count_idx as u32, - emit_on_window_close: self.base.emit_on_window_close, + emit_on_window_close: self.base.emit_on_window_close(), }) } } diff --git a/src/frontend/src/optimizer/plan_node/stream_hash_join.rs b/src/frontend/src/optimizer/plan_node/stream_hash_join.rs index 0075b1730b4eb..9d9c41425c4b1 100644 --- a/src/frontend/src/optimizer/plan_node/stream_hash_join.rs +++ b/src/frontend/src/optimizer/plan_node/stream_hash_join.rs @@ -20,7 +20,8 @@ use risingwave_pb::plan_common::JoinType; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::{DeltaExpression, HashJoinNode, PbInequalityPair}; -use super::generic::Join; +use super::generic::{GenericPlanRef, Join}; +use super::stream::StreamPlanRef; use super::utils::{childless_record, plan_node_name, watermark_pretty, Distill}; use super::{ generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeBinary, StreamDeltaJoin, StreamNode, @@ -178,7 +179,7 @@ impl StreamHashJoin { }; // TODO: derive from input - let base = PlanBase::new_stream_with_logical( + let base = PlanBase::new_stream_with_core( &core, dist, append_only, @@ -291,7 +292,7 @@ impl Distill for StreamHashJoin { { "interval", self.clean_left_state_conjunction_idx.is_some() && self.clean_right_state_conjunction_idx.is_some() }, { "append_only", self.is_append_only }, ); - let verbose = self.base.ctx.is_explain_verbose(); + let verbose = self.base.ctx().is_explain_verbose(); let mut vec = Vec::with_capacity(6); vec.push(("type", Pretty::debug(&self.core.join_type))); @@ -316,7 +317,7 @@ impl Distill for StreamHashJoin { if let Some(i) = self.clean_right_state_conjunction_idx { vec.push(("conditions_to_clean_right_state_table", get_cond(i))); } - if let Some(ow) = watermark_pretty(&self.base.watermark_columns, self.schema()) { + if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) { vec.push(("output_watermarks", ow)); } diff --git a/src/frontend/src/optimizer/plan_node/stream_hop_window.rs b/src/frontend/src/optimizer/plan_node/stream_hop_window.rs index c68b1b307d470..e177be6942360 100644 --- a/src/frontend/src/optimizer/plan_node/stream_hop_window.rs +++ b/src/frontend/src/optimizer/plan_node/stream_hop_window.rs @@ -17,6 +17,8 @@ use risingwave_common::util::column_index_mapping::ColIndexMapping; use risingwave_pb::stream_plan::stream_node::PbNodeBody; use risingwave_pb::stream_plan::HopWindowNode; +use super::generic::GenericPlanRef; +use super::stream::StreamPlanRef; use super::utils::{childless_record, watermark_pretty, Distill}; use super::{generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::expr::{Expr, ExprImpl, ExprRewriter}; @@ -56,7 +58,7 @@ impl StreamHopWindow { ) .rewrite_bitset(&watermark_columns); - let base = PlanBase::new_stream_with_logical( + let base = PlanBase::new_stream_with_core( &core, dist, input.append_only(), @@ -75,7 +77,7 @@ impl StreamHopWindow { impl Distill for StreamHopWindow { fn distill<'a>(&self) -> XmlNode<'a> { let mut vec = self.core.fields_pretty(); - if let Some(ow) = watermark_pretty(&self.base.watermark_columns, self.schema()) { + if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) { vec.push(("output_watermarks", ow)); } childless_record("StreamHopWindow", vec) diff --git a/src/frontend/src/optimizer/plan_node/stream_materialize.rs b/src/frontend/src/optimizer/plan_node/stream_materialize.rs index fb17537bc90e6..d8972436d5c78 100644 --- a/src/frontend/src/optimizer/plan_node/stream_materialize.rs +++ b/src/frontend/src/optimizer/plan_node/stream_materialize.rs @@ -24,11 +24,13 @@ use risingwave_common::util::sort_util::{ColumnOrder, OrderType}; use risingwave_pb::stream_plan::stream_node::PbNodeBody; use super::derive::derive_columns; +use super::stream::StreamPlanRef; use super::utils::{childless_record, Distill}; use super::{reorganize_elements_id, ExprRewritable, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::catalog::table_catalog::{CreateType, TableCatalog, TableType, TableVersion}; use crate::catalog::FragmentId; use crate::optimizer::plan_node::derive::derive_pk; +use crate::optimizer::plan_node::generic::GenericPlanRef; use crate::optimizer::plan_node::{PlanBase, PlanNodeMeta}; use crate::optimizer::property::{Cardinality, Distribution, Order, RequiredDist}; use crate::stream_fragmenter::BuildFragmentGraphState; @@ -273,8 +275,8 @@ impl Distill for StreamMaterialize { vec.push(("pk_conflict", Pretty::from(pk_conflict_behavior))); - let watermark_columns = &self.base.watermark_columns; - if self.base.watermark_columns.count_ones(..) > 0 { + let watermark_columns = &self.base.watermark_columns(); + if self.base.watermark_columns().count_ones(..) > 0 { let watermark_column_names = watermark_columns .ones() .map(|i| table.columns()[i].name_with_hidden().to_string()) @@ -294,16 +296,16 @@ impl PlanTreeNodeUnary for StreamMaterialize { fn clone_with_input(&self, input: PlanRef) -> Self { let new = Self::new(input, self.table().clone()); new.base - .schema + .schema() .fields .iter() - .zip_eq_fast(self.base.schema.fields.iter()) + .zip_eq_fast(self.base.schema().fields.iter()) .for_each(|(a, b)| { assert_eq!(a.data_type, b.data_type); assert_eq!(a.type_name, b.type_name); assert_eq!(a.sub_fields, b.sub_fields); }); - assert_eq!(new.plan_base().stream_key, self.plan_base().stream_key); + assert_eq!(new.plan_base().stream_key(), self.plan_base().stream_key()); new } } diff --git a/src/frontend/src/optimizer/plan_node/stream_now.rs b/src/frontend/src/optimizer/plan_node/stream_now.rs index 9eb0a0e0f143e..91ebc344fa51d 100644 --- a/src/frontend/src/optimizer/plan_node/stream_now.rs +++ b/src/frontend/src/optimizer/plan_node/stream_now.rs @@ -19,8 +19,7 @@ use risingwave_common::types::DataType; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::NowNode; -use super::generic::GenericPlanRef; -use super::stream::StreamPlanRef; +use super::generic::{GenericPlanRef, PhysicalPlanRef}; use super::utils::{childless_record, Distill, TableCatalogBuilder}; use super::{ExprRewritable, LogicalNow, PlanBase, StreamNode}; use crate::optimizer::plan_node::utils::column_names_pretty; @@ -59,7 +58,7 @@ impl StreamNow { impl Distill for StreamNow { fn distill<'a>(&self) -> XmlNode<'a> { - let vec = if self.base.ctx.is_explain_verbose() { + let vec = if self.base.ctx().is_explain_verbose() { vec![("output", column_names_pretty(self.schema()))] } else { vec![] diff --git a/src/frontend/src/optimizer/plan_node/stream_over_window.rs b/src/frontend/src/optimizer/plan_node/stream_over_window.rs index 0d749f0c7b0e6..5a2f9d98f1340 100644 --- a/src/frontend/src/optimizer/plan_node/stream_over_window.rs +++ b/src/frontend/src/optimizer/plan_node/stream_over_window.rs @@ -21,6 +21,7 @@ use risingwave_pb::stream_plan::stream_node::PbNodeBody; use super::generic::{GenericPlanNode, PlanWindowFunction}; use super::utils::{impl_distill_by_unit, TableCatalogBuilder}; use super::{generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; +use crate::optimizer::plan_node::generic::GenericPlanRef; use crate::stream_fragmenter::BuildFragmentGraphState; use crate::TableCatalog; @@ -37,7 +38,7 @@ impl StreamOverWindow { let input = &core.input; let watermark_columns = FixedBitSet::with_capacity(core.output_len()); - let base = PlanBase::new_stream_with_logical( + let base = PlanBase::new_stream_with_core( &core, input.distribution().clone(), false, // general over window cannot be append-only @@ -122,7 +123,7 @@ impl StreamNode for StreamOverWindow { .to_internal_table_prost(); let cache_policy = self .base - .ctx + .ctx() .session_ctx() .config() .get_streaming_over_window_cache_policy(); diff --git a/src/frontend/src/optimizer/plan_node/stream_project.rs b/src/frontend/src/optimizer/plan_node/stream_project.rs index 8a7665881e0cf..c0ff0d1cf2f43 100644 --- a/src/frontend/src/optimizer/plan_node/stream_project.rs +++ b/src/frontend/src/optimizer/plan_node/stream_project.rs @@ -17,6 +17,8 @@ use pretty_xmlish::XmlNode; use risingwave_pb::stream_plan::stream_node::PbNodeBody; use risingwave_pb::stream_plan::ProjectNode; +use super::generic::GenericPlanRef; +use super::stream::StreamPlanRef; use super::utils::{childless_record, watermark_pretty, Distill}; use super::{generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::expr::{try_derive_watermark, Expr, ExprImpl, ExprRewriter, WatermarkDerivation}; @@ -41,7 +43,7 @@ impl Distill for StreamProject { let schema = self.schema(); let mut vec = self.core.fields_pretty(schema); if let Some(display_output_watermarks) = - watermark_pretty(&self.base.watermark_columns, schema) + watermark_pretty(self.base.watermark_columns(), schema) { vec.push(("output_watermarks", display_output_watermarks)); } @@ -79,7 +81,7 @@ impl StreamProject { } // Project executor won't change the append-only behavior of the stream, so it depends on // input's `append_only`. - let base = PlanBase::new_stream_with_logical( + let base = PlanBase::new_stream_with_core( &core, distribution, input.append_only(), diff --git a/src/frontend/src/optimizer/plan_node/stream_project_set.rs b/src/frontend/src/optimizer/plan_node/stream_project_set.rs index cadd600f3c3b7..ba09d79c96c60 100644 --- a/src/frontend/src/optimizer/plan_node/stream_project_set.rs +++ b/src/frontend/src/optimizer/plan_node/stream_project_set.rs @@ -66,7 +66,7 @@ impl StreamProjectSet { // ProjectSet executor won't change the append-only behavior of the stream, so it depends on // input's `append_only`. - let base = PlanBase::new_stream_with_logical( + let base = PlanBase::new_stream_with_core( &core, distribution, input.append_only(), diff --git a/src/frontend/src/optimizer/plan_node/stream_share.rs b/src/frontend/src/optimizer/plan_node/stream_share.rs index 8b406005f40a6..3acf0b132805e 100644 --- a/src/frontend/src/optimizer/plan_node/stream_share.rs +++ b/src/frontend/src/optimizer/plan_node/stream_share.rs @@ -16,6 +16,8 @@ use pretty_xmlish::XmlNode; use risingwave_pb::stream_plan::stream_node::PbNodeBody; use risingwave_pb::stream_plan::PbStreamNode; +use super::generic::GenericPlanRef; +use super::stream::StreamPlanRef; use super::utils::Distill; use super::{generic, ExprRewritable, PlanRef, PlanTreeNodeUnary, StreamExchange, StreamNode}; use crate::optimizer::plan_node::{LogicalShare, PlanBase, PlanTreeNode}; @@ -34,7 +36,7 @@ impl StreamShare { let input = core.input.borrow().0.clone(); let dist = input.distribution().clone(); // Filter executor won't change the append-only behavior of the stream. - let base = PlanBase::new_stream_with_logical( + let base = PlanBase::new_stream_with_core( &core, dist, input.append_only(), @@ -79,7 +81,7 @@ impl StreamNode for StreamShare { impl StreamShare { pub fn adhoc_to_stream_prost(&self, state: &mut BuildFragmentGraphState) -> PbStreamNode { - let operator_id = self.base.id.0 as u32; + let operator_id = self.base.id().0 as u32; match state.get_share_stream_node(operator_id) { None => { diff --git a/src/frontend/src/optimizer/plan_node/stream_simple_agg.rs b/src/frontend/src/optimizer/plan_node/stream_simple_agg.rs index 59311dd22226c..92d96fdf21b08 100644 --- a/src/frontend/src/optimizer/plan_node/stream_simple_agg.rs +++ b/src/frontend/src/optimizer/plan_node/stream_simple_agg.rs @@ -21,6 +21,8 @@ use super::generic::{self, PlanAggCall}; use super::utils::{childless_record, plan_node_name, Distill}; use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::expr::ExprRewriter; +use crate::optimizer::plan_node::generic::PhysicalPlanRef; +use crate::optimizer::plan_node::stream::StreamPlanRef; use crate::optimizer::property::Distribution; use crate::stream_fragmenter::BuildFragmentGraphState; @@ -48,7 +50,7 @@ impl StreamSimpleAgg { let watermark_columns = FixedBitSet::with_capacity(core.output_len()); // Simple agg executor might change the append-only behavior of the stream. - let base = PlanBase::new_stream_with_logical(&core, dist, false, false, watermark_columns); + let base = PlanBase::new_stream_with_core(&core, dist, false, false, watermark_columns); StreamSimpleAgg { base, core, @@ -99,7 +101,7 @@ impl StreamNode for StreamSimpleAgg { .collect(), distribution_key: self .base - .dist + .distribution() .dist_column_indices() .iter() .map(|idx| *idx as u32) diff --git a/src/frontend/src/optimizer/plan_node/stream_sink.rs b/src/frontend/src/optimizer/plan_node/stream_sink.rs index a51380d630331..32e9fb487910c 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sink.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sink.rs @@ -37,6 +37,7 @@ use risingwave_pb::stream_plan::stream_node::PbNodeBody; use tracing::info; use super::derive::{derive_columns, derive_pk}; +use super::generic::GenericPlanRef; use super::utils::{childless_record, Distill, IndicesDisplay, TableCatalogBuilder}; use super::{ExprRewritable, PlanBase, PlanRef, StreamNode}; use crate::optimizer::plan_node::PlanTreeNodeUnary; @@ -57,7 +58,7 @@ pub struct StreamSink { impl StreamSink { #[must_use] pub fn new(input: PlanRef, sink_desc: SinkDesc) -> Self { - let base = PlanBase::derive_stream_plan_base(&input); + let base = input.plan_base().clone_with_new_plan_id(); Self { base, input, @@ -389,7 +390,7 @@ impl Distill for StreamSink { .iter() .map(|k| k.column_index) .collect_vec(), - schema: &self.base.schema, + schema: self.base.schema(), }; vec.push(("pk", pk.distill())); } @@ -409,7 +410,7 @@ impl StreamNode for StreamSink { PbNodeBody::Sink(SinkNode { sink_desc: Some(self.sink_desc.to_proto()), table: Some(table.to_internal_table_prost()), - log_store_type: match self.base.ctx.session_ctx().config().get_sink_decouple() { + log_store_type: match self.base.ctx().session_ctx().config().get_sink_decouple() { SinkDecouple::Default => { let enable_sink_decouple = match_sink_name_str!( diff --git a/src/frontend/src/optimizer/plan_node/stream_sort.rs b/src/frontend/src/optimizer/plan_node/stream_sort.rs index b82d71068d817..41a56a0fd5df2 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sort.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sort.rs @@ -20,6 +20,8 @@ use risingwave_common::catalog::FieldDisplay; use risingwave_common::util::sort_util::OrderType; use risingwave_pb::stream_plan::stream_node::PbNodeBody; +use super::generic::{GenericPlanRef, PhysicalPlanRef}; +use super::stream::StreamPlanRef; use super::utils::{childless_record, Distill, TableCatalogBuilder}; use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::stream_fragmenter::BuildFragmentGraphState; @@ -84,7 +86,7 @@ impl StreamEowcSort { tbl_builder.add_order_column(self.sort_column_index, OrderType::ascending()); order_cols.insert(self.sort_column_index); - let dist_key = self.base.dist.dist_column_indices().to_vec(); + let dist_key = self.base.distribution().dist_column_indices().to_vec(); for idx in &dist_key { if !order_cols.contains(idx) { tbl_builder.add_order_column(*idx, OrderType::ascending()); diff --git a/src/frontend/src/optimizer/plan_node/stream_source.rs b/src/frontend/src/optimizer/plan_node/stream_source.rs index 377e2704776bb..ae66cf568118b 100644 --- a/src/frontend/src/optimizer/plan_node/stream_source.rs +++ b/src/frontend/src/optimizer/plan_node/stream_source.rs @@ -37,7 +37,7 @@ pub struct StreamSource { impl StreamSource { pub fn new(core: generic::Source) -> Self { - let base = PlanBase::new_stream_with_logical( + let base = PlanBase::new_stream_with_core( &core, Distribution::SomeShard, core.catalog.as_ref().map_or(true, |s| s.append_only), diff --git a/src/frontend/src/optimizer/plan_node/stream_stateless_simple_agg.rs b/src/frontend/src/optimizer/plan_node/stream_stateless_simple_agg.rs index 0af7ebded94d9..474582ec877c7 100644 --- a/src/frontend/src/optimizer/plan_node/stream_stateless_simple_agg.rs +++ b/src/frontend/src/optimizer/plan_node/stream_stateless_simple_agg.rs @@ -20,6 +20,7 @@ use super::generic::{self, PlanAggCall}; use super::utils::impl_distill_by_unit; use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::expr::ExprRewriter; +use crate::optimizer::plan_node::generic::PhysicalPlanRef; use crate::optimizer::property::RequiredDist; use crate::stream_fragmenter::BuildFragmentGraphState; @@ -49,7 +50,7 @@ impl StreamStatelessSimpleAgg { } } - let base = PlanBase::new_stream_with_logical( + let base = PlanBase::new_stream_with_core( &core, input_dist.clone(), input.append_only(), diff --git a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs index 907a41db28525..965ca217a3369 100644 --- a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs +++ b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs @@ -24,11 +24,13 @@ use risingwave_common::util::sort_util::OrderType; use risingwave_pb::stream_plan::stream_node::PbNodeBody; use risingwave_pb::stream_plan::{ChainType, PbStreamNode}; +use super::generic::PhysicalPlanRef; use super::utils::{childless_record, Distill}; use super::{generic, ExprRewritable, PlanBase, PlanNodeId, PlanRef, StreamNode}; use crate::catalog::ColumnId; use crate::expr::{ExprRewriter, FunctionCall}; use crate::optimizer::plan_node::generic::GenericPlanRef; +use crate::optimizer::plan_node::stream::StreamPlanRef; use crate::optimizer::plan_node::utils::{IndicesDisplay, TableCatalogBuilder}; use crate::optimizer::property::{Distribution, DistributionDisplay}; use crate::stream_fragmenter::BuildFragmentGraphState; @@ -66,7 +68,7 @@ impl StreamTableScan { None => Distribution::SomeShard, } }; - let base = PlanBase::new_stream_with_logical( + let base = PlanBase::new_stream_with_core( &core, distribution, core.table_desc.append_only, @@ -192,7 +194,7 @@ impl_plan_tree_node_for_leaf! { StreamTableScan } impl Distill for StreamTableScan { fn distill<'a>(&self) -> XmlNode<'a> { - let verbose = self.base.ctx.is_explain_verbose(); + let verbose = self.base.ctx().is_explain_verbose(); let mut vec = Vec::with_capacity(4); vec.push(("table", Pretty::from(self.core.table_name.clone()))); vec.push(("columns", self.core.columns_pretty(verbose))); @@ -200,12 +202,12 @@ impl Distill for StreamTableScan { if verbose { let pk = IndicesDisplay { indices: self.stream_key().unwrap_or_default(), - schema: &self.base.schema, + schema: self.base.schema(), }; vec.push(("pk", pk.distill())); let dist = Pretty::display(&DistributionDisplay { distribution: self.distribution(), - input_schema: &self.base.schema, + input_schema: self.base.schema(), }); vec.push(("dist", dist)); } @@ -325,7 +327,7 @@ impl StreamTableScan { ..Default::default() })), stream_key, - operator_id: self.base.id.0 as u64, + operator_id: self.base.id().0 as u64, identity: { let s = self.distill_to_string(); s.replace("StreamTableScan", "Chain") diff --git a/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs b/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs index 2191ca322342d..675dbeb9ab381 100644 --- a/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs +++ b/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs @@ -18,6 +18,8 @@ use risingwave_pb::plan_common::JoinType; use risingwave_pb::stream_plan::stream_node::NodeBody; use risingwave_pb::stream_plan::TemporalJoinNode; +use super::generic::GenericPlanRef; +use super::stream::StreamPlanRef; use super::utils::{childless_record, watermark_pretty, Distill}; use super::{generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeBinary, StreamNode}; use crate::expr::{Expr, ExprRewriter}; @@ -61,7 +63,7 @@ impl StreamTemporalJoin { .rewrite_bitset(core.left.watermark_columns()), ); - let base = PlanBase::new_stream_with_logical( + let base = PlanBase::new_stream_with_core( &core, dist, true, @@ -88,7 +90,7 @@ impl StreamTemporalJoin { impl Distill for StreamTemporalJoin { fn distill<'a>(&self) -> XmlNode<'a> { - let verbose = self.base.ctx.is_explain_verbose(); + let verbose = self.base.ctx().is_explain_verbose(); let mut vec = Vec::with_capacity(if verbose { 3 } else { 2 }); vec.push(("type", Pretty::debug(&self.core.join_type))); @@ -101,7 +103,7 @@ impl Distill for StreamTemporalJoin { }), )); - if let Some(ow) = watermark_pretty(&self.base.watermark_columns, self.schema()) { + if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) { vec.push(("output_watermarks", ow)); } diff --git a/src/frontend/src/optimizer/plan_node/stream_topn.rs b/src/frontend/src/optimizer/plan_node/stream_topn.rs index e7a880fa7d757..87890625f6be7 100644 --- a/src/frontend/src/optimizer/plan_node/stream_topn.rs +++ b/src/frontend/src/optimizer/plan_node/stream_topn.rs @@ -40,7 +40,7 @@ impl StreamTopN { }; let watermark_columns = FixedBitSet::with_capacity(input.schema().len()); - let base = PlanBase::new_stream_with_logical(&core, dist, false, false, watermark_columns); + let base = PlanBase::new_stream_with_core(&core, dist, false, false, watermark_columns); StreamTopN { base, core } } diff --git a/src/frontend/src/optimizer/plan_node/stream_union.rs b/src/frontend/src/optimizer/plan_node/stream_union.rs index 8f6353d6be44c..6d6dca2d8dd02 100644 --- a/src/frontend/src/optimizer/plan_node/stream_union.rs +++ b/src/frontend/src/optimizer/plan_node/stream_union.rs @@ -19,6 +19,8 @@ use pretty_xmlish::XmlNode; use risingwave_pb::stream_plan::stream_node::PbNodeBody; use risingwave_pb::stream_plan::UnionNode; +use super::generic::GenericPlanRef; +use super::stream::StreamPlanRef; use super::utils::{childless_record, watermark_pretty, Distill}; use super::{generic, ExprRewritable, PlanRef}; use crate::optimizer::plan_node::generic::GenericPlanNode; @@ -46,7 +48,7 @@ impl StreamUnion { |acc_watermark_columns, input| acc_watermark_columns.bitand(input.watermark_columns()), ); - let base = PlanBase::new_stream_with_logical( + let base = PlanBase::new_stream_with_core( &core, dist, inputs.iter().all(|x| x.append_only()), @@ -60,7 +62,7 @@ impl StreamUnion { impl Distill for StreamUnion { fn distill<'a>(&self) -> XmlNode<'a> { let mut vec = self.core.fields_pretty(); - if let Some(ow) = watermark_pretty(&self.base.watermark_columns, self.schema()) { + if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) { vec.push(("output_watermarks", ow)); } childless_record("StreamUnion", vec) diff --git a/src/frontend/src/optimizer/plan_node/stream_values.rs b/src/frontend/src/optimizer/plan_node/stream_values.rs index fb0b844411f63..f8cc5db851159 100644 --- a/src/frontend/src/optimizer/plan_node/stream_values.rs +++ b/src/frontend/src/optimizer/plan_node/stream_values.rs @@ -18,6 +18,7 @@ use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode; use risingwave_pb::stream_plan::values_node::ExprTuple; use risingwave_pb::stream_plan::ValuesNode; +use super::generic::GenericPlanRef; use super::utils::{childless_record, Distill}; use super::{ExprRewritable, LogicalValues, PlanBase, StreamNode}; use crate::expr::{Expr, ExprImpl}; diff --git a/src/frontend/src/optimizer/plan_node/stream_watermark_filter.rs b/src/frontend/src/optimizer/plan_node/stream_watermark_filter.rs index ed5a946603ee4..066bc9a234ca5 100644 --- a/src/frontend/src/optimizer/plan_node/stream_watermark_filter.rs +++ b/src/frontend/src/optimizer/plan_node/stream_watermark_filter.rs @@ -21,6 +21,7 @@ use risingwave_common::util::sort_util::OrderType; use risingwave_pb::catalog::WatermarkDesc; use risingwave_pb::stream_plan::stream_node::PbNodeBody; +use super::stream::StreamPlanRef; use super::utils::{childless_record, watermark_pretty, Distill, TableCatalogBuilder}; use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode}; use crate::expr::{ExprDisplay, ExprImpl}; @@ -85,7 +86,7 @@ impl Distill for StreamWatermarkFilter { }) .collect(); let display_output_watermarks = - watermark_pretty(&self.base.watermark_columns, input_schema).unwrap(); + watermark_pretty(self.base.watermark_columns(), input_schema).unwrap(); let fields = vec![ ("watermark_descs", Pretty::Array(display_watermark_descs)), ("output_watermarks", display_output_watermarks), diff --git a/src/frontend/src/optimizer/plan_rewriter/plan_cloner.rs b/src/frontend/src/optimizer/plan_rewriter/plan_cloner.rs index f30f3d9fa4966..7e53b903ac962 100644 --- a/src/frontend/src/optimizer/plan_rewriter/plan_cloner.rs +++ b/src/frontend/src/optimizer/plan_rewriter/plan_cloner.rs @@ -16,6 +16,7 @@ use std::collections::HashMap; use itertools::Itertools; +use crate::optimizer::plan_node::generic::GenericPlanRef; use crate::optimizer::plan_node::{LogicalShare, PlanNodeId, PlanTreeNode, StreamShare}; use crate::optimizer::PlanRewriter; use crate::PlanRef; diff --git a/src/frontend/src/optimizer/plan_rewriter/share_source_rewriter.rs b/src/frontend/src/optimizer/plan_rewriter/share_source_rewriter.rs index 9ab0d4d580ddc..5b9efb9fc7c94 100644 --- a/src/frontend/src/optimizer/plan_rewriter/share_source_rewriter.rs +++ b/src/frontend/src/optimizer/plan_rewriter/share_source_rewriter.rs @@ -17,6 +17,7 @@ use std::collections::{HashMap, HashSet}; use itertools::Itertools; use crate::catalog::SourceId; +use crate::optimizer::plan_node::generic::GenericPlanRef; use crate::optimizer::plan_node::{ LogicalShare, LogicalSource, PlanNodeId, PlanTreeNode, StreamShare, }; diff --git a/src/frontend/src/optimizer/plan_visitor/share_parent_counter.rs b/src/frontend/src/optimizer/plan_visitor/share_parent_counter.rs index 7d538392f9361..7950b5d81a49c 100644 --- a/src/frontend/src/optimizer/plan_visitor/share_parent_counter.rs +++ b/src/frontend/src/optimizer/plan_visitor/share_parent_counter.rs @@ -15,6 +15,7 @@ use std::collections::HashMap; use super::{DefaultBehavior, DefaultValue}; +use crate::optimizer::plan_node::generic::GenericPlanRef; use crate::optimizer::plan_node::{LogicalShare, PlanNodeId, PlanTreeNodeUnary}; use crate::optimizer::plan_visitor::PlanVisitor; diff --git a/src/frontend/src/optimizer/property/distribution.rs b/src/frontend/src/optimizer/property/distribution.rs index 4fcaf959eac87..b6e7715dd155f 100644 --- a/src/frontend/src/optimizer/property/distribution.rs +++ b/src/frontend/src/optimizer/property/distribution.rs @@ -295,10 +295,12 @@ impl RequiredDist { pub fn enforce_if_not_satisfies( &self, - plan: PlanRef, + mut plan: PlanRef, required_order: &Order, ) -> Result { - let plan = required_order.enforce_if_not_satisfies(plan)?; + if let Convention::Batch = plan.convention() { + plan = required_order.enforce_if_not_satisfies(plan)?; + } if !plan.distribution().satisfies(self) { Ok(self.enforce(plan, required_order)) } else { diff --git a/src/frontend/src/optimizer/property/order.rs b/src/frontend/src/optimizer/property/order.rs index a70bffb13a8ba..19ad7586e1c11 100644 --- a/src/frontend/src/optimizer/property/order.rs +++ b/src/frontend/src/optimizer/property/order.rs @@ -92,7 +92,7 @@ impl Order { } } - pub fn enforce(&self, plan: PlanRef) -> PlanRef { + fn enforce(&self, plan: PlanRef) -> PlanRef { assert_eq!(plan.convention(), Convention::Batch); BatchSort::new(plan, self.clone()).into() } diff --git a/src/frontend/src/optimizer/rule/always_false_filter_rule.rs b/src/frontend/src/optimizer/rule/always_false_filter_rule.rs index 02165232372e4..eeba7d9f3be3b 100644 --- a/src/frontend/src/optimizer/rule/always_false_filter_rule.rs +++ b/src/frontend/src/optimizer/rule/always_false_filter_rule.rs @@ -15,6 +15,7 @@ use risingwave_common::types::ScalarImpl; use super::Rule; +use crate::optimizer::plan_node::generic::GenericPlanRef; use crate::optimizer::plan_node::{LogicalFilter, LogicalValues}; use crate::PlanRef; diff --git a/src/frontend/src/optimizer/rule/apply_join_transpose_rule.rs b/src/frontend/src/optimizer/rule/apply_join_transpose_rule.rs index 66579248a76f9..7ac121692c81d 100644 --- a/src/frontend/src/optimizer/rule/apply_join_transpose_rule.rs +++ b/src/frontend/src/optimizer/rule/apply_join_transpose_rule.rs @@ -23,6 +23,7 @@ use crate::expr::{ CorrelatedId, CorrelatedInputRef, Expr, ExprImpl, ExprRewriter, ExprType, FunctionCall, InputRef, }; +use crate::optimizer::plan_node::generic::GenericPlanRef; use crate::optimizer::plan_node::{LogicalApply, LogicalFilter, LogicalJoin, PlanTreeNodeBinary}; use crate::optimizer::plan_visitor::{ExprCorrelatedIdFinder, PlanCorrelatedIdFinder}; use crate::optimizer::rule::apply_offset_rewriter::ApplyCorrelatedIndicesConverter; diff --git a/src/frontend/src/optimizer/rule/expand_to_project_rule.rs b/src/frontend/src/optimizer/rule/expand_to_project_rule.rs index 1ed1da0037aba..01a39042efd98 100644 --- a/src/frontend/src/optimizer/rule/expand_to_project_rule.rs +++ b/src/frontend/src/optimizer/rule/expand_to_project_rule.rs @@ -36,7 +36,7 @@ impl Rule for ExpandToProjectRule { let column_subset = column_subsets.get(0).unwrap(); // if `column_subsets` len equals 1, convert it into a project - let mut exprs = Vec::with_capacity(expand.base.schema.len()); + let mut exprs = Vec::with_capacity(expand.base.schema().len()); // Add original input column first for i in 0..input.schema().len() { exprs.push(ExprImpl::InputRef( diff --git a/src/frontend/src/optimizer/rule/index_selection_rule.rs b/src/frontend/src/optimizer/rule/index_selection_rule.rs index 9103d1bc906bc..323cc59ef3558 100644 --- a/src/frontend/src/optimizer/rule/index_selection_rule.rs +++ b/src/frontend/src/optimizer/rule/index_selection_rule.rs @@ -66,6 +66,7 @@ use crate::expr::{ FunctionCall, InputRef, }; use crate::optimizer::optimizer_context::OptimizerContextRef; +use crate::optimizer::plan_node::generic::GenericPlanRef; use crate::optimizer::plan_node::{ generic, ColumnPruningContext, LogicalJoin, LogicalScan, LogicalUnion, PlanTreeNode, PlanTreeNodeBinary, PredicatePushdown, PredicatePushdownContext, diff --git a/src/frontend/src/optimizer/rule/left_deep_tree_join_ordering_rule.rs b/src/frontend/src/optimizer/rule/left_deep_tree_join_ordering_rule.rs index dcbb6f7b015ee..bd2db0ac67cca 100644 --- a/src/frontend/src/optimizer/rule/left_deep_tree_join_ordering_rule.rs +++ b/src/frontend/src/optimizer/rule/left_deep_tree_join_ordering_rule.rs @@ -47,6 +47,7 @@ mod tests { use super::*; use crate::expr::{ExprImpl, FunctionCall, InputRef}; use crate::optimizer::optimizer_context::OptimizerContext; + use crate::optimizer::plan_node::generic::GenericPlanRef; use crate::utils::Condition; #[tokio::test] diff --git a/src/frontend/src/optimizer/rule/merge_multijoin_rule.rs b/src/frontend/src/optimizer/rule/merge_multijoin_rule.rs index c496a906400ae..8682db8491a1d 100644 --- a/src/frontend/src/optimizer/rule/merge_multijoin_rule.rs +++ b/src/frontend/src/optimizer/rule/merge_multijoin_rule.rs @@ -46,6 +46,7 @@ mod tests { use super::*; use crate::expr::{ExprImpl, FunctionCall, InputRef}; use crate::optimizer::optimizer_context::OptimizerContext; + use crate::optimizer::plan_node::generic::GenericPlanRef; use crate::utils::Condition; #[tokio::test] diff --git a/src/frontend/src/optimizer/rule/min_max_on_index_rule.rs b/src/frontend/src/optimizer/rule/min_max_on_index_rule.rs index ea8386bc227f8..c32ae40531cd0 100644 --- a/src/frontend/src/optimizer/rule/min_max_on_index_rule.rs +++ b/src/frontend/src/optimizer/rule/min_max_on_index_rule.rs @@ -27,7 +27,7 @@ use risingwave_expr::aggregate::AggKind; use super::{BoxedRule, Rule}; use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef}; -use crate::optimizer::plan_node::generic::Agg; +use crate::optimizer::plan_node::generic::{Agg, GenericPlanRef}; use crate::optimizer::plan_node::{ LogicalAgg, LogicalFilter, LogicalScan, LogicalTopN, PlanAggCall, PlanTreeNodeUnary, }; diff --git a/src/frontend/src/optimizer/rule/over_window_to_topn_rule.rs b/src/frontend/src/optimizer/rule/over_window_to_topn_rule.rs index dfb6963c7fb4f..93637d3ba8193 100644 --- a/src/frontend/src/optimizer/rule/over_window_to_topn_rule.rs +++ b/src/frontend/src/optimizer/rule/over_window_to_topn_rule.rs @@ -18,6 +18,7 @@ use risingwave_expr::window_function::WindowFuncKind; use super::Rule; use crate::expr::{collect_input_refs, ExprImpl, ExprType}; +use crate::optimizer::plan_node::generic::GenericPlanRef; use crate::optimizer::plan_node::{LogicalFilter, LogicalTopN, PlanTreeNodeUnary}; use crate::optimizer::property::Order; use crate::planner::LIMIT_ALL_COUNT; diff --git a/src/frontend/src/optimizer/rule/pull_up_correlated_predicate_rule.rs b/src/frontend/src/optimizer/rule/pull_up_correlated_predicate_rule.rs index dc5f9c2bc9aba..f34146ba80050 100644 --- a/src/frontend/src/optimizer/rule/pull_up_correlated_predicate_rule.rs +++ b/src/frontend/src/optimizer/rule/pull_up_correlated_predicate_rule.rs @@ -18,6 +18,7 @@ use risingwave_common::util::column_index_mapping::ColIndexMapping; use super::super::plan_node::*; use super::{BoxedRule, Rule}; use crate::expr::{CorrelatedId, CorrelatedInputRef, Expr, ExprImpl, ExprRewriter, InputRef}; +use crate::optimizer::plan_node::generic::GenericPlanRef; use crate::optimizer::plan_visitor::{PlanCorrelatedIdFinder, PlanVisitor}; use crate::optimizer::PlanRef; use crate::utils::Condition; diff --git a/src/frontend/src/optimizer/rule/table_function_to_project_set_rule.rs b/src/frontend/src/optimizer/rule/table_function_to_project_set_rule.rs index 5a6f1187fdd02..f85ffc2318459 100644 --- a/src/frontend/src/optimizer/rule/table_function_to_project_set_rule.rs +++ b/src/frontend/src/optimizer/rule/table_function_to_project_set_rule.rs @@ -18,6 +18,7 @@ use risingwave_common::types::DataType; use super::{BoxedRule, Rule}; use crate::expr::{Expr, ExprImpl, ExprType, FunctionCall, InputRef}; +use crate::optimizer::plan_node::generic::GenericPlanRef; use crate::optimizer::plan_node::{ LogicalProject, LogicalProjectSet, LogicalTableFunction, LogicalValues, PlanTreeNodeUnary, }; @@ -51,7 +52,7 @@ impl Rule for TableFunctionToProjectSetRule { let logical_values = LogicalValues::create( vec![vec![]], Schema::new(vec![]), - logical_table_function.base.ctx.clone(), + logical_table_function.base.ctx().clone(), ); let logical_project_set = LogicalProjectSet::create(logical_values, vec![table_function]); // We need a project to align schema type because diff --git a/src/frontend/src/optimizer/rule/trivial_project_to_values_rule.rs b/src/frontend/src/optimizer/rule/trivial_project_to_values_rule.rs index 9759739490fe6..a13bef3baa9d9 100644 --- a/src/frontend/src/optimizer/rule/trivial_project_to_values_rule.rs +++ b/src/frontend/src/optimizer/rule/trivial_project_to_values_rule.rs @@ -13,6 +13,7 @@ // limitations under the License. use super::{BoxedRule, Rule}; +use crate::optimizer::plan_node::generic::GenericPlanRef; use crate::optimizer::plan_node::{LogicalValues, PlanTreeNodeUnary}; use crate::optimizer::plan_visitor::{LogicalCardinalityExt, SideEffectVisitor}; use crate::optimizer::{PlanRef, PlanVisitor}; diff --git a/src/frontend/src/optimizer/rule/union_input_values_merge_rule.rs b/src/frontend/src/optimizer/rule/union_input_values_merge_rule.rs index 8119b8847b600..7b83c017ab781 100644 --- a/src/frontend/src/optimizer/rule/union_input_values_merge_rule.rs +++ b/src/frontend/src/optimizer/rule/union_input_values_merge_rule.rs @@ -13,6 +13,7 @@ // limitations under the License. use super::{BoxedRule, Rule}; +use crate::optimizer::plan_node::generic::GenericPlanRef; use crate::optimizer::plan_node::LogicalValues; use crate::optimizer::{PlanRef, PlanTreeNode}; diff --git a/src/frontend/src/optimizer/rule/union_to_distinct_rule.rs b/src/frontend/src/optimizer/rule/union_to_distinct_rule.rs index 2a12f6b712e0d..f1d203fba1350 100644 --- a/src/frontend/src/optimizer/rule/union_to_distinct_rule.rs +++ b/src/frontend/src/optimizer/rule/union_to_distinct_rule.rs @@ -13,7 +13,7 @@ // limitations under the License. use super::{BoxedRule, Rule}; -use crate::optimizer::plan_node::generic::Agg; +use crate::optimizer::plan_node::generic::{Agg, GenericPlanRef}; use crate::optimizer::plan_node::{LogicalUnion, PlanTreeNode}; use crate::optimizer::PlanRef; @@ -24,7 +24,7 @@ impl Rule for UnionToDistinctRule { let union: &LogicalUnion = plan.as_logical_union()?; if !union.all() { let union_all = LogicalUnion::create(true, union.inputs().into_iter().collect()); - let distinct = Agg::new(vec![], (0..union.base.schema.len()).collect(), union_all) + let distinct = Agg::new(vec![], (0..union.base.schema().len()).collect(), union_all) .with_enable_two_phase(false); Some(distinct.into()) } else { diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs index 4e16bc6cd0b21..cb20103b3e76f 100644 --- a/src/frontend/src/scheduler/plan_fragmenter.rs +++ b/src/frontend/src/scheduler/plan_fragmenter.rs @@ -103,7 +103,7 @@ impl Serialize for ExecutionPlanNode { impl From for ExecutionPlanNode { fn from(plan_node: PlanRef) -> Self { Self { - plan_node_id: plan_node.plan_base().id, + plan_node_id: plan_node.plan_base().id(), plan_node_type: plan_node.node_type(), node: plan_node.to_batch_prost_body(), children: vec![],