Skip to content

Commit

Permalink
Add physical index scan
Browse files Browse the repository at this point in the history
  • Loading branch information
lewiszlw committed Feb 25, 2024
1 parent 6d9df93 commit 3857645
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 6 deletions.
4 changes: 4 additions & 0 deletions bustubx/src/catalog/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ impl Catalog {
)))
}

pub fn table_indexes(&self, table_ref: &TableReference) -> Vec<Arc<BPlusTreeIndex>> {

Check warning on line 115 in bustubx/src/catalog/catalog.rs

View workflow job for this annotation

GitHub Actions / Test Suite

unused variable: `table_ref`
vec![]
}

pub fn create_index(
&mut self,
index_name: String,
Expand Down
75 changes: 75 additions & 0 deletions bustubx/src/execution/physical_plan/index_scan.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
use crate::catalog::SchemaRef;
use crate::common::TableReference;
use crate::execution::{ExecutionContext, VolcanoExecutor};
use crate::storage::index::TreeIndexIterator;
use crate::{BustubxError, BustubxResult, Tuple};
use std::ops::{Bound, RangeBounds};
use std::sync::Mutex;

#[derive(Debug)]
pub struct PhysicalIndexScan {
table_ref: TableReference,
index_name: String,
table_schema: SchemaRef,
start_bound: Bound<Tuple>,
end_bound: Bound<Tuple>,
iterator: Mutex<Option<TreeIndexIterator>>,
}

impl PhysicalIndexScan {
pub fn new<R: RangeBounds<Tuple>>(

Check warning on line 20 in bustubx/src/execution/physical_plan/index_scan.rs

View workflow job for this annotation

GitHub Actions / Test Suite

associated function `new` is never used
table_ref: TableReference,
index_name: String,
table_schema: SchemaRef,
range: R,
) -> Self {
Self {
table_ref,
index_name,
table_schema,
start_bound: range.start_bound().cloned(),
end_bound: range.end_bound().cloned(),
iterator: Mutex::new(None),
}
}
}

impl VolcanoExecutor for PhysicalIndexScan {
fn init(&self, context: &mut ExecutionContext) -> BustubxResult<()> {
let index = context
.catalog
.get_index_by_name(&self.table_ref, &self.index_name)
.unwrap();
*self.iterator.lock().unwrap() = Some(TreeIndexIterator::new(
index,
(self.start_bound.clone(), self.end_bound.clone()),
));
Ok(())
}

fn next(&self, context: &mut ExecutionContext) -> BustubxResult<Option<Tuple>> {
let mut guard = self.iterator.lock().unwrap();
let Some(iterator) = &mut *guard else {
return Err(BustubxError::Execution(
"index iterator not created".to_string(),
));
};
let table_heap = context.catalog.table_heap(&self.table_ref)?;
if let Some(rid) = iterator.next()? {
let (_, tuple) = table_heap.tuple(rid)?;
Ok(Some(tuple))
} else {
Ok(None)
}
}

fn output_schema(&self) -> SchemaRef {
self.table_schema.clone()
}
}

impl std::fmt::Display for PhysicalIndexScan {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "IndexScan: {}", self.index_name)
}
}
17 changes: 11 additions & 6 deletions bustubx/src/execution/physical_plan/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,19 +74,24 @@ impl VolcanoExecutor for PhysicalInsert {
casted_data.push(value.cast_to(&target_type)?);
}
}
let tuple = Tuple {
schema: self.projected_schema.clone(),
data: casted_data,
};
// TODO fill default values in data
let tuple = Tuple::new(self.table_schema.clone(), casted_data);

// TODO update index if needed
let table_heap = context.catalog.table_heap(&self.table)?;
let tuple_meta = TupleMeta {
insert_txn_id: 0,
delete_txn_id: 0,
is_deleted: false,
};
table_heap.insert_tuple(&tuple_meta, &tuple)?;
let rid = table_heap.insert_tuple(&tuple_meta, &tuple)?;

let indexes = context.catalog.table_indexes(&self.table);
for index in indexes {
if let Ok(key_tuple) = tuple.project_with_schema(index.key_schema.clone()) {
index.insert(&key_tuple, rid)?;
}
}

