Skip to content

Commit

Permalink
Fix table heap iterator
Browse files Browse the repository at this point in the history
  • Loading branch information
lewiszlw committed Feb 24, 2024
1 parent c563dae commit ef5f736
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 49 deletions.
30 changes: 12 additions & 18 deletions bustubx/src/execution/physical_plan/seq_scan.rs
Original file line number Diff line number Diff line change
@@ -1,52 +1,46 @@
use log::debug;
use std::ops::{Bound, RangeBounds, RangeFull};
use std::sync::Mutex;

use crate::catalog::SchemaRef;
use crate::common::rid::Rid;
use crate::common::TableReference;
use crate::{
execution::{ExecutionContext, VolcanoExecutor},
storage::{TableIterator, Tuple},
BustubxResult,
BustubxError, BustubxResult,
};

#[derive(Debug)]
pub struct PhysicalSeqScan {
pub table: TableReference,
pub table_schema: SchemaRef,

iterator: Mutex<TableIterator>,
iterator: Mutex<Option<TableIterator>>,
}

impl PhysicalSeqScan {
pub fn new(table: TableReference, table_schema: SchemaRef) -> Self {
PhysicalSeqScan {
table,
table_schema,
iterator: Mutex::new(TableIterator::new(
Bound::Unbounded,
Bound::Unbounded,
None,
false,
false,
)),
iterator: Mutex::new(None),
}
}
}

