Skip to content

Commit

Permalink
Fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
lewiszlw committed Feb 1, 2024
1 parent e44cd5f commit 52c65a4
Show file tree
Hide file tree
Showing 9 changed files with 136 additions and 33 deletions.
1 change: 1 addition & 0 deletions bustubx/src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ impl PartialEq for Column {
}

impl Column {
// TODO set nullable
pub fn new(name: String, data_type: DataType) -> Self {
Self {
name,
Expand Down
37 changes: 35 additions & 2 deletions bustubx/src/catalog/schema.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::column::{Column, ColumnRef};
use crate::error::BustubxResult;
use crate::BustubxError;
use std::sync::Arc;

pub type SchemaRef = Arc<Schema>;
Expand Down Expand Up @@ -29,6 +30,24 @@ impl Schema {
Ok(Self { columns })
}

pub fn project(&self, indices: &[usize]) -> BustubxResult<SchemaRef> {
let new_columns = indices
.iter()
.map(|i| {
self.get_col_by_index(*i).ok_or_else(|| {
BustubxError::Plan(format!(
"project index {} out of bounds, max column count {}",
i,
self.columns.len(),
))
})
})
.collect::<BustubxResult<Vec<ColumnRef>>>()?;
Ok(Arc::new(Schema {
columns: new_columns,
}))
}

pub fn copy_schema(from: SchemaRef, key_attrs: &[u32]) -> Self {
let columns = key_attrs
.iter()
Expand All @@ -45,8 +64,22 @@ impl Schema {
self.columns.get(index).cloned()
}

pub fn get_index_by_name(&self, col_name: &String) -> Option<usize> {
self.columns.iter().position(|c| &c.name == col_name)
pub fn column_with_index(&self, index: usize) -> BustubxResult<ColumnRef> {
self.columns
.get(index)
.cloned()
.ok_or_else(|| BustubxError::Plan(format!("Unable to get column with index {index}")))
}

/// Find the index of the column with the given name.
pub fn index_of(&self, name: &str) -> BustubxResult<usize> {
let (idx, _) = self
.columns
.iter()
.enumerate()
.find(|(_, col)| &col.name == name)
.ok_or_else(|| BustubxError::Plan(format!("Unable to get column named \"{name}\"")))?;
Ok(idx)
}

pub fn fixed_len(&self) -> usize {
Expand Down
41 changes: 25 additions & 16 deletions bustubx/src/common/scalar.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::catalog::DataType;
use crate::BustubxResult;
use crate::{BustubxError, BustubxResult};

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ScalarValue {
Expand Down Expand Up @@ -59,21 +59,6 @@ impl ScalarValue {
}
}

pub fn from_sqlparser_value(value: &sqlparser::ast::Value, data_type: DataType) -> Self {
match value {
sqlparser::ast::Value::Number(v, _) => match data_type {
DataType::Int8 => Self::Int8(Some(v.parse::<i8>().unwrap())),
DataType::Int16 => Self::Int16(Some(v.parse::<i16>().unwrap())),
DataType::Int32 => Self::Int32(Some(v.parse::<i32>().unwrap())),
DataType::Int64 => Self::Int64(Some(v.parse::<i64>().unwrap())),
DataType::UInt64 => Self::UInt64(Some(v.parse::<u64>().unwrap())),
_ => panic!("Not implemented"),
},
sqlparser::ast::Value::Boolean(b) => ScalarValue::Boolean(Some(*b)),
_ => unreachable!(),
}
}

// TODO compare value with different data type
pub fn compare(&self, other: &Self) -> std::cmp::Ordering {
match self {
Expand Down Expand Up @@ -136,6 +121,30 @@ impl ScalarValue {
ScalarValue::UInt64(v) => v.is_none(),
}
}

/// Try to cast this value to a ScalarValue of type `data_type`
pub fn cast_to(&self, data_type: &DataType) -> BustubxResult<Self> {
match data_type {
DataType::Boolean => match self {
ScalarValue::Boolean(v) => Ok(ScalarValue::Boolean(v.clone())),
_ => Err(BustubxError::NotSupport(format!(
"Failed to cast {} to {} type",
self, data_type
))),
},
DataType::Int32 => match self {
ScalarValue::Int64(v) => Ok(ScalarValue::Int32(v.map(|v| v as i32))),
_ => Err(BustubxError::NotSupport(format!(
"Failed to cast {} to {} type",
self, data_type
))),
},
_ => Err(BustubxError::NotSupport(format!(
"Not support cast to {} type",
data_type
))),
}
}
}

impl std::fmt::Display for ScalarValue {
Expand Down
3 changes: 2 additions & 1 deletion bustubx/src/planner/logical_plan/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::sync::Arc;
#[derive(derive_new::new, Debug, Clone)]
pub struct Insert {
pub table: TableReference,
pub columns: Vec<String>,
pub table_schema: SchemaRef,
pub projected_schema: SchemaRef,
pub input: Arc<LogicalPlan>,
}
33 changes: 27 additions & 6 deletions bustubx/src/planner/logical_planner/plan_insert.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::BustubxResult;
use crate::{BustubxError, BustubxResult};
use std::sync::Arc;

use crate::planner::logical_plan::{Insert, LogicalPlan};
Expand All @@ -14,13 +14,34 @@ impl<'a> LogicalPlanner<'a> {
) -> BustubxResult<LogicalPlan> {
let values = self.plan_set_expr(source.body.as_ref())?;
let table = self.plan_table_name(table_name)?;
let columns = columns_ident
.iter()
.map(|ident| ident.value.clone())
.collect();
let table_schema = self
.context
.catalog
.get_table_by_name(table.table())
.map_or(
Err(BustubxError::Plan(format!("table {} not found", table))),
|info| Ok(info.schema.clone()),
)?;

let projected_schema = if columns_ident.is_empty() {
table_schema.clone()
} else {
let columns: Vec<String> = columns_ident
.iter()
.map(|ident| ident.value.clone())
.collect();
let indices = columns
.iter()
.map(|name| table_schema.index_of(name.as_str()))
.collect::<BustubxResult<Vec<usize>>>()?;
let projected_schema = table_schema.project(&indices)?;
projected_schema
};

Ok(LogicalPlan::Insert(Insert {
table,
columns,
table_schema,
projected_schema,
input: Arc::new(values),
}))
}
Expand Down
16 changes: 15 additions & 1 deletion bustubx/src/planner/logical_planner/plan_set_expr.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::catalog::{Column, Schema};
use crate::expression::{Alias, Expr, ExprTrait};
use crate::expression::{Alias, ColumnExpr, Expr, ExprTrait};
use crate::planner::logical_plan::JoinType;
use crate::planner::logical_plan::{
build_join_schema, project_schema, EmptyRelation, Filter, Join, LogicalPlan, Project,
Expand Down Expand Up @@ -42,6 +42,20 @@ impl LogicalPlanner<'_> {
expr: Box::new(self.plan_expr(expr)?),
}))
}
sqlparser::ast::SelectItem::Wildcard(_) => {
let all_columns = input
.schema()
.columns
.iter()
.map(|col| {
Expr::Column(ColumnExpr {
relation: None,
name: col.name.clone(),
})
})
.collect::<Vec<Expr>>();
exprs.extend(all_columns);
}
_ => {
return Err(BustubxError::Plan(format!(
"sqlparser select item {} not supported",
Expand Down
28 changes: 23 additions & 5 deletions bustubx/src/planner/physical_plan/insert.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::sync::{atomic::AtomicU32, Arc};

use crate::catalog::{ColumnRef, SchemaRef};
use crate::catalog::SchemaRef;
use crate::common::table_ref::TableReference;
use crate::{
catalog::{Column, DataType, Schema},
Expand All @@ -15,16 +15,23 @@ use super::PhysicalPlan;
#[derive(Debug)]
pub struct PhysicalInsert {
pub table: TableReference,
pub columns: Vec<String>,
pub table_schema: SchemaRef,
pub projected_schema: SchemaRef,
pub input: Arc<PhysicalPlan>,

insert_rows: AtomicU32,
}
impl PhysicalInsert {
pub fn new(table: TableReference, columns: Vec<String>, input: Arc<PhysicalPlan>) -> Self {
pub fn new(
table: TableReference,
table_schema: SchemaRef,
projected_schema: SchemaRef,
input: Arc<PhysicalPlan>,
) -> Self {
Self {
table,
columns,
table_schema,
projected_schema,
input,
insert_rows: AtomicU32::new(0),
}
Expand Down Expand Up @@ -54,8 +61,19 @@ impl VolcanoExecutor for PhysicalInsert {
)));
}
}

let tuple = next_tuple.unwrap();

// cast values
let mut casted_data = vec![];
for (idx, value) in tuple.data.iter().enumerate() {
casted_data
.push(value.cast_to(&self.projected_schema.column_with_index(idx)?.data_type)?);
}
let tuple = Tuple {
schema: self.projected_schema.clone(),
data: casted_data,
};

// TODO update index if needed
let table_heap = &mut context
.catalog
Expand Down
6 changes: 4 additions & 2 deletions bustubx/src/planner/physical_planner/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,15 @@ pub fn build_plan(logical_plan: Arc<LogicalPlan>) -> PhysicalPlan {
)),
LogicalPlan::Insert(Insert {
table,
columns,
table_schema,
projected_schema,
input,
}) => {
let input_physical_plan = build_plan(input.clone());
PhysicalPlan::Insert(PhysicalInsert::new(
table.clone(),
columns.clone(),
table_schema.clone(),
projected_schema.clone(),
Arc::new(input_physical_plan),
))
}
Expand Down
4 changes: 4 additions & 0 deletions tests/sqllogictest/tests/sqllogictest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ fn sqllogictest() {
for file in test_files {
let db = BustubxDB::new();
let mut tester = sqllogictest::Runner::new(db);
println!(
"======== start to run file {} ========",
file.to_str().unwrap()
);
tester.run_file(file).unwrap();
}
}
Expand Down

0 comments on commit 52c65a4

Please sign in to comment.