Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: use temp table to refactor materialized cte #16900

Merged
merged 18 commits into from
Dec 4, 2024
Merged
22 changes: 6 additions & 16 deletions src/query/catalog/src/table_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_exception::ResultExt;
use databend_common_expression::BlockThresholds;
use databend_common_expression::DataBlock;
use databend_common_expression::Expr;
use databend_common_expression::FunctionContext;
use databend_common_expression::Scalar;
Expand Down Expand Up @@ -78,8 +77,6 @@ use crate::runtime_filter_info::RuntimeFilterReady;
use crate::statistics::data_cache_statistics::DataCacheMetrics;
use crate::table::Table;

pub type MaterializedCtesBlocks = Arc<RwLock<HashMap<(usize, usize), Arc<RwLock<Vec<DataBlock>>>>>>;

pub struct ContextError;

#[derive(Debug)]
Expand Down Expand Up @@ -264,6 +261,8 @@ pub trait TableContext: Send + Sync {
async fn get_table(&self, catalog: &str, database: &str, table: &str)
-> Result<Arc<dyn Table>>;

fn evict_table_from_cache(&self, catalog: &str, database: &str, table: &str) -> Result<()>;

async fn get_table_with_batch(
&self,
catalog: &str,
Expand All @@ -281,19 +280,6 @@ pub trait TableContext: Send + Sync {
max_files: Option<usize>,
) -> Result<FilteredCopyFiles>;

fn set_materialized_cte(
&self,
idx: (usize, usize),
mem_table: Arc<RwLock<Vec<DataBlock>>>,
) -> Result<()>;

fn get_materialized_cte(
&self,
idx: (usize, usize),
) -> Result<Option<Arc<RwLock<Vec<DataBlock>>>>>;

fn get_materialized_ctes(&self) -> MaterializedCtesBlocks;

fn add_segment_location(&self, segment_loc: Location) -> Result<()>;

fn clear_segment_locations(&self) -> Result<()>;
Expand Down Expand Up @@ -389,6 +375,10 @@ pub trait TableContext: Send + Sync {
fn get_shared_settings(&self) -> Arc<Settings>;

fn get_runtime(&self) -> Result<Arc<Runtime>>;

fn add_m_cte_temp_table(&self, database_name: &str, table_name: &str);

async fn drop_m_cte_temp_table(&self) -> Result<()>;
}

pub type AbortChecker = Arc<dyn CheckAbort + Send + Sync>;
Expand Down
4 changes: 1 addition & 3 deletions src/query/pipeline/sources/src/async_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,7 @@ impl<T: 'static + AsyncSource> Processor for AsyncSourcer<T> {
match self.inner.generate().await? {
None => self.is_finish = true,
Some(data_block) => {
// Don't need to record the scan progress of `MaterializedCteSource`
// Because it reads data from memory.
if !data_block.is_empty() && self.name() != "MaterializedCteSource" {
if !data_block.is_empty() {
let progress_values = ProgressValues {
rows: data_block.num_rows(),
bytes: data_block.memory_size(),
Expand Down
4 changes: 1 addition & 3 deletions src/query/pipeline/sources/src/prefetch_async_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,7 @@ impl<T: 'static + PrefetchAsyncSource> Processor for PrefetchAsyncSourcer<T> {
match self.inner.generate().await? {
None => self.is_inner_finish = true,
Some(data_block) => {
// Don't need to record the scan progress of `MaterializedCteSource`
// Because it reads data from memory.
if !data_block.is_empty() && self.name() != "MaterializedCteSource" {
if !data_block.is_empty() {
let progress_values = ProgressValues {
rows: data_block.num_rows(),
bytes: data_block.memory_size(),
Expand Down
15 changes: 14 additions & 1 deletion src/query/service/src/interpreters/interpreter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use databend_common_base::base::short_sql;
use databend_common_base::runtime::profile::get_statistics_desc;
use databend_common_base::runtime::profile::ProfileDesc;
use databend_common_base::runtime::profile::ProfileStatisticsName;
use databend_common_base::runtime::GlobalIORuntime;
use databend_common_catalog::query_kind::QueryKind;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::ErrorCode;
Expand Down Expand Up @@ -230,7 +231,9 @@ async fn plan_sql(
) -> Result<(Plan, PlanExtras, AcquireQueueGuard)> {
let mut planner = Planner::new_with_query_executor(
ctx.clone(),
Arc::new(ServiceQueryExecutor::new(ctx.clone())),
Arc::new(ServiceQueryExecutor::new(QueryContext::create_from(
ctx.clone(),
))),
);

// Parse the SQL query, get extract additional information.
Expand Down Expand Up @@ -316,6 +319,8 @@ pub fn on_execution_finished(info: &ExecutionInfo, query_ctx: Arc<QueryContext>)
})?;
}

hook_clear_m_cte_temp_table(&query_ctx)?;

hook_vacuum_temp_files(&query_ctx)?;

hook_disk_temp_dir(&query_ctx)?;
Expand Down Expand Up @@ -358,3 +363,11 @@ fn need_acquire_lock(ctx: Arc<QueryContext>, stmt: &Statement) -> bool {
_ => false,
}
}

fn hook_clear_m_cte_temp_table(query_ctx: &Arc<QueryContext>) -> Result<()> {
let _ = GlobalIORuntime::instance().block_on(async move {
query_ctx.drop_m_cte_temp_table().await?;
Ok(())
});
Ok(())
}
73 changes: 0 additions & 73 deletions src/query/service/src/pipelines/builders/builder_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,14 @@ use databend_common_exception::Result;
use databend_common_pipeline_core::processors::ProcessorPtr;
use databend_common_pipeline_sinks::Sinker;
use databend_common_sql::executor::physical_plans::HashJoin;
use databend_common_sql::executor::physical_plans::MaterializedCte;
use databend_common_sql::executor::physical_plans::RangeJoin;
use databend_common_sql::executor::PhysicalPlan;
use databend_common_sql::ColumnBinding;
use databend_common_sql::IndexType;

use crate::pipelines::processors::transforms::range_join::RangeJoinState;
use crate::pipelines::processors::transforms::range_join::TransformRangeJoinLeft;
use crate::pipelines::processors::transforms::range_join::TransformRangeJoinRight;
use crate::pipelines::processors::transforms::HashJoinBuildState;
use crate::pipelines::processors::transforms::HashJoinProbeState;
use crate::pipelines::processors::transforms::MaterializedCteSink;
use crate::pipelines::processors::transforms::MaterializedCteState;
use crate::pipelines::processors::transforms::TransformHashJoinBuild;
use crate::pipelines::processors::transforms::TransformHashJoinProbe;
use crate::pipelines::processors::HashJoinDesc;
Expand Down Expand Up @@ -77,8 +72,6 @@ impl PipelineBuilder {
right_side_context,
self.main_pipeline.get_scopes(),
);
right_side_builder.cte_state = self.cte_state.clone();
right_side_builder.cte_scan_offsets = self.cte_scan_offsets.clone();
right_side_builder.hash_join_states = self.hash_join_states.clone();

let mut right_res = right_side_builder.finalize(&range_join.right)?;
Expand Down Expand Up @@ -148,8 +141,6 @@ impl PipelineBuilder {
build_side_context,
self.main_pipeline.get_scopes(),
);
build_side_builder.cte_state = self.cte_state.clone();
build_side_builder.cte_scan_offsets = self.cte_scan_offsets.clone();
build_side_builder.hash_join_states = self.hash_join_states.clone();
let mut build_res = build_side_builder.finalize(build)?;

Expand Down Expand Up @@ -224,68 +215,4 @@ impl PipelineBuilder {

Ok(())
}

pub(crate) fn build_materialized_cte(
&mut self,
materialized_cte: &MaterializedCte,
) -> Result<()> {
self.cte_scan_offsets.insert(
materialized_cte.cte_idx,
materialized_cte.cte_scan_offset.clone(),
);
self.expand_materialized_side_pipeline(
&materialized_cte.right,
materialized_cte.cte_idx,
&materialized_cte.materialized_output_columns,
)?;
self.build_pipeline(&materialized_cte.left)
}

fn expand_materialized_side_pipeline(
&mut self,
materialized_side: &PhysicalPlan,
cte_idx: IndexType,
materialized_output_columns: &[ColumnBinding],
) -> Result<()> {
let materialized_side_ctx = QueryContext::create_from(self.ctx.clone());
let state = Arc::new(MaterializedCteState::new(self.ctx.clone()));
self.cte_state.insert(cte_idx, state.clone());
let mut materialized_side_builder = PipelineBuilder::create(
self.func_ctx.clone(),
self.settings.clone(),
materialized_side_ctx,
self.main_pipeline.get_scopes(),
);
materialized_side_builder.cte_state = self.cte_state.clone();
materialized_side_builder.cte_scan_offsets = self.cte_scan_offsets.clone();
materialized_side_builder.hash_join_states = self.hash_join_states.clone();
let mut materialized_side_pipeline =
materialized_side_builder.finalize(materialized_side)?;
assert!(
materialized_side_pipeline
.main_pipeline
.is_pulling_pipeline()?
);

PipelineBuilder::build_result_projection(
&self.func_ctx,
materialized_side.output_schema()?,
materialized_output_columns,
&mut materialized_side_pipeline.main_pipeline,
false,
)?;

materialized_side_pipeline.main_pipeline.add_sink(|input| {
let transform = Sinker::<MaterializedCteSink>::create(
input,
MaterializedCteSink::create(self.ctx.clone(), cte_idx, state.clone())?,
);
Ok(ProcessorPtr::create(transform))
})?;
self.pipelines
.push(materialized_side_pipeline.main_pipeline.finalize());
self.pipelines
.extend(materialized_side_pipeline.sources_pipelines);
Ok(())
}
}
22 changes: 0 additions & 22 deletions src/query/service/src/pipelines/builders/builder_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,12 @@ use databend_common_sql::evaluator::BlockOperator;
use databend_common_sql::evaluator::CompoundBlockOperator;
use databend_common_sql::executor::physical_plans::CacheScan;
use databend_common_sql::executor::physical_plans::ConstantTableScan;
use databend_common_sql::executor::physical_plans::CteScan;
use databend_common_sql::executor::physical_plans::ExpressionScan;
use databend_common_sql::executor::physical_plans::TableScan;
use databend_common_sql::plans::CacheSource;

use crate::pipelines::processors::transforms::CacheSourceState;
use crate::pipelines::processors::transforms::HashJoinCacheState;
use crate::pipelines::processors::transforms::MaterializedCteSource;
use crate::pipelines::processors::transforms::TransformAddInternalColumns;
use crate::pipelines::processors::transforms::TransformCacheScan;
use crate::pipelines::processors::transforms::TransformExpressionScan;
Expand Down Expand Up @@ -76,26 +74,6 @@ impl PipelineBuilder {
Ok(())
}

pub(crate) fn build_cte_scan(&mut self, cte_scan: &CteScan) -> Result<()> {
let max_threads = self.settings.get_max_threads()?;
self.main_pipeline.add_source(
|output| {
MaterializedCteSource::create(
self.ctx.clone(),
output,
cte_scan.cte_idx,
self.cte_state.get(&cte_scan.cte_idx.0).unwrap().clone(),
cte_scan.offsets.clone(),
self.cte_scan_offsets
.get(&cte_scan.cte_idx.0)
.unwrap()
.clone(),
)
},
max_threads as usize,
)
}

pub(crate) fn build_constant_table_scan(&mut self, scan: &ConstantTableScan) -> Result<()> {
self.main_pipeline.add_source(
|output| {
Expand Down
2 changes: 0 additions & 2 deletions src/query/service/src/pipelines/builders/builder_union_all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,6 @@ impl PipelineBuilder {
union_ctx,
self.main_pipeline.get_scopes(),
);
pipeline_builder.cte_state = self.cte_state.clone();
pipeline_builder.cte_scan_offsets = self.cte_scan_offsets.clone();
pipeline_builder.hash_join_states = self.hash_join_states.clone();

let mut build_res = pipeline_builder.finalize(input)?;
Expand Down
13 changes: 0 additions & 13 deletions src/query/service/src/pipelines/pipeline_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,10 @@ use databend_common_pipeline_core::processors::PlanScopeGuard;
use databend_common_pipeline_core::Pipeline;
use databend_common_settings::Settings;
use databend_common_sql::executor::PhysicalPlan;
use databend_common_sql::IndexType;

use super::PipelineBuilderData;
use crate::interpreters::CreateTableInterpreter;
use crate::pipelines::processors::transforms::HashJoinBuildState;
use crate::pipelines::processors::transforms::MaterializedCteState;
use crate::pipelines::processors::HashJoinState;
use crate::pipelines::PipelineBuildResult;
use crate::servers::flight::v1::exchange::DefaultExchangeInjector;
Expand All @@ -49,11 +47,6 @@ pub struct PipelineBuilder {
pub merge_into_probe_data_fields: Option<Vec<DataField>>,
pub join_state: Option<Arc<HashJoinBuildState>>,

// The cte state of each materialized cte.
pub cte_state: HashMap<IndexType, Arc<MaterializedCteState>>,
// The column offsets used by cte scan
pub cte_scan_offsets: HashMap<IndexType, Vec<usize>>,

pub(crate) exchange_injector: Arc<dyn ExchangeInjector>,

pub hash_join_states: HashMap<usize, Arc<HashJoinState>>,
Expand All @@ -78,8 +71,6 @@ impl PipelineBuilder {
pipelines: vec![],
main_pipeline: Pipeline::with_scopes(scopes),
exchange_injector: DefaultExchangeInjector::create(),
cte_state: HashMap::new(),
cte_scan_offsets: HashMap::new(),
merge_into_probe_data_fields: None,
join_state: None,
hash_join_states: HashMap::new(),
Expand Down Expand Up @@ -162,7 +153,6 @@ impl PipelineBuilder {

match plan {
PhysicalPlan::TableScan(scan) => self.build_table_scan(scan),
PhysicalPlan::CteScan(scan) => self.build_cte_scan(scan),
PhysicalPlan::ConstantTableScan(scan) => self.build_constant_table_scan(scan),
PhysicalPlan::Filter(filter) => self.build_filter(filter),
PhysicalPlan::EvalScalar(eval_scalar) => self.build_eval_scalar(eval_scalar),
Expand All @@ -189,9 +179,6 @@ impl PipelineBuilder {
"Invalid physical plan with PhysicalPlan::Exchange",
)),
PhysicalPlan::RangeJoin(range_join) => self.build_range_join(range_join),
PhysicalPlan::MaterializedCte(materialized_cte) => {
self.build_materialized_cte(materialized_cte)
}
PhysicalPlan::CacheScan(cache_scan) => self.build_cache_scan(cache_scan),
PhysicalPlan::ExpressionScan(expression_scan) => {
self.build_expression_scan(expression_scan)
Expand Down
4 changes: 0 additions & 4 deletions src/query/service/src/pipelines/processors/transforms/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ mod transform_dictionary;
mod transform_expression_scan;
mod transform_filter;
mod transform_limit;
mod transform_materialized_cte;
mod transform_merge_block;
mod transform_null_if;
mod transform_recursive_cte_scan;
Expand All @@ -55,9 +54,6 @@ pub use transform_create_sets::TransformCreateSets;
pub use transform_expression_scan::TransformExpressionScan;
pub use transform_filter::TransformFilter;
pub use transform_limit::TransformLimit;
pub use transform_materialized_cte::MaterializedCteSink;
pub use transform_materialized_cte::MaterializedCteSource;
pub use transform_materialized_cte::MaterializedCteState;
pub use transform_merge_block::TransformMergeBlock;
pub use transform_null_if::TransformNullIf;
pub use transform_recursive_cte_scan::TransformRecursiveCteScan;
Expand Down
Loading
Loading