diff --git a/bustubx/src/buffer/buffer_pool.rs b/bustubx/src/buffer/buffer_pool.rs index 226e56c..be882c9 100644 --- a/bustubx/src/buffer/buffer_pool.rs +++ b/bustubx/src/buffer/buffer_pool.rs @@ -5,7 +5,13 @@ use std::{collections::VecDeque, sync::Arc}; use crate::buffer::page::{Page, PageId}; use crate::buffer::PageRef; -use crate::storage::DiskManager; +use crate::catalog::SchemaRef; +use crate::storage::codec::{ + BPlusTreeInternalPageCodec, BPlusTreeLeafPageCodec, BPlusTreePageCodec, TablePageCodec, +}; +use crate::storage::{ + BPlusTreeInternalPage, BPlusTreeLeafPage, BPlusTreePage, DiskManager, TablePage, +}; use crate::{BustubxError, BustubxResult}; use super::replacer::LRUKReplacer; @@ -112,6 +118,49 @@ impl BufferPoolManager { } } + pub fn fetch_table_page( + &self, + page_id: PageId, + schema: SchemaRef, + ) -> BustubxResult<(PageRef, TablePage)> { + let page = self.fetch_page(page_id)?; + let (table_page, _) = TablePageCodec::decode(page.read().unwrap().data(), schema.clone())?; + Ok((page, table_page)) + } + + pub fn fetch_tree_page( + &self, + page_id: PageId, + key_schema: SchemaRef, + ) -> BustubxResult<(PageRef, BPlusTreePage)> { + let page = self.fetch_page(page_id)?; + let (tree_page, _) = + BPlusTreePageCodec::decode(page.read().unwrap().data(), key_schema.clone())?; + Ok((page, tree_page)) + } + + pub fn fetch_tree_internal_page( + &self, + page_id: PageId, + key_schema: SchemaRef, + ) -> BustubxResult<(PageRef, BPlusTreeInternalPage)> { + let page = self.fetch_page(page_id)?; + let (tree_internal_page, _) = + BPlusTreeInternalPageCodec::decode(page.read().unwrap().data(), key_schema.clone())?; + Ok((page, tree_internal_page)) + } + + pub fn fetch_tree_leaf_page( + &self, + page_id: PageId, + key_schema: SchemaRef, + ) -> BustubxResult<(PageRef, BPlusTreeLeafPage)> { + let page = self.fetch_page(page_id)?; + let (tree_leaf_page, _) = + BPlusTreeLeafPageCodec::decode(page.read().unwrap().data(), key_schema.clone())?; + Ok((page, tree_leaf_page)) + } + // 将缓冲池中指定页写回磁盘 pub fn flush_page(&self, page_id: PageId) -> BustubxResult { if let Some(frame_id) = self.page_table.get(&page_id) { diff --git a/bustubx/src/catalog/information.rs b/bustubx/src/catalog/information.rs index fee9bf1..b84c0a0 100644 --- a/bustubx/src/catalog/information.rs +++ b/bustubx/src/catalog/information.rs @@ -291,8 +291,9 @@ fn load_table_last_page_id( ) -> BustubxResult { let mut page_id = first_page_id; loop { - let page = catalog.buffer_pool.fetch_page(page_id)?; - let (table_page, _) = TablePageCodec::decode(page.read().unwrap().data(), schema.clone())?; + let (_, table_page) = catalog + .buffer_pool + .fetch_table_page(page_id, schema.clone())?; if table_page.header.next_page_id == INVALID_PAGE_ID { return Ok(page_id); diff --git a/bustubx/src/common/util.rs b/bustubx/src/common/util.rs index 5e02086..3ed472f 100644 --- a/bustubx/src/common/util.rs +++ b/bustubx/src/common/util.rs @@ -98,9 +98,9 @@ pub(crate) fn pretty_format_index_tree(index: &BPlusTreeIndex) -> BustubxResult< let mut level_row = vec![]; while let Some(page_id) = curr_queue.pop_front() { - let page = index.buffer_pool.fetch_page(page_id)?; - let (curr_page, _) = - BPlusTreePageCodec::decode(page.read().unwrap().data(), index.key_schema.clone())?; + let (_, curr_page) = index + .buffer_pool + .fetch_tree_page(page_id, index.key_schema.clone())?; match curr_page { BPlusTreePage::Internal(internal_page) => { diff --git a/bustubx/src/storage/index.rs b/bustubx/src/storage/index.rs index 7e80005..b022d31 100644 --- a/bustubx/src/storage/index.rs +++ b/bustubx/src/storage/index.rs @@ -100,11 +100,9 @@ impl BPlusTreeIndex { let curr_page_id = curr_page.read().unwrap().page_id; if let Some(parent_page_id) = context.read_set.pop_back() { // 更新父节点 - let parent_page = self.buffer_pool.fetch_page(parent_page_id)?; - let (mut parent_tree_page, _) = BPlusTreePageCodec::decode( - parent_page.read().unwrap().data(), - self.key_schema.clone(), - )?; + let (parent_page, mut parent_tree_page) = self + .buffer_pool + .fetch_tree_page(parent_page_id, self.key_schema.clone())?; parent_tree_page.insert_internalkv(internalkv); curr_page = parent_page; @@ -205,11 +203,9 @@ impl BPlusTreeIndex { "Cannot process index page borrow or merge".to_string(), )); }; - let new_parent_page = self.buffer_pool.fetch_page(new_parent_page_id)?; - let (new_parent_tree_page, _) = BPlusTreePageCodec::decode( - new_parent_page.read().unwrap().data(), - self.key_schema.clone(), - )?; + let (_, new_parent_tree_page) = self + .buffer_pool + .fetch_tree_page(new_parent_page_id, self.key_schema.clone())?; curr_page_id = new_parent_page_id; curr_tree_page = new_parent_tree_page; @@ -261,11 +257,10 @@ impl BPlusTreeIndex { if self.is_empty() { return Ok(None); } - let mut curr_page = self - .buffer_pool - .fetch_page(self.root_page_id.load(Ordering::SeqCst))?; - let (mut curr_tree_page, _) = - BPlusTreePageCodec::decode(curr_page.read().unwrap().data(), self.key_schema.clone())?; + let (mut curr_page, mut curr_tree_page) = self.buffer_pool.fetch_tree_page( + self.root_page_id.load(Ordering::SeqCst), + self.key_schema.clone(), + )?; // 找到leaf page loop { @@ -276,11 +271,9 @@ impl BPlusTreeIndex { .push_back(curr_page.read().unwrap().page_id); // 查找下一页 let next_page_id = internal_page.look_up(key); - let next_page = self.buffer_pool.fetch_page(next_page_id)?; - let (next_tree_page, _) = BPlusTreePageCodec::decode( - next_page.read().unwrap().data(), - self.key_schema.clone(), - )?; + let (next_page, next_tree_page) = self + .buffer_pool + .fetch_tree_page(next_page_id, self.key_schema.clone())?; curr_page = next_page; curr_tree_page = next_tree_page; } @@ -357,18 +350,16 @@ impl BPlusTreeIndex { borrowed_page_id: PageId, min_max: bool, ) -> BustubxResult { - let borrowed_page = self.buffer_pool.fetch_page(borrowed_page_id)?; - let (mut borrowed_tree_page, _) = BPlusTreePageCodec::decode( - borrowed_page.read().unwrap().data(), - self.key_schema.clone(), - )?; + let (borrowed_page, mut borrowed_tree_page) = self + .buffer_pool + .fetch_tree_page(borrowed_page_id, self.key_schema.clone())?; if !borrowed_tree_page.can_borrow() { return Ok(false); } - let page = self.buffer_pool.fetch_page(page_id)?; - let (mut tree_page, _) = - BPlusTreePageCodec::decode(page.read().unwrap().data(), self.key_schema.clone())?; + let (page, mut tree_page) = self + .buffer_pool + .fetch_tree_page(page_id, self.key_schema.clone())?; let (old_internal_key, new_internal_key) = match borrowed_tree_page { BPlusTreePage::Internal(ref mut borrowed_internal_page) => { @@ -430,11 +421,9 @@ impl BPlusTreeIndex { ))); // 更新父节点 - let parent_page = self.buffer_pool.fetch_page(parent_page_id)?; - let (mut parent_internal_page, _) = BPlusTreeInternalPageCodec::decode( - parent_page.read().unwrap().data(), - self.key_schema.clone(), - )?; + let (parent_page, mut parent_internal_page) = self + .buffer_pool + .fetch_tree_internal_page(parent_page_id, self.key_schema.clone())?; parent_internal_page.replace_key(&old_internal_key, new_internal_key); parent_page.write().unwrap().set_data(page_bytes_to_array( @@ -448,12 +437,10 @@ impl BPlusTreeIndex { parent_page_id: PageId, child_page_id: PageId, ) -> BustubxResult<(Option, Option)> { - let parent_page = self.buffer_pool.fetch_page(parent_page_id)?; - let (parent_page, _) = BPlusTreeInternalPageCodec::decode( - parent_page.read().unwrap().data(), - self.key_schema.clone(), - )?; - Ok(parent_page.sibling_page_ids(child_page_id)) + let (_, parent_internal_page) = self + .buffer_pool + .fetch_tree_internal_page(parent_page_id, self.key_schema.clone())?; + Ok(parent_internal_page.sibling_page_ids(child_page_id)) } fn merge( @@ -462,12 +449,12 @@ impl BPlusTreeIndex { left_page_id: PageId, right_page_id: PageId, ) -> BustubxResult { - let left_page = self.buffer_pool.fetch_page(left_page_id)?; - let (mut left_tree_page, _) = - BPlusTreePageCodec::decode(left_page.read().unwrap().data(), self.key_schema.clone())?; - let right_page = self.buffer_pool.fetch_page(right_page_id)?; - let (mut right_tree_page, _) = - BPlusTreePageCodec::decode(right_page.read().unwrap().data(), self.key_schema.clone())?; + let (left_page, mut left_tree_page) = self + .buffer_pool + .fetch_tree_page(left_page_id, self.key_schema.clone())?; + let (_, mut right_tree_page) = self + .buffer_pool + .fetch_tree_page(right_page_id, self.key_schema.clone())?; // 向左合入 match left_tree_page { @@ -509,11 +496,9 @@ impl BPlusTreeIndex { self.buffer_pool.delete_page(right_page_id)?; // 更新父节点 - let parent_page = self.buffer_pool.fetch_page(parent_page_id)?; - let (mut parent_internal_page, _) = BPlusTreeInternalPageCodec::decode( - parent_page.read().unwrap().data(), - self.key_schema.clone(), - )?; + let (parent_page, mut parent_internal_page) = self + .buffer_pool + .fetch_tree_internal_page(parent_page_id, self.key_schema.clone())?; parent_internal_page.delete_page_id(right_page_id); // 根节点只有一个子节点(叶子)时,则叶子节点成为新的根节点 @@ -543,9 +528,9 @@ impl BPlusTreeIndex { } fn find_subtree_leafkv(&self, page_id: PageId, min_or_max: bool) -> BustubxResult { - let curr_page = self.buffer_pool.fetch_page(page_id)?; - let (mut curr_tree_page, _) = - BPlusTreePageCodec::decode(curr_page.read().unwrap().data(), self.key_schema.clone())?; + let (_, mut curr_tree_page) = self + .buffer_pool + .fetch_tree_page(page_id, self.key_schema.clone())?; loop { match curr_tree_page { BPlusTreePage::Internal(internal_page) => { @@ -555,12 +540,10 @@ impl BPlusTreeIndex { internal_page.header.current_size as usize - 1 }; let next_page_id = internal_page.value_at(index); - let next_page = self.buffer_pool.fetch_page(next_page_id)?; - curr_tree_page = BPlusTreePageCodec::decode( - next_page.read().unwrap().data(), - self.key_schema.clone(), - )? - .0; + let (_, next_tree_page) = self + .buffer_pool + .fetch_tree_page(next_page_id, self.key_schema.clone())?; + curr_tree_page = next_tree_page; } BPlusTreePage::Leaf(leaf_page) => { let index = if min_or_max { @@ -575,21 +558,18 @@ impl BPlusTreeIndex { } pub fn get_first_leaf_page(&self) -> BustubxResult { - let curr_page = self - .buffer_pool - .fetch_page(self.root_page_id.load(Ordering::SeqCst))?; - let (mut curr_tree_page, _) = - BPlusTreePageCodec::decode(curr_page.read().unwrap().data(), self.key_schema.clone())?; + let (_, mut curr_tree_page) = self.buffer_pool.fetch_tree_page( + self.root_page_id.load(Ordering::SeqCst), + self.key_schema.clone(), + )?; loop { match curr_tree_page { BPlusTreePage::Internal(internal_page) => { let next_page_id = internal_page.value_at(0); - let next_page = self.buffer_pool.fetch_page(next_page_id)?; - curr_tree_page = BPlusTreePageCodec::decode( - next_page.read().unwrap().data(), - self.key_schema.clone(), - )? - .0; + let (_, next_tree_page) = self + .buffer_pool + .fetch_tree_page(next_page_id, self.key_schema.clone())?; + curr_tree_page = next_tree_page; } BPlusTreePage::Leaf(leaf_page) => { return Ok(leaf_page); @@ -626,12 +606,11 @@ impl TreeIndexIterator { if next_page_id == INVALID_PAGE_ID { Ok(false) } else { - let next_page = self.index.buffer_pool.fetch_page(next_page_id)?; - self.leaf_page = BPlusTreeLeafPageCodec::decode( - next_page.read().unwrap().data(), - self.index.key_schema.clone(), - )? - .0; + let (_, next_leaf_page) = self + .index + .buffer_pool + .fetch_tree_leaf_page(next_page_id, self.index.key_schema.clone())?; + self.leaf_page = next_leaf_page; Ok(true) } } diff --git a/bustubx/src/storage/table_heap.rs b/bustubx/src/storage/table_heap.rs index b3c0e36..4879c80 100644 --- a/bustubx/src/storage/table_heap.rs +++ b/bustubx/src/storage/table_heap.rs @@ -52,11 +52,11 @@ impl TableHeap { /// An `Option` containing the `Rid` of the inserted tuple if successful, otherwise `None`. pub fn insert_tuple(&self, meta: &TupleMeta, tuple: &Tuple) -> BustubxResult { let mut last_page_id = self.last_page_id.load(Ordering::SeqCst); - let last_page = self.buffer_pool.fetch_page(last_page_id)?; + let (last_page, mut last_table_page) = self + .buffer_pool + .fetch_table_page(last_page_id, self.schema.clone())?; // Loop until a suitable page is found for inserting the tuple - let (mut last_table_page, _) = - TablePageCodec::decode(last_page.read().unwrap().data(), self.schema.clone())?; loop { if last_table_page.next_tuple_offset(tuple).is_ok() { break; @@ -110,9 +110,9 @@ impl TableHeap { } pub fn update_tuple(&self, rid: RecordId, tuple: Tuple) -> BustubxResult<()> { - let page = self.buffer_pool.fetch_page(rid.page_id)?; - let (mut table_page, _) = - TablePageCodec::decode(page.read().unwrap().data(), self.schema.clone())?; + let (page, mut table_page) = self + .buffer_pool + .fetch_table_page(rid.page_id, self.schema.clone())?; table_page.update_tuple(tuple, rid.slot_num as u16)?; page.write() @@ -122,9 +122,9 @@ impl TableHeap { } pub fn update_tuple_meta(&self, meta: TupleMeta, rid: RecordId) -> BustubxResult<()> { - let page = self.buffer_pool.fetch_page(rid.page_id)?; - let (mut table_page, _) = - TablePageCodec::decode(page.read().unwrap().data(), self.schema.clone())?; + let (page, mut table_page) = self + .buffer_pool + .fetch_table_page(rid.page_id, self.schema.clone())?; table_page.update_tuple_meta(meta, rid.slot_num as u16)?; page.write() @@ -134,9 +134,9 @@ impl TableHeap { } pub fn full_tuple(&self, rid: RecordId) -> BustubxResult<(TupleMeta, Tuple)> { - let page = self.buffer_pool.fetch_page(rid.page_id)?; - let (table_page, _) = - TablePageCodec::decode(page.read().unwrap().data(), self.schema.clone())?; + let (_, table_page) = self + .buffer_pool + .fetch_table_page(rid.page_id, self.schema.clone())?; let result = table_page.tuple(rid.slot_num as u16)?; Ok(result) } @@ -153,9 +153,9 @@ impl TableHeap { pub fn get_first_rid(&self) -> BustubxResult> { let first_page_id = self.first_page_id.load(Ordering::SeqCst); - let page = self.buffer_pool.fetch_page(first_page_id)?; - let (table_page, _) = - TablePageCodec::decode(page.read().unwrap().data(), self.schema.clone())?; + let (_, table_page) = self + .buffer_pool + .fetch_table_page(first_page_id, self.schema.clone())?; if table_page.header.num_tuples == 0 { // TODO 忽略删除的tuple Ok(None) @@ -165,9 +165,9 @@ impl TableHeap { } pub fn get_next_rid(&self, rid: RecordId) -> BustubxResult> { - let page = self.buffer_pool.fetch_page(rid.page_id)?; - let (table_page, _) = - TablePageCodec::decode(page.read().unwrap().data(), self.schema.clone())?; + let (_, table_page) = self + .buffer_pool + .fetch_table_page(rid.page_id, self.schema.clone())?; let next_rid = table_page.get_next_rid(&rid); if next_rid.is_some() { return Ok(next_rid); @@ -176,11 +176,9 @@ impl TableHeap { if table_page.header.next_page_id == INVALID_PAGE_ID { return Ok(None); } - let next_page = self + let (_, next_table_page) = self .buffer_pool - .fetch_page(table_page.header.next_page_id)?; - let (next_table_page, _) = - TablePageCodec::decode(next_page.read().unwrap().data(), self.schema.clone())?; + .fetch_table_page(table_page.header.next_page_id, self.schema.clone())?; if next_table_page.header.num_tuples == 0 { // TODO 忽略删除的tuple Ok(None)