diff --git a/bustubx/src/buffer/buffer_pool.rs b/bustubx/src/buffer/buffer_pool.rs index c567094..23a3eb3 100644 --- a/bustubx/src/buffer/buffer_pool.rs +++ b/bustubx/src/buffer/buffer_pool.rs @@ -3,7 +3,7 @@ use std::sync::RwLock; use std::{collections::VecDeque, sync::Arc}; use crate::buffer::page::{Page, PageId}; -use crate::buffer::BUSTUBX_PAGE_SIZE; + use crate::storage::DiskManager; use crate::{BustubxError, BustubxResult}; diff --git a/bustubx/src/common/rid.rs b/bustubx/src/common/rid.rs index 09fcf93..43a97b0 100644 --- a/bustubx/src/common/rid.rs +++ b/bustubx/src/common/rid.rs @@ -1,4 +1,9 @@ -use crate::buffer::PageId; +use crate::buffer::{PageId, INVALID_PAGE_ID}; + +pub const INVALID_RID: Rid = Rid { + page_id: INVALID_PAGE_ID, + slot_num: 0, +}; // TODO should move to table page? // Record Identifier @@ -7,10 +12,3 @@ pub struct Rid { pub page_id: PageId, pub slot_num: u32, } - -impl Rid { - pub const INVALID_RID: Self = Self { - page_id: u32::MAX, - slot_num: u32::MAX, - }; -} diff --git a/bustubx/src/execution/physical_plan/seq_scan.rs b/bustubx/src/execution/physical_plan/seq_scan.rs index e92cd55..8424c29 100644 --- a/bustubx/src/execution/physical_plan/seq_scan.rs +++ b/bustubx/src/execution/physical_plan/seq_scan.rs @@ -34,13 +34,12 @@ impl VolcanoExecutor for PhysicalSeqScan { } fn next(&self, _context: &mut ExecutionContext) -> BustubxResult> { - let mut guard = self.iterator.lock().unwrap(); - match &mut *guard { - Some(x) => Ok(x.next().map(|full| full.1)), - None => Err(BustubxError::Execution( + let Some(iterator) = &mut *self.iterator.lock().unwrap() else { + return Err(BustubxError::Execution( "table iterator not created".to_string(), - )), - } + )); + }; + Ok(iterator.next()?.map(|full| full.1)) } fn output_schema(&self) -> SchemaRef { diff --git a/bustubx/src/storage/table_heap.rs b/bustubx/src/storage/table_heap.rs index 00b8921..6194b13 100644 --- a/bustubx/src/storage/table_heap.rs +++ b/bustubx/src/storage/table_heap.rs @@ -1,5 +1,6 @@ use crate::buffer::{AtomicPageId, INVALID_PAGE_ID}; use crate::catalog::SchemaRef; +use crate::common::rid::INVALID_RID; use crate::common::util::page_bytes_to_array; use crate::storage::codec::TablePageCodec; use crate::storage::{TablePage, TupleMeta}; @@ -125,6 +126,7 @@ impl TableHeap { Ok(()) } + // FIXME pub fn tuple(&self, rid: Rid) -> BustubxResult<(TupleMeta, Tuple)> { let page = self.buffer_pool.fetch_page(rid.page_id)?; let (table_page, _) = @@ -143,53 +145,45 @@ impl TableHeap { Ok(result) } - pub fn get_first_rid(&self) -> Option { + pub fn get_first_rid(&self) -> BustubxResult> { let first_page_id = self.first_page_id.load(Ordering::SeqCst); - let page = self - .buffer_pool - .fetch_page(first_page_id) - .expect("Can not fetch page"); + let page = self.buffer_pool.fetch_page(first_page_id)?; let (table_page, _) = - TablePageCodec::decode(page.read().unwrap().data(), self.schema.clone()).unwrap(); - self.buffer_pool.unpin_page_id(first_page_id).unwrap(); + TablePageCodec::decode(page.read().unwrap().data(), self.schema.clone())?; + self.buffer_pool.unpin_page_id(first_page_id)?; if table_page.header.num_tuples == 0 { // TODO 忽略删除的tuple - None + Ok(None) } else { - Some(Rid::new(first_page_id, 0)) + Ok(Some(Rid::new(first_page_id, 0))) } } - pub fn get_next_rid(&self, rid: Rid) -> Option { - let page = self - .buffer_pool - .fetch_page(rid.page_id) - .expect("Can not fetch page"); + pub fn get_next_rid(&self, rid: Rid) -> BustubxResult> { + let page = self.buffer_pool.fetch_page(rid.page_id)?; let (table_page, _) = - TablePageCodec::decode(page.read().unwrap().data(), self.schema.clone()).unwrap(); - self.buffer_pool.unpin_page_id(rid.page_id).unwrap(); + TablePageCodec::decode(page.read().unwrap().data(), self.schema.clone())?; + self.buffer_pool.unpin_page_id(rid.page_id)?; let next_rid = table_page.get_next_rid(&rid); if next_rid.is_some() { - return next_rid; + return Ok(next_rid); } if table_page.header.next_page_id == INVALID_PAGE_ID { - return None; + return Ok(None); } let next_page = self .buffer_pool - .fetch_page(table_page.header.next_page_id) - .expect("Can not fetch page"); + .fetch_page(table_page.header.next_page_id)?; let (next_table_page, _) = - TablePageCodec::decode(next_page.read().unwrap().data(), self.schema.clone()).unwrap(); + TablePageCodec::decode(next_page.read().unwrap().data(), self.schema.clone())?; self.buffer_pool - .unpin_page_id(table_page.header.next_page_id) - .unwrap(); + .unpin_page_id(table_page.header.next_page_id)?; if next_table_page.header.num_tuples == 0 { // TODO 忽略删除的tuple - None + Ok(None) } else { - Some(Rid::new(table_page.header.next_page_id, 0)) + Ok(Some(Rid::new(table_page.header.next_page_id, 0))) } } } @@ -199,7 +193,7 @@ pub struct TableIterator { heap: Arc, start_bound: Bound, end_bound: Bound, - cursor: Option, + cursor: Rid, started: bool, ended: bool, } @@ -210,62 +204,75 @@ impl TableIterator { heap, start_bound: range.start_bound().cloned(), end_bound: range.end_bound().cloned(), - cursor: None, + cursor: INVALID_RID, started: false, ended: false, } } - pub fn next(&mut self) -> Option<(TupleMeta, Tuple)> { + pub fn next(&mut self) -> BustubxResult> { if self.ended { - return None; + return Ok(None); } if self.started { match self.end_bound { Bound::Included(rid) => { - if let Some(next_rid) = self.heap.get_next_rid(self.cursor.unwrap()) { + if let Some(next_rid) = self.heap.get_next_rid(self.cursor)? { if next_rid == rid { self.ended = true; } - self.cursor = Some(next_rid); - self.cursor.map(|rid| self.heap.tuple(rid).unwrap()) + self.cursor = next_rid; + Ok(self.heap.tuple(self.cursor).ok()) } else { - None + Ok(None) } } Bound::Excluded(rid) => { - if let Some(next_rid) = self.heap.get_next_rid(self.cursor.unwrap()) { + if let Some(next_rid) = self.heap.get_next_rid(self.cursor)? { if next_rid == rid { - None + Ok(None) } else { - self.cursor = Some(next_rid); - self.cursor.map(|rid| self.heap.tuple(rid).unwrap()) + self.cursor = next_rid; + Ok(self.heap.tuple(self.cursor).ok()) } } else { - None + Ok(None) } } Bound::Unbounded => { - let next_rid = self.heap.get_next_rid(self.cursor.unwrap()); - self.cursor = next_rid; - self.cursor.map(|rid| self.heap.tuple(rid).unwrap()) + if let Some(next_rid) = self.heap.get_next_rid(self.cursor)? { + self.cursor = next_rid; + Ok(self.heap.tuple(self.cursor).ok()) + } else { + Ok(None) + } } } } else { self.started = true; match self.start_bound { Bound::Included(rid) => { - self.cursor = Some(rid); - Some(self.heap.tuple(rid).unwrap()) + self.cursor = rid; + Ok(Some(self.heap.tuple(rid).unwrap())) } Bound::Excluded(rid) => { - self.cursor = self.heap.get_next_rid(rid); - self.cursor.map(|rid| self.heap.tuple(rid).unwrap()) + if let Some(next_rid) = self.heap.get_next_rid(rid)? { + self.cursor = next_rid; + Ok(self.heap.tuple(self.cursor).ok()) + } else { + self.ended = true; + Ok(None) + } } Bound::Unbounded => { - self.cursor = self.heap.get_first_rid(); - self.cursor.map(|rid| self.heap.tuple(rid).unwrap()) + if let Some(first_rid) = self.heap.get_first_rid()? { + self.cursor = first_rid; + Ok(self.heap.tuple(self.cursor).ok()) + } else { + self.ended = true; + Ok(None) + } } } } @@ -444,18 +451,18 @@ mod tests { let mut iterator = TableIterator::new(table_heap.clone(), ..); - let (meta, tuple) = iterator.next().unwrap(); + let (meta, tuple) = iterator.next().unwrap().unwrap(); assert_eq!(meta, meta1); assert_eq!(tuple.data, vec![1i8.into(), 1i16.into()]); - let (meta, tuple) = iterator.next().unwrap(); + let (meta, tuple) = iterator.next().unwrap().unwrap(); assert_eq!(meta, meta2); assert_eq!(tuple.data, vec![2i8.into(), 2i16.into()]); - let (meta, tuple) = iterator.next().unwrap(); + let (meta, tuple) = iterator.next().unwrap().unwrap(); assert_eq!(meta, meta3); assert_eq!(tuple.data, vec![3i8.into(), 3i16.into()]); - assert!(iterator.next().is_none()); + assert!(iterator.next().unwrap().is_none()); } }