=
diff --git a/src/frontend/src/optimizer/plan_node/logical_scan.rs b/src/frontend/src/optimizer/plan_node/logical_scan.rs
index a499a9c6ea3d3..07d2a6c7653e7 100644
--- a/src/frontend/src/optimizer/plan_node/logical_scan.rs
+++ b/src/frontend/src/optimizer/plan_node/logical_scan.rs
@@ -273,7 +273,7 @@ impl LogicalScan {
self.output_col_idx().to_vec(),
self.core.table_desc.clone(),
self.indexes().to_vec(),
- self.base.ctx.clone(),
+ self.base.ctx().clone(),
predicate,
self.for_system_time_as_of_proctime(),
self.table_cardinality(),
@@ -288,7 +288,7 @@ impl LogicalScan {
output_col_idx,
self.core.table_desc.clone(),
self.indexes().to_vec(),
- self.base.ctx.clone(),
+ self.base.ctx().clone(),
self.predicate().clone(),
self.for_system_time_as_of_proctime(),
self.table_cardinality(),
@@ -309,7 +309,7 @@ impl_plan_tree_node_for_leaf! {LogicalScan}
impl Distill for LogicalScan {
fn distill<'a>(&self) -> XmlNode<'a> {
- let verbose = self.base.ctx.is_explain_verbose();
+ let verbose = self.base.ctx().is_explain_verbose();
let mut vec = Vec::with_capacity(5);
vec.push(("table", Pretty::from(self.table_name().to_owned())));
let key_is_columns =
@@ -440,7 +440,7 @@ impl LogicalScan {
let (scan_ranges, predicate) = self.predicate().clone().split_to_scan_ranges(
self.core.table_desc.clone(),
self.base
- .ctx
+ .ctx()
.session_ctx()
.config()
.get_max_split_range_gap(),
@@ -551,7 +551,7 @@ impl ToStream for LogicalScan {
None.into(),
)));
}
- match self.base.stream_key.is_none() {
+ match self.base.stream_key().is_none() {
true => {
let mut col_ids = HashSet::new();
diff --git a/src/frontend/src/optimizer/plan_node/logical_share.rs b/src/frontend/src/optimizer/plan_node/logical_share.rs
index d924ee7180168..d6b5711740a98 100644
--- a/src/frontend/src/optimizer/plan_node/logical_share.rs
+++ b/src/frontend/src/optimizer/plan_node/logical_share.rs
@@ -69,7 +69,7 @@ impl LogicalShare {
}
pub(super) fn pretty_fields<'a>(base: &PlanBase, name: &'a str) -> XmlNode<'a> {
- childless_record(name, vec![("id", Pretty::debug(&base.id.0))])
+ childless_record(name, vec![("id", Pretty::debug(&base.id().0))])
}
}
diff --git a/src/frontend/src/optimizer/plan_node/logical_source.rs b/src/frontend/src/optimizer/plan_node/logical_source.rs
index 1d37da9eaa40f..45a5fbcb2240f 100644
--- a/src/frontend/src/optimizer/plan_node/logical_source.rs
+++ b/src/frontend/src/optimizer/plan_node/logical_source.rs
@@ -28,6 +28,7 @@ use risingwave_connector::source::{ConnectorProperties, DataType};
use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
use risingwave_pb::plan_common::GeneratedColumnDesc;
+use super::generic::GenericPlanRef;
use super::stream_watermark_filter::StreamWatermarkFilter;
use super::utils::{childless_record, Distill};
use super::{
@@ -204,7 +205,7 @@ impl LogicalSource {
..self.core.clone()
};
let mut new_s3_plan: PlanRef = StreamSource {
- base: PlanBase::new_stream_with_logical(
+ base: PlanBase::new_stream_with_core(
&logical_source,
Distribution::Single,
true, // `list` will keep listing all objects, it must be append-only
@@ -506,7 +507,7 @@ impl PredicatePushdown for LogicalSource {
let mut new_conjunctions = Vec::with_capacity(predicate.conjunctions.len());
for expr in predicate.conjunctions {
- if let Some(e) = expr_to_kafka_timestamp_range(expr, &mut range, &self.base.schema) {
+ if let Some(e) = expr_to_kafka_timestamp_range(expr, &mut range, self.base.schema()) {
// Not recognized, so push back
new_conjunctions.push(e);
}
diff --git a/src/frontend/src/optimizer/plan_node/logical_union.rs b/src/frontend/src/optimizer/plan_node/logical_union.rs
index 51e4e620cf4ca..1f02b026c0020 100644
--- a/src/frontend/src/optimizer/plan_node/logical_union.rs
+++ b/src/frontend/src/optimizer/plan_node/logical_union.rs
@@ -130,7 +130,7 @@ impl ToBatch for LogicalUnion {
if !self.all() {
let batch_union = BatchUnion::new(new_logical).into();
Ok(BatchHashAgg::new(
- generic::Agg::new(vec![], (0..self.base.schema.len()).collect(), batch_union)
+ generic::Agg::new(vec![], (0..self.base.schema().len()).collect(), batch_union)
.with_enable_two_phase(false),
)
.into())
@@ -170,7 +170,7 @@ impl ToStream for LogicalUnion {
&self,
ctx: &mut RewriteStreamContext,
) -> Result<(PlanRef, ColIndexMapping)> {
- let original_schema = self.base.schema.clone();
+ let original_schema = self.base.schema().clone();
let original_schema_len = original_schema.len();
let mut rewrites = vec![];
for input in &self.core.inputs {
@@ -353,7 +353,7 @@ mod tests {
// Check the result
let union = plan.as_logical_union().unwrap();
- assert_eq!(union.base.schema.len(), 2);
+ assert_eq!(union.base.schema().len(), 2);
}
#[tokio::test]
diff --git a/src/frontend/src/optimizer/plan_node/logical_update.rs b/src/frontend/src/optimizer/plan_node/logical_update.rs
index 80e4f350d8edb..1dbe1d3d3c5c9 100644
--- a/src/frontend/src/optimizer/plan_node/logical_update.rs
+++ b/src/frontend/src/optimizer/plan_node/logical_update.rs
@@ -15,6 +15,7 @@
use risingwave_common::catalog::TableVersionId;
use risingwave_common::error::Result;
+use super::generic::GenericPlanRef;
use super::utils::impl_distill_by_unit;
use super::{
gen_filter_and_pushdown, generic, BatchUpdate, ColPrunable, ExprRewritable, LogicalProject,
diff --git a/src/frontend/src/optimizer/plan_node/logical_values.rs b/src/frontend/src/optimizer/plan_node/logical_values.rs
index c6a3d2ac0564e..e62c6400f2015 100644
--- a/src/frontend/src/optimizer/plan_node/logical_values.rs
+++ b/src/frontend/src/optimizer/plan_node/logical_values.rs
@@ -21,6 +21,7 @@ use risingwave_common::catalog::{Field, Schema};
use risingwave_common::error::Result;
use risingwave_common::types::{DataType, ScalarImpl};
+use super::generic::GenericPlanRef;
use super::utils::{childless_record, Distill};
use super::{
BatchValues, ColPrunable, ExprRewritable, LogicalFilter, PlanBase, PlanRef, PredicatePushdown,
@@ -144,7 +145,7 @@ impl ColPrunable for LogicalValues {
.iter()
.map(|i| self.schema().fields[*i].clone())
.collect();
- Self::new(rows, Schema { fields }, self.base.ctx.clone()).into()
+ Self::new(rows, Schema { fields }, self.base.ctx().clone()).into()
}
}
diff --git a/src/frontend/src/optimizer/plan_node/merge_eq_nodes.rs b/src/frontend/src/optimizer/plan_node/merge_eq_nodes.rs
index 73f82e86aa260..9f2e8d94634be 100644
--- a/src/frontend/src/optimizer/plan_node/merge_eq_nodes.rs
+++ b/src/frontend/src/optimizer/plan_node/merge_eq_nodes.rs
@@ -15,6 +15,7 @@
use std::collections::HashMap;
use std::hash::Hash;
+use super::generic::GenericPlanRef;
use super::{EndoPlan, LogicalShare, PlanNodeId, PlanRef, PlanTreeNodeUnary, VisitPlan};
use crate::optimizer::plan_visitor;
use crate::utils::{Endo, Visit};
diff --git a/src/frontend/src/optimizer/plan_node/mod.rs b/src/frontend/src/optimizer/plan_node/mod.rs
index 188787c93b8c0..f16ebfb0c792c 100644
--- a/src/frontend/src/optimizer/plan_node/mod.rs
+++ b/src/frontend/src/optimizer/plan_node/mod.rs
@@ -46,7 +46,7 @@ use serde::Serialize;
use smallvec::SmallVec;
use self::batch::BatchPlanRef;
-use self::generic::GenericPlanRef;
+use self::generic::{GenericPlanRef, PhysicalPlanRef};
use self::stream::StreamPlanRef;
use self::utils::Distill;
use super::property::{Distribution, FunctionalDependencySet, Order};
@@ -419,29 +419,31 @@ impl PlanTreeNode for PlanRef {
}
}
-impl StreamPlanRef for PlanRef {
- fn distribution(&self) -> &Distribution {
- &self.plan_base().dist
+impl PlanNodeMeta for PlanRef {
+ fn node_type(&self) -> PlanNodeType {
+ self.0.node_type()
}
- fn append_only(&self) -> bool {
- self.plan_base().append_only
+ fn plan_base(&self) -> &PlanBase {
+ self.0.plan_base()
}
- fn emit_on_window_close(&self) -> bool {
- self.plan_base().emit_on_window_close
+ fn convention(&self) -> Convention {
+ self.0.convention()
}
}
-impl BatchPlanRef for PlanRef {
- fn order(&self) -> &Order {
- &self.plan_base().order
+/// Implement for every type that provides [`PlanBase`] through [`PlanNodeMeta`].
+impl GenericPlanRef for P
+where
+ P: PlanNodeMeta + Eq + Hash,
+{
+ fn id(&self) -> PlanNodeId {
+ self.plan_base().id()
}
-}
-impl GenericPlanRef for PlanRef {
fn schema(&self) -> &Schema {
- &self.plan_base().schema
+ self.plan_base().schema()
}
fn stream_key(&self) -> Option<&[usize]> {
@@ -457,6 +459,47 @@ impl GenericPlanRef for PlanRef {
}
}
+/// Implement for every type that provides [`PlanBase`] through [`PlanNodeMeta`].
+// TODO: further constrain the convention to be `Stream` or `Batch`.
+impl
PhysicalPlanRef for P
+where
+ P: PlanNodeMeta + Eq + Hash,
+{
+ fn distribution(&self) -> &Distribution {
+ self.plan_base().distribution()
+ }
+}
+
+/// Implement for every type that provides [`PlanBase`] through [`PlanNodeMeta`].
+// TODO: further constrain the convention to be `Stream`.
+impl
StreamPlanRef for P
+where
+ P: PlanNodeMeta + Eq + Hash,
+{
+ fn append_only(&self) -> bool {
+ self.plan_base().append_only()
+ }
+
+ fn emit_on_window_close(&self) -> bool {
+ self.plan_base().emit_on_window_close()
+ }
+
+ fn watermark_columns(&self) -> &FixedBitSet {
+ self.plan_base().watermark_columns()
+ }
+}
+
+/// Implement for every type that provides [`PlanBase`] through [`PlanNodeMeta`].
+// TODO: further constrain the convention to be `Batch`.
+impl
BatchPlanRef for P
+where
+ P: PlanNodeMeta + Eq + Hash,
+{
+ fn order(&self) -> &Order {
+ self.plan_base().order()
+ }
+}
+
/// In order to let expression display id started from 1 for explaining, hidden column names and
/// other places. We will reset expression display id to 0 and clone the whole plan to reset the
/// schema.
@@ -512,15 +555,15 @@ pub(crate) fn pretty_config() -> PrettyConfig {
impl dyn PlanNode {
pub fn id(&self) -> PlanNodeId {
- self.plan_base().id
+ self.plan_base().id()
}
pub fn ctx(&self) -> OptimizerContextRef {
- self.plan_base().ctx.clone()
+ self.plan_base().ctx().clone()
}
pub fn schema(&self) -> &Schema {
- &self.plan_base().schema
+ self.plan_base().schema()
}
pub fn stream_key(&self) -> Option<&[usize]> {
@@ -528,27 +571,28 @@ impl dyn PlanNode {
}
pub fn order(&self) -> &Order {
- &self.plan_base().order
+ self.plan_base().order()
}
+ // TODO: avoid no manual delegation
pub fn distribution(&self) -> &Distribution {
- &self.plan_base().dist
+ self.plan_base().distribution()
}
pub fn append_only(&self) -> bool {
- self.plan_base().append_only
+ self.plan_base().append_only()
}
pub fn emit_on_window_close(&self) -> bool {
- self.plan_base().emit_on_window_close
+ self.plan_base().emit_on_window_close()
}
pub fn functional_dependency(&self) -> &FunctionalDependencySet {
- &self.plan_base().functional_dependency
+ self.plan_base().functional_dependency()
}
pub fn watermark_columns(&self) -> &FixedBitSet {
- &self.plan_base().watermark_columns
+ self.plan_base().watermark_columns()
}
/// Serialize the plan node and its children to a stream plan proto.
diff --git a/src/frontend/src/optimizer/plan_node/plan_base.rs b/src/frontend/src/optimizer/plan_node/plan_base.rs
index e9a5bf26885bf..51b1aa5f41141 100644
--- a/src/frontend/src/optimizer/plan_node/plan_base.rs
+++ b/src/frontend/src/optimizer/plan_node/plan_base.rs
@@ -14,47 +14,132 @@
use educe::Educe;
use fixedbitset::FixedBitSet;
-use paste::paste;
use risingwave_common::catalog::Schema;
use super::generic::GenericPlanNode;
use super::*;
-use crate::for_all_plan_nodes;
use crate::optimizer::optimizer_context::OptimizerContextRef;
use crate::optimizer::property::{Distribution, FunctionalDependencySet, Order};
-/// the common fields of all nodes, please make a field named `base` in
-/// every planNode and correctly value it when construct the planNode.
-#[derive(Clone, Debug, Educe)]
-#[educe(PartialEq, Eq, Hash)]
-pub struct PlanBase {
- #[educe(PartialEq(ignore))]
- #[educe(Hash(ignore))]
- pub id: PlanNodeId,
- #[educe(PartialEq(ignore))]
- #[educe(Hash(ignore))]
- pub ctx: OptimizerContextRef,
- pub schema: Schema,
- /// the pk indices of the PlanNode's output, a empty stream key vec means there is no stream key
- pub stream_key: Option>,
- /// The order property of the PlanNode's output, store an `&Order::any()` here will not affect
- /// correctness, but insert unnecessary sort in plan
- pub order: Order,
+/// Common extra fields for physical plan nodes.
+#[derive(Clone, Debug, PartialEq, Eq, Hash)]
+struct PhysicalCommonExtra {
/// The distribution property of the PlanNode's output, store an `Distribution::any()` here
/// will not affect correctness, but insert unnecessary exchange in plan
- pub dist: Distribution,
+ dist: Distribution,
+}
+
+/// Extra fields for stream plan nodes.
+#[derive(Clone, Debug, PartialEq, Eq, Hash)]
+struct StreamExtra {
+ /// Common fields for physical plan nodes.
+ physical: PhysicalCommonExtra,
+
/// The append-only property of the PlanNode's output is a stream-only property. Append-only
/// means the stream contains only insert operation.
- pub append_only: bool,
+ append_only: bool,
/// Whether the output is emitted on window close.
- pub emit_on_window_close: bool,
- pub functional_dependency: FunctionalDependencySet,
+ emit_on_window_close: bool,
/// The watermark column indices of the PlanNode's output. There could be watermark output from
/// this stream operator.
- pub watermark_columns: FixedBitSet,
+ watermark_columns: FixedBitSet,
+}
+
+/// Extra fields for batch plan nodes.
+#[derive(Clone, Debug, PartialEq, Eq, Hash)]
+struct BatchExtra {
+ /// Common fields for physical plan nodes.
+ physical: PhysicalCommonExtra,
+
+ /// The order property of the PlanNode's output, store an `&Order::any()` here will not affect
+ /// correctness, but insert unnecessary sort in plan
+ order: Order,
+}
+
+/// Extra fields for physical plan nodes.
+#[derive(Clone, Debug, PartialEq, Eq, Hash)]
+enum PhysicalExtra {
+ Stream(StreamExtra),
+ Batch(BatchExtra),
+}
+
+impl PhysicalExtra {
+ fn common(&self) -> &PhysicalCommonExtra {
+ match self {
+ PhysicalExtra::Stream(stream) => &stream.physical,
+ PhysicalExtra::Batch(batch) => &batch.physical,
+ }
+ }
+
+ fn common_mut(&mut self) -> &mut PhysicalCommonExtra {
+ match self {
+ PhysicalExtra::Stream(stream) => &mut stream.physical,
+ PhysicalExtra::Batch(batch) => &mut batch.physical,
+ }
+ }
+
+ fn stream(&self) -> &StreamExtra {
+ match self {
+ PhysicalExtra::Stream(extra) => extra,
+ _ => panic!("access stream properties from batch plan node"),
+ }
+ }
+
+ fn batch(&self) -> &BatchExtra {
+ match self {
+ PhysicalExtra::Batch(extra) => extra,
+ _ => panic!("access batch properties from stream plan node"),
+ }
+ }
+}
+
+/// the common fields of all nodes, please make a field named `base` in
+/// every planNode and correctly value it when construct the planNode.
+///
+/// All fields are intentionally made private and immutable, as they should
+/// normally be the same as the given [`GenericPlanNode`] when constructing.
+///
+/// - To access them, use traits including [`GenericPlanRef`],
+/// [`PhysicalPlanRef`], [`StreamPlanRef`] and [`BatchPlanRef`].
+/// - To mutate them, use methods like `new_*` or `clone_with_*`.
+#[derive(Clone, Debug, Educe)]
+#[educe(PartialEq, Eq, Hash)]
+pub struct PlanBase {
+ // -- common fields --
+ #[educe(PartialEq(ignore), Hash(ignore))]
+ id: PlanNodeId,
+ #[educe(PartialEq(ignore), Hash(ignore))]
+ ctx: OptimizerContextRef,
+
+ schema: Schema,
+ /// the pk indices of the PlanNode's output, a empty stream key vec means there is no stream key
+ // TODO: this is actually a logical and stream only property
+ stream_key: Option>,
+ functional_dependency: FunctionalDependencySet,
+
+ /// Extra fields if the plan node is physical.
+ physical_extra: Option,
+}
+
+impl PlanBase {
+ fn physical_extra(&self) -> &PhysicalExtra {
+ self.physical_extra
+ .as_ref()
+ .expect("access physical properties from logical plan node")
+ }
+
+ fn physical_extra_mut(&mut self) -> &mut PhysicalExtra {
+ self.physical_extra
+ .as_mut()
+ .expect("access physical properties from logical plan node")
+ }
}
impl generic::GenericPlanRef for PlanBase {
+ fn id(&self) -> PlanNodeId {
+ self.id
+ }
+
fn schema(&self) -> &Schema {
&self.schema
}
@@ -72,23 +157,29 @@ impl generic::GenericPlanRef for PlanBase {
}
}
-impl stream::StreamPlanRef for PlanBase {
+impl generic::PhysicalPlanRef for PlanBase {
fn distribution(&self) -> &Distribution {
- &self.dist
+ &self.physical_extra().common().dist
}
+}
+impl stream::StreamPlanRef for PlanBase {
fn append_only(&self) -> bool {
- self.append_only
+ self.physical_extra().stream().append_only
}
fn emit_on_window_close(&self) -> bool {
- self.emit_on_window_close
+ self.physical_extra().stream().emit_on_window_close
+ }
+
+ fn watermark_columns(&self) -> &FixedBitSet {
+ &self.physical_extra().stream().watermark_columns
}
}
impl batch::BatchPlanRef for PlanBase {
fn order(&self) -> &Order {
- &self.order
+ &self.physical_extra().batch().order
}
}
@@ -100,47 +191,22 @@ impl PlanBase {
functional_dependency: FunctionalDependencySet,
) -> Self {
let id = ctx.next_plan_node_id();
- let watermark_columns = FixedBitSet::with_capacity(schema.len());
Self {
id,
ctx,
schema,
stream_key,
- dist: Distribution::Single,
- order: Order::any(),
- // Logical plan node won't touch `append_only` field
- append_only: true,
- emit_on_window_close: false,
functional_dependency,
- watermark_columns,
+ physical_extra: None,
}
}
- pub fn new_logical_with_core(node: &impl GenericPlanNode) -> Self {
+ pub fn new_logical_with_core(core: &impl GenericPlanNode) -> Self {
Self::new_logical(
- node.ctx(),
- node.schema(),
- node.stream_key(),
- node.functional_dependency(),
- )
- }
-
- pub fn new_stream_with_logical(
- logical: &impl GenericPlanNode,
- dist: Distribution,
- append_only: bool,
- emit_on_window_close: bool,
- watermark_columns: FixedBitSet,
- ) -> Self {
- Self::new_stream(
- logical.ctx(),
- logical.schema(),
- logical.stream_key(),
- logical.functional_dependency(),
- dist,
- append_only,
- emit_on_window_close,
- watermark_columns,
+ core.ctx(),
+ core.schema(),
+ core.stream_key(),
+ core.functional_dependency(),
)
}
@@ -160,22 +226,36 @@ impl PlanBase {
id,
ctx,
schema,
- dist,
- order: Order::any(),
stream_key,
- append_only,
- emit_on_window_close,
functional_dependency,
- watermark_columns,
+ physical_extra: Some(PhysicalExtra::Stream({
+ StreamExtra {
+ physical: PhysicalCommonExtra { dist },
+ append_only,
+ emit_on_window_close,
+ watermark_columns,
+ }
+ })),
}
}
- pub fn new_batch_from_logical(
- logical: &impl GenericPlanNode,
+ pub fn new_stream_with_core(
+ core: &impl GenericPlanNode,
dist: Distribution,
- order: Order,
+ append_only: bool,
+ emit_on_window_close: bool,
+ watermark_columns: FixedBitSet,
) -> Self {
- Self::new_batch(logical.ctx(), logical.schema(), dist, order)
+ Self::new_stream(
+ core.ctx(),
+ core.schema(),
+ core.stream_key(),
+ core.functional_dependency(),
+ dist,
+ append_only,
+ emit_on_window_close,
+ watermark_columns,
+ )
}
pub fn new_batch(
@@ -186,75 +266,49 @@ impl PlanBase {
) -> Self {
let id = ctx.next_plan_node_id();
let functional_dependency = FunctionalDependencySet::new(schema.len());
- let watermark_columns = FixedBitSet::with_capacity(schema.len());
Self {
id,
ctx,
schema,
- dist,
- order,
stream_key: None,
- // Batch plan node won't touch `append_only` field
- append_only: true,
- emit_on_window_close: false, // TODO(rc): batch EOWC support?
functional_dependency,
- watermark_columns,
+ physical_extra: Some(PhysicalExtra::Batch({
+ BatchExtra {
+ physical: PhysicalCommonExtra { dist },
+ order,
+ }
+ })),
}
}
- pub fn derive_stream_plan_base(plan_node: &PlanRef) -> Self {
- PlanBase::new_stream(
- plan_node.ctx(),
- plan_node.schema().clone(),
- plan_node.stream_key().map(|v| v.to_vec()),
- plan_node.functional_dependency().clone(),
- plan_node.distribution().clone(),
- plan_node.append_only(),
- plan_node.emit_on_window_close(),
- plan_node.watermark_columns().clone(),
- )
+ pub fn new_batch_with_core(
+ core: &impl GenericPlanNode,
+ dist: Distribution,
+ order: Order,
+ ) -> Self {
+ Self::new_batch(core.ctx(), core.schema(), dist, order)
}
pub fn clone_with_new_plan_id(&self) -> Self {
let mut new = self.clone();
- new.id = self.ctx.next_plan_node_id();
+ new.id = self.ctx().next_plan_node_id();
+ new
+ }
+
+ /// Clone the plan node with a new distribution.
+ ///
+ /// Panics if the plan node is not physical.
+ pub fn clone_with_new_distribution(&self, dist: Distribution) -> Self {
+ let mut new = self.clone();
+ new.physical_extra_mut().common_mut().dist = dist;
new
}
}
-macro_rules! impl_base_delegate {
- ($( { $convention:ident, $name:ident }),*) => {
- $(paste! {
- impl [<$convention $name>] {
- pub fn id(&self) -> PlanNodeId {
- self.plan_base().id
- }
- pub fn ctx(&self) -> OptimizerContextRef {
- self.plan_base().ctx()
- }
- pub fn schema(&self) -> &Schema {
- &self.plan_base().schema
- }
- pub fn stream_key(&self) -> Option<&[usize]> {
- self.plan_base().stream_key()
- }
- pub fn order(&self) -> &Order {
- &self.plan_base().order
- }
- pub fn distribution(&self) -> &Distribution {
- &self.plan_base().dist
- }
- pub fn append_only(&self) -> bool {
- self.plan_base().append_only
- }
- pub fn emit_on_window_close(&self) -> bool {
- self.plan_base().emit_on_window_close
- }
- pub fn functional_dependency(&self) -> &FunctionalDependencySet {
- &self.plan_base().functional_dependency
- }
- }
- })*
+// Mutators for testing only.
+#[cfg(test)]
+impl PlanBase {
+ pub fn functional_dependency_mut(&mut self) -> &mut FunctionalDependencySet {
+ &mut self.functional_dependency
}
}
-for_all_plan_nodes! { impl_base_delegate }
diff --git a/src/frontend/src/optimizer/plan_node/stream.rs b/src/frontend/src/optimizer/plan_node/stream.rs
index 2edf997bf91fd..866c62c2413a5 100644
--- a/src/frontend/src/optimizer/plan_node/stream.rs
+++ b/src/frontend/src/optimizer/plan_node/stream.rs
@@ -12,16 +12,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-use super::generic::GenericPlanRef;
-use crate::optimizer::property::Distribution;
+use fixedbitset::FixedBitSet;
-/// A subtrait of [`GenericPlanRef`] for stream plans.
+use super::generic::PhysicalPlanRef;
+
+/// A subtrait of [`PhysicalPlanRef`] for stream plans.
///
/// Due to the lack of refactoring, all plan nodes currently implement this trait
/// through [`super::PlanBase`]. One may still use this trait as a bound for
-/// expecting a stream plan, in contrast to [`GenericPlanRef`].
-pub trait StreamPlanRef: GenericPlanRef {
- fn distribution(&self) -> &Distribution;
+/// accessing a stream plan, in contrast to [`GenericPlanRef`] or
+/// [`PhysicalPlanRef`].
+///
+/// [`GenericPlanRef`]: super::generic::GenericPlanRef
+pub trait StreamPlanRef: PhysicalPlanRef {
fn append_only(&self) -> bool;
fn emit_on_window_close(&self) -> bool;
+ fn watermark_columns(&self) -> &FixedBitSet;
}
diff --git a/src/frontend/src/optimizer/plan_node/stream_dedup.rs b/src/frontend/src/optimizer/plan_node/stream_dedup.rs
index 6e96d0eab0e93..51b5e589e886e 100644
--- a/src/frontend/src/optimizer/plan_node/stream_dedup.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_dedup.rs
@@ -17,7 +17,7 @@ use risingwave_common::util::sort_util::OrderType;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use risingwave_pb::stream_plan::DedupNode;
-use super::generic::{self, GenericPlanNode, GenericPlanRef};
+use super::generic::{self, GenericPlanNode, GenericPlanRef, PhysicalPlanRef};
use super::stream::StreamPlanRef;
use super::utils::{impl_distill_by_unit, TableCatalogBuilder};
use super::{ExprRewritable, PlanBase, PlanTreeNodeUnary, StreamNode};
@@ -37,7 +37,7 @@ impl StreamDedup {
// A dedup operator must be append-only.
assert!(input.append_only());
- let base = PlanBase::new_stream_with_logical(
+ let base = PlanBase::new_stream_with_core(
&core,
input.distribution().clone(),
true,
diff --git a/src/frontend/src/optimizer/plan_node/stream_delta_join.rs b/src/frontend/src/optimizer/plan_node/stream_delta_join.rs
index db9e6ac296bbf..bb18f9cffdf0f 100644
--- a/src/frontend/src/optimizer/plan_node/stream_delta_join.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_delta_join.rs
@@ -20,7 +20,7 @@ use risingwave_pb::plan_common::JoinType;
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::{ArrangementInfo, DeltaIndexJoinNode};
-use super::generic::{self};
+use super::generic::{self, GenericPlanRef};
use super::utils::{childless_record, Distill};
use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeBinary, StreamNode};
use crate::expr::{Expr, ExprRewriter};
@@ -67,7 +67,7 @@ impl StreamDeltaJoin {
core.i2o_col_mapping().rewrite_bitset(&watermark_columns)
};
// TODO: derive from input
- let base = PlanBase::new_stream_with_logical(
+ let base = PlanBase::new_stream_with_core(
&core,
dist,
append_only,
@@ -90,7 +90,7 @@ impl StreamDeltaJoin {
impl Distill for StreamDeltaJoin {
fn distill<'a>(&self) -> XmlNode<'a> {
- let verbose = self.base.ctx.is_explain_verbose();
+ let verbose = self.base.ctx().is_explain_verbose();
let mut vec = Vec::with_capacity(if verbose { 3 } else { 2 });
vec.push(("type", Pretty::debug(&self.core.join_type)));
diff --git a/src/frontend/src/optimizer/plan_node/stream_dml.rs b/src/frontend/src/optimizer/plan_node/stream_dml.rs
index c9f969384c3a4..9b000974786e4 100644
--- a/src/frontend/src/optimizer/plan_node/stream_dml.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_dml.rs
@@ -17,6 +17,7 @@ use pretty_xmlish::{Pretty, XmlNode};
use risingwave_common::catalog::{ColumnDesc, INITIAL_TABLE_VERSION_ID};
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
+use super::stream::StreamPlanRef;
use super::utils::{childless_record, Distill};
use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::stream_fragmenter::BuildFragmentGraphState;
diff --git a/src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs b/src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs
index e1ca18da937e9..a4b74f37208e7 100644
--- a/src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_dynamic_filter.rs
@@ -17,7 +17,8 @@ pub use risingwave_pb::expr::expr_node::Type as ExprType;
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::DynamicFilterNode;
-use super::generic::DynamicFilter;
+use super::generic::{DynamicFilter, GenericPlanRef};
+use super::stream::StreamPlanRef;
use super::utils::{childless_record, column_names_pretty, watermark_pretty, Distill};
use super::{generic, ExprRewritable};
use crate::expr::{Expr, ExprImpl};
@@ -37,7 +38,7 @@ impl StreamDynamicFilter {
let watermark_columns = core.watermark_columns(core.right().watermark_columns()[0]);
// TODO: derive from input
- let base = PlanBase::new_stream_with_logical(
+ let base = PlanBase::new_stream_with_core(
&core,
core.left().distribution().clone(),
false, /* we can have a new abstraction for append only and monotonically increasing
@@ -78,11 +79,11 @@ impl StreamDynamicFilter {
impl Distill for StreamDynamicFilter {
fn distill<'a>(&self) -> XmlNode<'a> {
- let verbose = self.base.ctx.is_explain_verbose();
+ let verbose = self.base.ctx().is_explain_verbose();
let pred = self.core.pretty_field();
let mut vec = Vec::with_capacity(if verbose { 3 } else { 2 });
vec.push(("predicate", pred));
- if let Some(ow) = watermark_pretty(&self.base.watermark_columns, self.schema()) {
+ if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) {
vec.push(("output_watermarks", ow));
}
vec.push(("output", column_names_pretty(self.schema())));
diff --git a/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs b/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs
index 9418af8e4a364..d8c5a9635ce59 100644
--- a/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_eowc_over_window.rs
@@ -18,7 +18,7 @@ use fixedbitset::FixedBitSet;
use risingwave_common::util::sort_util::OrderType;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
-use super::generic::{self, PlanWindowFunction};
+use super::generic::{self, GenericPlanRef, PlanWindowFunction};
use super::utils::{impl_distill_by_unit, TableCatalogBuilder};
use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::stream_fragmenter::BuildFragmentGraphState;
@@ -50,7 +50,7 @@ impl StreamEowcOverWindow {
// ancient rows in some rarely updated partitions that are emitted at the end of time.
let watermark_columns = FixedBitSet::with_capacity(core.output_len());
- let base = PlanBase::new_stream_with_logical(
+ let base = PlanBase::new_stream_with_core(
&core,
input.distribution().clone(),
true,
diff --git a/src/frontend/src/optimizer/plan_node/stream_exchange.rs b/src/frontend/src/optimizer/plan_node/stream_exchange.rs
index 0fa1713bf4488..99e6c3c5161a1 100644
--- a/src/frontend/src/optimizer/plan_node/stream_exchange.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_exchange.rs
@@ -16,6 +16,8 @@ use pretty_xmlish::{Pretty, XmlNode};
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::{DispatchStrategy, DispatcherType, ExchangeNode};
+use super::generic::{GenericPlanRef, PhysicalPlanRef};
+use super::stream::StreamPlanRef;
use super::utils::{childless_record, plan_node_name, Distill};
use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::optimizer::property::{Distribution, DistributionDisplay};
@@ -78,7 +80,7 @@ impl StreamExchange {
impl Distill for StreamExchange {
fn distill<'a>(&self) -> XmlNode<'a> {
let distribution_display = DistributionDisplay {
- distribution: &self.base.dist,
+ distribution: self.base.distribution(),
input_schema: self.input.schema(),
};
childless_record(
@@ -117,13 +119,13 @@ impl StreamNode for StreamExchange {
})
} else {
Some(DispatchStrategy {
- r#type: match &self.base.dist {
+ r#type: match &self.base.distribution() {
Distribution::HashShard(_) => DispatcherType::Hash,
Distribution::Single => DispatcherType::Simple,
Distribution::Broadcast => DispatcherType::Broadcast,
_ => panic!("Do not allow Any or AnyShard in serialization process"),
} as i32,
- dist_key_indices: match &self.base.dist {
+ dist_key_indices: match &self.base.distribution() {
Distribution::HashShard(keys) => {
keys.iter().map(|num| *num as u32).collect()
}
diff --git a/src/frontend/src/optimizer/plan_node/stream_expand.rs b/src/frontend/src/optimizer/plan_node/stream_expand.rs
index e0f8852a19fb5..5959b8d6be4d2 100644
--- a/src/frontend/src/optimizer/plan_node/stream_expand.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_expand.rs
@@ -48,7 +48,7 @@ impl StreamExpand {
.map(|idx| idx + input.schema().len()),
);
- let base = PlanBase::new_stream_with_logical(
+ let base = PlanBase::new_stream_with_core(
&core,
dist,
input.append_only(),
diff --git a/src/frontend/src/optimizer/plan_node/stream_filter.rs b/src/frontend/src/optimizer/plan_node/stream_filter.rs
index ff4d344607776..0f000e6b8c0db 100644
--- a/src/frontend/src/optimizer/plan_node/stream_filter.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_filter.rs
@@ -34,7 +34,7 @@ impl StreamFilter {
let input = core.input.clone();
let dist = input.distribution().clone();
// Filter executor won't change the append-only behavior of the stream.
- let base = PlanBase::new_stream_with_logical(
+ let base = PlanBase::new_stream_with_core(
&core,
dist,
input.append_only(),
diff --git a/src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs b/src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs
index 190c05c0a5ba1..95fd72e9f6aa0 100644
--- a/src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_fs_fetch.rs
@@ -48,7 +48,7 @@ impl_plan_tree_node_for_unary! { StreamFsFetch }
impl StreamFsFetch {
pub fn new(input: PlanRef, source: generic::Source) -> Self {
- let base = PlanBase::new_stream_with_logical(
+ let base = PlanBase::new_stream_with_core(
&source,
Distribution::SomeShard,
source.catalog.as_ref().map_or(true, |s| s.append_only),
diff --git a/src/frontend/src/optimizer/plan_node/stream_group_topn.rs b/src/frontend/src/optimizer/plan_node/stream_group_topn.rs
index 14711d353f9d8..3e8f3c00206c4 100644
--- a/src/frontend/src/optimizer/plan_node/stream_group_topn.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_group_topn.rs
@@ -16,7 +16,8 @@ use fixedbitset::FixedBitSet;
use pretty_xmlish::XmlNode;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
-use super::generic::{DistillUnit, TopNLimit};
+use super::generic::{DistillUnit, GenericPlanRef, TopNLimit};
+use super::stream::StreamPlanRef;
use super::utils::{plan_node_name, watermark_pretty, Distill};
use super::{generic, ExprRewritable, PlanBase, PlanTreeNodeUnary, StreamNode};
use crate::optimizer::plan_node::generic::GenericPlanNode;
@@ -135,7 +136,7 @@ impl Distill for StreamGroupTopN {
{ "append_only", self.input().append_only() },
);
let mut node = self.core.distill_with_name(name);
- if let Some(ow) = watermark_pretty(&self.base.watermark_columns, self.schema()) {
+ if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) {
node.fields.push(("output_watermarks".into(), ow));
}
node
diff --git a/src/frontend/src/optimizer/plan_node/stream_hash_agg.rs b/src/frontend/src/optimizer/plan_node/stream_hash_agg.rs
index 25e1ac801f97c..55ab6b5906e59 100644
--- a/src/frontend/src/optimizer/plan_node/stream_hash_agg.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_hash_agg.rs
@@ -18,10 +18,11 @@ use pretty_xmlish::XmlNode;
use risingwave_common::error::{ErrorCode, Result};
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
-use super::generic::{self, PlanAggCall};
+use super::generic::{self, GenericPlanRef, PlanAggCall};
use super::utils::{childless_record, plan_node_name, watermark_pretty, Distill};
use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::expr::ExprRewriter;
+use crate::optimizer::plan_node::stream::StreamPlanRef;
use crate::stream_fragmenter::BuildFragmentGraphState;
use crate::utils::{ColIndexMapping, ColIndexMappingRewriteExt, IndexSet};
@@ -85,7 +86,7 @@ impl StreamHashAgg {
}
// Hash agg executor might change the append-only behavior of the stream.
- let base = PlanBase::new_stream_with_logical(
+ let base = PlanBase::new_stream_with_core(
&core,
dist,
emit_on_window_close, // in EOWC mode, we produce append only output
@@ -142,7 +143,7 @@ impl StreamHashAgg {
impl Distill for StreamHashAgg {
fn distill<'a>(&self) -> XmlNode<'a> {
let mut vec = self.core.fields_pretty();
- if let Some(ow) = watermark_pretty(&self.base.watermark_columns, self.schema()) {
+ if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) {
vec.push(("output_watermarks", ow));
}
childless_record(
@@ -214,7 +215,7 @@ impl StreamNode for StreamHashAgg {
})
.collect(),
row_count_index: self.row_count_idx as u32,
- emit_on_window_close: self.base.emit_on_window_close,
+ emit_on_window_close: self.base.emit_on_window_close(),
})
}
}
diff --git a/src/frontend/src/optimizer/plan_node/stream_hash_join.rs b/src/frontend/src/optimizer/plan_node/stream_hash_join.rs
index 0075b1730b4eb..9d9c41425c4b1 100644
--- a/src/frontend/src/optimizer/plan_node/stream_hash_join.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_hash_join.rs
@@ -20,7 +20,8 @@ use risingwave_pb::plan_common::JoinType;
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::{DeltaExpression, HashJoinNode, PbInequalityPair};
-use super::generic::Join;
+use super::generic::{GenericPlanRef, Join};
+use super::stream::StreamPlanRef;
use super::utils::{childless_record, plan_node_name, watermark_pretty, Distill};
use super::{
generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeBinary, StreamDeltaJoin, StreamNode,
@@ -178,7 +179,7 @@ impl StreamHashJoin {
};
// TODO: derive from input
- let base = PlanBase::new_stream_with_logical(
+ let base = PlanBase::new_stream_with_core(
&core,
dist,
append_only,
@@ -291,7 +292,7 @@ impl Distill for StreamHashJoin {
{ "interval", self.clean_left_state_conjunction_idx.is_some() && self.clean_right_state_conjunction_idx.is_some() },
{ "append_only", self.is_append_only },
);
- let verbose = self.base.ctx.is_explain_verbose();
+ let verbose = self.base.ctx().is_explain_verbose();
let mut vec = Vec::with_capacity(6);
vec.push(("type", Pretty::debug(&self.core.join_type)));
@@ -316,7 +317,7 @@ impl Distill for StreamHashJoin {
if let Some(i) = self.clean_right_state_conjunction_idx {
vec.push(("conditions_to_clean_right_state_table", get_cond(i)));
}
- if let Some(ow) = watermark_pretty(&self.base.watermark_columns, self.schema()) {
+ if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) {
vec.push(("output_watermarks", ow));
}
diff --git a/src/frontend/src/optimizer/plan_node/stream_hop_window.rs b/src/frontend/src/optimizer/plan_node/stream_hop_window.rs
index c68b1b307d470..e177be6942360 100644
--- a/src/frontend/src/optimizer/plan_node/stream_hop_window.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_hop_window.rs
@@ -17,6 +17,8 @@ use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use risingwave_pb::stream_plan::HopWindowNode;
+use super::generic::GenericPlanRef;
+use super::stream::StreamPlanRef;
use super::utils::{childless_record, watermark_pretty, Distill};
use super::{generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::expr::{Expr, ExprImpl, ExprRewriter};
@@ -56,7 +58,7 @@ impl StreamHopWindow {
)
.rewrite_bitset(&watermark_columns);
- let base = PlanBase::new_stream_with_logical(
+ let base = PlanBase::new_stream_with_core(
&core,
dist,
input.append_only(),
@@ -75,7 +77,7 @@ impl StreamHopWindow {
impl Distill for StreamHopWindow {
fn distill<'a>(&self) -> XmlNode<'a> {
let mut vec = self.core.fields_pretty();
- if let Some(ow) = watermark_pretty(&self.base.watermark_columns, self.schema()) {
+ if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) {
vec.push(("output_watermarks", ow));
}
childless_record("StreamHopWindow", vec)
diff --git a/src/frontend/src/optimizer/plan_node/stream_materialize.rs b/src/frontend/src/optimizer/plan_node/stream_materialize.rs
index fb17537bc90e6..d8972436d5c78 100644
--- a/src/frontend/src/optimizer/plan_node/stream_materialize.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_materialize.rs
@@ -24,11 +24,13 @@ use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use super::derive::derive_columns;
+use super::stream::StreamPlanRef;
use super::utils::{childless_record, Distill};
use super::{reorganize_elements_id, ExprRewritable, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::catalog::table_catalog::{CreateType, TableCatalog, TableType, TableVersion};
use crate::catalog::FragmentId;
use crate::optimizer::plan_node::derive::derive_pk;
+use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::{PlanBase, PlanNodeMeta};
use crate::optimizer::property::{Cardinality, Distribution, Order, RequiredDist};
use crate::stream_fragmenter::BuildFragmentGraphState;
@@ -273,8 +275,8 @@ impl Distill for StreamMaterialize {
vec.push(("pk_conflict", Pretty::from(pk_conflict_behavior)));
- let watermark_columns = &self.base.watermark_columns;
- if self.base.watermark_columns.count_ones(..) > 0 {
+ let watermark_columns = &self.base.watermark_columns();
+ if self.base.watermark_columns().count_ones(..) > 0 {
let watermark_column_names = watermark_columns
.ones()
.map(|i| table.columns()[i].name_with_hidden().to_string())
@@ -294,16 +296,16 @@ impl PlanTreeNodeUnary for StreamMaterialize {
fn clone_with_input(&self, input: PlanRef) -> Self {
let new = Self::new(input, self.table().clone());
new.base
- .schema
+ .schema()
.fields
.iter()
- .zip_eq_fast(self.base.schema.fields.iter())
+ .zip_eq_fast(self.base.schema().fields.iter())
.for_each(|(a, b)| {
assert_eq!(a.data_type, b.data_type);
assert_eq!(a.type_name, b.type_name);
assert_eq!(a.sub_fields, b.sub_fields);
});
- assert_eq!(new.plan_base().stream_key, self.plan_base().stream_key);
+ assert_eq!(new.plan_base().stream_key(), self.plan_base().stream_key());
new
}
}
diff --git a/src/frontend/src/optimizer/plan_node/stream_now.rs b/src/frontend/src/optimizer/plan_node/stream_now.rs
index 9eb0a0e0f143e..91ebc344fa51d 100644
--- a/src/frontend/src/optimizer/plan_node/stream_now.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_now.rs
@@ -19,8 +19,7 @@ use risingwave_common::types::DataType;
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::NowNode;
-use super::generic::GenericPlanRef;
-use super::stream::StreamPlanRef;
+use super::generic::{GenericPlanRef, PhysicalPlanRef};
use super::utils::{childless_record, Distill, TableCatalogBuilder};
use super::{ExprRewritable, LogicalNow, PlanBase, StreamNode};
use crate::optimizer::plan_node::utils::column_names_pretty;
@@ -59,7 +58,7 @@ impl StreamNow {
impl Distill for StreamNow {
fn distill<'a>(&self) -> XmlNode<'a> {
- let vec = if self.base.ctx.is_explain_verbose() {
+ let vec = if self.base.ctx().is_explain_verbose() {
vec![("output", column_names_pretty(self.schema()))]
} else {
vec![]
diff --git a/src/frontend/src/optimizer/plan_node/stream_over_window.rs b/src/frontend/src/optimizer/plan_node/stream_over_window.rs
index 0d749f0c7b0e6..5a2f9d98f1340 100644
--- a/src/frontend/src/optimizer/plan_node/stream_over_window.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_over_window.rs
@@ -21,6 +21,7 @@ use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use super::generic::{GenericPlanNode, PlanWindowFunction};
use super::utils::{impl_distill_by_unit, TableCatalogBuilder};
use super::{generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
+use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::stream_fragmenter::BuildFragmentGraphState;
use crate::TableCatalog;
@@ -37,7 +38,7 @@ impl StreamOverWindow {
let input = &core.input;
let watermark_columns = FixedBitSet::with_capacity(core.output_len());
- let base = PlanBase::new_stream_with_logical(
+ let base = PlanBase::new_stream_with_core(
&core,
input.distribution().clone(),
false, // general over window cannot be append-only
@@ -122,7 +123,7 @@ impl StreamNode for StreamOverWindow {
.to_internal_table_prost();
let cache_policy = self
.base
- .ctx
+ .ctx()
.session_ctx()
.config()
.get_streaming_over_window_cache_policy();
diff --git a/src/frontend/src/optimizer/plan_node/stream_project.rs b/src/frontend/src/optimizer/plan_node/stream_project.rs
index 8a7665881e0cf..c0ff0d1cf2f43 100644
--- a/src/frontend/src/optimizer/plan_node/stream_project.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_project.rs
@@ -17,6 +17,8 @@ use pretty_xmlish::XmlNode;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use risingwave_pb::stream_plan::ProjectNode;
+use super::generic::GenericPlanRef;
+use super::stream::StreamPlanRef;
use super::utils::{childless_record, watermark_pretty, Distill};
use super::{generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::expr::{try_derive_watermark, Expr, ExprImpl, ExprRewriter, WatermarkDerivation};
@@ -41,7 +43,7 @@ impl Distill for StreamProject {
let schema = self.schema();
let mut vec = self.core.fields_pretty(schema);
if let Some(display_output_watermarks) =
- watermark_pretty(&self.base.watermark_columns, schema)
+ watermark_pretty(self.base.watermark_columns(), schema)
{
vec.push(("output_watermarks", display_output_watermarks));
}
@@ -79,7 +81,7 @@ impl StreamProject {
}
// Project executor won't change the append-only behavior of the stream, so it depends on
// input's `append_only`.
- let base = PlanBase::new_stream_with_logical(
+ let base = PlanBase::new_stream_with_core(
&core,
distribution,
input.append_only(),
diff --git a/src/frontend/src/optimizer/plan_node/stream_project_set.rs b/src/frontend/src/optimizer/plan_node/stream_project_set.rs
index cadd600f3c3b7..ba09d79c96c60 100644
--- a/src/frontend/src/optimizer/plan_node/stream_project_set.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_project_set.rs
@@ -66,7 +66,7 @@ impl StreamProjectSet {
// ProjectSet executor won't change the append-only behavior of the stream, so it depends on
// input's `append_only`.
- let base = PlanBase::new_stream_with_logical(
+ let base = PlanBase::new_stream_with_core(
&core,
distribution,
input.append_only(),
diff --git a/src/frontend/src/optimizer/plan_node/stream_share.rs b/src/frontend/src/optimizer/plan_node/stream_share.rs
index 8b406005f40a6..3acf0b132805e 100644
--- a/src/frontend/src/optimizer/plan_node/stream_share.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_share.rs
@@ -16,6 +16,8 @@ use pretty_xmlish::XmlNode;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use risingwave_pb::stream_plan::PbStreamNode;
+use super::generic::GenericPlanRef;
+use super::stream::StreamPlanRef;
use super::utils::Distill;
use super::{generic, ExprRewritable, PlanRef, PlanTreeNodeUnary, StreamExchange, StreamNode};
use crate::optimizer::plan_node::{LogicalShare, PlanBase, PlanTreeNode};
@@ -34,7 +36,7 @@ impl StreamShare {
let input = core.input.borrow().0.clone();
let dist = input.distribution().clone();
// Filter executor won't change the append-only behavior of the stream.
- let base = PlanBase::new_stream_with_logical(
+ let base = PlanBase::new_stream_with_core(
&core,
dist,
input.append_only(),
@@ -79,7 +81,7 @@ impl StreamNode for StreamShare {
impl StreamShare {
pub fn adhoc_to_stream_prost(&self, state: &mut BuildFragmentGraphState) -> PbStreamNode {
- let operator_id = self.base.id.0 as u32;
+ let operator_id = self.base.id().0 as u32;
match state.get_share_stream_node(operator_id) {
None => {
diff --git a/src/frontend/src/optimizer/plan_node/stream_simple_agg.rs b/src/frontend/src/optimizer/plan_node/stream_simple_agg.rs
index 59311dd22226c..92d96fdf21b08 100644
--- a/src/frontend/src/optimizer/plan_node/stream_simple_agg.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_simple_agg.rs
@@ -21,6 +21,8 @@ use super::generic::{self, PlanAggCall};
use super::utils::{childless_record, plan_node_name, Distill};
use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::expr::ExprRewriter;
+use crate::optimizer::plan_node::generic::PhysicalPlanRef;
+use crate::optimizer::plan_node::stream::StreamPlanRef;
use crate::optimizer::property::Distribution;
use crate::stream_fragmenter::BuildFragmentGraphState;
@@ -48,7 +50,7 @@ impl StreamSimpleAgg {
let watermark_columns = FixedBitSet::with_capacity(core.output_len());
// Simple agg executor might change the append-only behavior of the stream.
- let base = PlanBase::new_stream_with_logical(&core, dist, false, false, watermark_columns);
+ let base = PlanBase::new_stream_with_core(&core, dist, false, false, watermark_columns);
StreamSimpleAgg {
base,
core,
@@ -99,7 +101,7 @@ impl StreamNode for StreamSimpleAgg {
.collect(),
distribution_key: self
.base
- .dist
+ .distribution()
.dist_column_indices()
.iter()
.map(|idx| *idx as u32)
diff --git a/src/frontend/src/optimizer/plan_node/stream_sink.rs b/src/frontend/src/optimizer/plan_node/stream_sink.rs
index a51380d630331..32e9fb487910c 100644
--- a/src/frontend/src/optimizer/plan_node/stream_sink.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_sink.rs
@@ -37,6 +37,7 @@ use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use tracing::info;
use super::derive::{derive_columns, derive_pk};
+use super::generic::GenericPlanRef;
use super::utils::{childless_record, Distill, IndicesDisplay, TableCatalogBuilder};
use super::{ExprRewritable, PlanBase, PlanRef, StreamNode};
use crate::optimizer::plan_node::PlanTreeNodeUnary;
@@ -57,7 +58,7 @@ pub struct StreamSink {
impl StreamSink {
#[must_use]
pub fn new(input: PlanRef, sink_desc: SinkDesc) -> Self {
- let base = PlanBase::derive_stream_plan_base(&input);
+ let base = input.plan_base().clone_with_new_plan_id();
Self {
base,
input,
@@ -389,7 +390,7 @@ impl Distill for StreamSink {
.iter()
.map(|k| k.column_index)
.collect_vec(),
- schema: &self.base.schema,
+ schema: self.base.schema(),
};
vec.push(("pk", pk.distill()));
}
@@ -409,7 +410,7 @@ impl StreamNode for StreamSink {
PbNodeBody::Sink(SinkNode {
sink_desc: Some(self.sink_desc.to_proto()),
table: Some(table.to_internal_table_prost()),
- log_store_type: match self.base.ctx.session_ctx().config().get_sink_decouple() {
+ log_store_type: match self.base.ctx().session_ctx().config().get_sink_decouple() {
SinkDecouple::Default => {
let enable_sink_decouple =
match_sink_name_str!(
diff --git a/src/frontend/src/optimizer/plan_node/stream_sort.rs b/src/frontend/src/optimizer/plan_node/stream_sort.rs
index b82d71068d817..41a56a0fd5df2 100644
--- a/src/frontend/src/optimizer/plan_node/stream_sort.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_sort.rs
@@ -20,6 +20,8 @@ use risingwave_common::catalog::FieldDisplay;
use risingwave_common::util::sort_util::OrderType;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
+use super::generic::{GenericPlanRef, PhysicalPlanRef};
+use super::stream::StreamPlanRef;
use super::utils::{childless_record, Distill, TableCatalogBuilder};
use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::stream_fragmenter::BuildFragmentGraphState;
@@ -84,7 +86,7 @@ impl StreamEowcSort {
tbl_builder.add_order_column(self.sort_column_index, OrderType::ascending());
order_cols.insert(self.sort_column_index);
- let dist_key = self.base.dist.dist_column_indices().to_vec();
+ let dist_key = self.base.distribution().dist_column_indices().to_vec();
for idx in &dist_key {
if !order_cols.contains(idx) {
tbl_builder.add_order_column(*idx, OrderType::ascending());
diff --git a/src/frontend/src/optimizer/plan_node/stream_source.rs b/src/frontend/src/optimizer/plan_node/stream_source.rs
index 377e2704776bb..ae66cf568118b 100644
--- a/src/frontend/src/optimizer/plan_node/stream_source.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_source.rs
@@ -37,7 +37,7 @@ pub struct StreamSource {
impl StreamSource {
pub fn new(core: generic::Source) -> Self {
- let base = PlanBase::new_stream_with_logical(
+ let base = PlanBase::new_stream_with_core(
&core,
Distribution::SomeShard,
core.catalog.as_ref().map_or(true, |s| s.append_only),
diff --git a/src/frontend/src/optimizer/plan_node/stream_stateless_simple_agg.rs b/src/frontend/src/optimizer/plan_node/stream_stateless_simple_agg.rs
index 0af7ebded94d9..474582ec877c7 100644
--- a/src/frontend/src/optimizer/plan_node/stream_stateless_simple_agg.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_stateless_simple_agg.rs
@@ -20,6 +20,7 @@ use super::generic::{self, PlanAggCall};
use super::utils::impl_distill_by_unit;
use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::expr::ExprRewriter;
+use crate::optimizer::plan_node::generic::PhysicalPlanRef;
use crate::optimizer::property::RequiredDist;
use crate::stream_fragmenter::BuildFragmentGraphState;
@@ -49,7 +50,7 @@ impl StreamStatelessSimpleAgg {
}
}
- let base = PlanBase::new_stream_with_logical(
+ let base = PlanBase::new_stream_with_core(
&core,
input_dist.clone(),
input.append_only(),
diff --git a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs
index 907a41db28525..965ca217a3369 100644
--- a/src/frontend/src/optimizer/plan_node/stream_table_scan.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_table_scan.rs
@@ -24,11 +24,13 @@ use risingwave_common::util::sort_util::OrderType;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use risingwave_pb::stream_plan::{ChainType, PbStreamNode};
+use super::generic::PhysicalPlanRef;
use super::utils::{childless_record, Distill};
use super::{generic, ExprRewritable, PlanBase, PlanNodeId, PlanRef, StreamNode};
use crate::catalog::ColumnId;
use crate::expr::{ExprRewriter, FunctionCall};
use crate::optimizer::plan_node::generic::GenericPlanRef;
+use crate::optimizer::plan_node::stream::StreamPlanRef;
use crate::optimizer::plan_node::utils::{IndicesDisplay, TableCatalogBuilder};
use crate::optimizer::property::{Distribution, DistributionDisplay};
use crate::stream_fragmenter::BuildFragmentGraphState;
@@ -66,7 +68,7 @@ impl StreamTableScan {
None => Distribution::SomeShard,
}
};
- let base = PlanBase::new_stream_with_logical(
+ let base = PlanBase::new_stream_with_core(
&core,
distribution,
core.table_desc.append_only,
@@ -192,7 +194,7 @@ impl_plan_tree_node_for_leaf! { StreamTableScan }
impl Distill for StreamTableScan {
fn distill<'a>(&self) -> XmlNode<'a> {
- let verbose = self.base.ctx.is_explain_verbose();
+ let verbose = self.base.ctx().is_explain_verbose();
let mut vec = Vec::with_capacity(4);
vec.push(("table", Pretty::from(self.core.table_name.clone())));
vec.push(("columns", self.core.columns_pretty(verbose)));
@@ -200,12 +202,12 @@ impl Distill for StreamTableScan {
if verbose {
let pk = IndicesDisplay {
indices: self.stream_key().unwrap_or_default(),
- schema: &self.base.schema,
+ schema: self.base.schema(),
};
vec.push(("pk", pk.distill()));
let dist = Pretty::display(&DistributionDisplay {
distribution: self.distribution(),
- input_schema: &self.base.schema,
+ input_schema: self.base.schema(),
});
vec.push(("dist", dist));
}
@@ -325,7 +327,7 @@ impl StreamTableScan {
..Default::default()
})),
stream_key,
- operator_id: self.base.id.0 as u64,
+ operator_id: self.base.id().0 as u64,
identity: {
let s = self.distill_to_string();
s.replace("StreamTableScan", "Chain")
diff --git a/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs b/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs
index 2191ca322342d..675dbeb9ab381 100644
--- a/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_temporal_join.rs
@@ -18,6 +18,8 @@ use risingwave_pb::plan_common::JoinType;
use risingwave_pb::stream_plan::stream_node::NodeBody;
use risingwave_pb::stream_plan::TemporalJoinNode;
+use super::generic::GenericPlanRef;
+use super::stream::StreamPlanRef;
use super::utils::{childless_record, watermark_pretty, Distill};
use super::{generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeBinary, StreamNode};
use crate::expr::{Expr, ExprRewriter};
@@ -61,7 +63,7 @@ impl StreamTemporalJoin {
.rewrite_bitset(core.left.watermark_columns()),
);
- let base = PlanBase::new_stream_with_logical(
+ let base = PlanBase::new_stream_with_core(
&core,
dist,
true,
@@ -88,7 +90,7 @@ impl StreamTemporalJoin {
impl Distill for StreamTemporalJoin {
fn distill<'a>(&self) -> XmlNode<'a> {
- let verbose = self.base.ctx.is_explain_verbose();
+ let verbose = self.base.ctx().is_explain_verbose();
let mut vec = Vec::with_capacity(if verbose { 3 } else { 2 });
vec.push(("type", Pretty::debug(&self.core.join_type)));
@@ -101,7 +103,7 @@ impl Distill for StreamTemporalJoin {
}),
));
- if let Some(ow) = watermark_pretty(&self.base.watermark_columns, self.schema()) {
+ if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) {
vec.push(("output_watermarks", ow));
}
diff --git a/src/frontend/src/optimizer/plan_node/stream_topn.rs b/src/frontend/src/optimizer/plan_node/stream_topn.rs
index e7a880fa7d757..87890625f6be7 100644
--- a/src/frontend/src/optimizer/plan_node/stream_topn.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_topn.rs
@@ -40,7 +40,7 @@ impl StreamTopN {
};
let watermark_columns = FixedBitSet::with_capacity(input.schema().len());
- let base = PlanBase::new_stream_with_logical(&core, dist, false, false, watermark_columns);
+ let base = PlanBase::new_stream_with_core(&core, dist, false, false, watermark_columns);
StreamTopN { base, core }
}
diff --git a/src/frontend/src/optimizer/plan_node/stream_union.rs b/src/frontend/src/optimizer/plan_node/stream_union.rs
index 8f6353d6be44c..6d6dca2d8dd02 100644
--- a/src/frontend/src/optimizer/plan_node/stream_union.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_union.rs
@@ -19,6 +19,8 @@ use pretty_xmlish::XmlNode;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
use risingwave_pb::stream_plan::UnionNode;
+use super::generic::GenericPlanRef;
+use super::stream::StreamPlanRef;
use super::utils::{childless_record, watermark_pretty, Distill};
use super::{generic, ExprRewritable, PlanRef};
use crate::optimizer::plan_node::generic::GenericPlanNode;
@@ -46,7 +48,7 @@ impl StreamUnion {
|acc_watermark_columns, input| acc_watermark_columns.bitand(input.watermark_columns()),
);
- let base = PlanBase::new_stream_with_logical(
+ let base = PlanBase::new_stream_with_core(
&core,
dist,
inputs.iter().all(|x| x.append_only()),
@@ -60,7 +62,7 @@ impl StreamUnion {
impl Distill for StreamUnion {
fn distill<'a>(&self) -> XmlNode<'a> {
let mut vec = self.core.fields_pretty();
- if let Some(ow) = watermark_pretty(&self.base.watermark_columns, self.schema()) {
+ if let Some(ow) = watermark_pretty(self.base.watermark_columns(), self.schema()) {
vec.push(("output_watermarks", ow));
}
childless_record("StreamUnion", vec)
diff --git a/src/frontend/src/optimizer/plan_node/stream_values.rs b/src/frontend/src/optimizer/plan_node/stream_values.rs
index fb0b844411f63..f8cc5db851159 100644
--- a/src/frontend/src/optimizer/plan_node/stream_values.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_values.rs
@@ -18,6 +18,7 @@ use risingwave_pb::stream_plan::stream_node::NodeBody as ProstStreamNode;
use risingwave_pb::stream_plan::values_node::ExprTuple;
use risingwave_pb::stream_plan::ValuesNode;
+use super::generic::GenericPlanRef;
use super::utils::{childless_record, Distill};
use super::{ExprRewritable, LogicalValues, PlanBase, StreamNode};
use crate::expr::{Expr, ExprImpl};
diff --git a/src/frontend/src/optimizer/plan_node/stream_watermark_filter.rs b/src/frontend/src/optimizer/plan_node/stream_watermark_filter.rs
index ed5a946603ee4..066bc9a234ca5 100644
--- a/src/frontend/src/optimizer/plan_node/stream_watermark_filter.rs
+++ b/src/frontend/src/optimizer/plan_node/stream_watermark_filter.rs
@@ -21,6 +21,7 @@ use risingwave_common::util::sort_util::OrderType;
use risingwave_pb::catalog::WatermarkDesc;
use risingwave_pb::stream_plan::stream_node::PbNodeBody;
+use super::stream::StreamPlanRef;
use super::utils::{childless_record, watermark_pretty, Distill, TableCatalogBuilder};
use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, StreamNode};
use crate::expr::{ExprDisplay, ExprImpl};
@@ -85,7 +86,7 @@ impl Distill for StreamWatermarkFilter {
})
.collect();
let display_output_watermarks =
- watermark_pretty(&self.base.watermark_columns, input_schema).unwrap();
+ watermark_pretty(self.base.watermark_columns(), input_schema).unwrap();
let fields = vec![
("watermark_descs", Pretty::Array(display_watermark_descs)),
("output_watermarks", display_output_watermarks),
diff --git a/src/frontend/src/optimizer/plan_rewriter/plan_cloner.rs b/src/frontend/src/optimizer/plan_rewriter/plan_cloner.rs
index f30f3d9fa4966..7e53b903ac962 100644
--- a/src/frontend/src/optimizer/plan_rewriter/plan_cloner.rs
+++ b/src/frontend/src/optimizer/plan_rewriter/plan_cloner.rs
@@ -16,6 +16,7 @@ use std::collections::HashMap;
use itertools::Itertools;
+use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::{LogicalShare, PlanNodeId, PlanTreeNode, StreamShare};
use crate::optimizer::PlanRewriter;
use crate::PlanRef;
diff --git a/src/frontend/src/optimizer/plan_rewriter/share_source_rewriter.rs b/src/frontend/src/optimizer/plan_rewriter/share_source_rewriter.rs
index 9ab0d4d580ddc..5b9efb9fc7c94 100644
--- a/src/frontend/src/optimizer/plan_rewriter/share_source_rewriter.rs
+++ b/src/frontend/src/optimizer/plan_rewriter/share_source_rewriter.rs
@@ -17,6 +17,7 @@ use std::collections::{HashMap, HashSet};
use itertools::Itertools;
use crate::catalog::SourceId;
+use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::{
LogicalShare, LogicalSource, PlanNodeId, PlanTreeNode, StreamShare,
};
diff --git a/src/frontend/src/optimizer/plan_visitor/share_parent_counter.rs b/src/frontend/src/optimizer/plan_visitor/share_parent_counter.rs
index 7d538392f9361..7950b5d81a49c 100644
--- a/src/frontend/src/optimizer/plan_visitor/share_parent_counter.rs
+++ b/src/frontend/src/optimizer/plan_visitor/share_parent_counter.rs
@@ -15,6 +15,7 @@
use std::collections::HashMap;
use super::{DefaultBehavior, DefaultValue};
+use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::{LogicalShare, PlanNodeId, PlanTreeNodeUnary};
use crate::optimizer::plan_visitor::PlanVisitor;
diff --git a/src/frontend/src/optimizer/property/distribution.rs b/src/frontend/src/optimizer/property/distribution.rs
index 4fcaf959eac87..b6e7715dd155f 100644
--- a/src/frontend/src/optimizer/property/distribution.rs
+++ b/src/frontend/src/optimizer/property/distribution.rs
@@ -295,10 +295,12 @@ impl RequiredDist {
pub fn enforce_if_not_satisfies(
&self,
- plan: PlanRef,
+ mut plan: PlanRef,
required_order: &Order,
) -> Result {
- let plan = required_order.enforce_if_not_satisfies(plan)?;
+ if let Convention::Batch = plan.convention() {
+ plan = required_order.enforce_if_not_satisfies(plan)?;
+ }
if !plan.distribution().satisfies(self) {
Ok(self.enforce(plan, required_order))
} else {
diff --git a/src/frontend/src/optimizer/property/order.rs b/src/frontend/src/optimizer/property/order.rs
index a70bffb13a8ba..19ad7586e1c11 100644
--- a/src/frontend/src/optimizer/property/order.rs
+++ b/src/frontend/src/optimizer/property/order.rs
@@ -92,7 +92,7 @@ impl Order {
}
}
- pub fn enforce(&self, plan: PlanRef) -> PlanRef {
+ fn enforce(&self, plan: PlanRef) -> PlanRef {
assert_eq!(plan.convention(), Convention::Batch);
BatchSort::new(plan, self.clone()).into()
}
diff --git a/src/frontend/src/optimizer/rule/always_false_filter_rule.rs b/src/frontend/src/optimizer/rule/always_false_filter_rule.rs
index 02165232372e4..eeba7d9f3be3b 100644
--- a/src/frontend/src/optimizer/rule/always_false_filter_rule.rs
+++ b/src/frontend/src/optimizer/rule/always_false_filter_rule.rs
@@ -15,6 +15,7 @@
use risingwave_common::types::ScalarImpl;
use super::Rule;
+use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::{LogicalFilter, LogicalValues};
use crate::PlanRef;
diff --git a/src/frontend/src/optimizer/rule/apply_join_transpose_rule.rs b/src/frontend/src/optimizer/rule/apply_join_transpose_rule.rs
index 66579248a76f9..7ac121692c81d 100644
--- a/src/frontend/src/optimizer/rule/apply_join_transpose_rule.rs
+++ b/src/frontend/src/optimizer/rule/apply_join_transpose_rule.rs
@@ -23,6 +23,7 @@ use crate::expr::{
CorrelatedId, CorrelatedInputRef, Expr, ExprImpl, ExprRewriter, ExprType, FunctionCall,
InputRef,
};
+use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::{LogicalApply, LogicalFilter, LogicalJoin, PlanTreeNodeBinary};
use crate::optimizer::plan_visitor::{ExprCorrelatedIdFinder, PlanCorrelatedIdFinder};
use crate::optimizer::rule::apply_offset_rewriter::ApplyCorrelatedIndicesConverter;
diff --git a/src/frontend/src/optimizer/rule/expand_to_project_rule.rs b/src/frontend/src/optimizer/rule/expand_to_project_rule.rs
index 1ed1da0037aba..01a39042efd98 100644
--- a/src/frontend/src/optimizer/rule/expand_to_project_rule.rs
+++ b/src/frontend/src/optimizer/rule/expand_to_project_rule.rs
@@ -36,7 +36,7 @@ impl Rule for ExpandToProjectRule {
let column_subset = column_subsets.get(0).unwrap();
// if `column_subsets` len equals 1, convert it into a project
- let mut exprs = Vec::with_capacity(expand.base.schema.len());
+ let mut exprs = Vec::with_capacity(expand.base.schema().len());
// Add original input column first
for i in 0..input.schema().len() {
exprs.push(ExprImpl::InputRef(
diff --git a/src/frontend/src/optimizer/rule/index_selection_rule.rs b/src/frontend/src/optimizer/rule/index_selection_rule.rs
index 9103d1bc906bc..323cc59ef3558 100644
--- a/src/frontend/src/optimizer/rule/index_selection_rule.rs
+++ b/src/frontend/src/optimizer/rule/index_selection_rule.rs
@@ -66,6 +66,7 @@ use crate::expr::{
FunctionCall, InputRef,
};
use crate::optimizer::optimizer_context::OptimizerContextRef;
+use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::{
generic, ColumnPruningContext, LogicalJoin, LogicalScan, LogicalUnion, PlanTreeNode,
PlanTreeNodeBinary, PredicatePushdown, PredicatePushdownContext,
diff --git a/src/frontend/src/optimizer/rule/left_deep_tree_join_ordering_rule.rs b/src/frontend/src/optimizer/rule/left_deep_tree_join_ordering_rule.rs
index dcbb6f7b015ee..bd2db0ac67cca 100644
--- a/src/frontend/src/optimizer/rule/left_deep_tree_join_ordering_rule.rs
+++ b/src/frontend/src/optimizer/rule/left_deep_tree_join_ordering_rule.rs
@@ -47,6 +47,7 @@ mod tests {
use super::*;
use crate::expr::{ExprImpl, FunctionCall, InputRef};
use crate::optimizer::optimizer_context::OptimizerContext;
+ use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::utils::Condition;
#[tokio::test]
diff --git a/src/frontend/src/optimizer/rule/merge_multijoin_rule.rs b/src/frontend/src/optimizer/rule/merge_multijoin_rule.rs
index c496a906400ae..8682db8491a1d 100644
--- a/src/frontend/src/optimizer/rule/merge_multijoin_rule.rs
+++ b/src/frontend/src/optimizer/rule/merge_multijoin_rule.rs
@@ -46,6 +46,7 @@ mod tests {
use super::*;
use crate::expr::{ExprImpl, FunctionCall, InputRef};
use crate::optimizer::optimizer_context::OptimizerContext;
+ use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::utils::Condition;
#[tokio::test]
diff --git a/src/frontend/src/optimizer/rule/min_max_on_index_rule.rs b/src/frontend/src/optimizer/rule/min_max_on_index_rule.rs
index ea8386bc227f8..c32ae40531cd0 100644
--- a/src/frontend/src/optimizer/rule/min_max_on_index_rule.rs
+++ b/src/frontend/src/optimizer/rule/min_max_on_index_rule.rs
@@ -27,7 +27,7 @@ use risingwave_expr::aggregate::AggKind;
use super::{BoxedRule, Rule};
use crate::expr::{ExprImpl, ExprType, FunctionCall, InputRef};
-use crate::optimizer::plan_node::generic::Agg;
+use crate::optimizer::plan_node::generic::{Agg, GenericPlanRef};
use crate::optimizer::plan_node::{
LogicalAgg, LogicalFilter, LogicalScan, LogicalTopN, PlanAggCall, PlanTreeNodeUnary,
};
diff --git a/src/frontend/src/optimizer/rule/over_window_to_topn_rule.rs b/src/frontend/src/optimizer/rule/over_window_to_topn_rule.rs
index dfb6963c7fb4f..93637d3ba8193 100644
--- a/src/frontend/src/optimizer/rule/over_window_to_topn_rule.rs
+++ b/src/frontend/src/optimizer/rule/over_window_to_topn_rule.rs
@@ -18,6 +18,7 @@ use risingwave_expr::window_function::WindowFuncKind;
use super::Rule;
use crate::expr::{collect_input_refs, ExprImpl, ExprType};
+use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::{LogicalFilter, LogicalTopN, PlanTreeNodeUnary};
use crate::optimizer::property::Order;
use crate::planner::LIMIT_ALL_COUNT;
diff --git a/src/frontend/src/optimizer/rule/pull_up_correlated_predicate_rule.rs b/src/frontend/src/optimizer/rule/pull_up_correlated_predicate_rule.rs
index dc5f9c2bc9aba..f34146ba80050 100644
--- a/src/frontend/src/optimizer/rule/pull_up_correlated_predicate_rule.rs
+++ b/src/frontend/src/optimizer/rule/pull_up_correlated_predicate_rule.rs
@@ -18,6 +18,7 @@ use risingwave_common::util::column_index_mapping::ColIndexMapping;
use super::super::plan_node::*;
use super::{BoxedRule, Rule};
use crate::expr::{CorrelatedId, CorrelatedInputRef, Expr, ExprImpl, ExprRewriter, InputRef};
+use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_visitor::{PlanCorrelatedIdFinder, PlanVisitor};
use crate::optimizer::PlanRef;
use crate::utils::Condition;
diff --git a/src/frontend/src/optimizer/rule/table_function_to_project_set_rule.rs b/src/frontend/src/optimizer/rule/table_function_to_project_set_rule.rs
index 5a6f1187fdd02..f85ffc2318459 100644
--- a/src/frontend/src/optimizer/rule/table_function_to_project_set_rule.rs
+++ b/src/frontend/src/optimizer/rule/table_function_to_project_set_rule.rs
@@ -18,6 +18,7 @@ use risingwave_common::types::DataType;
use super::{BoxedRule, Rule};
use crate::expr::{Expr, ExprImpl, ExprType, FunctionCall, InputRef};
+use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::{
LogicalProject, LogicalProjectSet, LogicalTableFunction, LogicalValues, PlanTreeNodeUnary,
};
@@ -51,7 +52,7 @@ impl Rule for TableFunctionToProjectSetRule {
let logical_values = LogicalValues::create(
vec![vec![]],
Schema::new(vec![]),
- logical_table_function.base.ctx.clone(),
+ logical_table_function.base.ctx().clone(),
);
let logical_project_set = LogicalProjectSet::create(logical_values, vec![table_function]);
// We need a project to align schema type because
diff --git a/src/frontend/src/optimizer/rule/trivial_project_to_values_rule.rs b/src/frontend/src/optimizer/rule/trivial_project_to_values_rule.rs
index 9759739490fe6..a13bef3baa9d9 100644
--- a/src/frontend/src/optimizer/rule/trivial_project_to_values_rule.rs
+++ b/src/frontend/src/optimizer/rule/trivial_project_to_values_rule.rs
@@ -13,6 +13,7 @@
// limitations under the License.
use super::{BoxedRule, Rule};
+use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::{LogicalValues, PlanTreeNodeUnary};
use crate::optimizer::plan_visitor::{LogicalCardinalityExt, SideEffectVisitor};
use crate::optimizer::{PlanRef, PlanVisitor};
diff --git a/src/frontend/src/optimizer/rule/union_input_values_merge_rule.rs b/src/frontend/src/optimizer/rule/union_input_values_merge_rule.rs
index 8119b8847b600..7b83c017ab781 100644
--- a/src/frontend/src/optimizer/rule/union_input_values_merge_rule.rs
+++ b/src/frontend/src/optimizer/rule/union_input_values_merge_rule.rs
@@ -13,6 +13,7 @@
// limitations under the License.
use super::{BoxedRule, Rule};
+use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::LogicalValues;
use crate::optimizer::{PlanRef, PlanTreeNode};
diff --git a/src/frontend/src/optimizer/rule/union_to_distinct_rule.rs b/src/frontend/src/optimizer/rule/union_to_distinct_rule.rs
index 2a12f6b712e0d..f1d203fba1350 100644
--- a/src/frontend/src/optimizer/rule/union_to_distinct_rule.rs
+++ b/src/frontend/src/optimizer/rule/union_to_distinct_rule.rs
@@ -13,7 +13,7 @@
// limitations under the License.
use super::{BoxedRule, Rule};
-use crate::optimizer::plan_node::generic::Agg;
+use crate::optimizer::plan_node::generic::{Agg, GenericPlanRef};
use crate::optimizer::plan_node::{LogicalUnion, PlanTreeNode};
use crate::optimizer::PlanRef;
@@ -24,7 +24,7 @@ impl Rule for UnionToDistinctRule {
let union: &LogicalUnion = plan.as_logical_union()?;
if !union.all() {
let union_all = LogicalUnion::create(true, union.inputs().into_iter().collect());
- let distinct = Agg::new(vec![], (0..union.base.schema.len()).collect(), union_all)
+ let distinct = Agg::new(vec![], (0..union.base.schema().len()).collect(), union_all)
.with_enable_two_phase(false);
Some(distinct.into())
} else {
diff --git a/src/frontend/src/scheduler/plan_fragmenter.rs b/src/frontend/src/scheduler/plan_fragmenter.rs
index 4e16bc6cd0b21..cb20103b3e76f 100644
--- a/src/frontend/src/scheduler/plan_fragmenter.rs
+++ b/src/frontend/src/scheduler/plan_fragmenter.rs
@@ -103,7 +103,7 @@ impl Serialize for ExecutionPlanNode {
impl From for ExecutionPlanNode {
fn from(plan_node: PlanRef) -> Self {
Self {
- plan_node_id: plan_node.plan_base().id,
+ plan_node_id: plan_node.plan_base().id(),
plan_node_type: plan_node.node_type(),
node: plan_node.to_batch_prost_body(),
children: vec![],