From e2970b1171523b182b36dc67e642641c47db078f Mon Sep 17 00:00:00 2001 From: MianChen <283559115@qq.com> Date: Mon, 25 Nov 2024 03:35:48 -0600 Subject: [PATCH] feat: partition table query optimize (#1594) ## Rationale Close #1441 ## Detailed Changes ### TLDR The performance issue with inlist queries is due to the extra overhead from bloom-filter-like directory lookups when scanning each SST file for rows. The solution is to create a separate predicate for each partition, containing only the keys relevant to that partition. Since the current partition filter only supports BinaryExpr(Column, operator, Literal) and non-negated InList expressions, this solution will address only those specific cases. ### Changes 1. During the scan building process, when identifying the partitions for a query, we create a PartitionedFilterKeyIndex variable to store the predicate key indices for each expression. 2. In the compute_partition_for_keys_group function, we use a HashMap>> to record the indices of keys involved in partition computation for each group. 3. In the partitioned_predicates function, we construct the final predicates for each partition. 4. In resolve_partitioned_scan_internal, we generate separate requests for each partition. e.g. conditions: 1. table schema: col_ts, col1, col2, in which col1 and col2 are both keys, and with two partitions 2. sql: select * from table where col1 = '33' and col2 in ("aa", "bb", "cc", "dd") partition expectations: yield two predicates p0: col1 = '33' and col2 in ("aa", "bb", "cc"); p1: col1 = '33' and col2 in ("dd") ### Other issues discovered When the inlist key args length is less than three, Expr will be refactored to nested BinaryExpr which bypasses the FilterExtractor. e.g. SQL: select * from table where col1 in ("aa", "bb") and col2 in (1,2,3,4,5...1000) Since ("aa", "bb") has fewer than three elements, the col1 key filter is not included in partition computation, which interrupts the partitioning process in the get_candidate_partition_keys_groups function, as contains_empty_filter is set to true. ## Test Plan 1. UT: test_partitioned_predicate 2. Manual test. --------- Co-authored-by: jiacai2050 --- .../env/cluster/ddl/partition_table.result | 8 +- .../cases/env/cluster/ddl/partition_table.sql | 4 +- .../src/dist_sql_query/physical_plan.rs | 5 +- .../src/dist_sql_query/resolver.rs | 16 +- .../src/dist_sql_query/test_util.rs | 2 + src/partition_table_engine/src/lib.rs | 49 +++- src/partition_table_engine/src/partition.rs | 8 +- .../src/scan_builder.rs | 225 +++++++++++++++++- src/table_engine/src/engine.rs | 3 + .../partition/rule/df_adapter/extractor.rs | 22 +- .../src/partition/rule/df_adapter/mod.rs | 32 ++- src/table_engine/src/partition/rule/key.rs | 79 ++++-- src/table_engine/src/partition/rule/mod.rs | 12 +- src/table_engine/src/partition/rule/random.rs | 11 +- src/table_engine/src/predicate.rs | 4 + 15 files changed, 418 insertions(+), 62 deletions(-) diff --git a/integration_tests/cases/env/cluster/ddl/partition_table.result b/integration_tests/cases/env/cluster/ddl/partition_table.result index 980a7bc1e1..d576d93bd7 100644 --- a/integration_tests/cases/env/cluster/ddl/partition_table.result +++ b/integration_tests/cases/env/cluster/ddl/partition_table.result @@ -99,24 +99,24 @@ UInt64(16367588166920223437),Timestamp(1651737067000),String("horaedb9"),Int32(0 -- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx -- SQLNESS REPLACE compute=\d+.?\d*(µ|m|n) compute=xx -- SQLNESS REPLACE time=\d+.?\d*(µ|m|n) time=xx --- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx +-- SQLNESS REPLACE elapsed_compute=\d+.?\d*(µ|m|n) elapsed_compute=xx -- SQLNESS REPLACE scan_memtable_\d+ scan_memtable_n EXPLAIN ANALYZE SELECT * from partition_table_t where name = "ceresdb0"; plan_type,plan, -String("Plan with Metrics"),String("ResolvedPartitionedScan: pushdown_continue:false, partition_count:1, metrics=xx\n ScanTable: table=__partition_table_t_1, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[name = Utf8(\"ceresdb0\")], time_range:TimeRange { inclusive_start: Timestamp(-9223372036854775808), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=0\n num_ssts=0\n scan_count=1\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=0\n total_rows_fetch_from_one=0\n scan_memtable_n, fetched_columns:[tsid,t,name,id,value]:\n=0]\n=0]\n"), +String("Plan with Metrics"),String("ResolvedPartitionedScan: pushdown_continue:false, partition_count:1, metrics=[\npartition_table_t:\n __partition_table_t_1:\n poll_duration=xxs\n total_duration=xxs\n wait_duration=xxs\n\n__partition_table_t_1:\nCoalescePartitionsExec, metrics=[output_rows=0, elapsed_compute=xxs]\n ScanTable: table=__partition_table_t_1, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[name = Utf8(\"ceresdb0\")], time_range:TimeRange { inclusive_start: Timestamp(-9223372036854775808), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=0\n num_ssts=0\n scan_count=1\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=0\n total_rows_fetch_from_one=0\n scan_memtable_n, fetched_columns:[tsid,t,name,id,value]:\n=0]\n=0]\n"), -- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx -- SQLNESS REPLACE compute=\d+.?\d*(µ|m|n) compute=xx -- SQLNESS REPLACE __partition_table_t_\d __partition_table_t_x -- SQLNESS REPLACE time=\d+.?\d*(µ|m|n) time=xx --- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx +-- SQLNESS REPLACE elapsed_compute=\d+.?\d*(µ|m|n) elapsed_compute=xx -- SQLNESS REPLACE scan_memtable_\d+ scan_memtable_n EXPLAIN ANALYZE SELECT * from partition_table_t where name in ("ceresdb0", "ceresdb1", "ceresdb2", "ceresdb3", "ceresdb4"); plan_type,plan, -String("Plan with Metrics"),String("ResolvedPartitionedScan: pushdown_continue:false, partition_count:3, metrics=xx\n ScanTable: table=__partition_table_t_x, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), metrics=xx\n ScanTable: table=__partition_table_t_x, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), metrics=xx\n ScanTable: table=__partition_table_t_x, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[name IN ([Utf8(\"ceresdb0\"), Utf8(\"ceresdb1\"), Utf8(\"ceresdb2\"), Utf8(\"ceresdb3\"), Utf8(\"ceresdb4\")])], time_range:TimeRange { inclusive_start: Timestamp(-9223372036854775808), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=0\n num_ssts=0\n scan_count=1\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=0\n total_rows_fetch_from_one=0\n scan_memtable_n, fetched_columns:[tsid,t,name,id,value]:\n=0]\n=0]\n"), +String("Plan with Metrics"),String("ResolvedPartitionedScan: pushdown_continue:false, partition_count:3, metrics=[\npartition_table_t:\n __partition_table_t_x:\n poll_duration=xxs\n total_duration=xxs\n wait_duration=xxs\n __partition_table_t_x:\n poll_duration=xxs\n total_duration=xxs\n wait_duration=xxs\n __partition_table_t_x:\n poll_duration=xxs\n total_duration=xxs\n wait_duration=xxs\n\n__partition_table_t_x:\nCoalescePartitionsExec, metrics=[output_rows=0, elapsed_compute=xxs]\n ScanTable: table=__partition_table_t_x, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[name IN ([Utf8(\"ceresdb2\")])], time_range:TimeRange { inclusive_start: Timestamp(-9223372036854775808), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=0\n num_ssts=0\n scan_count=1\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=0\n total_rows_fetch_from_one=0\n scan_memtable_n, fetched_columns:[tsid,t,name,id,value]:\n=0]\n\n__partition_table_t_x:\nCoalescePartitionsExec, metrics=[output_rows=0, elapsed_compute=xxs]\n ScanTable: table=__partition_table_t_x, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[name IN ([Utf8(\"ceresdb0\"), Utf8(\"ceresdb4\")])], time_range:TimeRange { inclusive_start: Timestamp(-9223372036854775808), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=0\n num_ssts=0\n scan_count=1\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=0\n total_rows_fetch_from_one=0\n scan_memtable_n, fetched_columns:[tsid,t,name,id,value]:\n=0]\n\n__partition_table_t_x:\nCoalescePartitionsExec, metrics=[output_rows=0, elapsed_compute=xxs]\n ScanTable: table=__partition_table_t_x, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[name IN ([Utf8(\"ceresdb1\"), Utf8(\"ceresdb3\")])], time_range:TimeRange { inclusive_start: Timestamp(-9223372036854775808), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=0\n num_ssts=0\n scan_count=1\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=0\n total_rows_fetch_from_one=0\n scan_memtable_n, fetched_columns:[tsid,t,name,id,value]:\n=0]\n=0]\n"), ALTER TABLE partition_table_t ADD COLUMN (b string); diff --git a/integration_tests/cases/env/cluster/ddl/partition_table.sql b/integration_tests/cases/env/cluster/ddl/partition_table.sql index e1f32de515..f06dee2ea8 100644 --- a/integration_tests/cases/env/cluster/ddl/partition_table.sql +++ b/integration_tests/cases/env/cluster/ddl/partition_table.sql @@ -57,7 +57,7 @@ SELECT * from partition_table_t where name in ("horaedb5", "horaedb6", "horaedb7 -- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx -- SQLNESS REPLACE compute=\d+.?\d*(µ|m|n) compute=xx -- SQLNESS REPLACE time=\d+.?\d*(µ|m|n) time=xx --- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx +-- SQLNESS REPLACE elapsed_compute=\d+.?\d*(µ|m|n) elapsed_compute=xx -- SQLNESS REPLACE scan_memtable_\d+ scan_memtable_n EXPLAIN ANALYZE SELECT * from partition_table_t where name = "ceresdb0"; @@ -65,7 +65,7 @@ EXPLAIN ANALYZE SELECT * from partition_table_t where name = "ceresdb0"; -- SQLNESS REPLACE compute=\d+.?\d*(µ|m|n) compute=xx -- SQLNESS REPLACE __partition_table_t_\d __partition_table_t_x -- SQLNESS REPLACE time=\d+.?\d*(µ|m|n) time=xx --- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx +-- SQLNESS REPLACE elapsed_compute=\d+.?\d*(µ|m|n) elapsed_compute=xx -- SQLNESS REPLACE scan_memtable_\d+ scan_memtable_n EXPLAIN ANALYZE SELECT * from partition_table_t where name in ("ceresdb0", "ceresdb1", "ceresdb2", "ceresdb3", "ceresdb4"); diff --git a/src/df_engine_extensions/src/dist_sql_query/physical_plan.rs b/src/df_engine_extensions/src/dist_sql_query/physical_plan.rs index dd430f520d..55692f258b 100644 --- a/src/df_engine_extensions/src/dist_sql_query/physical_plan.rs +++ b/src/df_engine_extensions/src/dist_sql_query/physical_plan.rs @@ -47,7 +47,7 @@ use datafusion::{ }; use futures::{future::BoxFuture, FutureExt, Stream, StreamExt}; use runtime::Priority; -use table_engine::{remote::model::TableIdentifier, table::ReadRequest}; +use table_engine::{predicate::Predicate, remote::model::TableIdentifier, table::ReadRequest}; use trace_metric::{collector::FormatCollectorVisitor, MetricsCollector, TraceMetricWhenDrop}; use crate::dist_sql_query::{RemotePhysicalPlanExecutor, RemoteTaskContext, TableScanContext}; @@ -62,6 +62,7 @@ pub struct UnresolvedPartitionedScan { pub table_scan_ctx: TableScanContext, pub metrics_collector: MetricsCollector, pub priority: Priority, + pub predicates: Option>, } impl UnresolvedPartitionedScan { @@ -69,6 +70,7 @@ impl UnresolvedPartitionedScan { table_name: &str, sub_tables: Vec, read_request: ReadRequest, + predicates: Option>, ) -> Self { let metrics_collector = MetricsCollector::new(table_name.to_string()); let table_scan_ctx = TableScanContext { @@ -83,6 +85,7 @@ impl UnresolvedPartitionedScan { table_scan_ctx, metrics_collector, priority: read_request.priority, + predicates, } } } diff --git a/src/df_engine_extensions/src/dist_sql_query/resolver.rs b/src/df_engine_extensions/src/dist_sql_query/resolver.rs index c3724b9d95..5fd3430a88 100644 --- a/src/df_engine_extensions/src/dist_sql_query/resolver.rs +++ b/src/df_engine_extensions/src/dist_sql_query/resolver.rs @@ -135,10 +135,22 @@ impl Resolver { let sub_tables = unresolved.sub_tables.clone(); let remote_plans = sub_tables .into_iter() - .map(|table| { + .enumerate() + .map(|(idx, table)| { let plan = Arc::new(UnresolvedSubTableScan { table: table.clone(), - table_scan_ctx: unresolved.table_scan_ctx.clone(), + table_scan_ctx: if let Some(ref predicates) = unresolved.predicates { + // Since all each partition has different predicate, so we shall build + // seperate ctx regarding each partition + // with different predicate + let mut ctx = unresolved.table_scan_ctx.clone(); + // overwrite old predicate (it's the predidcate before partiton + // calculation) with optimized predicate + ctx.predicate = Arc::new(predicates[idx].clone()); + ctx + } else { + unresolved.table_scan_ctx.clone() + }, }); let sub_metrics_collect = metrics_collector.span(table.table.clone()); diff --git a/src/df_engine_extensions/src/dist_sql_query/test_util.rs b/src/df_engine_extensions/src/dist_sql_query/test_util.rs index c42f9e3862..873c7a2214 100644 --- a/src/df_engine_extensions/src/dist_sql_query/test_util.rs +++ b/src/df_engine_extensions/src/dist_sql_query/test_util.rs @@ -313,6 +313,7 @@ impl TestContext { "test", sub_tables, self.request.clone(), + None, )); let filter: Arc = @@ -364,6 +365,7 @@ impl TestContext { "test", self.sub_table_groups[0].clone(), self.request.clone(), + None, )); self.build_aggr_plan_with_input(unresolved_scan) diff --git a/src/partition_table_engine/src/lib.rs b/src/partition_table_engine/src/lib.rs index 209049770a..ee0b460c30 100644 --- a/src/partition_table_engine/src/lib.rs +++ b/src/partition_table_engine/src/lib.rs @@ -27,14 +27,17 @@ use std::sync::Arc; use analytic_engine::TableOptions; use async_trait::async_trait; +use datafusion::logical_expr::expr::{Expr, InList}; use generic_error::BoxError; -use snafu::{OptionExt, ResultExt}; +use snafu::{ensure, OptionExt, ResultExt}; use table_engine::{ engine::{ CloseShardRequest, CloseTableRequest, CreateTableParams, CreateTableRequest, - DropTableRequest, OpenShardRequest, OpenShardResult, OpenTableRequest, Result, TableEngine, - Unexpected, UnexpectedNoCause, + DropTableRequest, InvalidPartitionContext, OpenShardRequest, OpenShardResult, + OpenTableRequest, Result, TableEngine, Unexpected, UnexpectedNoCause, }, + partition::rule::df_adapter::PartitionedFilterKeyIndex, + predicate::Predicate, remote::RemoteEngineRef, table::TableRef, PARTITION_TABLE_ENGINE_TYPE, @@ -110,3 +113,43 @@ impl TableEngine for PartitionTableEngine { vec![Ok("".to_string())] } } + +pub fn partitioned_predicates( + predicate: Arc, + partitions: &[usize], + partitioned_key_indices: &mut PartitionedFilterKeyIndex, +) -> Result> { + ensure!( + partitions.len() == partitioned_key_indices.keys().len(), + InvalidPartitionContext { + msg: format!( + "partitions length:{}, partitioned_key_indices length: {}", + partitions.len(), + partitioned_key_indices.keys().len() + ) + } + ); + let mut predicates = vec![(*predicate).clone(); partitions.len()]; + for (idx, predicate) in predicates.iter_mut().enumerate() { + let partition = partitions[idx]; + if let Some(filter_indices) = partitioned_key_indices.get(&partition) { + let exprs = predicate.mut_exprs(); + for (filter_idx, key_indices) in filter_indices { + if let Expr::InList(InList { + list, + negated: false, + .. + }) = &mut exprs[*filter_idx] + { + let mut idx = 0; + list.retain(|_| { + let should_kept = key_indices.contains(&idx); + idx += 1; + should_kept + }); + } + } + } + } + Ok(predicates) +} diff --git a/src/partition_table_engine/src/partition.rs b/src/partition_table_engine/src/partition.rs index 440722164a..2eb0a6a32f 100644 --- a/src/partition_table_engine/src/partition.rs +++ b/src/partition_table_engine/src/partition.rs @@ -33,8 +33,8 @@ use table_engine::{ partition::{ format_sub_partition_table_name, rule::{ - df_adapter::DfPartitionRuleAdapter, PartitionedRow, PartitionedRows, - PartitionedRowsIter, + df_adapter::{DfPartitionRuleAdapter, PartitionedFilterKeyIndex}, + PartitionedRow, PartitionedRows, PartitionedRowsIter, }, PartitionInfo, }, @@ -289,14 +289,14 @@ impl Table for PartitionTableImpl { .context(CreatePartitionRule)? } }; - + let mut partitioned_key_indices = PartitionedFilterKeyIndex::new(); // Evaluate expr and locate partition. let partitions = { let _locate_timer = PARTITION_TABLE_PARTITIONED_READ_DURATION_HISTOGRAM .with_label_values(&["locate"]) .start_timer(); df_partition_rule - .locate_partitions_for_read(request.predicate.exprs()) + .locate_partitions_for_read(request.predicate.exprs(), &mut partitioned_key_indices) .box_err() .context(LocatePartitions)? }; diff --git a/src/partition_table_engine/src/scan_builder.rs b/src/partition_table_engine/src/scan_builder.rs index 27281c0a52..25d080d3ff 100644 --- a/src/partition_table_engine/src/scan_builder.rs +++ b/src/partition_table_engine/src/scan_builder.rs @@ -27,13 +27,16 @@ use datafusion::{ use df_engine_extensions::dist_sql_query::physical_plan::UnresolvedPartitionedScan; use table_engine::{ partition::{ - format_sub_partition_table_name, rule::df_adapter::DfPartitionRuleAdapter, PartitionInfo, + format_sub_partition_table_name, + rule::df_adapter::{DfPartitionRuleAdapter, PartitionedFilterKeyIndex}, + PartitionInfo, }, provider::TableScanBuilder, remote::model::TableIdentifier, table::ReadRequest, }; +use crate::partitioned_predicates; #[derive(Debug)] pub struct PartitionedTableScanBuilder { table_name: String, @@ -61,13 +64,13 @@ impl PartitionedTableScanBuilder { &self, table_name: &str, partition_info: &PartitionInfo, - partitions: Vec, + partitions: &[usize], ) -> Vec { let definitions = partition_info.get_definitions(); partitions - .into_iter() + .iter() .map(|p| { - let partition_name = &definitions[p].name; + let partition_name = &definitions[*p].name; TableIdentifier { catalog: self.catalog_name.clone(), schema: self.schema_name.clone(), @@ -89,18 +92,226 @@ impl TableScanBuilder for PartitionedTableScanBuilder { DataFusionError::Internal(format!("failed to build partition rule, err:{e}")) })?; + let mut partitioned_key_indices = PartitionedFilterKeyIndex::new(); // Evaluate expr and locate partition. let partitions = df_partition_rule - .locate_partitions_for_read(request.predicate.exprs()) + .locate_partitions_for_read(request.predicate.exprs(), &mut partitioned_key_indices) .map_err(|e| { DataFusionError::Internal(format!("failed to locate partition for read, err:{e}")) })?; + let sub_tables = - self.get_sub_table_idents(&self.table_name, &self.partition_info, partitions); + self.get_sub_table_idents(&self.table_name, &self.partition_info, &partitions); + + let predicates = if partitioned_key_indices.len() == partitions.len() { + Some( + partitioned_predicates( + request.predicate.clone(), + &partitions, + &mut partitioned_key_indices, + ) + .map_err(|e| { + DataFusionError::Internal(format!("partition predicates failed, err:{e}")) + })?, + ) + } else { + // since FilterExtractor.extract only cover some specific expr + // cases, partitioned_key_indices.len() could be 0. + // All partition requests will have the same predicate. + None + }; // Build plan. - let plan = UnresolvedPartitionedScan::new(&self.table_name, sub_tables, request); + let plan = + UnresolvedPartitionedScan::new(&self.table_name, sub_tables, request, predicates); Ok(Arc::new(plan)) } } + +#[cfg(test)] +mod tests { + use common_types::{column_schema::Builder as ColBuilder, datum::DatumKind, schema::Builder}; + use datafusion::logical_expr::{binary_expr, in_list, Expr, Operator}; + use table_engine::{ + partition::{ + rule::df_adapter::{DfPartitionRuleAdapter, PartitionedFilterKeyIndex}, + KeyPartitionInfo, PartitionDefinition, PartitionInfo, + }, + predicate::PredicateBuilder, + }; + + use crate::partitioned_predicates; + + #[test] + fn test_partitioned_predicate() { + // conditions: + // 1) table schema: col_ts, col1, col2, in which col1 and col2 are both keys, + // and with two partitions + // 2) sql: select * from table where col1 = '33' and col2 in ("aa", "bb", + // "cc", "dd") + // partition expectations: + // 1) query fit in two partitions + // 2) yield two predicates, p0: col1 = '33' and col2 in ("aa", "bb", "cc"); + // p1: col1 = '33' and col2 in ("dd") + let definitions = vec![ + PartitionDefinition { + name: "p1".to_string(), + origin_name: None, + }, + PartitionDefinition { + name: "p2".to_string(), + origin_name: None, + }, + ]; + + let partition_info = PartitionInfo::Key(KeyPartitionInfo { + version: 0, + definitions, + partition_key: vec!["col1".to_string(), "col2".to_string()], + linear: false, + }); + + let schema = { + let builder = Builder::new(); + let col_ts = ColBuilder::new("col_ts".to_string(), DatumKind::Timestamp) + .build() + .expect("ts"); + let col1 = ColBuilder::new("col1".to_string(), DatumKind::String) + .build() + .expect("should succeed to build column schema"); + let col2 = ColBuilder::new("col2".to_string(), DatumKind::String) + .build() + .expect("should succeed to build column schema"); + let col3 = ColBuilder::new("col3".to_string(), DatumKind::String) + .build() + .expect("should succeed to build column schema"); + builder + .auto_increment_column_id(true) + .add_key_column(col_ts) + .unwrap() + .add_key_column(col1) + .unwrap() + .add_key_column(col2) + .unwrap() + .add_key_column(col3) + .unwrap() + .primary_key_indexes(vec![1, 2]) + .build() + .unwrap() + }; + + let df_partition_rule = DfPartitionRuleAdapter::new(partition_info, &schema).unwrap(); + + let exprs = vec![ + binary_expr( + Expr::Column("col1".into()), + Operator::Eq, + Expr::Literal("33".into()), + ), + in_list( + Expr::Column("col2".into()), + vec![ + Expr::Literal("aa".into()), + Expr::Literal("bb".into()), + Expr::Literal("cc".into()), + Expr::Literal("dd".into()), + ], + false, + ), + in_list( + Expr::Column("col3".into()), + vec![ + Expr::Literal("1".into()), + Expr::Literal("2".into()), + Expr::Literal("3".into()), + Expr::Literal("4".into()), + ], + false, + ), + ]; + let mut partitioned_key_indices = PartitionedFilterKeyIndex::new(); + let partitions = df_partition_rule + .locate_partitions_for_read(&exprs, &mut partitioned_key_indices) + .unwrap(); + assert!(partitions.len() == 2); + assert!(partitioned_key_indices.len() == 2); + + let predicate = PredicateBuilder::default() + .add_pushdown_exprs(exprs.as_slice()) + .build(); + + let predicates = partitioned_predicates( + predicate, + partitions.as_slice(), + &mut partitioned_key_indices, + ); + assert!(predicates.is_ok()); + let predicates = predicates.unwrap(); + assert!(predicates.len() == 2); + + assert!(predicates[0].exprs().len() == 3); + assert!( + predicates[0].exprs()[0] + == binary_expr( + Expr::Column("col1".into()), + Operator::Eq, + Expr::Literal("33".into()) + ) + ); + assert!( + predicates[0].exprs()[1] + == in_list( + Expr::Column("col2".into()), + vec![ + Expr::Literal("aa".into()), + Expr::Literal("bb".into()), + Expr::Literal("cc".into()), + ], + false, + ) + ); + assert!( + predicates[0].exprs()[2] + == in_list( + Expr::Column("col3".into()), + vec![ + Expr::Literal("1".into()), + Expr::Literal("2".into()), + Expr::Literal("3".into()), + Expr::Literal("4".into()), + ], + false, + ) + ); + assert!( + predicates[1].exprs()[0] + == binary_expr( + Expr::Column("col1".into()), + Operator::Eq, + Expr::Literal("33".into()) + ) + ); + assert!( + predicates[1].exprs()[1] + == in_list( + Expr::Column("col2".into()), + vec![Expr::Literal("dd".into()),], + false, + ) + ); + assert!( + predicates[1].exprs()[2] + == in_list( + Expr::Column("col3".into()), + vec![ + Expr::Literal("1".into()), + Expr::Literal("2".into()), + Expr::Literal("3".into()), + Expr::Literal("4".into()), + ], + false, + ) + ); + } +} diff --git a/src/table_engine/src/engine.rs b/src/table_engine/src/engine.rs index a9ea133708..0f81e0c279 100644 --- a/src/table_engine/src/engine.rs +++ b/src/table_engine/src/engine.rs @@ -97,6 +97,9 @@ pub enum Error { msg: Option, source: GenericError, }, + + #[snafu(display("Invalid partiton context, err:{}", msg))] + InvalidPartitionContext { msg: String }, } define_result!(Error); diff --git a/src/table_engine/src/partition/rule/df_adapter/extractor.rs b/src/table_engine/src/partition/rule/df_adapter/extractor.rs index ff6c393d40..78007500c7 100644 --- a/src/table_engine/src/partition/rule/df_adapter/extractor.rs +++ b/src/table_engine/src/partition/rule/df_adapter/extractor.rs @@ -16,14 +16,16 @@ // under the License. //! Partition filter extractor - use std::collections::HashSet; use common_types::datum::Datum; use datafusion::logical_expr::{expr::InList, Expr, Operator}; use df_operator::visitor::find_columns_by_expr; -use crate::partition::rule::filter::{PartitionCondition, PartitionFilter}; +use crate::partition::rule::{ + df_adapter::IndexedPartitionFilter, + filter::{PartitionCondition, PartitionFilter}, +}; /// The datafusion filter exprs extractor /// @@ -36,13 +38,13 @@ use crate::partition::rule::filter::{PartitionCondition, PartitionFilter}; /// For example: [KeyRule] and [KeyExtractor]. /// If they are not related, [PartitionRule] may not take effect. pub trait FilterExtractor: Send + Sync + 'static { - fn extract(&self, filters: &[Expr], columns: &[String]) -> Vec; + fn extract(&self, filters: &[Expr], columns: &[String]) -> IndexedPartitionFilter; } pub struct NoopExtractor; impl FilterExtractor for NoopExtractor { - fn extract(&self, _filters: &[Expr], _columns: &[String]) -> Vec { + fn extract(&self, _filters: &[Expr], _columns: &[String]) -> IndexedPartitionFilter { vec![] } } @@ -50,13 +52,14 @@ impl FilterExtractor for NoopExtractor { pub struct KeyExtractor; impl FilterExtractor for KeyExtractor { - fn extract(&self, filters: &[Expr], columns: &[String]) -> Vec { + fn extract(&self, filters: &[Expr], columns: &[String]) -> IndexedPartitionFilter { + // PartitionFilter indices may not the same as filters indices if filters.is_empty() { return Vec::default(); } let mut target = Vec::with_capacity(filters.len()); - for filter in filters { + for (index, filter) in filters.iter().enumerate() { // If no target columns included in `filter`, ignore this `filter`. let columns_in_filter = find_columns_by_expr(filter) .into_iter() @@ -78,7 +81,6 @@ impl FilterExtractor for KeyExtractor { // Finally, we try to convert `filter` to `PartitionFilter`. // We just support the simple situation: "colum = value" now. - // TODO: support "colum in [value list]". // TODO: we need to compare and check the datatype of column and value. // (Actually, there is type conversion on high-level, but when converted data // is overflow, it may take no effect). @@ -126,7 +128,7 @@ impl FilterExtractor for KeyExtractor { }; if let Some(pf) = partition_filter { - target.push(pf); + target.push((index, pf)); } } @@ -157,7 +159,7 @@ mod tests { column: "col1".to_string(), condition: PartitionCondition::Eq(Datum::Int32(42)), }; - assert_eq!(partition_filter.first().unwrap(), &expected); + assert_eq!(partition_filter.first().unwrap().1, expected); // Other expr will be rejected now. let rejected_expr = col("col1").gt(Expr::Literal(ScalarValue::Int32(Some(42)))); @@ -182,7 +184,7 @@ mod tests { column: "col1".to_string(), condition: PartitionCondition::In(vec![Datum::Int32(42), Datum::Int32(38)]), }; - assert_eq!(partition_filter.first().unwrap(), &expected); + assert_eq!(partition_filter.first().unwrap().1, expected); } #[test] diff --git a/src/table_engine/src/partition/rule/df_adapter/mod.rs b/src/table_engine/src/partition/rule/df_adapter/mod.rs index 8e95f85674..1b5df6ca08 100644 --- a/src/table_engine/src/partition/rule/df_adapter/mod.rs +++ b/src/table_engine/src/partition/rule/df_adapter/mod.rs @@ -16,6 +16,7 @@ // under the License. //! Partition rule datafusion adapter +use std::collections::{BTreeSet, HashMap}; use common_types::{row::RowGroup, schema::Schema}; use datafusion::logical_expr::Expr; @@ -23,14 +24,22 @@ use datafusion::logical_expr::Expr; use self::extractor::{KeyExtractor, NoopExtractor}; use crate::partition::{ rule::{ - df_adapter::extractor::FilterExtractorRef, factory::PartitionRuleFactory, PartitionRulePtr, - PartitionedRows, + df_adapter::extractor::FilterExtractorRef, factory::PartitionRuleFactory, + filter::PartitionFilter, PartitionRulePtr, PartitionedRows, }, BuildPartitionRule, PartitionInfo, Result, }; mod extractor; +pub type PartitionId = usize; // partiton number (id) +pub type FilterIndex = usize; // filter (or expr) index regarding predicate.exprs() +pub type KeyIndex = usize; // key index regarding inlist expr +pub type FilterKeyIndex = HashMap>; +pub type PartitionedFilterKeyIndex = HashMap; +pub type IndexedPartitionFilter = Vec<(usize, PartitionFilter)>; +pub type IndexedPartitionFilterRef<'a> = &'a [(usize, PartitionFilter)]; + /// Partition rule's adapter for datafusion pub struct DfPartitionRuleAdapter { /// Partition rule @@ -56,12 +65,17 @@ impl DfPartitionRuleAdapter { self.rule.location_partitions_for_write(row_group) } - pub fn locate_partitions_for_read(&self, filters: &[Expr]) -> Result> { + pub fn locate_partitions_for_read( + &self, + filters: &[Expr], + partitioned_key_indices: &mut PartitionedFilterKeyIndex, + ) -> Result> { // Extract partition filters from datafusion filters. let partition_filters = self.extractor.extract(filters, self.columns()); // Locate partitions from filters. - self.rule.locate_partitions_for_read(&partition_filters) + self.rule + .locate_partitions_for_read(&partition_filters, partitioned_key_indices) } fn create_extractor(partition_info: &PartitionInfo) -> Result { @@ -116,8 +130,9 @@ mod tests { // Basic flow let key_rule_adapter = DfPartitionRuleAdapter::new(PartitionInfo::Key(ket_partition), &schema).unwrap(); + let mut partitioned_key_indices = PartitionedFilterKeyIndex::new(); let partitions = key_rule_adapter - .locate_partitions_for_read(&valid_filters_1) + .locate_partitions_for_read(&valid_filters_1, &mut partitioned_key_indices) .unwrap(); let partition_keys = [ @@ -132,7 +147,7 @@ mod tests { // Conflict filter and empty partitions let partitions = key_rule_adapter - .locate_partitions_for_read(&valid_filters_2) + .locate_partitions_for_read(&valid_filters_2, &mut partitioned_key_indices) .unwrap(); assert!(partitions.is_empty()); @@ -161,12 +176,13 @@ mod tests { let key_rule_adapter = DfPartitionRuleAdapter::new(PartitionInfo::Key(ket_partition), &schema).unwrap(); + let mut partitioned_key_indices = PartitionedFilterKeyIndex::new(); // Partitions located from invalid filters. let partitions_1 = key_rule_adapter - .locate_partitions_for_read(&invalid_filters_1) + .locate_partitions_for_read(&invalid_filters_1, &mut partitioned_key_indices) .unwrap(); let partitions_2 = key_rule_adapter - .locate_partitions_for_read(&invalid_filters_2) + .locate_partitions_for_read(&invalid_filters_2, &mut partitioned_key_indices) .unwrap(); // Expected diff --git a/src/table_engine/src/partition/rule/key.rs b/src/table_engine/src/partition/rule/key.rs index 70a3038f6e..7692c80020 100644 --- a/src/table_engine/src/partition/rule/key.rs +++ b/src/table_engine/src/partition/rule/key.rs @@ -30,7 +30,9 @@ use snafu::OptionExt; use crate::partition::{ rule::{ - filter::PartitionCondition, PartitionFilter, PartitionRule, PartitionedRow, PartitionedRows, + df_adapter::{IndexedPartitionFilterRef, PartitionedFilterKeyIndex}, + filter::PartitionCondition, + PartitionFilter, PartitionRule, PartitionedRow, PartitionedRows, }, Internal, LocateWritePartition, Result, }; @@ -139,15 +141,29 @@ impl KeyRule { &self, group: &[usize], filters: &[PartitionFilter], - ) -> Result> { - let mut partitions = BTreeSet::new(); + ) -> Result { + // Retrieve all the key DatumView instances along with their corresponding + // indices related to their positions in the predicate inlist. let expanded_group = expand_partition_keys_group(group, filters)?; - for partition_keys in expanded_group { - let partition = compute_partition(partition_keys.into_iter(), self.partition_num); - partitions.insert(partition); + + let mut partitioned_key_indices = PartitionedFilterKeyIndex::new(); + for indexed_partition_keys in expanded_group { + // batch all the keys for hash computation + let partition_keys = indexed_partition_keys.iter().map(|item| item.1.clone()); + let partition = compute_partition(partition_keys, self.partition_num); + + // collect all the key indices related to all predicate expr in the target + // partition + let filter_inlist_indices = partitioned_key_indices.entry(partition).or_default(); + for (index, item) in indexed_partition_keys.iter().enumerate() { + filter_inlist_indices + .entry(group[index]) + .or_default() + .insert(item.0); + } } - Ok(partitions) + Ok(partitioned_key_indices) } #[inline] @@ -189,16 +205,25 @@ impl PartitionRule for KeyRule { Ok(PartitionedRows::Multiple(Box::new(iter))) } - fn locate_partitions_for_read(&self, filters: &[PartitionFilter]) -> Result> { + fn locate_partitions_for_read( + &self, + indexed_filters: IndexedPartitionFilterRef, + partitioned_key_indices: &mut PartitionedFilterKeyIndex, + ) -> Result> { // Filters are empty. - if filters.is_empty() { + if indexed_filters.is_empty() { return Ok(self.all_partitions()); } + let filters = indexed_filters + .iter() + .map(|(_idx, filter)| filter.clone()) + .collect::>(); + // Group the filters by their columns. // If found invalid filter, return all partitions. let candidate_partition_keys_groups = self - .get_candidate_partition_keys_groups(filters) + .get_candidate_partition_keys_groups(&filters) .map_err(|e| { error!("KeyRule locate partition for read, err:{}", e); }) @@ -208,11 +233,31 @@ impl PartitionRule for KeyRule { } let (first_group, rest_groups) = candidate_partition_keys_groups.split_first().unwrap(); - let mut target_partitions = self.compute_partition_for_keys_group(first_group, filters)?; + let mut partitioned_key_indices_all = + self.compute_partition_for_keys_group(first_group, filters.as_slice())?; + let mut target_partitions: BTreeSet = + partitioned_key_indices_all.keys().copied().collect(); for group in rest_groups { // Same as above, if found invalid, return all partitions. - let partitions = match self.compute_partition_for_keys_group(group, filters) { - Ok(partitions) => partitions, + let partitions = match self.compute_partition_for_keys_group(group, filters.as_slice()) + { + Ok(partitioned_filter_key_index_rest) => { + for (partition_rest, filter_key_index_rest) in + &partitioned_filter_key_index_rest + { + // merge all the rest key indices. + let filter_key_index = partitioned_key_indices_all + .entry(*partition_rest) + .or_default(); + for item in filter_key_index_rest { + filter_key_index + .entry(*item.0) + .or_default() + .extend(item.1.iter()); + } + } + partitioned_filter_key_index_rest.keys().copied().collect() + } Err(e) => { error!("KeyRule locate partition for read, err:{}", e); return Ok(self.all_partitions()); @@ -225,6 +270,8 @@ impl PartitionRule for KeyRule { .collect::>(); } + partitioned_key_indices.extend(partitioned_key_indices_all); + Ok(target_partitions.into_iter().collect()) } } @@ -232,7 +279,7 @@ impl PartitionRule for KeyRule { fn expand_partition_keys_group<'a>( group: &[usize], filters: &'a [PartitionFilter], -) -> Result>>> { +) -> Result)>>> { let mut datum_by_columns = Vec::with_capacity(group.len()); for filter_idx in group { let filter = &filters[*filter_idx]; @@ -252,7 +299,7 @@ fn expand_partition_keys_group<'a>( Ok(datum_by_columns .into_iter() - .map(|filters| filters.into_iter()) + .map(|filters| filters.into_iter().enumerate()) .multi_cartesian_product()) } @@ -546,7 +593,7 @@ mod tests { // Expanded group let expanded_group = expand_partition_keys_group(&group, &filters) .unwrap() - .map(|v| v.iter().map(|view| view.to_datum()).collect_vec()) + .map(|v| v.iter().map(|view| view.1.to_datum()).collect_vec()) .collect_vec(); let expected = vec![ vec![Datum::UInt32(1), Datum::UInt32(2), Datum::UInt32(3)], diff --git a/src/table_engine/src/partition/rule/mod.rs b/src/table_engine/src/partition/rule/mod.rs index a1c138f3b8..b17ec85f80 100644 --- a/src/table_engine/src/partition/rule/mod.rs +++ b/src/table_engine/src/partition/rule/mod.rs @@ -22,10 +22,12 @@ mod factory; mod filter; mod key; mod random; - use common_types::row::{Row, RowGroup}; -use self::filter::PartitionFilter; +use self::{ + df_adapter::{IndexedPartitionFilterRef, PartitionedFilterKeyIndex}, + filter::PartitionFilter, +}; use crate::partition::Result; /// The partitioned rows of the written requests. @@ -67,7 +69,11 @@ pub trait PartitionRule: Send + Sync + 'static { /// passed here. /// /// If unexpected filters still found, all partitions will be returned. - fn locate_partitions_for_read(&self, filters: &[PartitionFilter]) -> Result>; + fn locate_partitions_for_read( + &self, + indexed_filters: IndexedPartitionFilterRef, + partitioned_key_indices: &mut PartitionedFilterKeyIndex, + ) -> Result>; } pub type PartitionRulePtr = Box; diff --git a/src/table_engine/src/partition/rule/random.rs b/src/table_engine/src/partition/rule/random.rs index 0be8804f6f..d1e84a03f2 100644 --- a/src/table_engine/src/partition/rule/random.rs +++ b/src/table_engine/src/partition/rule/random.rs @@ -21,7 +21,10 @@ use common_types::row::RowGroup; use itertools::Itertools; use crate::partition::{ - rule::{filter::PartitionFilter, PartitionRule, PartitionedRows}, + rule::{ + df_adapter::{IndexedPartitionFilterRef, PartitionedFilterKeyIndex}, + PartitionRule, PartitionedRows, + }, Result, }; @@ -47,7 +50,11 @@ impl PartitionRule for RandomRule { }) } - fn locate_partitions_for_read(&self, _filters: &[PartitionFilter]) -> Result> { + fn locate_partitions_for_read( + &self, + _indexed_filters: IndexedPartitionFilterRef, + _partitioned_key_indices: &mut PartitionedFilterKeyIndex, + ) -> Result> { Ok((0..self.partition_num).collect_vec()) } } diff --git a/src/table_engine/src/predicate.rs b/src/table_engine/src/predicate.rs index 3a3294fcd9..37c09e9de2 100644 --- a/src/table_engine/src/predicate.rs +++ b/src/table_engine/src/predicate.rs @@ -104,6 +104,10 @@ impl Predicate { &self.exprs } + pub fn mut_exprs(&mut self) -> &mut [Expr] { + &mut self.exprs + } + pub fn time_range(&self) -> TimeRange { self.time_range }