Skip to content

Commit

Permalink
Support information_schema.schemas table
Browse files Browse the repository at this point in the history
  • Loading branch information
lewiszlw committed Feb 27, 2024
1 parent 3fcb9da commit 304a4fc
Show file tree
Hide file tree
Showing 7 changed files with 185 additions and 118 deletions.
73 changes: 46 additions & 27 deletions bustubx/src/catalog/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::Arc;

use crate::catalog::{
SchemaRef, COLUMNS_SCHMEA, INFORMATION_SCHEMA_COLUMNS, INFORMATION_SCHEMA_NAME,
INFORMATION_SCHEMA_TABLES, TABLES_SCHMEA,
INFORMATION_SCHEMA_SCHEMAS, INFORMATION_SCHEMA_TABLES, SCHEMAS_SCHMEA, TABLES_SCHMEA,
};
use crate::common::TableReference;
use crate::storage::{TupleMeta, BPLUS_INTERNAL_PAGE_MAX_SIZE, BPLUS_LEAF_PAGE_MAX_SIZE};
Expand All @@ -17,11 +17,13 @@ use crate::{
pub static DEFAULT_CATALOG_NAME: &str = "bustubx";
pub static DEFAULT_SCHEMA_NAME: &str = "public";

#[derive(Debug)]
pub struct Catalog {
pub schemas: HashMap<String, CatalogSchema>,
pub buffer_pool: Arc<BufferPoolManager>,
}

#[derive(Debug)]
pub struct CatalogSchema {
pub name: String,
pub tables: HashMap<String, CatalogTable>,
Expand All @@ -36,6 +38,7 @@ impl CatalogSchema {
}
}

#[derive(Debug)]
pub struct CatalogTable {
pub name: String,
pub table: Arc<TableHeap>,
Expand All @@ -54,29 +57,50 @@ impl CatalogTable {

impl Catalog {
pub fn new(buffer_pool: Arc<BufferPoolManager>) -> Self {
// TODO should load from disk
let mut schemas = HashMap::new();
schemas.insert(
DEFAULT_SCHEMA_NAME.to_string(),
CatalogSchema {
name: DEFAULT_SCHEMA_NAME.to_string(),
tables: HashMap::new(),
},
);
Self {
schemas,
schemas: HashMap::new(),
buffer_pool,
}
}

pub fn create_schema(&mut self, scheme_name: String) -> BustubxResult<()> {
if self.schemas.contains_key(&scheme_name) {
pub fn create_schema(&mut self, schema_name: impl Into<String>) -> BustubxResult<()> {
let schema_name = schema_name.into();
if self.schemas.contains_key(&schema_name) {
return Err(BustubxError::Storage(
"Cannot create duplicated schema".to_string(),
));
}
self.schemas
.insert(scheme_name.clone(), CatalogSchema::new(scheme_name));
.insert(schema_name.clone(), CatalogSchema::new(schema_name.clone()));

// update system table
let Some(information_schema) = self.schemas.get_mut(INFORMATION_SCHEMA_NAME) else {
return Err(BustubxError::Internal(
"catalog schema information_schema not created yet".to_string(),
));
};
let Some(schemas_table) = information_schema
.tables
.get_mut(INFORMATION_SCHEMA_SCHEMAS)
else {
return Err(BustubxError::Internal(
"table information_schema.schemas not created yet".to_string(),
));
};

let tuple_meta = TupleMeta {
insert_txn_id: 0,
delete_txn_id: 0,
is_deleted: false,
};
let tuple = Tuple::new(
SCHEMAS_SCHMEA.clone(),
vec![
DEFAULT_CATALOG_NAME.to_string().into(),
schema_name.clone().into(),
],
);
schemas_table.table.insert_tuple(&tuple_meta, &tuple)?;
Ok(())
}

Expand Down Expand Up @@ -317,13 +341,10 @@ impl Catalog {
#[cfg(test)]
mod tests {
use std::sync::Arc;
use tempfile::TempDir;

use crate::common::TableReference;
use crate::{
buffer::BufferPoolManager,
catalog::{Column, DataType, Schema},
storage::DiskManager,
Database,
};

Expand Down Expand Up @@ -364,36 +385,34 @@ mod tests {

#[test]
pub fn test_catalog_create_index() {
let temp_dir = TempDir::new().unwrap();
let temp_path = temp_dir.path().join("test.db");

let disk_manager = DiskManager::try_new(temp_path).unwrap();
let buffer_pool = Arc::new(BufferPoolManager::new(1000, Arc::new(disk_manager)));
let mut catalog = super::Catalog::new(buffer_pool);
let mut db = Database::new_temp().unwrap();

let table_ref = TableReference::bare("test_table1");
let schema = Arc::new(Schema::new(vec![
Column::new("a", DataType::Int8, true),
Column::new("b", DataType::Int16, true),
Column::new("c", DataType::Int32, true),
]));
let _ = catalog.create_table(table_ref.clone(), schema.clone());
let _ = db.catalog.create_table(table_ref.clone(), schema.clone());

let index_name1 = "test_index1".to_string();
let key_schema1 = schema.project(&[0, 2]).unwrap();
let index1 = catalog
let index1 = db
.catalog
.create_index(index_name1.clone(), &table_ref, key_schema1.clone())
.unwrap();
assert_eq!(index1.key_schema, key_schema1);

let index_name2 = "test_index2".to_string();
let key_schema2 = schema.project(&[1]).unwrap();
let index2 = catalog
let index2 = db
.catalog
.create_index(index_name2.clone(), &table_ref, key_schema2.clone())
.unwrap();
assert_eq!(index2.key_schema, key_schema2);

let index3 = catalog
let index3 = db
.catalog
.index(&table_ref, index_name1.as_str())
.unwrap()
.unwrap();
Expand Down
178 changes: 107 additions & 71 deletions bustubx/src/catalog/information.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use crate::buffer::{AtomicPageId, PageId, INVALID_PAGE_ID};
use crate::catalog::catalog::{CatalogSchema, CatalogTable};
use crate::catalog::{Catalog, Column, DataType, Schema, SchemaRef, DEFAULT_CATALOG_NAME};
use crate::catalog::{
Catalog, Column, DataType, Schema, SchemaRef, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME,
};
use crate::common::{ScalarValue, TableReference};
use crate::storage::codec::TablePageCodec;
use crate::storage::TableHeap;
Expand All @@ -9,10 +11,16 @@ use std::collections::HashMap;
use std::sync::Arc;

pub static INFORMATION_SCHEMA_NAME: &str = "information_schema";
pub static INFORMATION_SCHEMA_SCHEMAS: &str = "schemas";
pub static INFORMATION_SCHEMA_TABLES: &str = "tables";
pub static INFORMATION_SCHEMA_COLUMNS: &str = "columns";

lazy_static::lazy_static! {
pub static ref SCHEMAS_SCHMEA: SchemaRef = Arc::new(Schema::new(vec![
Column::new("catalog", DataType::Varchar(None), false),
Column::new("schema", DataType::Varchar(None), false),
]));

pub static ref TABLES_SCHMEA: SchemaRef = Arc::new(Schema::new(vec![
Column::new("table_catalog", DataType::Varchar(None), false),
Column::new("table_schema", DataType::Varchar(None), false),
Expand Down Expand Up @@ -45,10 +53,104 @@ lazy_static::lazy_static! {

pub fn load_catalog_data(db: &mut Database) -> BustubxResult<()> {
load_information_schema(&mut db.catalog)?;
load_schemas(db)?;
create_default_schema_if_not_exists(&mut db.catalog)?;
load_user_tables(db)?;
Ok(())
}

fn create_default_schema_if_not_exists(catalog: &mut Catalog) -> BustubxResult<()> {
if !catalog.schemas.contains_key(DEFAULT_SCHEMA_NAME) {
catalog.create_schema(DEFAULT_SCHEMA_NAME)?;
}
Ok(())
}

fn load_information_schema(catalog: &mut Catalog) -> BustubxResult<()> {
let meta = catalog.buffer_pool.disk_manager.meta.read().unwrap();
let information_schema_schemas_first_page_id = meta.information_schema_schemas_first_page_id;
let information_schema_tables_first_page_id = meta.information_schema_tables_first_page_id;
let information_schema_columns_first_page_id = meta.information_schema_columns_first_page_id;
drop(meta);

// load last page id
let information_schema_schemas_last_page_id = load_table_last_page_id(
catalog,
information_schema_schemas_first_page_id,
SCHEMAS_SCHMEA.clone(),
)?;
let information_schema_tables_last_page_id = load_table_last_page_id(
catalog,
information_schema_tables_first_page_id,
TABLES_SCHMEA.clone(),
)?;
let information_schema_columns_last_page_id = load_table_last_page_id(
catalog,
information_schema_columns_first_page_id,
COLUMNS_SCHMEA.clone(),
)?;

let mut information_schema = CatalogSchema::new(INFORMATION_SCHEMA_NAME);

let schemas_table = TableHeap {
schema: SCHEMAS_SCHMEA.clone(),
buffer_pool: catalog.buffer_pool.clone(),
first_page_id: AtomicPageId::new(information_schema_schemas_first_page_id),
last_page_id: AtomicPageId::new(information_schema_schemas_last_page_id),
};
information_schema.tables.insert(
INFORMATION_SCHEMA_SCHEMAS.to_string(),
CatalogTable::new(INFORMATION_SCHEMA_SCHEMAS, Arc::new(schemas_table)),
);

let tables_table = TableHeap {
schema: TABLES_SCHMEA.clone(),
buffer_pool: catalog.buffer_pool.clone(),
first_page_id: AtomicPageId::new(information_schema_tables_first_page_id),
last_page_id: AtomicPageId::new(information_schema_tables_last_page_id),
};
information_schema.tables.insert(
INFORMATION_SCHEMA_TABLES.to_string(),
CatalogTable::new(INFORMATION_SCHEMA_TABLES, Arc::new(tables_table)),
);

let columns_table = TableHeap {
schema: COLUMNS_SCHMEA.clone(),
buffer_pool: catalog.buffer_pool.clone(),
first_page_id: AtomicPageId::new(information_schema_columns_first_page_id),
last_page_id: AtomicPageId::new(information_schema_columns_last_page_id),
};
information_schema.tables.insert(
INFORMATION_SCHEMA_COLUMNS.to_string(),
CatalogTable::new(INFORMATION_SCHEMA_COLUMNS, Arc::new(columns_table)),
);

catalog.load_schema(INFORMATION_SCHEMA_NAME, information_schema);
Ok(())
}

fn load_schemas(db: &mut Database) -> BustubxResult<()> {
let schema_tuples = db.run(&format!(
"select * from {}.{}",
INFORMATION_SCHEMA_NAME, INFORMATION_SCHEMA_SCHEMAS
))?;
for schema_tuple in schema_tuples.into_iter() {
let error = Err(BustubxError::Internal(format!(
"Failed to decode schema tuple: {:?}",
schema_tuple,
)));
let ScalarValue::Varchar(Some(_catalog)) = schema_tuple.value(0)? else {
return error;
};
let ScalarValue::Varchar(Some(schema_name)) = schema_tuple.value(1)? else {
return error;
};
db.catalog
.load_schema(schema_name, CatalogSchema::new(schema_name));
}
Ok(())
}

fn load_user_tables(db: &mut Database) -> BustubxResult<()> {
let table_tuples = db.run(&format!(
"select * from {}.{}",
Expand Down Expand Up @@ -97,86 +199,20 @@ fn load_user_tables(db: &mut Database) -> BustubxResult<()> {
}
let schema = Arc::new(Schema::new(columns));

let Some(catalog_schema) = db.catalog.schemas.get_mut(catalog.as_str()) else {
return Err(BustubxError::Storage(format!(
"catalog schema {} not created yet",
catalog
)));
};
let table_heap = TableHeap {
schema: schema.clone(),
buffer_pool: db.buffer_pool.clone(),
first_page_id: AtomicPageId::new(*first_page_id),
last_page_id: AtomicPageId::new(*last_page_id),
};
let catalog_table = CatalogTable {
name: table_name.clone(),
table: Arc::new(table_heap),
indexes: HashMap::new(),
};
catalog_schema
.tables
.insert(table_name.clone(), catalog_table);
db.catalog.load_table(
TableReference::full(catalog, table_schema, table_name),
CatalogTable::new(table_name, Arc::new(table_heap)),
)?;
}
Ok(())
}

fn load_information_schema(catalog: &mut Catalog) -> BustubxResult<()> {
let meta = catalog.buffer_pool.disk_manager.meta.read().unwrap();
let information_schema_tables_first_page_id = meta.information_schema_tables_first_page_id;
let information_schema_columns_first_page_id = meta.information_schema_columns_first_page_id;
drop(meta);

// load last page id
let information_schema_tables_last_page_id = load_table_last_page_id(
catalog,
information_schema_tables_first_page_id,
TABLES_SCHMEA.clone(),
)?;
let information_schema_columns_last_page_id = load_table_last_page_id(
catalog,
information_schema_columns_first_page_id,
COLUMNS_SCHMEA.clone(),
)?;

let mut information_schema = CatalogSchema::new(INFORMATION_SCHEMA_NAME);

let tables_table = TableHeap {
schema: TABLES_SCHMEA.clone(),
buffer_pool: catalog.buffer_pool.clone(),
first_page_id: AtomicPageId::new(information_schema_tables_first_page_id),
last_page_id: AtomicPageId::new(information_schema_tables_last_page_id),
};
let catalog_table = CatalogTable {
name: INFORMATION_SCHEMA_TABLES.to_string(),
table: Arc::new(tables_table),
indexes: HashMap::new(),
};
information_schema
.tables
.insert(INFORMATION_SCHEMA_TABLES.to_string(), catalog_table);

let columns_table = TableHeap {
schema: COLUMNS_SCHMEA.clone(),
buffer_pool: catalog.buffer_pool.clone(),
first_page_id: AtomicPageId::new(information_schema_columns_first_page_id),
last_page_id: AtomicPageId::new(information_schema_columns_last_page_id),
};
let catalog_table = CatalogTable {
name: INFORMATION_SCHEMA_TABLES.to_string(),
table: Arc::new(columns_table),
indexes: HashMap::new(),
};
information_schema
.tables
.insert(INFORMATION_SCHEMA_COLUMNS.to_string(), catalog_table);

catalog.load_schema(INFORMATION_SCHEMA_NAME, information_schema);

// TODO load schemas
Ok(())
}

fn load_table_last_page_id(
catalog: &mut Catalog,
first_page_id: PageId,
Expand Down
1 change: 0 additions & 1 deletion bustubx/src/planner/physical_planner/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ impl PhysicalPlanner<'_> {
.get(table_ref.table())
{
if !catalog_table.indexes.is_empty() {
println!("LWZTEST create index scan");
PhysicalPlan::IndexScan(PhysicalIndexScan::new(
table_ref.clone(),
catalog_table.indexes.keys().next().unwrap().clone(),
Expand Down
Loading

0 comments on commit 304a4fc

Please sign in to comment.