impl VolcanoExecutor for PhysicalSeqScan {
fn init(&self, context: &mut ExecutionContext) -> BustubxResult<()> {
let table_heap = context.catalog.table_heap(&self.table)?;
*self.iterator.lock().unwrap() = table_heap.scan(RangeFull);
*self.iterator.lock().unwrap() = Some(TableIterator::new(table_heap, (..)));

Check warning on line 32 in bustubx/src/execution/physical_plan/seq_scan.rs

View workflow job for this annotation

GitHub Actions / Test Suite

unnecessary parentheses around function argument
Ok(())
}

fn next(&self, context: &mut ExecutionContext) -> BustubxResult<Option<Tuple>> {
let table_heap = context.catalog.table_heap(&self.table)?;
let mut iterator = self.iterator.lock().unwrap();
let full_tuple = iterator.next(&table_heap);
Ok(full_tuple.map(|t| t.1))
fn next(&self, _context: &mut ExecutionContext) -> BustubxResult<Option<Tuple>> {
let mut guard = self.iterator.lock().unwrap();
match &mut *guard {
Some(x) => Ok(x.next().map(|full| full.1)),
None => Err(BustubxError::Execution(
"table iterator not created".to_string(),
)),
}
}

fn output_schema(&self) -> SchemaRef {
Expand Down
4 changes: 2 additions & 2 deletions bustubx/src/storage/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ pub struct BPlusTreeIndex {
pub root_page_id: PageId,
}

pub struct TreeIndexIterator {}

impl BPlusTreeIndex {
pub fn new(
key_schema: SchemaRef,
Expand Down Expand Up @@ -572,6 +570,8 @@ impl BPlusTreeIndex {
}
}

pub struct TreeIndexIterator {}

#[cfg(test)]
mod tests {
use std::sync::Arc;
Expand Down
62 changes: 33 additions & 29 deletions bustubx/src/storage/table_heap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,20 +183,11 @@ impl TableHeap {
Some(Rid::new(table_page.header.next_page_id, 0))
}
}

pub fn scan<R: RangeBounds<Rid>>(&self, rang: R) -> TableIterator {
TableIterator {
start_bound: rang.start_bound().cloned(),
end_bound: rang.end_bound().cloned(),
cursor: None,
started: false,
ended: false,
}
}
}

#[derive(derive_new::new, Debug)]
#[derive(Debug)]
pub struct TableIterator {
heap: Arc<TableHeap>,
start_bound: Bound<Rid>,
end_bound: Bound<Rid>,
cursor: Option<Rid>,
Expand All @@ -205,55 +196,67 @@ pub struct TableIterator {
}

impl TableIterator {
pub fn next(&mut self, table_heap: &TableHeap) -> Option<(TupleMeta, Tuple)> {
pub fn new<R: RangeBounds<Rid>>(heap: Arc<TableHeap>, range: R) -> Self {
Self {
heap,
start_bound: range.start_bound().cloned(),
end_bound: range.end_bound().cloned(),
cursor: None,
started: false,
ended: false,
}
}

pub fn next(&mut self) -> Option<(TupleMeta, Tuple)> {
if self.ended {
return None;
}

if self.started {
match self.end_bound {
Bound::Included(rid) => {
if let Some(next_rid) = table_heap.get_next_rid(self.cursor.unwrap()) {
if let Some(next_rid) = self.heap.get_next_rid(self.cursor.unwrap()) {
if next_rid == rid {
self.ended = true;
}
self.cursor = Some(next_rid);
self.cursor.map(|rid| table_heap.tuple(rid).unwrap())
self.cursor.map(|rid| self.heap.tuple(rid).unwrap())
} else {
None
}
}
Bound::Excluded(rid) => {
if let Some(next_rid) = table_heap.get_next_rid(self.cursor.unwrap()) {
if let Some(next_rid) = self.heap.get_next_rid(self.cursor.unwrap()) {
if next_rid == rid {
None
} else {
self.cursor = Some(next_rid);
self.cursor.map(|rid| table_heap.tuple(rid).unwrap())
self.cursor.map(|rid| self.heap.tuple(rid).unwrap())
}
} else {
None
}
}
Bound::Unbounded => {
let next_rid = table_heap.get_next_rid(self.cursor.unwrap());
let next_rid = self.heap.get_next_rid(self.cursor.unwrap());
self.cursor = next_rid;
self.cursor.map(|rid| table_heap.tuple(rid).unwrap())
self.cursor.map(|rid| self.heap.tuple(rid).unwrap())
}
}
} else {
self.started = true;
match self.start_bound {
Bound::Included(rid) => {
self.cursor = Some(rid.clone());
Some(table_heap.tuple(rid).unwrap())
Some(self.heap.tuple(rid).unwrap())
}
Bound::Excluded(rid) => {
self.cursor = table_heap.get_next_rid(rid);
self.cursor.map(|rid| table_heap.tuple(rid).unwrap())
self.cursor = self.heap.get_next_rid(rid);
self.cursor.map(|rid| self.heap.tuple(rid).unwrap())
}
Bound::Unbounded => {
self.cursor = table_heap.get_first_rid();
self.cursor.map(|rid| table_heap.tuple(rid).unwrap())
self.cursor = self.heap.get_first_rid();
self.cursor.map(|rid| self.heap.tuple(rid).unwrap())
}
}
}
Expand All @@ -267,6 +270,7 @@ mod tests {
use tempfile::TempDir;

use crate::catalog::{Column, DataType, Schema};
use crate::storage::TableIterator;
use crate::{
buffer::BufferPoolManager,
storage::{table_heap::TableHeap, DiskManager, Tuple},
Expand Down Expand Up @@ -393,7 +397,7 @@ mod tests {

let disk_manager = DiskManager::try_new(temp_path).unwrap();
let buffer_pool = Arc::new(BufferPoolManager::new(1000, Arc::new(disk_manager)));
let table_heap = TableHeap::try_new(schema.clone(), buffer_pool).unwrap();
let table_heap = Arc::new(TableHeap::try_new(schema.clone(), buffer_pool).unwrap());

let meta1 = super::TupleMeta {
insert_txn_id: 1,
Expand Down Expand Up @@ -429,20 +433,20 @@ mod tests {
)
.unwrap();

let mut iterator = table_heap.scan(RangeFull);
let mut iterator = TableIterator::new(table_heap.clone(), (..));

let (meta, tuple) = iterator.next(&table_heap).unwrap();
let (meta, tuple) = iterator.next().unwrap();
assert_eq!(meta, meta1);
assert_eq!(tuple.data, vec![1i8.into(), 1i16.into()]);

let (meta, tuple) = iterator.next(&table_heap).unwrap();
let (meta, tuple) = iterator.next().unwrap();
assert_eq!(meta, meta2);
assert_eq!(tuple.data, vec![2i8.into(), 2i16.into()]);

let (meta, tuple) = iterator.next(&table_heap).unwrap();
let (meta, tuple) = iterator.next().unwrap();
assert_eq!(meta, meta3);
assert_eq!(tuple.data, vec![3i8.into(), 3i16.into()]);

assert!(iterator.next(&table_heap).is_none());
assert!(iterator.next().is_none());
}
}

0 comments on commit ef5f736

Please sign in to comment.