Skip to content

Commit

Permalink
Add information_schema.indexes
Browse files Browse the repository at this point in the history
  • Loading branch information
lewiszlw committed Feb 28, 2024
1 parent a35408f commit d0b784b
Show file tree
Hide file tree
Showing 14 changed files with 241 additions and 56 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@
- [x] Buffer Pool
- [x] Table Heap
- [x] System Metadata (information_schema)
- [ ] Parallel Execution
- [x] B+ Tree Index
- [ ] Parallel Execution
- [ ] Two Phase Locking
- [ ] Multi-Version Concurrency Control
- [ ] Crash Recovery

Expand All @@ -36,6 +37,7 @@ RUST_LOG=info,bustubx=debug cargo run --bin bustubx-cli
- [Fedomn/sqlrs](https://github.com/Fedomn/sqlrs) and [blogs](https://frankma.me/categories/sqlrs/)
- [KipData/KipSQL](https://github.com/KipData/KipSQL)
- [talent-plan/tinysql](https://github.com/talent-plan/tinysql)
- [arrow-datafusion](https://github.com/apache/arrow-datafusion)
- [CMU 15-445课程笔记-zhenghe](https://zhenghe.gitbook.io/open-courses/cmu-15-445-645-database-systems/relational-data-model)
- [CMU15-445 22Fall通关记录 - 知乎](https://www.zhihu.com/column/c_1605901992903004160)
- [B+ Tree Visualization](https://www.cs.usfca.edu/~galles/visualization/BPlusTree.html)
74 changes: 68 additions & 6 deletions bustubx/src/catalog/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ use std::sync::atomic::Ordering;
use std::sync::Arc;

use crate::catalog::{
SchemaRef, COLUMNS_SCHMEA, INFORMATION_SCHEMA_COLUMNS, INFORMATION_SCHEMA_NAME,
INFORMATION_SCHEMA_SCHEMAS, INFORMATION_SCHEMA_TABLES, SCHEMAS_SCHMEA, TABLES_SCHMEA,
key_schema_to_varchar, SchemaRef, COLUMNS_SCHMEA, INDEXES_SCHMEA, INFORMATION_SCHEMA_COLUMNS,
INFORMATION_SCHEMA_INDEXES, INFORMATION_SCHEMA_NAME, INFORMATION_SCHEMA_SCHEMAS,
INFORMATION_SCHEMA_TABLES, SCHEMAS_SCHMEA, TABLES_SCHMEA,
};
use crate::common::TableReference;
use crate::storage::{BPLUS_INTERNAL_PAGE_MAX_SIZE, BPLUS_LEAF_PAGE_MAX_SIZE, EMPTY_TUPLE_META};
Expand Down Expand Up @@ -246,6 +247,10 @@ impl Catalog {
table_ref: &TableReference,
key_schema: SchemaRef,
) -> BustubxResult<Arc<BPlusTreeIndex>> {
let catalog_name = table_ref
.catalog()
.unwrap_or(DEFAULT_CATALOG_NAME)
.to_string();
let catalog_schema_name = table_ref
.schema()
.unwrap_or(DEFAULT_SCHEMA_NAME)
Expand Down Expand Up @@ -273,12 +278,45 @@ impl Catalog {
let b_plus_tree_index = Arc::new(BPlusTreeIndex::new(
key_schema.clone(),
self.buffer_pool.clone(),
BPLUS_LEAF_PAGE_MAX_SIZE as u32,
BPLUS_INTERNAL_PAGE_MAX_SIZE as u32,
BPLUS_LEAF_PAGE_MAX_SIZE as u32,
));
catalog_table
.indexes
.insert(index_name, b_plus_tree_index.clone());
.insert(index_name.clone(), b_plus_tree_index.clone());

// update system table
let Some(information_schema) = self.schemas.get_mut(INFORMATION_SCHEMA_NAME) else {
return Err(BustubxError::Internal(
"catalog schema information_schema not created yet".to_string(),
));
};
let Some(indexes_table) = information_schema
.tables
.get_mut(INFORMATION_SCHEMA_INDEXES)
else {
return Err(BustubxError::Internal(
"table information_schema.indexes not created yet".to_string(),
));
};

let tuple = Tuple::new(
INDEXES_SCHMEA.clone(),
vec![
catalog_name.clone().into(),
catalog_schema_name.clone().into(),
table_name.clone().into(),
index_name.clone().into(),
key_schema_to_varchar(&b_plus_tree_index.key_schema).into(),
b_plus_tree_index.internal_max_size.into(),
b_plus_tree_index.leaf_max_size.into(),
b_plus_tree_index.root_page_id.load(Ordering::SeqCst).into(),
],
);
println!("LWZTEST tuple: {:?}", tuple);
indexes_table
.table
.insert_tuple(&EMPTY_TUPLE_META, &tuple)?;

Ok(b_plus_tree_index)
}
Expand Down Expand Up @@ -329,6 +367,30 @@ impl Catalog {
catalog_schema.tables.insert(table_name, table);
Ok(())
}

pub fn load_index(
&mut self,
table_ref: TableReference,
index_name: impl Into<String>,
index: Arc<BPlusTreeIndex>,
) -> BustubxResult<()> {
let catalog_schema_name = table_ref.schema().unwrap_or(DEFAULT_SCHEMA_NAME);
let table_name = table_ref.table().to_string();
let Some(catalog_schema) = self.schemas.get_mut(catalog_schema_name) else {
return Err(BustubxError::Storage(format!(
"catalog schema {} not created yet",
catalog_schema_name
)));
};
let Some(catalog_table) = catalog_schema.tables.get_mut(&table_name) else {
return Err(BustubxError::Storage(format!(
"catalog table {} not created yet",
table_name
)));
};
catalog_table.indexes.insert(index_name.into(), index);
Ok(())
}
}

#[cfg(test)]
Expand Down Expand Up @@ -389,15 +451,15 @@ mod tests {
let _ = db.catalog.create_table(table_ref.clone(), schema.clone());

let index_name1 = "test_index1".to_string();
let key_schema1 = schema.project(&[0, 2]).unwrap();
let key_schema1 = Arc::new(schema.project(&[0, 2]).unwrap());
let index1 = db
.catalog
.create_index(index_name1.clone(), &table_ref, key_schema1.clone())
.unwrap();
assert_eq!(index1.key_schema, key_schema1);

let index_name2 = "test_index2".to_string();
let key_schema2 = schema.project(&[1]).unwrap();
let key_schema2 = Arc::new(schema.project(&[1]).unwrap());
let index2 = db
.catalog
.create_index(index_name2.clone(), &table_ref, key_schema2.clone())
Expand Down
108 changes: 108 additions & 0 deletions bustubx/src/catalog/information.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ use crate::storage::codec::TablePageCodec;
use crate::storage::TableHeap;
use crate::{BustubxError, BustubxResult, Database};

use crate::storage::index::BPlusTreeIndex;
use std::sync::Arc;

pub static INFORMATION_SCHEMA_NAME: &str = "information_schema";
pub static INFORMATION_SCHEMA_SCHEMAS: &str = "schemas";
pub static INFORMATION_SCHEMA_TABLES: &str = "tables";
pub static INFORMATION_SCHEMA_COLUMNS: &str = "columns";
pub static INFORMATION_SCHEMA_INDEXES: &str = "indexes";

lazy_static::lazy_static! {
pub static ref SCHEMAS_SCHMEA: SchemaRef = Arc::new(Schema::new(vec![
Expand All @@ -36,13 +38,25 @@ lazy_static::lazy_static! {
Column::new("data_type", DataType::Varchar(None), false),
Column::new("nullable", DataType::Boolean, false),
]));

pub static ref INDEXES_SCHMEA: SchemaRef = Arc::new(Schema::new(vec![
Column::new("table_catalog", DataType::Varchar(None), false),
Column::new("table_schema", DataType::Varchar(None), false),
Column::new("table_name", DataType::Varchar(None), false),
Column::new("index_name", DataType::Varchar(None), false),
Column::new("key_schema", DataType::Varchar(None), false),
Column::new("internal_max_size", DataType::UInt32, false),
Column::new("leaf_max_size", DataType::UInt32, false),
Column::new("root_page_id", DataType::UInt32, false),
]));
}

pub fn load_catalog_data(db: &mut Database) -> BustubxResult<()> {
load_information_schema(&mut db.catalog)?;
load_schemas(db)?;
create_default_schema_if_not_exists(&mut db.catalog)?;
load_user_tables(db)?;
load_user_indexes(db)?;
Ok(())
}

Expand All @@ -58,6 +72,7 @@ fn load_information_schema(catalog: &mut Catalog) -> BustubxResult<()> {
let information_schema_schemas_first_page_id = meta.information_schema_schemas_first_page_id;
let information_schema_tables_first_page_id = meta.information_schema_tables_first_page_id;
let information_schema_columns_first_page_id = meta.information_schema_columns_first_page_id;
let information_schema_indexes_first_page_id = meta.information_schema_indexes_first_page_id;
drop(meta);

// load last page id
Expand All @@ -76,6 +91,11 @@ fn load_information_schema(catalog: &mut Catalog) -> BustubxResult<()> {
information_schema_columns_first_page_id,
COLUMNS_SCHMEA.clone(),
)?;
let information_schema_indexes_last_page_id = load_table_last_page_id(
catalog,
information_schema_indexes_first_page_id,
INDEXES_SCHMEA.clone(),
)?;

let mut information_schema = CatalogSchema::new(INFORMATION_SCHEMA_NAME);

Expand Down Expand Up @@ -112,6 +132,17 @@ fn load_information_schema(catalog: &mut Catalog) -> BustubxResult<()> {
CatalogTable::new(INFORMATION_SCHEMA_COLUMNS, Arc::new(columns_table)),
);

let indexes_table = TableHeap {
schema: INDEXES_SCHMEA.clone(),
buffer_pool: catalog.buffer_pool.clone(),
first_page_id: AtomicPageId::new(information_schema_indexes_first_page_id),
last_page_id: AtomicPageId::new(information_schema_indexes_last_page_id),
};
information_schema.tables.insert(
INFORMATION_SCHEMA_INDEXES.to_string(),
CatalogTable::new(INFORMATION_SCHEMA_INDEXES, Arc::new(indexes_table)),
);

catalog.load_schema(INFORMATION_SCHEMA_NAME, information_schema);
Ok(())
}
Expand Down Expand Up @@ -200,6 +231,61 @@ fn load_user_tables(db: &mut Database) -> BustubxResult<()> {
Ok(())
}

fn load_user_indexes(db: &mut Database) -> BustubxResult<()> {
let index_tuples = db.run(&format!(
"select * from {}.{}",
INFORMATION_SCHEMA_NAME, INFORMATION_SCHEMA_INDEXES
))?;
for index_tuple in index_tuples.into_iter() {
let error = Err(BustubxError::Internal(format!(
"Failed to decode index tuple: {:?}",
index_tuple
)));
let ScalarValue::Varchar(Some(catalog_name)) = index_tuple.value(0)? else {
return error;
};
let ScalarValue::Varchar(Some(table_schema_name)) = index_tuple.value(1)? else {
return error;
};
let ScalarValue::Varchar(Some(table_name)) = index_tuple.value(2)? else {
return error;
};
let ScalarValue::Varchar(Some(index_name)) = index_tuple.value(3)? else {
return error;
};
let ScalarValue::Varchar(Some(key_schema_str)) = index_tuple.value(4)? else {
return error;
};
let ScalarValue::UInt32(Some(internal_max_size)) = index_tuple.value(5)? else {
return error;
};
let ScalarValue::UInt32(Some(leaf_max_size)) = index_tuple.value(6)? else {
return error;
};
let ScalarValue::UInt32(Some(root_page_id)) = index_tuple.value(7)? else {
return error;
};

let table_ref = TableReference::full(catalog_name, table_schema_name, table_name);
let table_schema = db.catalog.table_heap(&table_ref)?.schema.clone();
let key_schema = Arc::new(parse_key_schema_from_varchar(
key_schema_str.as_str(),
table_schema,
)?);

let b_plus_tree_index = BPlusTreeIndex {
key_schema,
buffer_pool: db.buffer_pool.clone(),
internal_max_size: *internal_max_size,
leaf_max_size: *leaf_max_size,
root_page_id: AtomicPageId::new(*root_page_id),
};
db.catalog
.load_index(table_ref, index_name, Arc::new(b_plus_tree_index))?;
}
Ok(())
}

fn load_table_last_page_id(
catalog: &mut Catalog,
first_page_id: PageId,
Expand All @@ -217,3 +303,25 @@ fn load_table_last_page_id(
}
}
}

pub fn key_schema_to_varchar(key_schema: &Schema) -> String {
key_schema
.columns
.iter()
.map(|col| col.name.as_str())
.collect::<Vec<_>>()
.join(", ")
}

fn parse_key_schema_from_varchar(varchar: &str, table_schema: SchemaRef) -> BustubxResult<Schema> {
let column_names = varchar
.split(",")
.into_iter()
.map(|name| name.trim())
.collect::<Vec<&str>>();
let indices = column_names
.into_iter()
.map(|name| table_schema.index_of(None, name))
.collect::<BustubxResult<Vec<usize>>>()?;
table_schema.project(&indices)
}
6 changes: 3 additions & 3 deletions bustubx/src/catalog/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ impl Schema {
Ok(Self { columns })
}

pub fn project(&self, indices: &[usize]) -> BustubxResult<SchemaRef> {
pub fn project(&self, indices: &[usize]) -> BustubxResult<Schema> {
let new_columns = indices
.iter()
.map(|i| self.column_with_index(*i))
.collect::<BustubxResult<Vec<ColumnRef>>>()?;
Ok(Arc::new(Schema {
Ok(Schema {
columns: new_columns,
}))
})
}

pub fn column_with_name(
Expand Down
3 changes: 2 additions & 1 deletion bustubx/src/execution/physical_plan/create_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::{
storage::Tuple,
BustubxError, BustubxResult,
};
use std::sync::Arc;

#[derive(Debug, derive_new::new)]
pub struct PhysicalCreateIndex {
Expand All @@ -32,7 +33,7 @@ impl VolcanoExecutor for PhysicalCreateIndex {
}
}
}
let key_schema = self.table_schema.project(&key_indices)?;
let key_schema = Arc::new(self.table_schema.project(&key_indices)?);
context
.catalog
.create_index(self.name.clone(), &self.table, key_schema)?;
Expand Down
1 change: 1 addition & 0 deletions bustubx/src/execution/physical_plan/index_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ impl VolcanoExecutor for PhysicalIndexScan {
}

fn next(&self, context: &mut ExecutionContext) -> BustubxResult<Option<Tuple>> {
println!("LWZTEST index_scan");
let mut guard = self.iterator.lock().unwrap();
let Some(iterator) = &mut *guard else {
return Err(BustubxError::Execution(
Expand Down
1 change: 1 addition & 0 deletions bustubx/src/execution/physical_plan/seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ impl VolcanoExecutor for PhysicalSeqScan {
}

fn next(&self, _context: &mut ExecutionContext) -> BustubxResult<Option<Tuple>> {
println!("LWZTEST seq_scan");
let Some(iterator) = &mut *self.iterator.lock().unwrap() else {
return Err(BustubxError::Execution(
"table iterator not created".to_string(),
Expand Down
2 changes: 1 addition & 1 deletion bustubx/src/planner/logical_planner/plan_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ impl<'a> LogicalPlanner<'a> {
.map(|name| table_schema.index_of(Some(&table), name.as_str()))
.collect::<BustubxResult<Vec<usize>>>()?;

table_schema.project(&indices)?
Arc::new(table_schema.project(&indices)?)
};

Ok(LogicalPlan::Insert(Insert {
Expand Down
7 changes: 7 additions & 0 deletions bustubx/src/storage/codec/meta_page.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ impl MetaPageCodec {
bytes.extend(CommonCodec::encode_u32(
page.information_schema_columns_first_page_id,
));
bytes.extend(CommonCodec::encode_u32(
page.information_schema_indexes_first_page_id,
));
bytes
}

Expand All @@ -40,6 +43,9 @@ impl MetaPageCodec {
let (information_schema_columns_first_page_id, offset) =
CommonCodec::decode_u32(left_bytes)?;
left_bytes = &left_bytes[offset..];
let (information_schema_indexes_first_page_id, offset) =
CommonCodec::decode_u32(left_bytes)?;
left_bytes = &left_bytes[offset..];

Ok((
MetaPage {
Expand All @@ -49,6 +55,7 @@ impl MetaPageCodec {
information_schema_schemas_first_page_id,
information_schema_tables_first_page_id,
information_schema_columns_first_page_id,
information_schema_indexes_first_page_id,
},
bytes.len() - left_bytes.len(),
))
Expand Down
Loading

0 comments on commit d0b784b

Please sign in to comment.