Skip to content

Commit

Permalink
Improve table iterator
Browse files Browse the repository at this point in the history
  • Loading branch information
lewiszlw committed Feb 25, 2024
1 parent 27f1690 commit fa6c987
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 66 deletions.
2 changes: 1 addition & 1 deletion bustubx/src/buffer/buffer_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::RwLock;
use std::{collections::VecDeque, sync::Arc};

use crate::buffer::page::{Page, PageId};
use crate::buffer::BUSTUBX_PAGE_SIZE;

use crate::storage::DiskManager;
use crate::{BustubxError, BustubxResult};

Expand Down
14 changes: 6 additions & 8 deletions bustubx/src/common/rid.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
use crate::buffer::PageId;
use crate::buffer::{PageId, INVALID_PAGE_ID};

pub const INVALID_RID: Rid = Rid {
page_id: INVALID_PAGE_ID,
slot_num: 0,
};

// TODO should move to table page?
// Record Identifier
Expand All @@ -7,10 +12,3 @@ pub struct Rid {
pub page_id: PageId,
pub slot_num: u32,
}

impl Rid {
pub const INVALID_RID: Self = Self {
page_id: u32::MAX,
slot_num: u32::MAX,
};
}
11 changes: 5 additions & 6 deletions bustubx/src/execution/physical_plan/seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,12 @@ impl VolcanoExecutor for PhysicalSeqScan {
}

fn next(&self, _context: &mut ExecutionContext) -> BustubxResult<Option<Tuple>> {
let mut guard = self.iterator.lock().unwrap();
match &mut *guard {
Some(x) => Ok(x.next().map(|full| full.1)),
None => Err(BustubxError::Execution(
let Some(iterator) = &mut *self.iterator.lock().unwrap() else {
return Err(BustubxError::Execution(
"table iterator not created".to_string(),
)),
}
));
};
Ok(iterator.next()?.map(|full| full.1))
}

