Skip to content

Commit

Permalink
Refactor some physical plan
Browse files Browse the repository at this point in the history
  • Loading branch information
lewiszlw committed Feb 1, 2024
1 parent 4d3bc09 commit 8a2f601
Show file tree
Hide file tree
Showing 13 changed files with 52 additions and 53 deletions.
2 changes: 1 addition & 1 deletion bustubx/src/expression/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::catalog::Schema;
use crate::catalog::{Column, DataType};
use crate::common::ScalarValue;
use crate::storage::Tuple;
use crate::{BustubxResult};
use crate::BustubxResult;

pub trait ExprTrait {
/// Get the data type of this expression, given the schema of the input
Expand Down
15 changes: 7 additions & 8 deletions bustubx/src/planner/logical_plan_v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ pub use join::Join;
pub use limit::Limit;
pub use project::Project;
pub use sort::{OrderByExpr, Sort};
use std::sync::Arc;
pub use table_scan::TableScan;
pub use util::*;
pub use values::Values;
Expand All @@ -44,17 +43,17 @@ pub enum LogicalPlanV2 {
impl LogicalPlanV2 {
pub fn schema(&self) -> &SchemaRef {
match self {
LogicalPlanV2::CreateTable(_) => &Arc::new(Schema::empty()),
LogicalPlanV2::CreateIndex(_) => &Arc::new(Schema::empty()),
LogicalPlanV2::CreateTable(_) => todo!(),
LogicalPlanV2::CreateIndex(_) => todo!(),
LogicalPlanV2::Filter(Filter { input, .. }) => input.schema(),
LogicalPlanV2::Insert(_) => &Arc::new(Schema::new(vec![Column::new(
"insert_rows".to_string(),
DataType::Int32,
)])),
LogicalPlanV2::Insert(_) => todo!(),
LogicalPlanV2::Join(Join { schema, .. }) => schema,
LogicalPlanV2::Limit(Limit { input, .. }) => input.schema(),
LogicalPlanV2::Project(Project { schema, .. }) => schema,
LogicalPlanV2::TableScan(TableScan { schema, .. }) => schema,
LogicalPlanV2::TableScan(TableScan {
table_schema: schema,
..
}) => schema,
LogicalPlanV2::Sort(Sort { input, .. }) => input.schema(),
LogicalPlanV2::Values(Values { schema, .. }) => schema,
LogicalPlanV2::EmptyRelation(EmptyRelation { schema, .. }) => schema,
Expand Down
2 changes: 1 addition & 1 deletion bustubx/src/planner/logical_plan_v2/table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::expression::Expr;
#[derive(derive_new::new, Debug, Clone)]
pub struct TableScan {
pub table_ref: TableReference,
pub schema: SchemaRef,
pub table_schema: SchemaRef,
pub filters: Vec<Expr>,
pub limit: Option<usize>,
}
2 changes: 1 addition & 1 deletion bustubx/src/planner/logical_planner/logical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ impl<'a> LogicalPlanner<'a> {
match table_ref {
BoundTableRef::BaseTable(table) => LogicalPlan {
operator: LogicalOperator::new_scan_operator(
table.oid,
table.table.clone(),
table.schema.columns.clone(),
),
children: Vec::new(),
Expand Down
2 changes: 1 addition & 1 deletion bustubx/src/planner/logical_planner/plan_set_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ impl LogicalPlanner<'_> {
)?;
Ok(LogicalPlanV2::TableScan(TableScan {
table_ref,
schema,
table_schema: schema,
filters: vec![],
limit: None,
}))
Expand Down
4 changes: 2 additions & 2 deletions bustubx/src/planner/operator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ impl LogicalOperator {
pub fn new_values_operator(columns: Vec<ColumnRef>, tuples: Vec<Vec<Expr>>) -> LogicalOperator {
LogicalOperator::Values(LogicalValuesOperator::new(columns, tuples))
}
pub fn new_scan_operator(table_oid: TableOid, columns: Vec<ColumnRef>) -> LogicalOperator {
LogicalOperator::Scan(LogicalScanOperator::new(table_oid, columns))
pub fn new_scan_operator(table_name: String, columns: Vec<ColumnRef>) -> LogicalOperator {
LogicalOperator::Scan(LogicalScanOperator::new(table_name, columns))
}
pub fn new_project_operator(expressions: Vec<Expr>) -> LogicalOperator {
LogicalOperator::Project(LogicalProjectOperator::new(expressions))
Expand Down
2 changes: 1 addition & 1 deletion bustubx/src/planner/operator/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ use crate::catalog::TableOid;

#[derive(derive_new::new, Debug, Clone)]
pub struct LogicalScanOperator {
pub table_oid: TableOid,
pub table_name: String,
pub columns: Vec<ColumnRef>,
}
19 changes: 7 additions & 12 deletions bustubx/src/planner/physical_plan/create_index.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use crate::catalog::SchemaRef;
use crate::common::table_ref::TableReference;
use crate::planner::logical_plan_v2::OrderByExpr;
use crate::{
catalog::Schema,
execution::{ExecutionContext, VolcanoExecutor},
Expand All @@ -9,26 +11,19 @@ use std::sync::Arc;

#[derive(Debug, derive_new::new)]
pub struct PhysicalCreateIndex {
pub index_name: String,
pub table_name: String,
pub name: String,
pub table: TableReference,
pub table_schema: SchemaRef,
pub key_attrs: Vec<u32>,
pub columns: Vec<OrderByExpr>,
}

impl VolcanoExecutor for PhysicalCreateIndex {
fn next(&self, context: &mut ExecutionContext) -> BustubxResult<Option<Tuple>> {
context.catalog.create_index(
self.index_name.clone(),
self.table_name.clone(),
self.key_attrs.clone(),
);
// TODO implement
Ok(None)
}
fn output_schema(&self) -> SchemaRef {
Arc::new(Schema::copy_schema(
self.table_schema.clone(),
&self.key_attrs,
))
Arc::new(Schema::empty())
}
}

Expand Down
10 changes: 6 additions & 4 deletions bustubx/src/planner/physical_plan/create_table.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::catalog::SchemaRef;
use crate::common::table_ref::TableReference;
use crate::{
catalog::Schema,
execution::{ExecutionContext, VolcanoExecutor},
Expand All @@ -9,15 +10,16 @@ use std::sync::Arc;

#[derive(derive_new::new, Debug)]
pub struct PhysicalCreateTable {
pub table_name: String,
pub table: TableReference,
pub schema: Schema,
}

impl VolcanoExecutor for PhysicalCreateTable {
fn next(&self, context: &mut ExecutionContext) -> BustubxResult<Option<Tuple>> {
context
.catalog
.create_table(self.table_name.clone(), Arc::new(self.schema.clone()));
context.catalog.create_table(
self.table.table().to_string(),
Arc::new(self.schema.clone()),
);
Ok(None)
}
fn output_schema(&self) -> SchemaRef {
Expand Down
9 changes: 5 additions & 4 deletions bustubx/src/planner/physical_plan/insert.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::{atomic::AtomicU32, Arc};

use crate::catalog::{ColumnRef, SchemaRef};
use crate::common::table_ref::TableReference;
use crate::{
catalog::{Column, DataType, Schema},
common::ScalarValue,
Expand All @@ -13,16 +14,16 @@ use super::PhysicalPlan;

#[derive(Debug)]
pub struct PhysicalInsert {
pub table_name: String,
pub table: TableReference,
pub columns: Vec<ColumnRef>,
pub input: Arc<PhysicalPlan>,

insert_rows: AtomicU32,
}
impl PhysicalInsert {
pub fn new(table_name: String, columns: Vec<ColumnRef>, input: Arc<PhysicalPlan>) -> Self {
pub fn new(table: TableReference, columns: Vec<ColumnRef>, input: Arc<PhysicalPlan>) -> Self {
Self {
table_name,
table,
columns,
input,
insert_rows: AtomicU32::new(0),
Expand Down Expand Up @@ -58,7 +59,7 @@ impl VolcanoExecutor for PhysicalInsert {
// TODO update index if needed
let table_heap = &mut context
.catalog
.get_mut_table_by_name(self.table_name.as_str())
.get_mut_table_by_name(self.table.table())
.unwrap()
.table;
let tuple_meta = TupleMeta {
Expand Down
4 changes: 2 additions & 2 deletions bustubx/src/planner/physical_plan/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use super::PhysicalPlan;

#[derive(derive_new::new, Debug)]
pub struct PhysicalProject {
pub expressions: Vec<Expr>,
pub exprs: Vec<Expr>,
pub input: Arc<PhysicalPlan>,
}

Expand All @@ -29,7 +29,7 @@ impl VolcanoExecutor for PhysicalProject {
}
let next_tuple = next_tuple.unwrap();
let mut new_values = Vec::new();
for expr in &self.expressions {
for expr in &self.exprs {
new_values.push(expr.evaluate(&next_tuple)?);
}
return Ok(Some(Tuple::new(self.output_schema(), new_values)));
Expand Down
19 changes: 9 additions & 10 deletions bustubx/src/planner/physical_plan/seq_scan.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::{Arc, Mutex};

use crate::catalog::{ColumnRef, SchemaRef};
use crate::common::table_ref::TableReference;
use crate::{
catalog::{Schema, TableOid},
execution::{ExecutionContext, VolcanoExecutor},
Expand All @@ -10,17 +11,17 @@ use crate::{

#[derive(Debug)]
pub struct PhysicalSeqScan {
pub table_oid: TableOid,
pub columns: Vec<ColumnRef>,
pub table: TableReference,
pub table_schema: SchemaRef,

iterator: Mutex<TableIterator>,
}

impl PhysicalSeqScan {
pub fn new(table_oid: TableOid, columns: Vec<ColumnRef>) -> Self {
pub fn new(table: TableReference, table_schema: SchemaRef) -> Self {
PhysicalSeqScan {
table_oid,
columns,
table,
table_schema,
iterator: Mutex::new(TableIterator::new(None, None)),
}
}
Expand All @@ -31,7 +32,7 @@ impl VolcanoExecutor for PhysicalSeqScan {
println!("init table scan executor");
let table_info = context
.catalog
.get_mut_table_by_oid(self.table_oid)
.get_mut_table_by_name(self.table.table())
.unwrap();
let inited_iterator = table_info.table.iter(None, None);
let mut iterator = self.iterator.lock().unwrap();
Expand All @@ -42,17 +43,15 @@ impl VolcanoExecutor for PhysicalSeqScan {
fn next(&self, context: &mut ExecutionContext) -> BustubxResult<Option<Tuple>> {
let table_info = context
.catalog
.get_mut_table_by_oid(self.table_oid)
.get_mut_table_by_name(self.table.table())
.unwrap();
let mut iterator = self.iterator.lock().unwrap();
let full_tuple = iterator.next(&mut table_info.table);
return Ok(full_tuple.map(|t| t.1));
}

fn output_schema(&self) -> SchemaRef {
Arc::new(Schema {
columns: self.columns.clone(),
})
self.table_schema.clone()
}
}

Expand Down
15 changes: 9 additions & 6 deletions bustubx/src/planner/physical_planner/physical_planner.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::catalog::Schema;
use crate::common::table_ref::TableReference;
use std::sync::Arc;

use crate::planner::logical_plan::LogicalPlan;
Expand Down Expand Up @@ -33,23 +34,23 @@ pub fn build_plan(logical_plan: Arc<LogicalPlan>) -> PhysicalPlan {
let plan = match logical_plan.operator {
LogicalOperator::CreateTable(ref logic_create_table) => {
PhysicalPlan::CreateTable(PhysicalCreateTable::new(
logic_create_table.table_name.clone(),
TableReference::bare(logic_create_table.table_name.clone()),
logic_create_table.schema.clone(),
))
}
LogicalOperator::CreateIndex(ref logic_create_index) => {
PhysicalPlan::CreateIndex(PhysicalCreateIndex::new(
logic_create_index.index_name.clone(),
logic_create_index.table_name.clone(),
TableReference::bare(logic_create_index.table_name.clone()),
logic_create_index.table_schema.clone(),
logic_create_index.key_attrs.clone(),
vec![],
))
}
LogicalOperator::Insert(ref logic_insert) => {
let child_logical_node = logical_plan.children[0].clone();
let child_physical_node = build_plan(child_logical_node.clone());
PhysicalPlan::Insert(PhysicalInsert::new(
logic_insert.table_name.clone(),
TableReference::bare(logic_insert.table_name.clone()),
logic_insert.columns.clone(),
Arc::new(child_physical_node),
))
Expand Down Expand Up @@ -77,8 +78,10 @@ pub fn build_plan(logical_plan: Arc<LogicalPlan>) -> PhysicalPlan {
}
LogicalOperator::Scan(ref logical_table_scan) => {
PhysicalPlan::TableScan(PhysicalSeqScan::new(
logical_table_scan.table_oid.clone(),
logical_table_scan.columns.clone(),
TableReference::bare(logical_table_scan.table_name.clone()),
Arc::new(Schema {
columns: logical_table_scan.columns.clone(),
}),
))
}
LogicalOperator::Limit(ref logical_limit) => {
Expand Down

0 comments on commit 8a2f601

Please sign in to comment.