From fefef00341680f61b22e2a30fcb3670d32219d23 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=9E=97=E4=BC=9F?= Date: Thu, 29 Feb 2024 19:41:34 +0800 Subject: [PATCH] Implement physical update --- .../src/execution/physical_plan/index_scan.rs | 3 +- bustubx/src/execution/physical_plan/insert.rs | 3 +- bustubx/src/execution/physical_plan/update.rs | 37 +++++++- bustubx/src/execution/physical_plan/values.rs | 3 +- bustubx/src/storage/mod.rs | 2 +- bustubx/src/storage/page/table_page.rs | 25 ++++- bustubx/src/storage/table_heap.rs | 93 +++++++++++++------ bustubx/src/storage/tuple.rs | 6 +- .../slt/{update.slt => _update.slt} | 0 9 files changed, 130 insertions(+), 42 deletions(-) rename tests/sqllogictest/slt/{update.slt => _update.slt} (100%) diff --git a/bustubx/src/execution/physical_plan/index_scan.rs b/bustubx/src/execution/physical_plan/index_scan.rs index 6922b7f..3066c8d 100644 --- a/bustubx/src/execution/physical_plan/index_scan.rs +++ b/bustubx/src/execution/physical_plan/index_scan.rs @@ -56,8 +56,7 @@ impl VolcanoExecutor for PhysicalIndexScan { }; let table_heap = context.catalog.table_heap(&self.table_ref)?; if let Some(rid) = iterator.next()? { - let (_, tuple) = table_heap.tuple(rid)?; - Ok(Some(tuple)) + Ok(Some(table_heap.tuple(rid)?)) } else { Ok(None) } diff --git a/bustubx/src/execution/physical_plan/insert.rs b/bustubx/src/execution/physical_plan/insert.rs index 8770d10..11dc492 100644 --- a/bustubx/src/execution/physical_plan/insert.rs +++ b/bustubx/src/execution/physical_plan/insert.rs @@ -54,8 +54,7 @@ impl VolcanoExecutor for PhysicalInsert { return if self.insert_rows.load(Ordering::SeqCst) == 0 { Ok(None) } else { - let insert_rows = self.insert_rows.load(Ordering::SeqCst); - self.insert_rows.store(0, Ordering::SeqCst); + let insert_rows = self.insert_rows.swap(0, Ordering::SeqCst); Ok(Some(Tuple::new( self.output_schema(), vec![ScalarValue::Int32(Some(insert_rows as i32))], diff --git a/bustubx/src/execution/physical_plan/update.rs b/bustubx/src/execution/physical_plan/update.rs index 3c3239a..1717ddc 100644 --- a/bustubx/src/execution/physical_plan/update.rs +++ b/bustubx/src/execution/physical_plan/update.rs @@ -1,7 +1,8 @@ use crate::catalog::{SchemaRef, UPDATE_OUTPUT_SCHEMA_REF}; -use crate::common::TableReference; +use crate::common::{ScalarValue, TableReference}; use crate::execution::{ExecutionContext, VolcanoExecutor}; -use crate::expression::Expr; +use crate::expression::{Expr, ExprTrait}; +use crate::storage::{TableIterator, EMPTY_TUPLE}; use crate::{BustubxResult, Tuple}; use std::collections::HashMap; use std::sync::atomic::{AtomicU32, Ordering}; @@ -40,8 +41,36 @@ impl VolcanoExecutor for PhysicalUpdate { } fn next(&self, context: &mut ExecutionContext) -> BustubxResult> { - // TODO implementation - Ok(None) + // TODO may scan index + let table_heap = context.catalog.table_heap(&self.table)?; + let mut table_iterator = TableIterator::new(table_heap.clone(), ..); + loop { + if let Some((rid, mut tuple)) = table_iterator.next()? { + if let Some(selection) = &self.selection { + if !selection.evaluate(&tuple)?.as_boolean()?.unwrap_or(false) { + continue; + } + } + // update tuple data + for (col_name, value_expr) in self.assignments.iter() { + let new_value = value_expr.evaluate(&EMPTY_TUPLE)?; + let index = tuple.schema.index_of(None, &col_name)?; + tuple.data[index] = new_value; + } + table_heap.update_tuple(rid, tuple)?; + self.update_rows.fetch_add(1, Ordering::SeqCst); + } else { + return if self.update_rows.load(Ordering::SeqCst) == 0 { + Ok(None) + } else { + let update_rows = self.update_rows.swap(0, Ordering::SeqCst); + Ok(Some(Tuple::new( + self.output_schema(), + vec![ScalarValue::Int32(Some(update_rows as i32))], + ))) + }; + } + } } fn output_schema(&self) -> SchemaRef { diff --git a/bustubx/src/execution/physical_plan/values.rs b/bustubx/src/execution/physical_plan/values.rs index 58c8f9f..b51384e 100644 --- a/bustubx/src/execution/physical_plan/values.rs +++ b/bustubx/src/execution/physical_plan/values.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use crate::catalog::SchemaRef; use crate::common::ScalarValue; use crate::expression::{Expr, ExprTrait}; +use crate::storage::EMPTY_TUPLE; use crate::{ catalog::Schema, execution::{ExecutionContext, VolcanoExecutor}, @@ -35,7 +36,7 @@ impl VolcanoExecutor for PhysicalValues { if cursor < self.rows.len() { let values = self.rows[cursor] .iter() - .map(|e| e.evaluate(&Tuple::empty(Arc::new(Schema::empty())))) + .map(|e| e.evaluate(&EMPTY_TUPLE)) .collect::>>()?; debug_assert_eq!(self.schema.column_count(), values.len()); diff --git a/bustubx/src/storage/mod.rs b/bustubx/src/storage/mod.rs index 421e7a5..58b7c55 100644 --- a/bustubx/src/storage/mod.rs +++ b/bustubx/src/storage/mod.rs @@ -8,4 +8,4 @@ mod tuple; pub use disk_manager::DiskManager; pub use page::*; pub use table_heap::{TableHeap, TableIterator}; -pub use tuple::Tuple; +pub use tuple::*; diff --git a/bustubx/src/storage/page/table_page.rs b/bustubx/src/storage/page/table_page.rs index e134cca..0fe7aad 100644 --- a/bustubx/src/storage/page/table_page.rs +++ b/bustubx/src/storage/page/table_page.rs @@ -152,7 +152,7 @@ impl TablePage { Ok(tuple_id) } - pub fn update_tuple_meta(&mut self, meta: &TupleMeta, slot_num: u16) -> BustubxResult<()> { + pub fn update_tuple_meta(&mut self, meta: TupleMeta, slot_num: u16) -> BustubxResult<()> { if slot_num >= self.header.num_tuples { return Err(BustubxError::Storage(format!( "tuple_id {} out of range", @@ -163,7 +163,26 @@ impl TablePage { self.header.num_deleted_tuples += 1; } - self.header.tuple_infos[slot_num as usize].meta = *meta; + self.header.tuple_infos[slot_num as usize].meta = meta; + Ok(()) + } + + pub fn update_tuple(&mut self, tuple: Tuple, slot_num: u16) -> BustubxResult<()> { + if slot_num >= self.header.num_tuples { + return Err(BustubxError::Storage(format!( + "tuple_id {} out of range", + slot_num + ))); + } + let offset = self.header.tuple_infos[slot_num as usize].offset as usize; + let size = self.header.tuple_infos[slot_num as usize].size as usize; + let tuple_bytes = TupleCodec::encode(&tuple); + if tuple_bytes.len() == size { + self.data[offset..(offset + size)].copy_from_slice(&tuple_bytes); + } else { + // need move other tuples + todo!() + } Ok(()) } @@ -283,7 +302,7 @@ mod tests { tuple_meta.delete_txn_id = 1; tuple_meta.insert_txn_id = 2; - table_page.update_tuple_meta(&tuple_meta, 0).unwrap(); + table_page.update_tuple_meta(tuple_meta, 0).unwrap(); let tuple_meta = table_page.tuple_meta(0).unwrap(); assert!(tuple_meta.is_deleted); assert_eq!(tuple_meta.delete_txn_id, 1); diff --git a/bustubx/src/storage/table_heap.rs b/bustubx/src/storage/table_heap.rs index 52a3f17..b3c0e36 100644 --- a/bustubx/src/storage/table_heap.rs +++ b/bustubx/src/storage/table_heap.rs @@ -109,7 +109,19 @@ impl TableHeap { Ok(RecordId::new(last_page_id, slot_id as u32)) } - pub fn update_tuple_meta(&self, meta: &TupleMeta, rid: RecordId) -> BustubxResult<()> { + pub fn update_tuple(&self, rid: RecordId, tuple: Tuple) -> BustubxResult<()> { + let page = self.buffer_pool.fetch_page(rid.page_id)?; + let (mut table_page, _) = + TablePageCodec::decode(page.read().unwrap().data(), self.schema.clone())?; + table_page.update_tuple(tuple, rid.slot_num as u16)?; + + page.write() + .unwrap() + .set_data(page_bytes_to_array(&TablePageCodec::encode(&table_page))); + Ok(()) + } + + pub fn update_tuple_meta(&self, meta: TupleMeta, rid: RecordId) -> BustubxResult<()> { let page = self.buffer_pool.fetch_page(rid.page_id)?; let (mut table_page, _) = TablePageCodec::decode(page.read().unwrap().data(), self.schema.clone())?; @@ -121,8 +133,7 @@ impl TableHeap { Ok(()) } - // FIXME - pub fn tuple(&self, rid: RecordId) -> BustubxResult<(TupleMeta, Tuple)> { + pub fn full_tuple(&self, rid: RecordId) -> BustubxResult<(TupleMeta, Tuple)> { let page = self.buffer_pool.fetch_page(rid.page_id)?; let (table_page, _) = TablePageCodec::decode(page.read().unwrap().data(), self.schema.clone())?; @@ -130,12 +141,14 @@ impl TableHeap { Ok(result) } + pub fn tuple(&self, rid: RecordId) -> BustubxResult { + let (_meta, tuple) = self.full_tuple(rid)?; + Ok(tuple) + } + pub fn tuple_meta(&self, rid: RecordId) -> BustubxResult { - let page = self.buffer_pool.fetch_page(rid.page_id)?; - let (table_page, _) = - TablePageCodec::decode(page.read().unwrap().data(), self.schema.clone())?; - let result = table_page.tuple_meta(rid.slot_num as u16)?; - Ok(result) + let (meta, _tuple) = self.full_tuple(rid)?; + Ok(meta) } pub fn get_first_rid(&self) -> BustubxResult> { @@ -199,7 +212,7 @@ impl TableIterator { } } - pub fn next(&mut self) -> BustubxResult> { + pub fn next(&mut self) -> BustubxResult> { if self.ended { return Ok(None); } @@ -212,7 +225,11 @@ impl TableIterator { self.ended = true; } self.cursor = next_rid; - Ok(self.heap.tuple(self.cursor).ok()) + Ok(self + .heap + .tuple(self.cursor) + .ok() + .map(|tuple| (self.cursor, tuple))) } else { Ok(None) } @@ -223,7 +240,11 @@ impl TableIterator { Ok(None) } else { self.cursor = next_rid; - Ok(self.heap.tuple(self.cursor).ok()) + Ok(self + .heap + .tuple(self.cursor) + .ok() + .map(|tuple| (self.cursor, tuple))) } } else { Ok(None) @@ -232,7 +253,11 @@ impl TableIterator { Bound::Unbounded => { if let Some(next_rid) = self.heap.get_next_rid(self.cursor)? { self.cursor = next_rid; - Ok(self.heap.tuple(self.cursor).ok()) + Ok(self + .heap + .tuple(self.cursor) + .ok() + .map(|tuple| (self.cursor, tuple))) } else { Ok(None) } @@ -243,12 +268,20 @@ impl TableIterator { match self.start_bound { Bound::Included(rid) => { self.cursor = rid; - Ok(Some(self.heap.tuple(rid).unwrap())) + Ok(self + .heap + .tuple(self.cursor) + .ok() + .map(|tuple| (self.cursor, tuple))) } Bound::Excluded(rid) => { if let Some(next_rid) = self.heap.get_next_rid(rid)? { self.cursor = next_rid; - Ok(self.heap.tuple(self.cursor).ok()) + Ok(self + .heap + .tuple(self.cursor) + .ok() + .map(|tuple| (self.cursor, tuple))) } else { self.ended = true; Ok(None) @@ -257,7 +290,11 @@ impl TableIterator { Bound::Unbounded => { if let Some(first_rid) = self.heap.get_first_rid()? { self.cursor = first_rid; - Ok(self.heap.tuple(self.cursor).ok()) + Ok(self + .heap + .tuple(self.cursor) + .ok() + .map(|tuple| (self.cursor, tuple))) } else { self.ended = true; Ok(None) @@ -317,7 +354,7 @@ mod tests { meta.insert_txn_id = 1; meta.delete_txn_id = 2; meta.is_deleted = true; - table_heap.update_tuple_meta(&meta, rid2).unwrap(); + table_heap.update_tuple_meta(meta, rid2).unwrap(); let meta = table_heap.tuple_meta(rid2).unwrap(); assert_eq!(meta.insert_txn_id, 1); @@ -372,15 +409,15 @@ mod tests { ) .unwrap(); - let (meta, tuple) = table_heap.tuple(rid1).unwrap(); + let (meta, tuple) = table_heap.full_tuple(rid1).unwrap(); assert_eq!(meta, meta1); assert_eq!(tuple.data, vec![1i8.into(), 1i16.into()]); - let (meta, tuple) = table_heap.tuple(rid2).unwrap(); + let (meta, tuple) = table_heap.full_tuple(rid2).unwrap(); assert_eq!(meta, meta2); assert_eq!(tuple.data, vec![2i8.into(), 2i16.into()]); - let (meta, tuple) = table_heap.tuple(rid3).unwrap(); + let (meta, tuple) = table_heap.full_tuple(rid3).unwrap(); assert_eq!(meta, meta3); assert_eq!(tuple.data, vec![3i8.into(), 3i16.into()]); } @@ -404,7 +441,7 @@ mod tests { delete_txn_id: 1, is_deleted: false, }; - let _rid1 = table_heap + let rid1 = table_heap .insert_tuple( &meta1, &Tuple::new(schema.clone(), vec![1i8.into(), 1i16.into()]), @@ -415,7 +452,7 @@ mod tests { delete_txn_id: 2, is_deleted: false, }; - let _rid2 = table_heap + let rid2 = table_heap .insert_tuple( &meta2, &Tuple::new(schema.clone(), vec![2i8.into(), 2i16.into()]), @@ -426,7 +463,7 @@ mod tests { delete_txn_id: 3, is_deleted: false, }; - let _rid3 = table_heap + let rid3 = table_heap .insert_tuple( &meta3, &Tuple::new(schema.clone(), vec![3i8.into(), 3i16.into()]), @@ -435,16 +472,16 @@ mod tests { let mut iterator = TableIterator::new(table_heap.clone(), ..); - let (meta, tuple) = iterator.next().unwrap().unwrap(); - assert_eq!(meta, meta1); + let (rid, tuple) = iterator.next().unwrap().unwrap(); + assert_eq!(rid, rid1); assert_eq!(tuple.data, vec![1i8.into(), 1i16.into()]); - let (meta, tuple) = iterator.next().unwrap().unwrap(); - assert_eq!(meta, meta2); + let (rid, tuple) = iterator.next().unwrap().unwrap(); + assert_eq!(rid, rid2); assert_eq!(tuple.data, vec![2i8.into(), 2i16.into()]); - let (meta, tuple) = iterator.next().unwrap().unwrap(); - assert_eq!(meta, meta3); + let (rid, tuple) = iterator.next().unwrap().unwrap(); + assert_eq!(rid, rid3); assert_eq!(tuple.data, vec![3i8.into(), 3i16.into()]); assert!(iterator.next().unwrap().is_none()); diff --git a/bustubx/src/storage/tuple.rs b/bustubx/src/storage/tuple.rs index 588d9ea..bc94873 100644 --- a/bustubx/src/storage/tuple.rs +++ b/bustubx/src/storage/tuple.rs @@ -1,9 +1,13 @@ -use crate::catalog::SchemaRef; +use crate::catalog::{SchemaRef, EMPTY_SCHEMA_REF}; use crate::common::TableReference; use crate::{catalog::Schema, common::ScalarValue, BustubxError, BustubxResult}; use std::cmp::Ordering; use std::sync::Arc; +lazy_static::lazy_static! { + pub static ref EMPTY_TUPLE: Tuple = Tuple::empty(EMPTY_SCHEMA_REF.clone()); +} + #[derive(Debug, Clone, Eq, PartialEq)] pub struct Tuple { pub schema: SchemaRef, diff --git a/tests/sqllogictest/slt/update.slt b/tests/sqllogictest/slt/_update.slt similarity index 100% rename from tests/sqllogictest/slt/update.slt rename to tests/sqllogictest/slt/_update.slt