From 3fcb9da931255881e6d75e37ea5b3e91e1b7678a Mon Sep 17 00:00:00 2001 From: Linwei Zhang Date: Wed, 28 Feb 2024 00:56:41 +0800 Subject: [PATCH] Refactor catalog --- bustubx/src/buffer/buffer_pool.rs | 3 +- bustubx/src/catalog/catalog.rs | 306 ++++++++++++------ bustubx/src/catalog/information.rs | 42 ++- bustubx/src/common/mod.rs | 4 +- bustubx/src/common/table_ref.rs | 25 -- .../src/execution/physical_plan/index_scan.rs | 2 +- bustubx/src/lib.rs | 1 + .../physical_planner/physical_planner.rs | 15 +- bustubx/src/storage/codec/table_page.rs | 8 +- bustubx/src/storage/page/table_page.rs | 2 +- bustubx/src/transaction/lock_manager.rs | 60 ++++ bustubx/src/transaction/mod.rs | 9 + bustubx/src/transaction/transaction.rs | 1 + .../src/transaction/transaction_manager.rs | 23 ++ 14 files changed, 364 insertions(+), 137 deletions(-) create mode 100644 bustubx/src/transaction/lock_manager.rs create mode 100644 bustubx/src/transaction/mod.rs create mode 100644 bustubx/src/transaction/transaction.rs create mode 100644 bustubx/src/transaction/transaction_manager.rs diff --git a/bustubx/src/buffer/buffer_pool.rs b/bustubx/src/buffer/buffer_pool.rs index feff2fd..226e56c 100644 --- a/bustubx/src/buffer/buffer_pool.rs +++ b/bustubx/src/buffer/buffer_pool.rs @@ -58,8 +58,7 @@ impl BufferPoolManager { // 从磁盘分配一个页 let new_page_id = self.disk_manager.allocate_page().unwrap(); self.page_table.insert(new_page_id, frame_id); - let mut new_page = Page::new(new_page_id); - new_page.pin_count = 1; + let new_page = Page::new(new_page_id).with_pin_count(1u32); self.pool[frame_id].write().unwrap().replace(new_page); self.replacer.write().unwrap().record_access(frame_id)?; diff --git a/bustubx/src/catalog/catalog.rs b/bustubx/src/catalog/catalog.rs index df960da..8fdd9dd 100644 --- a/bustubx/src/catalog/catalog.rs +++ b/bustubx/src/catalog/catalog.rs @@ -1,11 +1,12 @@ -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::sync::atomic::Ordering; use std::sync::Arc; use crate::catalog::{ - SchemaRef, COLUMNS_SCHMEA, COLUMNS_TABLE_REF, TABLES_SCHMEA, TABLES_TABLE_REF, + SchemaRef, COLUMNS_SCHMEA, INFORMATION_SCHEMA_COLUMNS, INFORMATION_SCHEMA_NAME, + INFORMATION_SCHEMA_TABLES, TABLES_SCHMEA, }; -use crate::common::{FullTableRef, TableReference}; +use crate::common::TableReference; use crate::storage::{TupleMeta, BPLUS_INTERNAL_PAGE_MAX_SIZE, BPLUS_LEAF_PAGE_MAX_SIZE}; use crate::{ buffer::BufferPoolManager, @@ -16,34 +17,91 @@ use crate::{ pub static DEFAULT_CATALOG_NAME: &str = "bustubx"; pub static DEFAULT_SCHEMA_NAME: &str = "public"; -/// catalog, schema, table, index -pub type FullIndexRef = (String, String, String, String); - pub struct Catalog { - pub tables: HashMap>, - pub indexes: HashMap>, - pub table_indexes: HashMap>, + pub schemas: HashMap, pub buffer_pool: Arc, } -impl Catalog { - pub fn new(buffer_pool: Arc) -> Self { +pub struct CatalogSchema { + pub name: String, + pub tables: HashMap, +} + +impl CatalogSchema { + pub fn new(name: impl Into) -> Self { Self { + name: name.into(), tables: HashMap::new(), + } + } +} + +pub struct CatalogTable { + pub name: String, + pub table: Arc, + pub indexes: HashMap>, +} + +impl CatalogTable { + pub fn new(name: impl Into, table: Arc) -> Self { + Self { + name: name.into(), + table, indexes: HashMap::new(), - table_indexes: HashMap::new(), + } + } +} + +impl Catalog { + pub fn new(buffer_pool: Arc) -> Self { + // TODO should load from disk + let mut schemas = HashMap::new(); + schemas.insert( + DEFAULT_SCHEMA_NAME.to_string(), + CatalogSchema { + name: DEFAULT_SCHEMA_NAME.to_string(), + tables: HashMap::new(), + }, + ); + Self { + schemas, buffer_pool, } } + pub fn create_schema(&mut self, scheme_name: String) -> BustubxResult<()> { + if self.schemas.contains_key(&scheme_name) { + return Err(BustubxError::Storage( + "Cannot create duplicated schema".to_string(), + )); + } + self.schemas + .insert(scheme_name.clone(), CatalogSchema::new(scheme_name)); + Ok(()) + } + pub fn create_table( &mut self, table_ref: TableReference, schema: SchemaRef, ) -> BustubxResult> { - // TODO fail if database not created - let full_table_ref = table_ref.extend_to_full(); - if self.tables.contains_key(&full_table_ref) { + let catalog_name = table_ref + .catalog() + .unwrap_or(DEFAULT_CATALOG_NAME) + .to_string(); + let catalog_schema_name = table_ref + .schema() + .unwrap_or(DEFAULT_SCHEMA_NAME) + .to_string(); + let table_name = table_ref.table().to_string(); + + let Some(catalog_schema) = self.schemas.get_mut(&catalog_schema_name) else { + return Err(BustubxError::Storage(format!( + "catalog schema {} not created yet", + catalog_schema_name + ))); + }; + if catalog_schema.tables.contains_key(table_ref.table()) { return Err(BustubxError::Storage( "Cannot create duplicated table".to_string(), )); @@ -52,18 +110,28 @@ impl Catalog { schema.clone(), self.buffer_pool.clone(), )?); - self.tables - .insert(full_table_ref.clone(), table_heap.clone()); - self.table_indexes - .insert(full_table_ref.clone(), HashSet::new()); + let catalog_table = CatalogTable { + name: table_name.clone(), + table: table_heap.clone(), + indexes: HashMap::new(), + }; + catalog_schema + .tables + .insert(table_name.clone(), catalog_table); // update system table - let tables_table = self - .tables - .get_mut(&TABLES_TABLE_REF.extend_to_full()) - .ok_or(BustubxError::Internal( - "Cannot find tables table".to_string(), - ))?; + let Some(information_schema) = self.schemas.get_mut(INFORMATION_SCHEMA_NAME) else { + return Err(BustubxError::Internal( + "catalog schema information_schema not created yet".to_string(), + )); + }; + let Some(tables_table) = information_schema.tables.get_mut(INFORMATION_SCHEMA_TABLES) + else { + return Err(BustubxError::Internal( + "table information_schema.tables not created yet".to_string(), + )); + }; + let tuple_meta = TupleMeta { insert_txn_id: 0, delete_txn_id: 0, @@ -72,77 +140,87 @@ impl Catalog { let tuple = Tuple::new( TABLES_SCHMEA.clone(), vec![ - full_table_ref.0.clone().into(), - full_table_ref.1.clone().into(), - full_table_ref.2.clone().into(), + catalog_name.clone().into(), + catalog_schema_name.clone().into(), + table_name.clone().into(), (table_heap.first_page_id.load(Ordering::SeqCst)).into(), (table_heap.last_page_id.load(Ordering::SeqCst)).into(), ], ); - tables_table.insert_tuple(&tuple_meta, &tuple)?; + tables_table.table.insert_tuple(&tuple_meta, &tuple)?; - let columns_table = self + let Some(columns_table) = information_schema .tables - .get_mut(&COLUMNS_TABLE_REF.extend_to_full()) - .ok_or(BustubxError::Internal( - "Cannot find columns table".to_string(), - ))?; + .get_mut(INFORMATION_SCHEMA_COLUMNS) + else { + return Err(BustubxError::Internal( + "table information_schema.columns not created yet".to_string(), + )); + }; for col in schema.columns.iter() { let sql_type: sqlparser::ast::DataType = (&col.data_type).into(); let tuple = Tuple::new( COLUMNS_SCHMEA.clone(), vec![ - full_table_ref.0.clone().into(), - full_table_ref.1.clone().into(), - full_table_ref.2.clone().into(), + catalog_name.clone().into(), + catalog_schema_name.clone().into(), + table_name.clone().into(), col.name.clone().into(), format!("{sql_type}").into(), col.nullable.into(), ], ); - columns_table.insert_tuple(&tuple_meta, &tuple)?; + columns_table.table.insert_tuple(&tuple_meta, &tuple)?; } Ok(table_heap) } pub fn table_heap(&self, table_ref: &TableReference) -> BustubxResult> { - self.tables - .get(&table_ref.extend_to_full()) - .cloned() - .ok_or(BustubxError::Internal(format!( - "Not found the table {}", - table_ref - ))) + let catalog_schema_name = table_ref + .schema() + .unwrap_or(DEFAULT_SCHEMA_NAME) + .to_string(); + let table_name = table_ref.table().to_string(); + + let Some(catalog_schema) = self.schemas.get(&catalog_schema_name) else { + return Err(BustubxError::Storage(format!( + "catalog schema {} not created yet", + catalog_schema_name + ))); + }; + let Some(catalog_table) = catalog_schema.tables.get(&table_name) else { + return Err(BustubxError::Storage(format!( + "table {} not created yet", + table_name + ))); + }; + Ok(catalog_table.table.clone()) } pub fn table_indexes( &self, table_ref: &TableReference, ) -> BustubxResult>> { - let full_table_ref = table_ref.extend_to_full(); - if let Some(indexes) = self.table_indexes.get(&full_table_ref) { - indexes - .iter() - .map(|name| { - let full_index_ref = ( - full_table_ref.0.clone(), - full_table_ref.1.clone(), - full_table_ref.2.clone(), - name.clone(), - ); - self.indexes - .get(&full_index_ref) - .cloned() - .ok_or(BustubxError::Storage(format!( - "Index name {} should be valid", - name - ))) - }) - .collect::>>>() - } else { - Ok(vec![]) - } + let catalog_schema_name = table_ref + .schema() + .unwrap_or(DEFAULT_SCHEMA_NAME) + .to_string(); + let table_name = table_ref.table().to_string(); + + let Some(catalog_schema) = self.schemas.get(&catalog_schema_name) else { + return Err(BustubxError::Storage(format!( + "catalog schema {} not created yet", + catalog_schema_name + ))); + }; + let Some(catalog_table) = catalog_schema.tables.get(&table_name) else { + return Err(BustubxError::Storage(format!( + "table {} not created yet", + table_name + ))); + }; + Ok(catalog_table.indexes.values().cloned().collect()) } pub fn create_index( @@ -151,13 +229,29 @@ impl Catalog { table_ref: &TableReference, key_schema: SchemaRef, ) -> BustubxResult> { - let full_table_ref = table_ref.extend_to_full(); - let full_index_ref = ( - full_table_ref.0.clone(), - full_table_ref.1.clone(), - full_table_ref.2.clone(), - index_name.clone(), - ); + let catalog_schema_name = table_ref + .schema() + .unwrap_or(DEFAULT_SCHEMA_NAME) + .to_string(); + let table_name = table_ref.table().to_string(); + + let Some(catalog_schema) = self.schemas.get_mut(&catalog_schema_name) else { + return Err(BustubxError::Storage(format!( + "catalog schema {} not created yet", + catalog_schema_name + ))); + }; + let Some(catalog_table) = catalog_schema.tables.get_mut(&table_name) else { + return Err(BustubxError::Storage(format!( + "table {} not created yet", + table_name + ))); + }; + if catalog_table.indexes.contains_key(&index_name) { + return Err(BustubxError::Storage( + "Cannot create duplicated index".to_string(), + )); + } let b_plus_tree_index = Arc::new(BPlusTreeIndex::new( key_schema.clone(), @@ -165,27 +259,58 @@ impl Catalog { BPLUS_LEAF_PAGE_MAX_SIZE as u32, BPLUS_INTERNAL_PAGE_MAX_SIZE as u32, )); + catalog_table + .indexes + .insert(index_name, b_plus_tree_index.clone()); - self.indexes - .insert(full_index_ref.clone(), b_plus_tree_index.clone()); - if let Some(indexes) = self.table_indexes.get_mut(&full_table_ref) { - indexes.insert(index_name); - } else { - return Err(BustubxError::Storage( - "Cannot find table_indexes map".to_string(), - )); - } Ok(b_plus_tree_index) } - pub fn get_index_by_name( + pub fn index( &self, table_ref: &TableReference, index_name: &str, - ) -> Option> { - let (catalog, schema, table) = table_ref.extend_to_full(); - let full_index_ref = (catalog, schema, table, index_name.to_string()); - self.indexes.get(&full_index_ref).cloned() + ) -> BustubxResult>> { + let catalog_schema_name = table_ref + .schema() + .unwrap_or(DEFAULT_SCHEMA_NAME) + .to_string(); + let table_name = table_ref.table().to_string(); + + let Some(catalog_schema) = self.schemas.get(&catalog_schema_name) else { + return Err(BustubxError::Storage(format!( + "catalog schema {} not created yet", + catalog_schema_name + ))); + }; + let Some(catalog_table) = catalog_schema.tables.get(&table_name) else { + return Err(BustubxError::Storage(format!( + "table {} not created yet", + table_name + ))); + }; + Ok(catalog_table.indexes.get(index_name).cloned()) + } + + pub fn load_schema(&mut self, name: impl Into, schema: CatalogSchema) { + self.schemas.insert(name.into(), schema); + } + + pub fn load_table( + &mut self, + table_ref: TableReference, + table: CatalogTable, + ) -> BustubxResult<()> { + let catalog_schema_name = table_ref.schema().unwrap_or(DEFAULT_SCHEMA_NAME); + let table_name = table_ref.table().to_string(); + let Some(catalog_schema) = self.schemas.get_mut(catalog_schema_name) else { + return Err(BustubxError::Storage(format!( + "catalog schema {} not created yet", + catalog_schema_name + ))); + }; + catalog_schema.tables.insert(table_name, table); + Ok(()) } } @@ -269,7 +394,8 @@ mod tests { assert_eq!(index2.key_schema, key_schema2); let index3 = catalog - .get_index_by_name(&table_ref, index_name1.as_str()) + .index(&table_ref, index_name1.as_str()) + .unwrap() .unwrap(); assert_eq!(index3.key_schema, key_schema1); } diff --git a/bustubx/src/catalog/information.rs b/bustubx/src/catalog/information.rs index eb97aa8..437dd5a 100644 --- a/bustubx/src/catalog/information.rs +++ b/bustubx/src/catalog/information.rs @@ -1,9 +1,11 @@ use crate::buffer::{AtomicPageId, PageId, INVALID_PAGE_ID}; +use crate::catalog::catalog::{CatalogSchema, CatalogTable}; use crate::catalog::{Catalog, Column, DataType, Schema, SchemaRef, DEFAULT_CATALOG_NAME}; use crate::common::{ScalarValue, TableReference}; use crate::storage::codec::TablePageCodec; use crate::storage::TableHeap; use crate::{BustubxError, BustubxResult, Database}; +use std::collections::HashMap; use std::sync::Arc; pub static INFORMATION_SCHEMA_NAME: &str = "information_schema"; @@ -95,16 +97,26 @@ fn load_user_tables(db: &mut Database) -> BustubxResult<()> { } let schema = Arc::new(Schema::new(columns)); + let Some(catalog_schema) = db.catalog.schemas.get_mut(catalog.as_str()) else { + return Err(BustubxError::Storage(format!( + "catalog schema {} not created yet", + catalog + ))); + }; let table_heap = TableHeap { schema: schema.clone(), buffer_pool: db.buffer_pool.clone(), first_page_id: AtomicPageId::new(*first_page_id), last_page_id: AtomicPageId::new(*last_page_id), }; - let table_ref = TableReference::full(catalog, table_schema, table_name); - db.catalog + let catalog_table = CatalogTable { + name: table_name.clone(), + table: Arc::new(table_heap), + indexes: HashMap::new(), + }; + catalog_schema .tables - .insert(table_ref.extend_to_full(), Arc::new(table_heap)); + .insert(table_name.clone(), catalog_table); } Ok(()) } @@ -127,15 +139,22 @@ fn load_information_schema(catalog: &mut Catalog) -> BustubxResult<()> { COLUMNS_SCHMEA.clone(), )?; + let mut information_schema = CatalogSchema::new(INFORMATION_SCHEMA_NAME); + let tables_table = TableHeap { schema: TABLES_SCHMEA.clone(), buffer_pool: catalog.buffer_pool.clone(), first_page_id: AtomicPageId::new(information_schema_tables_first_page_id), last_page_id: AtomicPageId::new(information_schema_tables_last_page_id), }; - catalog + let catalog_table = CatalogTable { + name: INFORMATION_SCHEMA_TABLES.to_string(), + table: Arc::new(tables_table), + indexes: HashMap::new(), + }; + information_schema .tables - .insert(TABLES_TABLE_REF.extend_to_full(), Arc::new(tables_table)); + .insert(INFORMATION_SCHEMA_TABLES.to_string(), catalog_table); let columns_table = TableHeap { schema: COLUMNS_SCHMEA.clone(), @@ -143,9 +162,18 @@ fn load_information_schema(catalog: &mut Catalog) -> BustubxResult<()> { first_page_id: AtomicPageId::new(information_schema_columns_first_page_id), last_page_id: AtomicPageId::new(information_schema_columns_last_page_id), }; - catalog + let catalog_table = CatalogTable { + name: INFORMATION_SCHEMA_TABLES.to_string(), + table: Arc::new(columns_table), + indexes: HashMap::new(), + }; + information_schema .tables - .insert(COLUMNS_TABLE_REF.extend_to_full(), Arc::new(columns_table)); + .insert(INFORMATION_SCHEMA_COLUMNS.to_string(), catalog_table); + + catalog.load_schema(INFORMATION_SCHEMA_NAME, information_schema); + + // TODO load schemas Ok(()) } diff --git a/bustubx/src/common/mod.rs b/bustubx/src/common/mod.rs index 3863f2b..ddfde8a 100644 --- a/bustubx/src/common/mod.rs +++ b/bustubx/src/common/mod.rs @@ -5,6 +5,4 @@ pub mod util; pub use bitmap::DynamicBitmap; pub use scalar::ScalarValue; -pub use table_ref::{FullTableRef, TableReference}; - -pub type TransactionId = u32; +pub use table_ref::TableReference; diff --git a/bustubx/src/common/table_ref.rs b/bustubx/src/common/table_ref.rs index 4e59e70..d6d7cb4 100644 --- a/bustubx/src/common/table_ref.rs +++ b/bustubx/src/common/table_ref.rs @@ -1,5 +1,3 @@ -use crate::catalog::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; - #[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] pub enum TableReference { /// An unqualified table reference, e.g. "table" @@ -25,9 +23,6 @@ pub enum TableReference { }, } -/// catalog, schema, table -pub type FullTableRef = (String, String, String); - impl TableReference { pub fn bare(table: impl Into) -> Self { Self::Bare { @@ -91,26 +86,6 @@ impl TableReference { } } } - - pub fn extend_to_full(&self) -> FullTableRef { - match self { - TableReference::Bare { table } => ( - DEFAULT_CATALOG_NAME.to_string(), - DEFAULT_SCHEMA_NAME.to_string(), - table.clone(), - ), - TableReference::Partial { schema, table } => ( - DEFAULT_CATALOG_NAME.to_string(), - schema.clone(), - table.clone(), - ), - TableReference::Full { - catalog, - schema, - table, - } => (catalog.clone(), schema.clone(), table.clone()), - } - } } impl std::fmt::Display for TableReference { diff --git a/bustubx/src/execution/physical_plan/index_scan.rs b/bustubx/src/execution/physical_plan/index_scan.rs index 771b613..6922b7f 100644 --- a/bustubx/src/execution/physical_plan/index_scan.rs +++ b/bustubx/src/execution/physical_plan/index_scan.rs @@ -38,7 +38,7 @@ impl VolcanoExecutor for PhysicalIndexScan { fn init(&self, context: &mut ExecutionContext) -> BustubxResult<()> { let index = context .catalog - .get_index_by_name(&self.table_ref, &self.index_name) + .index(&self.table_ref, &self.index_name)? .unwrap(); *self.iterator.lock().unwrap() = Some(TreeIndexIterator::new( index, diff --git a/bustubx/src/lib.rs b/bustubx/src/lib.rs index 30f6b2c..b828d7b 100644 --- a/bustubx/src/lib.rs +++ b/bustubx/src/lib.rs @@ -10,6 +10,7 @@ mod optimizer; mod parser; mod planner; mod storage; +mod transaction; pub use common::util::pretty_format_tuples; pub use database::Database; diff --git a/bustubx/src/planner/physical_planner/physical_planner.rs b/bustubx/src/planner/physical_planner/physical_planner.rs index 5aa7312..ff4cec6 100644 --- a/bustubx/src/planner/physical_planner/physical_planner.rs +++ b/bustubx/src/planner/physical_planner/physical_planner.rs @@ -1,4 +1,4 @@ -use crate::catalog::{Catalog, Schema}; +use crate::catalog::{Catalog, Schema, DEFAULT_SCHEMA_NAME}; use std::sync::Arc; use crate::planner::logical_plan::{ @@ -87,12 +87,19 @@ impl PhysicalPlanner<'_> { limit: _, }) => { // TODO fix testing - if let Some(indexes) = self.catalog.table_indexes.get(&table_ref.extend_to_full()) { - if !indexes.is_empty() { + if let Some(catalog_table) = self + .catalog + .schemas + .get(table_ref.schema().unwrap_or(DEFAULT_SCHEMA_NAME)) + .unwrap() + .tables + .get(table_ref.table()) + { + if !catalog_table.indexes.is_empty() { println!("LWZTEST create index scan"); PhysicalPlan::IndexScan(PhysicalIndexScan::new( table_ref.clone(), - indexes.iter().next().cloned().unwrap(), + catalog_table.indexes.keys().next().unwrap().clone(), table_schema.clone(), .., )) diff --git a/bustubx/src/storage/codec/table_page.rs b/bustubx/src/storage/codec/table_page.rs index 92f1df4..fafce19 100644 --- a/bustubx/src/storage/codec/table_page.rs +++ b/bustubx/src/storage/codec/table_page.rs @@ -86,8 +86,8 @@ impl TablePageHeaderTupleInfoCodec { let mut bytes = Vec::new(); bytes.extend(CommonCodec::encode_u16(tuple_info.offset)); bytes.extend(CommonCodec::encode_u16(tuple_info.size)); - bytes.extend(CommonCodec::encode_u32(tuple_info.meta.insert_txn_id)); - bytes.extend(CommonCodec::encode_u32(tuple_info.meta.delete_txn_id)); + bytes.extend(CommonCodec::encode_u64(tuple_info.meta.insert_txn_id)); + bytes.extend(CommonCodec::encode_u64(tuple_info.meta.delete_txn_id)); bytes.extend(CommonCodec::encode_bool(tuple_info.meta.is_deleted)); bytes } @@ -98,9 +98,9 @@ impl TablePageHeaderTupleInfoCodec { left_bytes = &left_bytes[offset..]; let (size, offset) = CommonCodec::decode_u16(left_bytes)?; left_bytes = &left_bytes[offset..]; - let (insert_txn_id, offset) = CommonCodec::decode_u32(left_bytes)?; + let (insert_txn_id, offset) = CommonCodec::decode_u64(left_bytes)?; left_bytes = &left_bytes[offset..]; - let (delete_txn_id, offset) = CommonCodec::decode_u32(left_bytes)?; + let (delete_txn_id, offset) = CommonCodec::decode_u64(left_bytes)?; left_bytes = &left_bytes[offset..]; let (is_deleted, offset) = CommonCodec::decode_bool(left_bytes)?; left_bytes = &left_bytes[offset..]; diff --git a/bustubx/src/storage/page/table_page.rs b/bustubx/src/storage/page/table_page.rs index ed71f64..66b65d5 100644 --- a/bustubx/src/storage/page/table_page.rs +++ b/bustubx/src/storage/page/table_page.rs @@ -1,7 +1,7 @@ use crate::buffer::{PageId, BUSTUBX_PAGE_SIZE, INVALID_PAGE_ID}; use crate::catalog::SchemaRef; -use crate::common::TransactionId; use crate::storage::codec::{TablePageHeaderCodec, TablePageHeaderTupleInfoCodec, TupleCodec}; +use crate::transaction::TransactionId; use crate::{BustubxError, BustubxResult, Tuple}; lazy_static::lazy_static! { diff --git a/bustubx/src/transaction/lock_manager.rs b/bustubx/src/transaction/lock_manager.rs new file mode 100644 index 0000000..d70085b --- /dev/null +++ b/bustubx/src/transaction/lock_manager.rs @@ -0,0 +1,60 @@ +use crate::common::TableReference; +use crate::storage::RecordId; +use crate::transaction::{Transaction, TransactionId}; +use std::collections::HashMap; + +#[derive(Debug)] +pub enum LockMode { + Shared, + Exclusive, + IntentionShared, + IntentionExclusive, + SharedIntentionExclusive, +} + +pub struct LockRequest { + txn_id: TransactionId, + lock_mod: LockMode, + table_ref: TableReference, + rid: Option, + granted: bool, +} + +pub struct LockManager { + table_lock_map: HashMap>, + row_lock_map: HashMap>, +} + +impl LockManager { + pub fn lock_table(&self, txn: Transaction, mode: LockMode, table_ref: TableReference) -> bool { + todo!() + } + + pub fn unlock_table(&self, txn: Transaction, table_ref: TableReference) -> bool { + todo!() + } + + pub fn lock_row( + &self, + txn: Transaction, + mode: LockMode, + table_ref: TableReference, + rid: RecordId, + ) -> bool { + todo!() + } + + pub fn unlock_row( + &self, + txn: Transaction, + table_ref: TableReference, + rid: RecordId, + force: bool, + ) -> bool { + todo!() + } + + pub fn unlock_all(&self) { + todo!() + } +} diff --git a/bustubx/src/transaction/mod.rs b/bustubx/src/transaction/mod.rs new file mode 100644 index 0000000..2edfe1e --- /dev/null +++ b/bustubx/src/transaction/mod.rs @@ -0,0 +1,9 @@ +mod lock_manager; +mod transaction; +mod transaction_manager; + +pub type TransactionId = u64; + +pub use lock_manager::*; +pub use transaction::*; +pub use transaction_manager::*; diff --git a/bustubx/src/transaction/transaction.rs b/bustubx/src/transaction/transaction.rs new file mode 100644 index 0000000..8952d63 --- /dev/null +++ b/bustubx/src/transaction/transaction.rs @@ -0,0 +1 @@ +pub struct Transaction {} diff --git a/bustubx/src/transaction/transaction_manager.rs b/bustubx/src/transaction/transaction_manager.rs new file mode 100644 index 0000000..e5491fb --- /dev/null +++ b/bustubx/src/transaction/transaction_manager.rs @@ -0,0 +1,23 @@ +use crate::transaction::Transaction; + +pub enum IsolationLevel { + ReadUncommitted, + SnapshotIsolation, + Serializable, +} + +pub struct TransactionManager {} + +impl TransactionManager { + pub fn begin(&self, isolation_level: IsolationLevel) -> Transaction { + todo!() + } + + pub fn commit(&self, txn: Transaction) -> bool { + todo!() + } + + pub fn abort(&self, txn: Transaction) { + todo!() + } +}