Skip to content

Commit

Permalink
Implement merge limit
Browse files Browse the repository at this point in the history
  • Loading branch information
lewiszlw committed Feb 4, 2024
1 parent d5d70a2 commit f62bdfe
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 9 deletions.
2 changes: 1 addition & 1 deletion bustubx/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use tempfile::TempDir;

use crate::buffer::TABLE_HEAP_BUFFER_POOL_SIZE;
use crate::error::{BustubxError, BustubxResult};
use crate::optimizer::LogicalOptimizer;
use crate::planner::logical_plan::LogicalPlan;
use crate::planner::PhysicalPlanner;
use crate::{
Expand All @@ -12,7 +13,6 @@ use crate::{
planner::{LogicalPlanner, PlannerContext},
storage::{DiskManager, Tuple},
};
use crate::optimizer::LogicalOptimizer;

pub struct Database {
disk_manager: Arc<DiskManager>,
Expand Down
4 changes: 2 additions & 2 deletions bustubx/src/optimizer/logical_optimizer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::error::BustubxResult;
use crate::optimizer::rule::{EliminateLimit, PushDownLimit};
use crate::optimizer::rule::{EliminateLimit, MergeLimit, PushDownLimit};

Check warning on line 2 in bustubx/src/optimizer/logical_optimizer.rs

View workflow job for this annotation

GitHub Actions / Test Suite

unused import: `PushDownLimit`
use crate::planner::logical_plan::LogicalPlan;
use std::sync::Arc;

Expand Down Expand Up @@ -38,7 +38,7 @@ pub struct LogicalOptimizer {
impl LogicalOptimizer {
pub fn new() -> Self {
let rules: Vec<Arc<dyn LogicalOptimizerRule + Sync + Send>> =
vec![Arc::new(EliminateLimit {})];
vec![Arc::new(EliminateLimit {}), Arc::new(MergeLimit {})];

Self {
rules,
Expand Down
1 change: 1 addition & 0 deletions bustubx/src/optimizer/rule/eliminate_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ mod tests {
fn build_optimizer() -> LogicalOptimizer {
LogicalOptimizer::with_rules(vec![Arc::new(EliminateLimit)])
}

#[test]
fn eliminate_limit() {
let mut db = Database::new_temp().unwrap();
Expand Down
91 changes: 91 additions & 0 deletions bustubx/src/optimizer/rule/merge_limit.rs
Original file line number Diff line number Diff line change
@@ -1 +1,92 @@
use crate::optimizer::logical_optimizer::ApplyOrder;
use crate::optimizer::LogicalOptimizerRule;
use crate::planner::logical_plan::{Limit, LogicalPlan};
use crate::BustubxResult;
use std::cmp::min;
use std::sync::Arc;

pub struct MergeLimit;

impl LogicalOptimizerRule for MergeLimit {
fn try_optimize(&self, plan: &LogicalPlan) -> BustubxResult<Option<LogicalPlan>> {
let LogicalPlan::Limit(parent) = plan else {
return Ok(None);
};

if let LogicalPlan::Limit(child) = &*parent.input {
let new_limit = match (parent.limit, child.limit) {
(Some(parent_limit), Some(child_limit)) => {
Some(min(parent_limit, child_limit.saturating_sub(parent.offset)))
}
(Some(parent_limit), None) => Some(parent_limit),
(None, Some(child_limit)) => Some(child_limit.saturating_sub(parent.offset)),
(None, None) => None,
};
let plan = LogicalPlan::Limit(Limit {
limit: new_limit,
offset: child.offset + parent.offset,
input: Arc::new((*child.input).clone()),
});
return self
.try_optimize(&plan)
.map(|opt_plan| opt_plan.or_else(|| Some(plan)));
} else {
Ok(None)
}
}

fn name(&self) -> &str {
"MergeLimit"
}

fn apply_order(&self) -> Option<ApplyOrder> {
Some(ApplyOrder::TopDown)
}
}

#[cfg(test)]
mod tests {
use crate::catalog::EMPTY_SCHEMA_REF;
use crate::optimizer::rule::MergeLimit;
use crate::optimizer::LogicalOptimizer;
use crate::planner::logical_plan::{EmptyRelation, Limit, LogicalPlan};
use std::sync::Arc;

fn build_optimizer() -> LogicalOptimizer {
LogicalOptimizer::with_rules(vec![Arc::new(MergeLimit)])
}

#[test]
fn merge_limit() {
let plan = LogicalPlan::Limit(Limit {
limit: Some(10),
offset: 0,
input: Arc::new(LogicalPlan::Limit(Limit {
limit: Some(1000),
offset: 0,
input: Arc::new(LogicalPlan::Limit(Limit {
limit: None,
offset: 10,
input: Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: EMPTY_SCHEMA_REF.clone(),
})),
})),
})),
});
let optimized_plan = build_optimizer().optimize(&plan).unwrap();

if let LogicalPlan::Limit(Limit {
limit,
offset,
input,
}) = optimized_plan
{
assert_eq!(limit, Some(10));
assert_eq!(offset, 10);
assert!(matches!(input.as_ref(), LogicalPlan::EmptyRelation(_)));
} else {
panic!("the first node should be limit");
}
}
}
12 changes: 6 additions & 6 deletions bustubx/src/planner/logical_planner/plan_create_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ impl<'a> LogicalPlanner<'a> {
table_name: &sqlparser::ast::ObjectName,
columns: &Vec<sqlparser::ast::OrderByExpr>,
) -> BustubxResult<LogicalPlan> {
let index_name = index_name
.0
.get(0)
.map_or(Err(BustubxError::Plan("".to_string())), |ident| {
Ok(ident.value.clone())
})?;
let index_name = index_name.0.get(0).map_or(
Err(BustubxError::Plan(format!(
"Index name {index_name} is not expected"
))),
|ident| Ok(ident.value.clone()),
)?;
let table = self.bind_table_name(table_name)?;
let mut columns_expr = vec![];
for col in columns.iter() {
Expand Down

0 comments on commit f62bdfe

Please sign in to comment.