diff --git a/bustubx/src/catalog/catalog.rs b/bustubx/src/catalog/catalog.rs index efc1f9d..6bff6e1 100644 --- a/bustubx/src/catalog/catalog.rs +++ b/bustubx/src/catalog/catalog.rs @@ -19,16 +19,9 @@ pub static DEFAULT_SCHEMA_NAME: &str = "public"; /// catalog, schema, table, index pub type FullIndexRef = (String, String, String, String); -// index元信息 -pub struct IndexInfo { - pub key_schema: SchemaRef, - pub index: BPlusTreeIndex, - pub table_name: String, -} - pub struct Catalog { pub tables: HashMap>, - pub indexes: HashMap, + pub indexes: HashMap>, pub buffer_pool: Arc, } @@ -124,7 +117,7 @@ impl Catalog { index_name: String, table_ref: &TableReference, key_attrs: Vec, - ) -> BustubxResult<&IndexInfo> { + ) -> BustubxResult> { let (catalog, schema, table) = table_ref.extend_to_full(); let full_index_ref = (catalog, schema, table, index_name.clone()); @@ -139,14 +132,11 @@ impl Catalog { BPLUS_INTERNAL_PAGE_MAX_SIZE as u32, ); - let index_info = IndexInfo { - key_schema, - index: b_plus_tree_index, - table_name: table_ref.table().to_string(), - }; - self.indexes.insert(full_index_ref.clone(), index_info); + self.indexes + .insert(full_index_ref.clone(), Arc::new(b_plus_tree_index)); self.indexes .get(&full_index_ref) + .cloned() .ok_or(BustubxError::Internal("Failed to create table".to_string())) } @@ -154,10 +144,10 @@ impl Catalog { &self, table_ref: &TableReference, index_name: &str, - ) -> Option<&IndexInfo> { + ) -> Option> { let (catalog, schema, table) = table_ref.extend_to_full(); let full_index_ref = (catalog, schema, table, index_name.to_string()); - self.indexes.get(&full_index_ref) + self.indexes.get(&full_index_ref).cloned() } } @@ -231,7 +221,6 @@ mod tests { let index_info = catalog .create_index(index_name1.clone(), &table_ref, key_attrs) .unwrap(); - assert_eq!(index_info.table_name, table_ref.table()); assert_eq!(index_info.key_schema.column_count(), 2); assert_eq!( index_info.key_schema.column_with_index(0).unwrap().name, @@ -263,7 +252,6 @@ mod tests { let index_info = catalog .create_index(index_name2.clone(), &table_ref, key_attrs) .unwrap(); - assert_eq!(index_info.table_name, table_ref.table()); assert_eq!(index_info.key_schema.column_count(), 1); assert_eq!( index_info.key_schema.column_with_index(0).unwrap().name, diff --git a/bustubx/src/common/util.rs b/bustubx/src/common/util.rs index 8455842..5a69025 100644 --- a/bustubx/src/common/util.rs +++ b/bustubx/src/common/util.rs @@ -6,6 +6,7 @@ use crate::storage::index::BPlusTreeIndex; use crate::BustubxResult; use comfy_table::Cell; use std::collections::VecDeque; +use std::sync::atomic::Ordering; use crate::storage::{BPlusTreePage, Tuple}; @@ -79,7 +80,7 @@ pub(crate) fn pretty_format_index_tree(index: &BPlusTreeIndex) -> BustubxResult< } // 层序遍历 let mut curr_queue = VecDeque::new(); - curr_queue.push_back(index.root_page_id); + curr_queue.push_back(index.root_page_id.load(Ordering::SeqCst)); let mut level_index = 1; loop { diff --git a/bustubx/src/execution/physical_plan/seq_scan.rs b/bustubx/src/execution/physical_plan/seq_scan.rs index 314e1e5..e92cd55 100644 --- a/bustubx/src/execution/physical_plan/seq_scan.rs +++ b/bustubx/src/execution/physical_plan/seq_scan.rs @@ -29,7 +29,7 @@ impl PhysicalSeqScan { impl VolcanoExecutor for PhysicalSeqScan { fn init(&self, context: &mut ExecutionContext) -> BustubxResult<()> { let table_heap = context.catalog.table_heap(&self.table)?; - *self.iterator.lock().unwrap() = Some(TableIterator::new(table_heap, (..))); + *self.iterator.lock().unwrap() = Some(TableIterator::new(table_heap, ..)); Ok(()) } diff --git a/bustubx/src/storage/index.rs b/bustubx/src/storage/index.rs index 62a9884..47bbbf9 100644 --- a/bustubx/src/storage/index.rs +++ b/bustubx/src/storage/index.rs @@ -1,9 +1,10 @@ use std::collections::VecDeque; -use std::ops::{Bound, Range, RangeBounds}; +use std::ops::{Bound, RangeBounds}; +use std::sync::atomic::Ordering; use std::sync::{Arc, RwLock}; -use crate::buffer::{Page, PageId, INVALID_PAGE_ID}; -use crate::catalog::{Schema, SchemaRef}; +use crate::buffer::{AtomicPageId, Page, PageId, INVALID_PAGE_ID}; +use crate::catalog::SchemaRef; use crate::common::util::page_bytes_to_array; use crate::storage::codec::{ BPlusTreeInternalPageCodec, BPlusTreeLeafPageCodec, BPlusTreePageCodec, @@ -39,7 +40,7 @@ pub struct BPlusTreeIndex { pub buffer_pool: Arc, pub leaf_max_size: u32, pub internal_max_size: u32, - pub root_page_id: PageId, + pub root_page_id: AtomicPageId, } impl BPlusTreeIndex { @@ -54,20 +55,20 @@ impl BPlusTreeIndex { buffer_pool, leaf_max_size, internal_max_size, - root_page_id: INVALID_PAGE_ID, + root_page_id: AtomicPageId::new(INVALID_PAGE_ID), } } pub fn is_empty(&self) -> bool { - self.root_page_id == INVALID_PAGE_ID + self.root_page_id.load(Ordering::SeqCst) == INVALID_PAGE_ID } - pub fn insert(&mut self, key: &Tuple, rid: Rid) -> BustubxResult<()> { + pub fn insert(&self, key: &Tuple, rid: Rid) -> BustubxResult<()> { if self.is_empty() { self.start_new_tree(key, rid)?; return Ok(()); } - let mut context = Context::new(self.root_page_id); + let mut context = Context::new(self.root_page_id.load(Ordering::SeqCst)); // 找到leaf page let Some(leaf_page) = self.find_leaf_page(key, &mut context)? else { return Err(BustubxError::Storage( @@ -106,7 +107,7 @@ impl BPlusTreeIndex { curr_page = parent_page; curr_tree_page = parent_tree_page; - } else if curr_page_id == self.root_page_id { + } else if curr_page_id == self.root_page_id.load(Ordering::SeqCst) { // new 一个新的root page let new_root_page = self.buffer_pool.new_page()?; let new_root_page_id = new_root_page.read().unwrap().page_id; @@ -114,8 +115,10 @@ impl BPlusTreeIndex { BPlusTreeInternalPage::new(self.key_schema.clone(), self.internal_max_size); // internal page第一个kv对的key为空 - new_root_internal_page - .insert(Tuple::empty(self.key_schema.clone()), self.root_page_id); + new_root_internal_page.insert( + Tuple::empty(self.key_schema.clone()), + self.root_page_id.load(Ordering::SeqCst), + ); new_root_internal_page.insert(internalkv.0, internalkv.1); new_root_page.write().unwrap().data = page_bytes_to_array( @@ -124,7 +127,7 @@ impl BPlusTreeIndex { self.buffer_pool.unpin_page(new_root_page.clone(), true)?; // 更新root page id - self.root_page_id = new_root_page_id; + self.root_page_id.store(new_root_page_id, Ordering::SeqCst); curr_page = new_root_page; curr_tree_page = BPlusTreePage::Internal(new_root_internal_page); @@ -138,11 +141,11 @@ impl BPlusTreeIndex { Ok(()) } - pub fn delete(&mut self, key: &Tuple) -> BustubxResult<()> { + pub fn delete(&self, key: &Tuple) -> BustubxResult<()> { if self.is_empty() { return Ok(()); } - let mut context = Context::new(self.root_page_id); + let mut context = Context::new(self.root_page_id.load(Ordering::SeqCst)); // 找到leaf page let Some(leaf_page) = self.find_leaf_page(key, &mut context)? else { return Err(BustubxError::Storage( @@ -161,7 +164,8 @@ impl BPlusTreeIndex { let mut curr_page_id = leaf_page.read().unwrap().page_id; // leaf page未达到半满则从兄弟节点借一个或合并 - while curr_tree_page.is_underflow(self.root_page_id == curr_page_id) { + while curr_tree_page.is_underflow(self.root_page_id.load(Ordering::SeqCst) == curr_page_id) + { let Some(parent_page_id) = context.read_set.pop_back() else { return Err(BustubxError::Storage("Cannot find parent page".to_string())); }; @@ -214,7 +218,7 @@ impl BPlusTreeIndex { unimplemented!() } - fn start_new_tree(&mut self, key: &Tuple, rid: Rid) -> BustubxResult<()> { + fn start_new_tree(&self, key: &Tuple, rid: Rid) -> BustubxResult<()> { let new_page = self.buffer_pool.new_page()?; let new_page_id = new_page.read().unwrap().page_id; @@ -225,7 +229,7 @@ impl BPlusTreeIndex { page_bytes_to_array(&BPlusTreeLeafPageCodec::encode(&leaf_page)); // 更新root page id - self.root_page_id = new_page_id; + self.root_page_id.store(new_page_id, Ordering::SeqCst); self.buffer_pool.unpin_page_id(new_page_id, true)?; Ok(()) @@ -238,7 +242,7 @@ impl BPlusTreeIndex { } // 找到leaf page - let mut context = Context::new(self.root_page_id); + let mut context = Context::new(self.root_page_id.load(Ordering::SeqCst)); let Some(leaf_page) = self.find_leaf_page(key, &mut context)? else { return Ok(None); }; @@ -259,7 +263,9 @@ impl BPlusTreeIndex { if self.is_empty() { return Ok(None); } - let mut curr_page = self.buffer_pool.fetch_page(self.root_page_id)?; + let mut curr_page = self + .buffer_pool + .fetch_page(self.root_page_id.load(Ordering::SeqCst))?; let (mut curr_tree_page, _) = BPlusTreePageCodec::decode(&curr_page.read().unwrap().data, self.key_schema.clone())?; @@ -291,7 +297,7 @@ impl BPlusTreeIndex { } // 分裂page - fn split(&mut self, tree_page: &mut BPlusTreePage) -> BustubxResult { + fn split(&self, tree_page: &mut BPlusTreePage) -> BustubxResult { let new_page = self.buffer_pool.new_page()?; let new_page_id = new_page.read().unwrap().page_id; @@ -332,7 +338,7 @@ impl BPlusTreeIndex { } fn borrow_min_kv( - &mut self, + &self, parent_page_id: PageId, page_id: PageId, borrowed_page_id: PageId, @@ -341,7 +347,7 @@ impl BPlusTreeIndex { } fn borrow_max_kv( - &mut self, + &self, parent_page_id: PageId, page_id: PageId, borrowed_page_id: PageId, @@ -350,7 +356,7 @@ impl BPlusTreeIndex { } fn borrow( - &mut self, + &self, parent_page_id: PageId, page_id: PageId, borrowed_page_id: PageId, @@ -439,7 +445,7 @@ impl BPlusTreeIndex { } fn find_sibling_pages( - &mut self, + &self, parent_page_id: PageId, child_page_id: PageId, ) -> BustubxResult<(Option, Option)> { @@ -453,7 +459,7 @@ impl BPlusTreeIndex { } fn merge( - &mut self, + &self, parent_page_id: PageId, left_page_id: PageId, right_page_id: PageId, @@ -511,8 +517,10 @@ impl BPlusTreeIndex { parent_internal_page.delete_page_id(right_page_id); // 根节点只有一个子节点(叶子)时,则叶子节点成为新的根节点 - if parent_page_id == self.root_page_id && parent_internal_page.header.current_size == 1 { - self.root_page_id = left_page_id; + if parent_page_id == self.root_page_id.load(Ordering::SeqCst) + && parent_internal_page.header.current_size == 1 + { + self.root_page_id.store(left_page_id, Ordering::SeqCst); // 删除旧的根节点 self.buffer_pool.unpin_page_id(parent_page_id, false)?; self.buffer_pool.delete_page(parent_page_id)?; @@ -526,16 +534,16 @@ impl BPlusTreeIndex { } // 查找子树最小的leafKV - fn find_subtree_min_leafkv(&mut self, page_id: PageId) -> BustubxResult { + fn find_subtree_min_leafkv(&self, page_id: PageId) -> BustubxResult { self.find_subtree_leafkv(page_id, true) } // 查找子树最大的leafKV - fn find_subtree_max_leafkv(&mut self, page_id: PageId) -> BustubxResult { + fn find_subtree_max_leafkv(&self, page_id: PageId) -> BustubxResult { self.find_subtree_leafkv(page_id, false) } - fn find_subtree_leafkv(&mut self, page_id: PageId, min_or_max: bool) -> BustubxResult { + fn find_subtree_leafkv(&self, page_id: PageId, min_or_max: bool) -> BustubxResult { let curr_page = self.buffer_pool.fetch_page(page_id)?; let (mut curr_tree_page, _) = BPlusTreePageCodec::decode(&curr_page.read().unwrap().data, self.key_schema.clone())?; @@ -568,17 +576,192 @@ impl BPlusTreeIndex { } } } + + pub fn get_first_leaf_page(&self) -> BustubxResult { + let curr_page = self + .buffer_pool + .fetch_page(self.root_page_id.load(Ordering::SeqCst))?; + let (mut curr_tree_page, _) = + BPlusTreePageCodec::decode(&curr_page.read().unwrap().data, self.key_schema.clone())?; + self.buffer_pool.unpin_page(curr_page.clone(), false)?; + loop { + match curr_tree_page { + BPlusTreePage::Internal(internal_page) => { + let next_page_id = internal_page.value_at(0); + let next_page = self.buffer_pool.fetch_page(next_page_id)?; + curr_tree_page = BPlusTreePageCodec::decode( + &next_page.read().unwrap().data, + self.key_schema.clone(), + )? + .0; + self.buffer_pool.unpin_page(next_page, false)?; + } + BPlusTreePage::Leaf(leaf_page) => { + return Ok(leaf_page); + } + } + } + } +} + +pub struct TreeIndexIterator { + index: Arc, + start_bound: Bound, + end_bound: Bound, + leaf_page: BPlusTreeLeafPage, + cursor: usize, + started: bool, } -pub struct TreeIndexIterator {} +impl TreeIndexIterator { + pub fn new>(index: Arc, range: R) -> Self { + Self { + index, + start_bound: range.start_bound().cloned(), + end_bound: range.end_bound().cloned(), + leaf_page: BPlusTreeLeafPage::empty(), + cursor: 0, + started: false, + } + } + + pub fn load_next_leaf_page(&mut self) -> BustubxResult { + let next_page_id = self.leaf_page.header.next_page_id; + if next_page_id == INVALID_PAGE_ID { + Ok(false) + } else { + let next_page = self.index.buffer_pool.fetch_page(next_page_id)?; + self.leaf_page = BPlusTreeLeafPageCodec::decode( + &next_page.read().unwrap().data, + self.index.key_schema.clone(), + )? + .0; + self.index + .buffer_pool + .unpin_page(next_page.clone(), false)?; + Ok(true) + } + } + + pub fn next(&mut self) -> BustubxResult> { + if self.started { + match self.end_bound.as_ref() { + Bound::Included(end_tuple) => { + self.cursor += 1; + let end_tuple = end_tuple.clone(); + let kv = if self.cursor >= self.leaf_page.header.current_size as usize { + if self.load_next_leaf_page()? { + self.cursor = 0; + self.leaf_page.array[self.cursor].clone() + } else { + return Ok(None); + } + } else { + self.leaf_page.array[self.cursor].clone() + }; + if kv.0 <= end_tuple { + Ok(Some(kv.1)) + } else { + Ok(None) + } + } + Bound::Excluded(end_tuple) => { + self.cursor += 1; + let end_tuple = end_tuple.clone(); + let kv = if self.cursor >= self.leaf_page.header.current_size as usize { + if self.load_next_leaf_page()? { + self.cursor = 0; + self.leaf_page.array[self.cursor].clone() + } else { + return Ok(None); + } + } else { + self.leaf_page.array[self.cursor].clone() + }; + if kv.0 < end_tuple { + Ok(Some(kv.1)) + } else { + Ok(None) + } + } + Bound::Unbounded => { + self.cursor += 1; + if self.cursor >= self.leaf_page.header.current_size as usize { + if self.load_next_leaf_page()? { + self.cursor = 0; + Ok(Some(self.leaf_page.array[self.cursor].1)) + } else { + Ok(None) + } + } else { + Ok(Some(self.leaf_page.array[self.cursor].1)) + } + } + } + } else { + self.started = true; + match self.start_bound.as_ref() { + Bound::Included(start_tuple) => { + let mut context = Context::new(self.index.root_page_id.load(Ordering::SeqCst)); + let Some(leaf_page) = self.index.find_leaf_page(start_tuple, &mut context)? + else { + return Ok(None); + }; + self.leaf_page = BPlusTreeLeafPageCodec::decode( + &leaf_page.read().unwrap().data, + self.index.key_schema.clone(), + )? + .0; + if let Some(idx) = self.leaf_page.next_closest(start_tuple, true) { + self.cursor = idx; + Ok(Some(self.leaf_page.array[self.cursor].1)) + } else if self.load_next_leaf_page()? { + self.cursor = 0; + Ok(Some(self.leaf_page.array[self.cursor].1)) + } else { + Ok(None) + } + } + Bound::Excluded(start_tuple) => { + let mut context = Context::new(self.index.root_page_id.load(Ordering::SeqCst)); + let Some(leaf_page) = self.index.find_leaf_page(start_tuple, &mut context)? + else { + return Ok(None); + }; + self.leaf_page = BPlusTreeLeafPageCodec::decode( + &leaf_page.read().unwrap().data, + self.index.key_schema.clone(), + )? + .0; + if let Some(idx) = self.leaf_page.next_closest(start_tuple, false) { + self.cursor = idx; + Ok(Some(self.leaf_page.array[self.cursor].1)) + } else if self.load_next_leaf_page()? { + self.cursor = 0; + Ok(Some(self.leaf_page.array[self.cursor].1)) + } else { + Ok(None) + } + } + Bound::Unbounded => { + self.leaf_page = self.index.get_first_leaf_page()?; + self.cursor = 0; + Ok(Some(self.leaf_page.array[self.cursor].1)) + } + } + } + } +} #[cfg(test)] mod tests { + use std::ops::Bound; use std::sync::Arc; use tempfile::TempDir; use crate::catalog::SchemaRef; use crate::common::util::pretty_format_index_tree; + use crate::storage::index::TreeIndexIterator; use crate::{ buffer::BufferPoolManager, catalog::{Column, DataType, Schema}, @@ -598,7 +781,7 @@ mod tests { ])); let disk_manager = DiskManager::try_new(temp_path).unwrap(); let buffer_pool = Arc::new(BufferPoolManager::new(1000, Arc::new(disk_manager))); - let mut index = BPlusTreeIndex::new(key_schema.clone(), buffer_pool, 4, 4); + let index = BPlusTreeIndex::new(key_schema.clone(), buffer_pool, 4, 4); index .insert( @@ -709,7 +892,7 @@ B+ Tree Level No.3: #[test] pub fn test_index_delete() { - let (mut index, key_schema) = build_index(); + let (index, key_schema) = build_index(); index .delete(&Tuple::new( @@ -759,7 +942,7 @@ B+ Tree Level No.2: #[test] pub fn test_index_get() { - let (mut index, key_schema) = build_index(); + let (index, key_schema) = build_index(); assert_eq!( index .get(&Tuple::new( @@ -779,4 +962,40 @@ B+ Tree Level No.2: Some(Rid::new(10, 10)) ); } + + #[test] + pub fn test_index_iterator() { + let (index, key_schema) = build_index(); + let index = Arc::new(index); + + let end_tuple1 = Tuple::new(key_schema.clone(), vec![3i8.into(), 3i16.into()]); + let mut iterator1 = TreeIndexIterator::new(index.clone(), ..end_tuple1); + assert_eq!(iterator1.next().unwrap(), Some(Rid::new(1, 1))); + assert_eq!(iterator1.next().unwrap(), Some(Rid::new(2, 2))); + assert_eq!(iterator1.next().unwrap(), None); + + let start_tuple2 = Tuple::new(key_schema.clone(), vec![3i8.into(), 3i16.into()]); + let end_tuple2 = Tuple::new(key_schema.clone(), vec![5i8.into(), 5i16.into()]); + let mut iterator2 = TreeIndexIterator::new(index.clone(), start_tuple2..=end_tuple2); + assert_eq!(iterator2.next().unwrap(), Some(Rid::new(3, 3))); + assert_eq!(iterator2.next().unwrap(), Some(Rid::new(4, 4))); + assert_eq!(iterator2.next().unwrap(), Some(Rid::new(5, 5))); + assert_eq!(iterator2.next().unwrap(), None); + + let start_tuple3 = Tuple::new(key_schema.clone(), vec![6i8.into(), 6i16.into()]); + let end_tuple3 = Tuple::new(key_schema.clone(), vec![8i8.into(), 8i16.into()]); + let mut iterator3 = TreeIndexIterator::new( + index.clone(), + (Bound::Excluded(start_tuple3), Bound::Excluded(end_tuple3)), + ); + assert_eq!(iterator3.next().unwrap(), Some(Rid::new(7, 7))); + + let start_tuple4 = Tuple::new(key_schema.clone(), vec![9i8.into(), 9i16.into()]); + let mut iterator4 = TreeIndexIterator::new(index.clone(), start_tuple4..); + assert_eq!(iterator4.next().unwrap(), Some(Rid::new(9, 9))); + assert_eq!(iterator4.next().unwrap(), Some(Rid::new(10, 10))); + assert_eq!(iterator4.next().unwrap(), Some(Rid::new(11, 11))); + assert_eq!(iterator4.next().unwrap(), None); + assert_eq!(iterator4.next().unwrap(), None); + } } diff --git a/bustubx/src/storage/page/index_page.rs b/bustubx/src/storage/page/index_page.rs index 597f08d..aa8736b 100644 --- a/bustubx/src/storage/page/index_page.rs +++ b/bustubx/src/storage/page/index_page.rs @@ -1,6 +1,7 @@ use crate::buffer::{PageId, INVALID_PAGE_ID}; -use crate::catalog::SchemaRef; +use crate::catalog::{Schema, SchemaRef}; use crate::{common::rid::Rid, Tuple}; +use std::sync::Arc; pub const BPLUS_INTERNAL_PAGE_MAX_SIZE: usize = 10; pub const BPLUS_LEAF_PAGE_MAX_SIZE: usize = 10; @@ -305,6 +306,19 @@ impl BPlusTreeLeafPage { } } + pub fn empty() -> Self { + Self { + schema: Arc::new(Schema::empty()), + header: BPlusTreeLeafPageHeader { + page_type: BPlusTreePageType::LeafPage, + current_size: 0, + max_size: 0, + next_page_id: INVALID_PAGE_ID, + }, + array: Vec::new(), + } + } + pub fn min_size(&self) -> u32 { self.header.max_size / 2 } @@ -386,6 +400,18 @@ impl BPlusTreeLeafPage { } None } + + pub fn next_closest(&self, tuple: &Tuple, included: bool) -> Option { + for (idx, (key, _)) in self.array.iter().enumerate() { + if tuple == key && included { + return Some(idx); + } + if key > tuple { + return Some(idx); + } + } + None + } } #[cfg(test)] diff --git a/bustubx/src/storage/table_heap.rs b/bustubx/src/storage/table_heap.rs index 49be1a8..0de219b 100644 --- a/bustubx/src/storage/table_heap.rs +++ b/bustubx/src/storage/table_heap.rs @@ -247,7 +247,7 @@ impl TableIterator { self.started = true; match self.start_bound { Bound::Included(rid) => { - self.cursor = Some(rid.clone()); + self.cursor = Some(rid); Some(self.heap.tuple(rid).unwrap()) } Bound::Excluded(rid) => { @@ -265,7 +265,7 @@ impl TableIterator { #[cfg(test)] mod tests { - use std::ops::RangeFull; + use std::sync::Arc; use tempfile::TempDir; @@ -433,7 +433,7 @@ mod tests { ) .unwrap(); - let mut iterator = TableIterator::new(table_heap.clone(), (..)); + let mut iterator = TableIterator::new(table_heap.clone(), ..); let (meta, tuple) = iterator.next().unwrap(); assert_eq!(meta, meta1);