fn output_schema(&self) -> SchemaRef {
Expand Down
109 changes: 58 additions & 51 deletions bustubx/src/storage/table_heap.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::buffer::{AtomicPageId, INVALID_PAGE_ID};
use crate::catalog::SchemaRef;
use crate::common::rid::INVALID_RID;
use crate::common::util::page_bytes_to_array;
use crate::storage::codec::TablePageCodec;
use crate::storage::{TablePage, TupleMeta};
Expand Down Expand Up @@ -125,6 +126,7 @@ impl TableHeap {
Ok(())
}

// FIXME
pub fn tuple(&self, rid: Rid) -> BustubxResult<(TupleMeta, Tuple)> {
let page = self.buffer_pool.fetch_page(rid.page_id)?;
let (table_page, _) =
Expand All @@ -143,53 +145,45 @@ impl TableHeap {
Ok(result)
}

pub fn get_first_rid(&self) -> Option<Rid> {
pub fn get_first_rid(&self) -> BustubxResult<Option<Rid>> {
let first_page_id = self.first_page_id.load(Ordering::SeqCst);
let page = self
.buffer_pool
.fetch_page(first_page_id)
.expect("Can not fetch page");
let page = self.buffer_pool.fetch_page(first_page_id)?;
let (table_page, _) =
TablePageCodec::decode(page.read().unwrap().data(), self.schema.clone()).unwrap();
self.buffer_pool.unpin_page_id(first_page_id).unwrap();
TablePageCodec::decode(page.read().unwrap().data(), self.schema.clone())?;
self.buffer_pool.unpin_page_id(first_page_id)?;
if table_page.header.num_tuples == 0 {
// TODO 忽略删除的tuple
None
Ok(None)
} else {
Some(Rid::new(first_page_id, 0))
Ok(Some(Rid::new(first_page_id, 0)))
}
}

pub fn get_next_rid(&self, rid: Rid) -> Option<Rid> {
let page = self
.buffer_pool
.fetch_page(rid.page_id)
.expect("Can not fetch page");
pub fn get_next_rid(&self, rid: Rid) -> BustubxResult<Option<Rid>> {
let page = self.buffer_pool.fetch_page(rid.page_id)?;
let (table_page, _) =
TablePageCodec::decode(page.read().unwrap().data(), self.schema.clone()).unwrap();
self.buffer_pool.unpin_page_id(rid.page_id).unwrap();
TablePageCodec::decode(page.read().unwrap().data(), self.schema.clone())?;
self.buffer_pool.unpin_page_id(rid.page_id)?;
let next_rid = table_page.get_next_rid(&rid);
if next_rid.is_some() {
return next_rid;
return Ok(next_rid);
}

if table_page.header.next_page_id == INVALID_PAGE_ID {
return None;
return Ok(None);
}
let next_page = self
.buffer_pool
.fetch_page(table_page.header.next_page_id)
.expect("Can not fetch page");
.fetch_page(table_page.header.next_page_id)?;
let (next_table_page, _) =
TablePageCodec::decode(next_page.read().unwrap().data(), self.schema.clone()).unwrap();
TablePageCodec::decode(next_page.read().unwrap().data(), self.schema.clone())?;
self.buffer_pool
.unpin_page_id(table_page.header.next_page_id)
.unwrap();
.unpin_page_id(table_page.header.next_page_id)?;
if next_table_page.header.num_tuples == 0 {
// TODO 忽略删除的tuple
None
Ok(None)
} else {
Some(Rid::new(table_page.header.next_page_id, 0))
Ok(Some(Rid::new(table_page.header.next_page_id, 0)))
}
}
}
Expand All @@ -199,7 +193,7 @@ pub struct TableIterator {
heap: Arc<TableHeap>,
start_bound: Bound<Rid>,
end_bound: Bound<Rid>,
cursor: Option<Rid>,
cursor: Rid,
started: bool,
ended: bool,
}
Expand All @@ -210,62 +204,75 @@ impl TableIterator {
heap,
start_bound: range.start_bound().cloned(),
end_bound: range.end_bound().cloned(),
cursor: None,
cursor: INVALID_RID,
started: false,
ended: false,
}
}

pub fn next(&mut self) -> Option<(TupleMeta, Tuple)> {
pub fn next(&mut self) -> BustubxResult<Option<(TupleMeta, Tuple)>> {
if self.ended {
return None;
return Ok(None);
}

if self.started {
match self.end_bound {
Bound::Included(rid) => {
if let Some(next_rid) = self.heap.get_next_rid(self.cursor.unwrap()) {
if let Some(next_rid) = self.heap.get_next_rid(self.cursor)? {
if next_rid == rid {
self.ended = true;
}
self.cursor = Some(next_rid);
self.cursor.map(|rid| self.heap.tuple(rid).unwrap())
self.cursor = next_rid;
Ok(self.heap.tuple(self.cursor).ok())
} else {
None
Ok(None)
}
}
Bound::Excluded(rid) => {
if let Some(next_rid) = self.heap.get_next_rid(self.cursor.unwrap()) {
if let Some(next_rid) = self.heap.get_next_rid(self.cursor)? {
if next_rid == rid {
None
Ok(None)
} else {
self.cursor = Some(next_rid);
self.cursor.map(|rid| self.heap.tuple(rid).unwrap())
self.cursor = next_rid;
Ok(self.heap.tuple(self.cursor).ok())
}
} else {
None
Ok(None)
}
}
Bound::Unbounded => {
let next_rid = self.heap.get_next_rid(self.cursor.unwrap());
self.cursor = next_rid;
self.cursor.map(|rid| self.heap.tuple(rid).unwrap())
if let Some(next_rid) = self.heap.get_next_rid(self.cursor)? {
self.cursor = next_rid;
Ok(self.heap.tuple(self.cursor).ok())
} else {
Ok(None)
}
}
}
} else {
self.started = true;
match self.start_bound {
Bound::Included(rid) => {
self.cursor = Some(rid);
Some(self.heap.tuple(rid).unwrap())
self.cursor = rid;
Ok(Some(self.heap.tuple(rid).unwrap()))
}
Bound::Excluded(rid) => {
self.cursor = self.heap.get_next_rid(rid);
self.cursor.map(|rid| self.heap.tuple(rid).unwrap())
if let Some(next_rid) = self.heap.get_next_rid(rid)? {
self.cursor = next_rid;
Ok(self.heap.tuple(self.cursor).ok())
} else {
self.ended = true;
Ok(None)
}
}
Bound::Unbounded => {
self.cursor = self.heap.get_first_rid();
self.cursor.map(|rid| self.heap.tuple(rid).unwrap())
if let Some(first_rid) = self.heap.get_first_rid()? {
self.cursor = first_rid;
Ok(self.heap.tuple(self.cursor).ok())
} else {
self.ended = true;
Ok(None)
}
}
}
}
Expand Down Expand Up @@ -444,18 +451,18 @@ mod tests {

let mut iterator = TableIterator::new(table_heap.clone(), ..);

let (meta, tuple) = iterator.next().unwrap();
let (meta, tuple) = iterator.next().unwrap().unwrap();
assert_eq!(meta, meta1);
assert_eq!(tuple.data, vec![1i8.into(), 1i16.into()]);

let (meta, tuple) = iterator.next().unwrap();
let (meta, tuple) = iterator.next().unwrap().unwrap();
assert_eq!(meta, meta2);
assert_eq!(tuple.data, vec![2i8.into(), 2i16.into()]);

let (meta, tuple) = iterator.next().unwrap();
let (meta, tuple) = iterator.next().unwrap().unwrap();
assert_eq!(meta, meta3);
assert_eq!(tuple.data, vec![3i8.into(), 3i16.into()]);

assert!(iterator.next().is_none());
assert!(iterator.next().unwrap().is_none());
}
}

0 comments on commit fa6c987

Please sign in to comment.