diff --git a/bustubx/src/catalog/catalog.rs b/bustubx/src/catalog/catalog.rs index 944cf37..b7dd14b 100644 --- a/bustubx/src/catalog/catalog.rs +++ b/bustubx/src/catalog/catalog.rs @@ -112,6 +112,10 @@ impl Catalog { ))) } + pub fn table_indexes(&self, table_ref: &TableReference) -> Vec> { + vec![] + } + pub fn create_index( &mut self, index_name: String, diff --git a/bustubx/src/execution/physical_plan/index_scan.rs b/bustubx/src/execution/physical_plan/index_scan.rs new file mode 100644 index 0000000..771b613 --- /dev/null +++ b/bustubx/src/execution/physical_plan/index_scan.rs @@ -0,0 +1,75 @@ +use crate::catalog::SchemaRef; +use crate::common::TableReference; +use crate::execution::{ExecutionContext, VolcanoExecutor}; +use crate::storage::index::TreeIndexIterator; +use crate::{BustubxError, BustubxResult, Tuple}; +use std::ops::{Bound, RangeBounds}; +use std::sync::Mutex; + +#[derive(Debug)] +pub struct PhysicalIndexScan { + table_ref: TableReference, + index_name: String, + table_schema: SchemaRef, + start_bound: Bound, + end_bound: Bound, + iterator: Mutex>, +} + +impl PhysicalIndexScan { + pub fn new>( + table_ref: TableReference, + index_name: String, + table_schema: SchemaRef, + range: R, + ) -> Self { + Self { + table_ref, + index_name, + table_schema, + start_bound: range.start_bound().cloned(), + end_bound: range.end_bound().cloned(), + iterator: Mutex::new(None), + } + } +} + +impl VolcanoExecutor for PhysicalIndexScan { + fn init(&self, context: &mut ExecutionContext) -> BustubxResult<()> { + let index = context + .catalog + .get_index_by_name(&self.table_ref, &self.index_name) + .unwrap(); + *self.iterator.lock().unwrap() = Some(TreeIndexIterator::new( + index, + (self.start_bound.clone(), self.end_bound.clone()), + )); + Ok(()) + } + + fn next(&self, context: &mut ExecutionContext) -> BustubxResult> { + let mut guard = self.iterator.lock().unwrap(); + let Some(iterator) = &mut *guard else { + return Err(BustubxError::Execution( + "index iterator not created".to_string(), + )); + }; + let table_heap = context.catalog.table_heap(&self.table_ref)?; + if let Some(rid) = iterator.next()? { + let (_, tuple) = table_heap.tuple(rid)?; + Ok(Some(tuple)) + } else { + Ok(None) + } + } + + fn output_schema(&self) -> SchemaRef { + self.table_schema.clone() + } +} + +impl std::fmt::Display for PhysicalIndexScan { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "IndexScan: {}", self.index_name) + } +} diff --git a/bustubx/src/execution/physical_plan/insert.rs b/bustubx/src/execution/physical_plan/insert.rs index 9bf4106..ffade65 100644 --- a/bustubx/src/execution/physical_plan/insert.rs +++ b/bustubx/src/execution/physical_plan/insert.rs @@ -74,19 +74,24 @@ impl VolcanoExecutor for PhysicalInsert { casted_data.push(value.cast_to(&target_type)?); } } - let tuple = Tuple { - schema: self.projected_schema.clone(), - data: casted_data, - }; + // TODO fill default values in data + let tuple = Tuple::new(self.table_schema.clone(), casted_data); - // TODO update index if needed let table_heap = context.catalog.table_heap(&self.table)?; let tuple_meta = TupleMeta { insert_txn_id: 0, delete_txn_id: 0, is_deleted: false, }; - table_heap.insert_tuple(&tuple_meta, &tuple)?; + let rid = table_heap.insert_tuple(&tuple_meta, &tuple)?; + + let indexes = context.catalog.table_indexes(&self.table); + for index in indexes { + if let Ok(key_tuple) = tuple.project_with_schema(index.key_schema.clone()) { + index.insert(&key_tuple, rid)?; + } + } + self.insert_rows .fetch_add(1, std::sync::atomic::Ordering::SeqCst); } diff --git a/bustubx/src/execution/physical_plan/mod.rs b/bustubx/src/execution/physical_plan/mod.rs index 4e1f995..055ae22 100644 --- a/bustubx/src/execution/physical_plan/mod.rs +++ b/bustubx/src/execution/physical_plan/mod.rs @@ -3,6 +3,7 @@ mod create_index; mod create_table; mod empty; mod filter; +mod index_scan; mod insert; mod limit; mod nested_loop_join; @@ -16,6 +17,7 @@ pub use create_index::PhysicalCreateIndex; pub use create_table::PhysicalCreateTable; pub use empty::PhysicalEmpty; pub use filter::PhysicalFilter; +pub use index_scan::PhysicalIndexScan; pub use insert::PhysicalInsert; pub use limit::PhysicalLimit; pub use nested_loop_join::PhysicalNestedLoopJoin; @@ -39,6 +41,7 @@ pub enum PhysicalPlan { Project(PhysicalProject), Filter(PhysicalFilter), TableScan(PhysicalSeqScan), + IndexScan(PhysicalIndexScan), Limit(PhysicalLimit), Insert(PhysicalInsert), Values(PhysicalValues), @@ -65,6 +68,7 @@ impl PhysicalPlan { | PhysicalPlan::CreateTable(_) | PhysicalPlan::CreateIndex(_) | PhysicalPlan::TableScan(_) + | PhysicalPlan::IndexScan(_) | PhysicalPlan::Values(_) => vec![], } } @@ -81,6 +85,7 @@ impl VolcanoExecutor for PhysicalPlan { PhysicalPlan::Project(op) => op.init(context), PhysicalPlan::Filter(op) => op.init(context), PhysicalPlan::TableScan(op) => op.init(context), + PhysicalPlan::IndexScan(op) => op.init(context), PhysicalPlan::Limit(op) => op.init(context), PhysicalPlan::NestedLoopJoin(op) => op.init(context), PhysicalPlan::Sort(op) => op.init(context), @@ -98,6 +103,7 @@ impl VolcanoExecutor for PhysicalPlan { PhysicalPlan::Project(op) => op.next(context), PhysicalPlan::Filter(op) => op.next(context), PhysicalPlan::TableScan(op) => op.next(context), + PhysicalPlan::IndexScan(op) => op.next(context), PhysicalPlan::Limit(op) => op.next(context), PhysicalPlan::NestedLoopJoin(op) => op.next(context), PhysicalPlan::Sort(op) => op.next(context), @@ -115,6 +121,7 @@ impl VolcanoExecutor for PhysicalPlan { Self::Project(op) => op.output_schema(), Self::Filter(op) => op.output_schema(), Self::TableScan(op) => op.output_schema(), + Self::IndexScan(op) => op.output_schema(), Self::Limit(op) => op.output_schema(), Self::NestedLoopJoin(op) => op.output_schema(), Self::Sort(op) => op.output_schema(), @@ -134,6 +141,7 @@ impl std::fmt::Display for PhysicalPlan { Self::Project(op) => write!(f, "{op}"), Self::Filter(op) => write!(f, "{op}"), Self::TableScan(op) => write!(f, "{op}"), + Self::IndexScan(op) => write!(f, "{op}"), Self::Limit(op) => write!(f, "{op}"), Self::NestedLoopJoin(op) => write!(f, "{op}"), Self::Sort(op) => write!(f, "{op}"), diff --git a/bustubx/src/storage/index.rs b/bustubx/src/storage/index.rs index 47bbbf9..0ec1a51 100644 --- a/bustubx/src/storage/index.rs +++ b/bustubx/src/storage/index.rs @@ -35,6 +35,7 @@ impl Context { } // B+树索引 +#[derive(Debug)] pub struct BPlusTreeIndex { pub key_schema: SchemaRef, pub buffer_pool: Arc, @@ -604,6 +605,7 @@ impl BPlusTreeIndex { } } +#[derive(Debug)] pub struct TreeIndexIterator { index: Arc, start_bound: Bound, diff --git a/bustubx/src/storage/tuple.rs b/bustubx/src/storage/tuple.rs index 606d070..b0466f5 100644 --- a/bustubx/src/storage/tuple.rs +++ b/bustubx/src/storage/tuple.rs @@ -15,6 +15,22 @@ impl Tuple { Self { schema, data } } + pub fn project_with_schema(&self, projected_schema: SchemaRef) -> BustubxResult { + let indices = projected_schema + .columns + .iter() + .map(|col| { + self.schema + .index_of(col.relation.as_ref(), col.name.as_str()) + }) + .collect::>>()?; + let projected_data = indices + .iter() + .map(|idx| self.data[*idx].clone()) + .collect::>(); + Ok(Self::new(projected_schema, projected_data)) + } + pub fn empty(schema: SchemaRef) -> Self { let mut data = vec![]; for col in schema.columns.iter() {