Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(optimizer): type-safe plan base with compile-time convention check #13000

Merged
merged 15 commits into from
Oct 26, 2023
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/frontend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ arrow-schema = { workspace = true }
async-recursion = "1.0.5"
async-trait = "0.1"
auto_enums = { version = "0.8", features = ["futures03"] }
auto_impl = "1"
bk-tree = "0.5.0"
bytes = "1"
clap = { version = "4", features = ["derive"] }
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/handler/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use super::create_table::{
use super::query::gen_batch_plan_by_statement;
use super::RwPgResponse;
use crate::handler::HandlerArgs;
use crate::optimizer::plan_node::generic::GenericPlanRef;
use crate::optimizer::plan_node::{Convention, Explain};
use crate::optimizer::OptimizerContext;
use crate::scheduler::worker_node_manager::WorkerNodeSelector;
Expand Down
8 changes: 5 additions & 3 deletions src/frontend/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::ops::DerefMut;

pub mod plan_node;
pub use plan_node::{Explain, PlanRef};

pub mod property;

mod delta_join_solver;
Expand Down Expand Up @@ -46,10 +47,11 @@ use risingwave_connector::sink::catalog::SinkFormatDesc;
use risingwave_pb::catalog::WatermarkDesc;

use self::heuristic_optimizer::ApplyOrder;
use self::plan_node::generic::{self, PhysicalPlanRef};
use self::plan_node::{
generic, stream_enforce_eowc_requirement, BatchProject, Convention, LogicalProject,
LogicalSource, StreamDml, StreamMaterialize, StreamProject, StreamRowIdGen, StreamSink,
StreamWatermarkFilter, ToStreamContext,
stream_enforce_eowc_requirement, BatchProject, Convention, LogicalProject, LogicalSource,
StreamDml, StreamMaterialize, StreamProject, StreamRowIdGen, StreamSink, StreamWatermarkFilter,
ToStreamContext,
};
#[cfg(debug_assertions)]
use self::plan_visitor::InputRefValidator;
Expand Down
8 changes: 8 additions & 0 deletions src/frontend/src/optimizer/plan_node/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ use crate::optimizer::property::Order;
/// [`PhysicalPlanRef`].
///
/// [`GenericPlanRef`]: super::generic::GenericPlanRef
#[auto_impl::auto_impl(&)]
pub trait BatchPlanRef: PhysicalPlanRef {
fn order(&self) -> &Order;
}

/// Prelude for batch plan nodes.
pub mod prelude {
pub use super::super::generic::{GenericPlanRef, PhysicalPlanRef};
pub use super::super::Batch;
pub use super::BatchPlanRef;
}
6 changes: 4 additions & 2 deletions src/frontend/src/optimizer/plan_node/batch_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,26 @@ use risingwave_common::error::Result;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::DeleteNode;

use super::batch::prelude::*;
use super::utils::impl_distill_by_unit;
use super::{
generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch,
};
use crate::optimizer::plan_node::generic::PhysicalPlanRef;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant import

use crate::optimizer::plan_node::ToLocalBatch;
use crate::optimizer::property::{Distribution, Order, RequiredDist};

/// `BatchDelete` implements [`super::LogicalDelete`]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct BatchDelete {
pub base: PlanBase,
pub base: PlanBase<Batch>,
pub core: generic::Delete<PlanRef>,
}

impl BatchDelete {
pub fn new(core: generic::Delete<PlanRef>) -> Self {
assert_eq!(core.input.distribution(), &Distribution::Single);
let base: PlanBase =
let base =
PlanBase::new_batch_with_core(&core, core.input.distribution().clone(), Order::any());
Self { base, core }
}
Expand Down
5 changes: 2 additions & 3 deletions src/frontend/src/optimizer/plan_node/batch_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ use risingwave_common::error::Result;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::{ExchangeNode, MergeSortExchangeNode};

use super::batch::BatchPlanRef;
use super::generic::{GenericPlanRef, PhysicalPlanRef};
use super::batch::prelude::*;
use super::utils::{childless_record, Distill};
use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch};
use crate::optimizer::plan_node::ToLocalBatch;
Expand All @@ -28,7 +27,7 @@ use crate::optimizer::property::{Distribution, DistributionDisplay, Order, Order
/// without changing its content.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct BatchExchange {
pub base: PlanBase,
pub base: PlanBase<Batch>,
input: PlanRef,
}

Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/optimizer/plan_node/batch_expand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use risingwave_pb::batch_plan::expand_node::Subset;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::ExpandNode;