self.insert_rows
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
}
Expand Down
8 changes: 8 additions & 0 deletions bustubx/src/execution/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ mod create_index;
mod create_table;
mod empty;
mod filter;
mod index_scan;
mod insert;
mod limit;
mod nested_loop_join;
Expand All @@ -16,6 +17,7 @@ pub use create_index::PhysicalCreateIndex;
pub use create_table::PhysicalCreateTable;
pub use empty::PhysicalEmpty;
pub use filter::PhysicalFilter;
pub use index_scan::PhysicalIndexScan;
pub use insert::PhysicalInsert;
pub use limit::PhysicalLimit;
pub use nested_loop_join::PhysicalNestedLoopJoin;
Expand All @@ -39,6 +41,7 @@ pub enum PhysicalPlan {
Project(PhysicalProject),
Filter(PhysicalFilter),
TableScan(PhysicalSeqScan),
IndexScan(PhysicalIndexScan),

Check warning on line 44 in bustubx/src/execution/physical_plan/mod.rs

View workflow job for this annotation

GitHub Actions / Test Suite

variant `IndexScan` is never constructed
Limit(PhysicalLimit),
Insert(PhysicalInsert),
Values(PhysicalValues),
Expand All @@ -65,6 +68,7 @@ impl PhysicalPlan {
| PhysicalPlan::CreateTable(_)
| PhysicalPlan::CreateIndex(_)
| PhysicalPlan::TableScan(_)
| PhysicalPlan::IndexScan(_)
| PhysicalPlan::Values(_) => vec![],
}
}
Expand All @@ -81,6 +85,7 @@ impl VolcanoExecutor for PhysicalPlan {
PhysicalPlan::Project(op) => op.init(context),
PhysicalPlan::Filter(op) => op.init(context),
PhysicalPlan::TableScan(op) => op.init(context),
PhysicalPlan::IndexScan(op) => op.init(context),
PhysicalPlan::Limit(op) => op.init(context),
PhysicalPlan::NestedLoopJoin(op) => op.init(context),
PhysicalPlan::Sort(op) => op.init(context),
Expand All @@ -98,6 +103,7 @@ impl VolcanoExecutor for PhysicalPlan {
PhysicalPlan::Project(op) => op.next(context),
PhysicalPlan::Filter(op) => op.next(context),
PhysicalPlan::TableScan(op) => op.next(context),
PhysicalPlan::IndexScan(op) => op.next(context),
PhysicalPlan::Limit(op) => op.next(context),
PhysicalPlan::NestedLoopJoin(op) => op.next(context),
PhysicalPlan::Sort(op) => op.next(context),
Expand All @@ -115,6 +121,7 @@ impl VolcanoExecutor for PhysicalPlan {
Self::Project(op) => op.output_schema(),
Self::Filter(op) => op.output_schema(),
Self::TableScan(op) => op.output_schema(),
Self::IndexScan(op) => op.output_schema(),
Self::Limit(op) => op.output_schema(),
Self::NestedLoopJoin(op) => op.output_schema(),
Self::Sort(op) => op.output_schema(),
Expand All @@ -134,6 +141,7 @@ impl std::fmt::Display for PhysicalPlan {
Self::Project(op) => write!(f, "{op}"),
Self::Filter(op) => write!(f, "{op}"),
Self::TableScan(op) => write!(f, "{op}"),
Self::IndexScan(op) => write!(f, "{op}"),
Self::Limit(op) => write!(f, "{op}"),
Self::NestedLoopJoin(op) => write!(f, "{op}"),
Self::Sort(op) => write!(f, "{op}"),
Expand Down
2 changes: 2 additions & 0 deletions bustubx/src/storage/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ impl Context {
}

// B+树索引
#[derive(Debug)]
pub struct BPlusTreeIndex {
pub key_schema: SchemaRef,
pub buffer_pool: Arc<BufferPoolManager>,
Expand Down Expand Up @@ -604,6 +605,7 @@ impl BPlusTreeIndex {
}
}

#[derive(Debug)]
pub struct TreeIndexIterator {
index: Arc<BPlusTreeIndex>,
start_bound: Bound<Tuple>,
Expand Down
16 changes: 16 additions & 0 deletions bustubx/src/storage/tuple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,22 @@ impl Tuple {
Self { schema, data }
}

pub fn project_with_schema(&self, projected_schema: SchemaRef) -> BustubxResult<Self> {
let indices = projected_schema
.columns
.iter()
.map(|col| {
self.schema
.index_of(col.relation.as_ref(), col.name.as_str())
})
.collect::<BustubxResult<Vec<usize>>>()?;
let projected_data = indices
.iter()
.map(|idx| self.data[*idx].clone())
.collect::<Vec<ScalarValue>>();
Ok(Self::new(projected_schema, projected_data))
}

pub fn empty(schema: SchemaRef) -> Self {
let mut data = vec![];
for col in schema.columns.iter() {
Expand Down

0 comments on commit 3857645

Please sign in to comment.