Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: merge into split #15644

Merged
merged 5 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/query/service/src/interpreters/interpreter_merge_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,8 @@ impl MergeIntoInterpreter {
}
}

log::info!("target build optimization is {}", target_build_optimization);

let update_stream_meta = dml_build_update_stream_req(self.ctx.clone(), meta_data).await?;

let table_name = table_name.clone();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ impl PipelineBuilder {
PhysicalPlan::TableScan(scan) => match scan.table_index {
None | Some(databend_common_sql::DUMMY_TABLE_INDEX) => (false, false),
Some(table_index) => match need_reserve_block_info(self.ctx.clone(), table_index) {
(true, is_distributed) => (true, is_distributed),
// due to issue https://github.com/datafuselabs/databend/issues/15643,
// target build optimization of merge-into is disabled

//(true, is_distributed) => (true, is_distributed),
(true, is_distributed) => (false, is_distributed),
_ => (false, false),
},
},
Expand Down
17 changes: 2 additions & 15 deletions src/query/sql/src/planner/binder/merge_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ use crate::IndexType;
use crate::ScalarBinder;
use crate::ScalarExpr;
use crate::Visibility;
use crate::DUMMY_COLUMN_INDEX;

#[derive(Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)]
pub enum MergeIntoType {
Expand Down Expand Up @@ -439,20 +438,8 @@ impl Binder {
.await?,
);
}
let mut split_idx = DUMMY_COLUMN_INDEX;
// find any target table column index for merge_into_split
for column in self
.metadata
.read()
.columns_by_table_index(table_index)
.iter()
{
if column.index() != row_id_index {
split_idx = column.index();
break;
}
}
assert!(split_idx != DUMMY_COLUMN_INDEX);

let split_idx = row_id_index;

Ok(MergeInto {
catalog: catalog_name.to_string(),
Expand Down
7 changes: 5 additions & 2 deletions src/query/sql/src/planner/optimizer/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ async fn optimize_merge_into(mut opt_ctx: OptimizerContext, plan: Box<MergeInto>
.table_ctx
.get_settings()
.get_enable_distributed_merge_into()?;
let mut new_columns_set = plan.columns_set.clone();
let new_columns_set = plan.columns_set.clone();
if change_join_order
&& matches!(plan.merge_type, MergeIntoType::FullOperation)
&& opt_ctx
Expand All @@ -461,7 +461,10 @@ async fn optimize_merge_into(mut opt_ctx: OptimizerContext, plan: Box<MergeInto>
== 0
&& flag
{
new_columns_set.remove(&plan.row_id_index);
// due to issue https://github.com/datafuselabs/databend/issues/15643,
// target build optimization of merge-into is disabled: here row_id column should be kept

// new_columns_set.remove(&plan.row_id_index);
opt_ctx.table_ctx.set_merge_into_join(MergeIntoJoin {
merge_into_join_type: MergeIntoJoinType::Left,
is_distributed: false,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
statement ok
create or replace database m_test;

statement ok
use m_test;

statement ok
create table t(a string, b string, c string, d string, k string);

statement ok
create table s(a string, b string, c string, d string, k string);

statement ok
insert into t(k) values('k');

statement ok
insert into s(k) values('k');


query II
merge into t using s on t.k = s.k when matched then update * when not matched then insert *;
----
0 1

query TTTTT
select * from t;
----
NULL NULL NULL NULL k
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ explain merge into target_build_optimization as t1 using source_optimization as
MergeInto:
target_table: default.default.target_build_optimization
├── distributed: false
├── target_build_optimization: true
├── target_build_optimization: false
├── can_try_update_column_only: true
├── matched update: [condition: None,update set a = t2.a (#0),b = t2.b (#1),c = t2.c (#2)]
└── unmatched insert: [condition: None,insert into (a,b,c) values(CAST(a (#0) AS Int32 NULL),CAST(b (#1) AS String NULL),CAST(c (#2) AS String NULL))]
Expand Down Expand Up @@ -135,7 +135,7 @@ explain merge into target_build_optimization as t1 using source_optimization as
MergeInto:
target_table: default.default.target_build_optimization
├── distributed: false
├── target_build_optimization: true
├── target_build_optimization: false
├── can_try_update_column_only: true
├── matched update: [condition: None,update set a = t2.a (#0),b = t2.b (#1),c = t2.c (#2)]
└── unmatched insert: [condition: None,insert into (a,b,c) values(CAST(a (#0) AS Int32 NULL),CAST(b (#1) AS String NULL),CAST(c (#2) AS String NULL))]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ on t1.a = t2.a when matched then update set t1.b = t2.b when not matched then in
MergeInto:
target_table: default.default.column_only_optimization_target
├── distributed: false
├── target_build_optimization: true
├── target_build_optimization: false
├── can_try_update_column_only: true
├── matched update: [condition: None,update set b = t2.b (#1)]
└── unmatched insert: [condition: None,insert into (a,b) values(CAST(a (#0) AS Int32 NULL),CAST(b (#1) AS String NULL))]
Expand Down Expand Up @@ -211,7 +211,7 @@ on t1.a = t2.a when matched then update * when not matched then insert *;
MergeInto:
target_table: default.default.column_only_optimization_target
├── distributed: false
├── target_build_optimization: true
├── target_build_optimization: false
├── can_try_update_column_only: true
├── matched update: [condition: None,update set a = a (#0),b = b (#1)]
└── unmatched insert: [condition: None,insert into (a,b) values(CAST(a (#0) AS Int32 NULL),CAST(b (#1) AS String NULL))]
Expand Down
Loading