Skip to content

Commit

Permalink
Add more check
Browse files Browse the repository at this point in the history
  • Loading branch information
lewiszlw committed Feb 28, 2024
1 parent 5b724b9 commit 9aff00f
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 42 deletions.
25 changes: 18 additions & 7 deletions bustubx/src/catalog/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,32 +21,43 @@ pub struct Schema {

impl Schema {
pub fn new(columns: Vec<Column>) -> Self {
Self {
columns: columns.into_iter().map(Arc::new).collect(),
Self::new_with_check(columns.into_iter().map(Arc::new).collect())
}

fn new_with_check(columns: Vec<ColumnRef>) -> Self {
for (idx1, col1) in columns.iter().enumerate() {
for idx2 in idx1 + 1..columns.len() {
let col2 = &columns[idx2];
match (&col1.relation, &col2.relation) {
(Some(rel1), Some(rel2)) => {
assert_ne!(rel1.resolved_eq(rel2) && col1.name == col2.name, true)
}
(None, None) => assert_ne!(col1.name, col2.name),
(Some(_), None) | (None, Some(_)) => {}
}
}
}
Self { columns }
}

pub fn empty() -> Self {
Self { columns: vec![] }
}

pub fn try_merge(schemas: impl IntoIterator<Item = Self>) -> BustubxResult<Self> {
// TODO check column conflict
let mut columns = Vec::new();
for schema in schemas {
columns.extend(schema.columns);
}
Ok(Self { columns })
Ok(Self::new_with_check(columns))
}

pub fn project(&self, indices: &[usize]) -> BustubxResult<Schema> {
let new_columns = indices
.iter()
.map(|i| self.column_with_index(*i))
.collect::<BustubxResult<Vec<ColumnRef>>>()?;
Ok(Schema {
columns: new_columns,
})
Ok(Schema::new_with_check(new_columns))
}

pub fn column_with_name(
Expand Down
15 changes: 9 additions & 6 deletions bustubx/src/execution/physical_plan/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,21 +43,19 @@ impl VolcanoExecutor for PhysicalInsert {
fn init(&self, context: &mut ExecutionContext) -> BustubxResult<()> {
debug!("init insert executor");
self.input.init(context)?;
self.insert_rows
.store(0, std::sync::atomic::Ordering::SeqCst);
self.insert_rows.store(0, Ordering::SeqCst);
Ok(())
}
fn next(&self, context: &mut ExecutionContext) -> BustubxResult<Option<Tuple>> {
loop {
let next_tuple = self.input.next(context)?;
if next_tuple.is_none() {
// only return insert_rows when input exhausted
return if self.insert_rows.load(std::sync::atomic::Ordering::SeqCst) == 0 {
return if self.insert_rows.load(Ordering::SeqCst) == 0 {
Ok(None)
} else {
let insert_rows = self.insert_rows.load(std::sync::atomic::Ordering::SeqCst);
self.insert_rows
.store(0, std::sync::atomic::Ordering::SeqCst);
let insert_rows = self.insert_rows.load(Ordering::SeqCst);
self.insert_rows.store(0, Ordering::SeqCst);
Ok(Some(Tuple::new(
self.output_schema(),
vec![ScalarValue::Int32(Some(insert_rows as i32))],
Expand Down Expand Up @@ -85,7 +83,12 @@ impl VolcanoExecutor for PhysicalInsert {
let indexes = context.catalog.table_indexes(&self.table)?;
for index in indexes {
if let Ok(key_tuple) = tuple.project_with_schema(index.key_schema.clone()) {
let root_page_id = index.root_page_id.load(Ordering::SeqCst);
index.insert(&key_tuple, rid)?;
let new_root_page_id = index.root_page_id.load(Ordering::SeqCst);
if new_root_page_id != root_page_id {
// TODO update system table
}
}
}

Expand Down
10 changes: 9 additions & 1 deletion bustubx/src/execution/physical_plan/values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,15 @@ impl VolcanoExecutor for PhysicalValues {
.iter()
.map(|e| e.evaluate(&Tuple::empty(Arc::new(Schema::empty()))))
.collect::<BustubxResult<Vec<ScalarValue>>>()?;
Ok(Some(Tuple::new(self.output_schema(), values)))
debug_assert_eq!(self.schema.column_count(), values.len());

let casted_values = values
.iter()
.zip(self.schema.columns.iter())
.map(|(val, col)| val.cast_to(&col.data_type))
.collect::<BustubxResult<Vec<ScalarValue>>>()?;

Ok(Some(Tuple::new(self.output_schema(), casted_values)))
} else {
Ok(None)
}
Expand Down
13 changes: 10 additions & 3 deletions bustubx/src/planner/logical_planner/plan_insert.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::BustubxResult;
use std::sync::Arc;

use crate::planner::logical_plan::{Insert, LogicalPlan};
use crate::planner::logical_plan::{Insert, LogicalPlan, Values};

use super::LogicalPlanner;

Expand All @@ -12,7 +12,7 @@ impl<'a> LogicalPlanner<'a> {
columns_ident: &Vec<sqlparser::ast::Ident>,
source: &sqlparser::ast::Query,
) -> BustubxResult<LogicalPlan> {
let values = self.plan_set_expr(source.body.as_ref())?;
let mut input = self.plan_set_expr(source.body.as_ref())?;
let table = self.bind_table_name(table_name)?;
let table_schema = self.context.catalog.table_heap(&table)?.schema.clone();

Expand All @@ -31,11 +31,18 @@ impl<'a> LogicalPlanner<'a> {
Arc::new(table_schema.project(&indices)?)
};

if let LogicalPlan::Values(Values { values, .. }) = input {
input = LogicalPlan::Values(Values {
values,
schema: projected_schema.clone(),
})
}

Ok(LogicalPlan::Insert(Insert {
table,
table_schema,
projected_schema,
input: Arc::new(values),
input: Arc::new(input),
}))
}
}
20 changes: 2 additions & 18 deletions bustubx/src/planner/logical_planner/plan_set_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,26 +300,10 @@ impl LogicalPlanner<'_> {
}
result.push(record);
}
if result.is_empty() {
return Ok(LogicalPlan::Values(Values {
schema: Arc::new(Schema::empty()),
values: vec![],
}));
}

// parse schema
let first_row = &result[0];
let mut columns = vec![];
for (idx, item) in first_row.iter().enumerate() {
columns.push(Column::new(
idx.to_string(),
item.data_type(&EMPTY_SCHEMA_REF)?,
item.nullable(&EMPTY_SCHEMA_REF)?,
))
}

// schema will be replaced later
Ok(LogicalPlan::Values(Values {
schema: Arc::new(Schema::new(columns)),
schema: Arc::new(Schema::empty()),
values: result,
}))
}
Expand Down
13 changes: 6 additions & 7 deletions bustubx/src/storage/tuple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,12 @@ pub struct Tuple {
impl Tuple {
pub fn new(schema: SchemaRef, data: Vec<ScalarValue>) -> Self {
debug_assert_eq!(schema.columns.len(), data.len());
// TODO enable
// debug_assert!(schema
// .columns
// .iter()
// .zip(data.iter())
// .find(|(col, val)| ScalarValue::new_empty(col.data_type).data_type() != val.data_type())
// .is_none());
debug_assert!(schema
.columns
.iter()
.zip(data.iter())
.find(|(col, val)| ScalarValue::new_empty(col.data_type).data_type() != val.data_type())
.is_none());
Self { schema, data }
}

Expand Down

0 comments on commit 9aff00f

Please sign in to comment.