Skip to content

Commit

Permalink
Implement physical update
Browse files Browse the repository at this point in the history
  • Loading branch information
lewiszlw committed Feb 29, 2024
1 parent fd3930e commit fefef00
Show file tree
Hide file tree
Showing 9 changed files with 130 additions and 42 deletions.
3 changes: 1 addition & 2 deletions bustubx/src/execution/physical_plan/index_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ impl VolcanoExecutor for PhysicalIndexScan {
};
let table_heap = context.catalog.table_heap(&self.table_ref)?;
if let Some(rid) = iterator.next()? {
let (_, tuple) = table_heap.tuple(rid)?;
Ok(Some(tuple))
Ok(Some(table_heap.tuple(rid)?))
} else {
Ok(None)
}
Expand Down
3 changes: 1 addition & 2 deletions bustubx/src/execution/physical_plan/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ impl VolcanoExecutor for PhysicalInsert {
return if self.insert_rows.load(Ordering::SeqCst) == 0 {
Ok(None)
} else {
let insert_rows = self.insert_rows.load(Ordering::SeqCst);
self.insert_rows.store(0, Ordering::SeqCst);
let insert_rows = self.insert_rows.swap(0, Ordering::SeqCst);
Ok(Some(Tuple::new(
self.output_schema(),
vec![ScalarValue::Int32(Some(insert_rows as i32))],
Expand Down
37 changes: 33 additions & 4 deletions bustubx/src/execution/physical_plan/update.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::catalog::{SchemaRef, UPDATE_OUTPUT_SCHEMA_REF};
use crate::common::TableReference;
use crate::common::{ScalarValue, TableReference};
use crate::execution::{ExecutionContext, VolcanoExecutor};
use crate::expression::Expr;
use crate::expression::{Expr, ExprTrait};
use crate::storage::{TableIterator, EMPTY_TUPLE};
use crate::{BustubxResult, Tuple};
use std::collections::HashMap;
use std::sync::atomic::{AtomicU32, Ordering};
Expand Down Expand Up @@ -40,8 +41,36 @@ impl VolcanoExecutor for PhysicalUpdate {
}

fn next(&self, context: &mut ExecutionContext) -> BustubxResult<Option<Tuple>> {
// TODO implementation
Ok(None)
// TODO may scan index
let table_heap = context.catalog.table_heap(&self.table)?;
let mut table_iterator = TableIterator::new(table_heap.clone(), ..);
loop {
if let Some((rid, mut tuple)) = table_iterator.next()? {
if let Some(selection) = &self.selection {
if !selection.evaluate(&tuple)?.as_boolean()?.unwrap_or(false) {
continue;
}
}
// update tuple data
for (col_name, value_expr) in self.assignments.iter() {
let new_value = value_expr.evaluate(&EMPTY_TUPLE)?;
let index = tuple.schema.index_of(None, &col_name)?;
tuple.data[index] = new_value;
}
table_heap.update_tuple(rid, tuple)?;
self.update_rows.fetch_add(1, Ordering::SeqCst);
} else {
return if self.update_rows.load(Ordering::SeqCst) == 0 {
Ok(None)
} else {
let update_rows = self.update_rows.swap(0, Ordering::SeqCst);
Ok(Some(Tuple::new(
self.output_schema(),
vec![ScalarValue::Int32(Some(update_rows as i32))],
)))
};
}
}
}

fn output_schema(&self) -> SchemaRef {
Expand Down
3 changes: 2 additions & 1 deletion bustubx/src/execution/physical_plan/values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::sync::Arc;
use crate::catalog::SchemaRef;
use crate::common::ScalarValue;
use crate::expression::{Expr, ExprTrait};
use crate::storage::EMPTY_TUPLE;
use crate::{
catalog::Schema,

Check warning on line 9 in bustubx/src/execution/physical_plan/values.rs

View workflow job for this annotation

GitHub Actions / Test Suite

unused import: `catalog::Schema`
execution::{ExecutionContext, VolcanoExecutor},
Expand Down Expand Up @@ -35,7 +36,7 @@ impl VolcanoExecutor for PhysicalValues {
if cursor < self.rows.len() {
let values = self.rows[cursor]
.iter()
.map(|e| e.evaluate(&Tuple::empty(Arc::new(Schema::empty()))))
.map(|e| e.evaluate(&EMPTY_TUPLE))
.collect::<BustubxResult<Vec<ScalarValue>>>()?;
debug_assert_eq!(self.schema.column_count(), values.len());

Expand Down
2 changes: 1 addition & 1 deletion bustubx/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ mod tuple;
pub use disk_manager::DiskManager;
pub use page::*;
pub use table_heap::{TableHeap, TableIterator};
pub use tuple::Tuple;
pub use tuple::*;
25 changes: 22 additions & 3 deletions bustubx/src/storage/page/table_page.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ impl TablePage {
Ok(tuple_id)
}

pub fn update_tuple_meta(&mut self, meta: &TupleMeta, slot_num: u16) -> BustubxResult<()> {
pub fn update_tuple_meta(&mut self, meta: TupleMeta, slot_num: u16) -> BustubxResult<()> {
if slot_num >= self.header.num_tuples {
return Err(BustubxError::Storage(format!(
"tuple_id {} out of range",
Expand All @@ -163,7 +163,26 @@ impl TablePage {
self.header.num_deleted_tuples += 1;
}

self.header.tuple_infos[slot_num as usize].meta = *meta;
self.header.tuple_infos[slot_num as usize].meta = meta;
Ok(())
}

pub fn update_tuple(&mut self, tuple: Tuple, slot_num: u16) -> BustubxResult<()> {
if slot_num >= self.header.num_tuples {
return Err(BustubxError::Storage(format!(
"tuple_id {} out of range",
slot_num
)));
}
let offset = self.header.tuple_infos[slot_num as usize].offset as usize;
let size = self.header.tuple_infos[slot_num as usize].size as usize;
let tuple_bytes = TupleCodec::encode(&tuple);
if tuple_bytes.len() == size {
self.data[offset..(offset + size)].copy_from_slice(&tuple_bytes);
} else {
// need move other tuples
todo!()
}
Ok(())
}

Expand Down Expand Up @@ -283,7 +302,7 @@ mod tests {
tuple_meta.delete_txn_id = 1;
tuple_meta.insert_txn_id = 2;

table_page.update_tuple_meta(&tuple_meta, 0).unwrap();
table_page.update_tuple_meta(tuple_meta, 0).unwrap();
let tuple_meta = table_page.tuple_meta(0).unwrap();
assert!(tuple_meta.is_deleted);
assert_eq!(tuple_meta.delete_txn_id, 1);
Expand Down
93 changes: 65 additions & 28 deletions bustubx/src/storage/table_heap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,19 @@ impl TableHeap {
Ok(RecordId::new(last_page_id, slot_id as u32))
}

pub fn update_tuple_meta(&self, meta: &TupleMeta, rid: RecordId) -> BustubxResult<()> {
pub fn update_tuple(&self, rid: RecordId, tuple: Tuple) -> BustubxResult<()> {
let page = self.buffer_pool.fetch_page(rid.page_id)?;
let (mut table_page, _) =
TablePageCodec::decode(page.read().unwrap().data(), self.schema.clone())?;
table_page.update_tuple(tuple, rid.slot_num as u16)?;

page.write()
.unwrap()
.set_data(page_bytes_to_array(&TablePageCodec::encode(&table_page)));
Ok(())
}

pub fn update_tuple_meta(&self, meta: TupleMeta, rid: RecordId) -> BustubxResult<()> {
let page = self.buffer_pool.fetch_page(rid.page_id)?;
let (mut table_page, _) =
TablePageCodec::decode(page.read().unwrap().data(), self.schema.clone())?;
Expand All @@ -121,21 +133,22 @@ impl TableHeap {
Ok(())
}

// FIXME
pub fn tuple(&self, rid: RecordId) -> BustubxResult<(TupleMeta, Tuple)> {
pub fn full_tuple(&self, rid: RecordId) -> BustubxResult<(TupleMeta, Tuple)> {
let page = self.buffer_pool.fetch_page(rid.page_id)?;
let (table_page, _) =
TablePageCodec::decode(page.read().unwrap().data(), self.schema.clone())?;
let result = table_page.tuple(rid.slot_num as u16)?;
Ok(result)
}

pub fn tuple(&self, rid: RecordId) -> BustubxResult<Tuple> {
let (_meta, tuple) = self.full_tuple(rid)?;
Ok(tuple)
}

pub fn tuple_meta(&self, rid: RecordId) -> BustubxResult<TupleMeta> {
let page = self.buffer_pool.fetch_page(rid.page_id)?;
let (table_page, _) =
TablePageCodec::decode(page.read().unwrap().data(), self.schema.clone())?;
let result = table_page.tuple_meta(rid.slot_num as u16)?;
Ok(result)
let (meta, _tuple) = self.full_tuple(rid)?;
Ok(meta)
}

pub fn get_first_rid(&self) -> BustubxResult<Option<RecordId>> {
Expand Down Expand Up @@ -199,7 +212,7 @@ impl TableIterator {
}
}

pub fn next(&mut self) -> BustubxResult<Option<(TupleMeta, Tuple)>> {
pub fn next(&mut self) -> BustubxResult<Option<(RecordId, Tuple)>> {
if self.ended {
return Ok(None);
}
Expand All @@ -212,7 +225,11 @@ impl TableIterator {
self.ended = true;
}
self.cursor = next_rid;
Ok(self.heap.tuple(self.cursor).ok())
Ok(self
.heap
.tuple(self.cursor)
.ok()
.map(|tuple| (self.cursor, tuple)))
} else {
Ok(None)
}
Expand All @@ -223,7 +240,11 @@ impl TableIterator {
Ok(None)
} else {
self.cursor = next_rid;
Ok(self.heap.tuple(self.cursor).ok())
Ok(self
.heap
.tuple(self.cursor)
.ok()
.map(|tuple| (self.cursor, tuple)))
}
} else {
Ok(None)
Expand All @@ -232,7 +253,11 @@ impl TableIterator {
Bound::Unbounded => {
if let Some(next_rid) = self.heap.get_next_rid(self.cursor)? {
self.cursor = next_rid;
Ok(self.heap.tuple(self.cursor).ok())
Ok(self
.heap
.tuple(self.cursor)
.ok()
.map(|tuple| (self.cursor, tuple)))
} else {
Ok(None)
}
Expand All @@ -243,12 +268,20 @@ impl TableIterator {
match self.start_bound {
Bound::Included(rid) => {
self.cursor = rid;
Ok(Some(self.heap.tuple(rid).unwrap()))
Ok(self
.heap
.tuple(self.cursor)
.ok()
.map(|tuple| (self.cursor, tuple)))
}
Bound::Excluded(rid) => {
if let Some(next_rid) = self.heap.get_next_rid(rid)? {
self.cursor = next_rid;
Ok(self.heap.tuple(self.cursor).ok())
Ok(self
.heap
.tuple(self.cursor)
.ok()
.map(|tuple| (self.cursor, tuple)))
} else {
self.ended = true;
Ok(None)
Expand All @@ -257,7 +290,11 @@ impl TableIterator {
Bound::Unbounded => {
if let Some(first_rid) = self.heap.get_first_rid()? {
self.cursor = first_rid;
Ok(self.heap.tuple(self.cursor).ok())
Ok(self
.heap
.tuple(self.cursor)
.ok()
.map(|tuple| (self.cursor, tuple)))
} else {
self.ended = true;
Ok(None)
Expand Down Expand Up @@ -317,7 +354,7 @@ mod tests {
meta.insert_txn_id = 1;
meta.delete_txn_id = 2;
meta.is_deleted = true;
table_heap.update_tuple_meta(&meta, rid2).unwrap();
table_heap.update_tuple_meta(meta, rid2).unwrap();

let meta = table_heap.tuple_meta(rid2).unwrap();
assert_eq!(meta.insert_txn_id, 1);
Expand Down Expand Up @@ -372,15 +409,15 @@ mod tests {
)
.unwrap();

let (meta, tuple) = table_heap.tuple(rid1).unwrap();
let (meta, tuple) = table_heap.full_tuple(rid1).unwrap();
assert_eq!(meta, meta1);
assert_eq!(tuple.data, vec![1i8.into(), 1i16.into()]);

let (meta, tuple) = table_heap.tuple(rid2).unwrap();
let (meta, tuple) = table_heap.full_tuple(rid2).unwrap();
assert_eq!(meta, meta2);
assert_eq!(tuple.data, vec![2i8.into(), 2i16.into()]);

let (meta, tuple) = table_heap.tuple(rid3).unwrap();
let (meta, tuple) = table_heap.full_tuple(rid3).unwrap();
assert_eq!(meta, meta3);
assert_eq!(tuple.data, vec![3i8.into(), 3i16.into()]);
}
Expand All @@ -404,7 +441,7 @@ mod tests {
delete_txn_id: 1,
is_deleted: false,
};
let _rid1 = table_heap
let rid1 = table_heap
.insert_tuple(
&meta1,
&Tuple::new(schema.clone(), vec![1i8.into(), 1i16.into()]),
Expand All @@ -415,7 +452,7 @@ mod tests {
delete_txn_id: 2,
is_deleted: false,
};
let _rid2 = table_heap
let rid2 = table_heap
.insert_tuple(
&meta2,
&Tuple::new(schema.clone(), vec![2i8.into(), 2i16.into()]),
Expand All @@ -426,7 +463,7 @@ mod tests {
delete_txn_id: 3,
is_deleted: false,
};
let _rid3 = table_heap
let rid3 = table_heap
.insert_tuple(
&meta3,
&Tuple::new(schema.clone(), vec![3i8.into(), 3i16.into()]),
Expand All @@ -435,16 +472,16 @@ mod tests {

let mut iterator = TableIterator::new(table_heap.clone(), ..);

let (meta, tuple) = iterator.next().unwrap().unwrap();
assert_eq!(meta, meta1);
let (rid, tuple) = iterator.next().unwrap().unwrap();
assert_eq!(rid, rid1);
assert_eq!(tuple.data, vec![1i8.into(), 1i16.into()]);

let (meta, tuple) = iterator.next().unwrap().unwrap();
assert_eq!(meta, meta2);
let (rid, tuple) = iterator.next().unwrap().unwrap();
assert_eq!(rid, rid2);
assert_eq!(tuple.data, vec![2i8.into(), 2i16.into()]);

let (meta, tuple) = iterator.next().unwrap().unwrap();
assert_eq!(meta, meta3);
let (rid, tuple) = iterator.next().unwrap().unwrap();
assert_eq!(rid, rid3);
assert_eq!(tuple.data, vec![3i8.into(), 3i16.into()]);

assert!(iterator.next().unwrap().is_none());
Expand Down
6 changes: 5 additions & 1 deletion bustubx/src/storage/tuple.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
use crate::catalog::SchemaRef;
use crate::catalog::{SchemaRef, EMPTY_SCHEMA_REF};
use crate::common::TableReference;
use crate::{catalog::Schema, common::ScalarValue, BustubxError, BustubxResult};
use std::cmp::Ordering;
use std::sync::Arc;

lazy_static::lazy_static! {
pub static ref EMPTY_TUPLE: Tuple = Tuple::empty(EMPTY_SCHEMA_REF.clone());
}

#[derive(Debug, Clone, Eq, PartialEq)]
pub struct Tuple {
pub schema: SchemaRef,
Expand Down
File renamed without changes.

0 comments on commit fefef00

Please sign in to comment.