use super::batch::prelude::*;
use super::utils::impl_distill_by_unit;
use super::{generic, ExprRewritable};
use crate::optimizer::plan_node::{
Expand All @@ -28,7 +29,7 @@ use crate::optimizer::PlanRef;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct BatchExpand {
pub base: PlanBase,
pub base: PlanBase<Batch>,
core: generic::Expand<PlanRef>,
}

Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/optimizer/plan_node/batch_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use risingwave_common::error::Result;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::FilterNode;

use super::batch::prelude::*;
use super::utils::impl_distill_by_unit;
use super::{generic, ExprRewritable, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch};
use crate::expr::{Expr, ExprImpl, ExprRewriter};
Expand All @@ -25,7 +26,7 @@ use crate::utils::Condition;
/// `BatchFilter` implements [`super::LogicalFilter`]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct BatchFilter {
pub base: PlanBase,
pub base: PlanBase<Batch>,
core: generic::Filter<PlanRef>,
}

Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/optimizer/plan_node/batch_group_topn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use risingwave_common::error::Result;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::GroupTopNNode;

use super::batch::prelude::*;
use super::utils::impl_distill_by_unit;
use super::{
generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch,
Expand All @@ -26,7 +27,7 @@ use crate::optimizer::property::{Order, RequiredDist};
/// `BatchGroupTopN` implements [`super::LogicalTopN`] to find the top N elements with a heap
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct BatchGroupTopN {
pub base: PlanBase,
pub base: PlanBase<Batch>,
core: generic::TopN<PlanRef>,
}

Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/optimizer/plan_node/batch_hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use risingwave_common::error::Result;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::HashAggNode;

use super::batch::prelude::*;
use super::generic::{self, GenericPlanRef, PlanAggCall};
use super::utils::impl_distill_by_unit;
use super::{
Expand All @@ -30,7 +31,7 @@ use crate::utils::{ColIndexMappingRewriteExt, IndexSet};

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct BatchHashAgg {
pub base: PlanBase,
pub base: PlanBase<Batch>,
core: generic::Agg<PlanRef>,
}

Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/optimizer/plan_node/batch_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::HashJoinNode;
use risingwave_pb::plan_common::JoinType;

use super::batch::prelude::*;
use super::generic::{self, GenericPlanRef};
use super::utils::{childless_record, Distill};
use super::{
Expand All @@ -35,7 +36,7 @@ use crate::utils::ColIndexMappingRewriteExt;
/// get output rows.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct BatchHashJoin {
pub base: PlanBase,
pub base: PlanBase<Batch>,
core: generic::Join<PlanRef>,

/// The join condition must be equivalent to `logical.on`, but separated into equal and
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/optimizer/plan_node/batch_hop_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use risingwave_common::error::Result;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::HopWindowNode;

use super::batch::prelude::*;
use super::utils::impl_distill_by_unit;
use super::{
generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch,
Expand All @@ -29,7 +30,7 @@ use crate::utils::ColIndexMappingRewriteExt;
/// input rows
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct BatchHopWindow {
pub base: PlanBase,
pub base: PlanBase<Batch>,
core: generic::HopWindow<PlanRef>,
window_start_exprs: Vec<ExprImpl>,
window_end_exprs: Vec<ExprImpl>,
Expand Down
5 changes: 3 additions & 2 deletions src/frontend/src/optimizer/plan_node/batch_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::InsertNode;
use risingwave_pb::plan_common::{DefaultColumns, IndexAndExpr};

use super::batch::prelude::*;
use super::generic::GenericPlanRef;
use super::utils::{childless_record, Distill};
use super::{generic, ExprRewritable, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch};
Expand All @@ -28,14 +29,14 @@ use crate::optimizer::property::{Distribution, Order, RequiredDist};
/// `BatchInsert` implements [`super::LogicalInsert`]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct BatchInsert {
pub base: PlanBase,
pub base: PlanBase<Batch>,
pub core: generic::Insert<PlanRef>,
}

impl BatchInsert {
pub fn new(core: generic::Insert<PlanRef>) -> Self {
assert_eq!(core.input.distribution(), &Distribution::Single);
let base: PlanBase =
let base: PlanBase<Batch> =
PlanBase::new_batch_with_core(&core, core.input.distribution().clone(), Order::any());

BatchInsert { base, core }
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/optimizer/plan_node/batch_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use risingwave_common::error::Result;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::LimitNode;

use super::batch::prelude::*;
use super::generic::PhysicalPlanRef;
use super::utils::impl_distill_by_unit;
use super::{
Expand All @@ -27,7 +28,7 @@ use crate::optimizer::property::{Order, RequiredDist};
/// `BatchLimit` implements [`super::LogicalLimit`] to fetch specified rows from input
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct BatchLimit {
pub base: PlanBase,
pub base: PlanBase<Batch>,
core: generic::Limit<PlanRef>,
}

Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/optimizer/plan_node/batch_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use risingwave_common::error::Result;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::{DistributedLookupJoinNode, LocalLookupJoinNode};

use super::batch::prelude::*;
use super::generic::{self, GenericPlanRef};
use super::utils::{childless_record, Distill};
use super::ExprRewritable;
Expand All @@ -33,7 +34,7 @@ use crate::utils::ColIndexMappingRewriteExt;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct BatchLookupJoin {
pub base: PlanBase,
pub base: PlanBase<Batch>,
core: generic::Join<PlanRef>,

/// The join condition must be equivalent to `logical.on`, but separated into equal and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use risingwave_common::error::Result;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::NestedLoopJoinNode;

use super::batch::prelude::*;
use super::generic::{self, GenericPlanRef};
use super::utils::{childless_record, Distill};
use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeBinary, ToBatchPb, ToDistributedBatch};
Expand All @@ -30,7 +31,7 @@ use crate::utils::ConditionDisplay;
/// against all pairs of rows from inner & outer side within 2 layers of loops.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct BatchNestedLoopJoin {
pub base: PlanBase,
pub base: PlanBase<Batch>,
core: generic::Join<PlanRef>,
}

Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/optimizer/plan_node/batch_over_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::SortOverWindowNode;

use super::batch::prelude::*;
use super::batch::BatchPlanRef;
use super::generic::PlanWindowFunction;
use super::utils::impl_distill_by_unit;
Expand All @@ -28,7 +29,7 @@ use crate::optimizer::property::{Order, RequiredDist};

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct BatchOverWindow {
pub base: PlanBase,
pub base: PlanBase<Batch>,
core: generic::OverWindow<PlanRef>,
}

Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/optimizer/plan_node/batch_project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::ProjectNode;
use risingwave_pb::expr::ExprNode;

use super::generic::GenericPlanRef;
use super::batch::prelude::*;
use super::utils::{childless_record, Distill};
use super::{
generic, ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch,
Expand All @@ -31,7 +31,7 @@ use crate::utils::ColIndexMappingRewriteExt;
/// rows
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct BatchProject {
pub base: PlanBase,
pub base: PlanBase<Batch>,
core: generic::Project<PlanRef>,
}

Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/optimizer/plan_node/batch_project_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use risingwave_common::error::Result;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::ProjectSetNode;

use super::batch::prelude::*;
use super::utils::impl_distill_by_unit;
use super::{generic, ExprRewritable};
use crate::expr::ExprRewriter;
Expand All @@ -28,7 +29,7 @@ use crate::utils::ColIndexMappingRewriteExt;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct BatchProjectSet {
pub base: PlanBase,
pub base: PlanBase<Batch>,
core: generic::ProjectSet<PlanRef>,
}

Expand Down
5 changes: 2 additions & 3 deletions src/frontend/src/optimizer/plan_node/batch_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ use risingwave_pb::batch_plan::row_seq_scan_node::ChunkSize;
use risingwave_pb::batch_plan::{RowSeqScanNode, SysRowSeqScanNode};
use risingwave_pb::plan_common::PbColumnDesc;

use super::batch::BatchPlanRef;
use super::generic::{GenericPlanRef, PhysicalPlanRef};
use super::batch::prelude::*;
use super::utils::{childless_record, Distill};
use super::{generic, ExprRewritable, PlanBase, PlanRef, ToBatchPb, ToDistributedBatch};
use crate::catalog::ColumnId;
Expand All @@ -36,7 +35,7 @@ use crate::optimizer::property::{Distribution, DistributionDisplay, Order};
/// `BatchSeqScan` implements [`super::LogicalScan`] to scan from a row-oriented table
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct BatchSeqScan {
pub base: PlanBase,
pub base: PlanBase<Batch>,
core: generic::Scan,
scan_ranges: Vec<ScanRange>,
}
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/optimizer/plan_node/batch_simple_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use risingwave_common::error::Result;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::SortAggNode;

use super::batch::prelude::*;
use super::generic::{self, GenericPlanRef, PlanAggCall};
use super::utils::impl_distill_by_unit;
use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch};
Expand All @@ -25,7 +26,7 @@ use crate::optimizer::property::{Distribution, Order, RequiredDist};

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct BatchSimpleAgg {
pub base: PlanBase,
pub base: PlanBase<Batch>,
core: generic::Agg<PlanRef>,
}

Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/optimizer/plan_node/batch_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use risingwave_common::error::Result;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::SortNode;

use super::batch::prelude::*;
use super::batch::BatchPlanRef;
use super::utils::{childless_record, Distill};
use super::{ExprRewritable, PlanBase, PlanRef, PlanTreeNodeUnary, ToBatchPb, ToDistributedBatch};
Expand All @@ -27,7 +28,7 @@ use crate::optimizer::property::{Order, OrderDisplay};
/// collation required by user or parent plan node.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct BatchSort {
pub base: PlanBase,
pub base: PlanBase<Batch>,
input: PlanRef,
}

Expand Down
Loading
Loading