Skip to content

Commit

Permalink
Add is_null to scalar value and try_merge to tuple
Browse files Browse the repository at this point in the history
  • Loading branch information
lewiszlw committed Jan 30, 2024
1 parent ce8d8d3 commit ee1c0f9
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 33 deletions.
10 changes: 10 additions & 0 deletions bustubx/src/common/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,16 @@ impl ScalarValue {
ScalarValue::Int64(_) => DataType::Int64,
}
}

pub fn is_null(&self) -> bool {
match self {
ScalarValue::Boolean(v) => v.is_none(),
ScalarValue::Int8(v) => v.is_none(),
ScalarValue::Int16(v) => v.is_none(),
ScalarValue::Int32(v) => v.is_none(),
ScalarValue::Int64(v) => v.is_none(),
}
}
}

impl std::fmt::Display for ScalarValue {
Expand Down
19 changes: 7 additions & 12 deletions bustubx/src/planner/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,14 @@ impl Expr {
}
}

pub fn evaluate_join(
&self,
left_tuple: &Tuple,
left_schema: &Schema,
right_tuple: &Tuple,
right_schema: &Schema,
) -> ScalarValue {
pub fn evaluate_join(&self, left_tuple: &Tuple, right_tuple: &Tuple) -> ScalarValue {
// combine left and right tuple, left and right schema
let tuple = Tuple::from_tuples(vec![
(left_tuple.clone(), left_schema.clone()),
(right_tuple.clone(), right_schema.clone()),
]);
let schema = Schema::try_merge(vec![left_schema.clone(), right_schema.clone()]).unwrap();
let tuple = Tuple::try_merge(vec![left_tuple.clone(), right_tuple.clone()]).unwrap();
let schema = Schema::try_merge(vec![
left_tuple.schema.as_ref().clone(),
right_tuple.schema.as_ref().clone(),
])
.unwrap();
self.evaluate(Some(&tuple), Some(&schema))
}
}
17 changes: 3 additions & 14 deletions bustubx/src/planner/physical_plan/nested_loop_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,28 +71,17 @@ impl VolcanoExecutor for PhysicalNestedLoopJoin {
// save latest left_next_result before return
*self.left_tuple.lock().unwrap() = Some(left_tuple.clone());

return Some(Tuple::from_tuples(vec![
(left_tuple, self.left_input.output_schema()),
(right_tuple, self.right_input.output_schema()),
]));
return Some(Tuple::try_merge(vec![left_tuple, right_tuple]).unwrap());
} else {
let condition = self.condition.clone().unwrap();
let evaluate_res = condition.evaluate_join(
&left_tuple,
&self.left_input.output_schema(),
&right_tuple,
&self.right_input.output_schema(),
);
let evaluate_res = condition.evaluate_join(&left_tuple, &right_tuple);
// TODO support left/right join after null support added
if let ScalarValue::Boolean(Some(v)) = evaluate_res {
if v {
// save latest left_next_result before return
*self.left_tuple.lock().unwrap() = Some(left_tuple.clone());

return Some(Tuple::from_tuples(vec![
(left_tuple, self.left_input.output_schema()),
(right_tuple, self.right_input.output_schema()),
]));
return Some(Tuple::try_merge(vec![left_tuple, right_tuple]).unwrap());
}
} else {
panic!("nested loop join condition should be boolean")
Expand Down
13 changes: 6 additions & 7 deletions bustubx/src/storage/tuple.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::catalog::{ColumnRef, SchemaRef};
use crate::{catalog::Schema, common::config::TransactionId, common::ScalarValue};
use crate::{catalog::Schema, common::config::TransactionId, common::ScalarValue, BustubxResult};
use std::sync::Arc;

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
Expand Down Expand Up @@ -40,18 +40,17 @@ impl Tuple {
Self { schema, data }
}

// TODO add unit test to make sure this still works if tuple format changes
pub fn from_tuples(tuples: Vec<(Tuple, Schema)>) -> Self {
pub fn try_merge(tuples: impl IntoIterator<Item = Self>) -> BustubxResult<Self> {
let mut data = vec![];
let mut merged_schema = Schema::empty();
for (tuple, schema) in tuples {
for tuple in tuples {
data.extend(tuple.data);
merged_schema = Schema::try_merge(vec![merged_schema, schema]).unwrap();
merged_schema = Schema::try_merge(vec![merged_schema, tuple.schema.as_ref().clone()])?;
}
Self {
Ok(Self {
schema: Arc::new(merged_schema),
data,
}
})
}

pub fn is_zero(&self) -> bool {
Expand Down

0 comments on commit ee1c0f9

Please sign in to comment.