Skip to content

Commit

Permalink
feat: partition table query optimize (#1594)
Browse files Browse the repository at this point in the history
## Rationale
Close #1441 

## Detailed Changes
### TLDR
The performance issue with inlist queries is due to the extra overhead
from bloom-filter-like directory lookups when scanning each SST file for
rows. The solution is to create a separate predicate for each partition,
containing only the keys relevant to that partition. Since the current
partition filter only supports BinaryExpr(Column, operator, Literal) and
non-negated InList expressions, this solution will address only those
specific cases.

### Changes
1. During the scan building process, when identifying the partitions for
a query, we create a PartitionedFilterKeyIndex variable to store the
predicate key indices for each expression.
2. In the compute_partition_for_keys_group function, we use a
HashMap<partition_id, HashMap<filter_index, BTreeSet<key_index>>> to
record the indices of keys involved in partition computation for each
group.
3. In the partitioned_predicates function, we construct the final
predicates for each partition.
4. In resolve_partitioned_scan_internal, we generate separate requests
for each partition.

e.g.
conditions:
1. table schema: col_ts, col1, col2, in which col1 and col2 are both
keys,
     and with two partitions
2. sql: select * from table where col1 = '33' and col2 in ("aa", "bb",
     "cc", "dd")

partition expectations:
   yield two predicates
      p0: col1 = '33' and col2 in ("aa", "bb", "cc");
      p1: col1 = '33' and col2 in ("dd")

### Other issues discovered
When the inlist key args length is less than three, Expr will be
refactored to nested BinaryExpr which bypasses the FilterExtractor.

e.g.
SQL: select * from table where col1 in ("aa", "bb") and col2 in
(1,2,3,4,5...1000)
Since ("aa", "bb") has fewer than three elements, the col1 key filter is
not included in partition computation, which interrupts the partitioning
process in the get_candidate_partition_keys_groups function, as
contains_empty_filter is set to true.


## Test Plan
1. UT: test_partitioned_predicate
2. Manual test.

---------

Co-authored-by: jiacai2050 <[email protected]>
  • Loading branch information
zealchen and jiacai2050 authored Nov 25, 2024
1 parent 2176524 commit e2970b1
Show file tree
Hide file tree
Showing 15 changed files with 418 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,24 +99,24 @@ UInt64(16367588166920223437),Timestamp(1651737067000),String("horaedb9"),Int32(0
-- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
-- SQLNESS REPLACE compute=\d+.?\d*(µ|m|n) compute=xx
-- SQLNESS REPLACE time=\d+.?\d*(µ|m|n) time=xx
-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
-- SQLNESS REPLACE elapsed_compute=\d+.?\d*(µ|m|n) elapsed_compute=xx
-- SQLNESS REPLACE scan_memtable_\d+ scan_memtable_n
EXPLAIN ANALYZE SELECT * from partition_table_t where name = "ceresdb0";

plan_type,plan,
String("Plan with Metrics"),String("ResolvedPartitionedScan: pushdown_continue:false, partition_count:1, metrics=xx\n ScanTable: table=__partition_table_t_1, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[name = Utf8(\"ceresdb0\")], time_range:TimeRange { inclusive_start: Timestamp(-9223372036854775808), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=0\n num_ssts=0\n scan_count=1\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=0\n total_rows_fetch_from_one=0\n scan_memtable_n, fetched_columns:[tsid,t,name,id,value]:\n=0]\n=0]\n"),
String("Plan with Metrics"),String("ResolvedPartitionedScan: pushdown_continue:false, partition_count:1, metrics=[\npartition_table_t:\n __partition_table_t_1:\n poll_duration=xxs\n total_duration=xxs\n wait_duration=xxs\n\n__partition_table_t_1:\nCoalescePartitionsExec, metrics=[output_rows=0, elapsed_compute=xxs]\n ScanTable: table=__partition_table_t_1, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[name = Utf8(\"ceresdb0\")], time_range:TimeRange { inclusive_start: Timestamp(-9223372036854775808), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=0\n num_ssts=0\n scan_count=1\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=0\n total_rows_fetch_from_one=0\n scan_memtable_n, fetched_columns:[tsid,t,name,id,value]:\n=0]\n=0]\n"),


-- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
-- SQLNESS REPLACE compute=\d+.?\d*(µ|m|n) compute=xx
-- SQLNESS REPLACE __partition_table_t_\d __partition_table_t_x
-- SQLNESS REPLACE time=\d+.?\d*(µ|m|n) time=xx
-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
-- SQLNESS REPLACE elapsed_compute=\d+.?\d*(µ|m|n) elapsed_compute=xx
-- SQLNESS REPLACE scan_memtable_\d+ scan_memtable_n
EXPLAIN ANALYZE SELECT * from partition_table_t where name in ("ceresdb0", "ceresdb1", "ceresdb2", "ceresdb3", "ceresdb4");

plan_type,plan,
String("Plan with Metrics"),String("ResolvedPartitionedScan: pushdown_continue:false, partition_count:3, metrics=xx\n ScanTable: table=__partition_table_t_x, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), metrics=xx\n ScanTable: table=__partition_table_t_x, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), metrics=xx\n ScanTable: table=__partition_table_t_x, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[name IN ([Utf8(\"ceresdb0\"), Utf8(\"ceresdb1\"), Utf8(\"ceresdb2\"), Utf8(\"ceresdb3\"), Utf8(\"ceresdb4\")])], time_range:TimeRange { inclusive_start: Timestamp(-9223372036854775808), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=0\n num_ssts=0\n scan_count=1\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=0\n total_rows_fetch_from_one=0\n scan_memtable_n, fetched_columns:[tsid,t,name,id,value]:\n=0]\n=0]\n"),
String("Plan with Metrics"),String("ResolvedPartitionedScan: pushdown_continue:false, partition_count:3, metrics=[\npartition_table_t:\n __partition_table_t_x:\n poll_duration=xxs\n total_duration=xxs\n wait_duration=xxs\n __partition_table_t_x:\n poll_duration=xxs\n total_duration=xxs\n wait_duration=xxs\n __partition_table_t_x:\n poll_duration=xxs\n total_duration=xxs\n wait_duration=xxs\n\n__partition_table_t_x:\nCoalescePartitionsExec, metrics=[output_rows=0, elapsed_compute=xxs]\n ScanTable: table=__partition_table_t_x, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[name IN ([Utf8(\"ceresdb2\")])], time_range:TimeRange { inclusive_start: Timestamp(-9223372036854775808), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=0\n num_ssts=0\n scan_count=1\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=0\n total_rows_fetch_from_one=0\n scan_memtable_n, fetched_columns:[tsid,t,name,id,value]:\n=0]\n\n__partition_table_t_x:\nCoalescePartitionsExec, metrics=[output_rows=0, elapsed_compute=xxs]\n ScanTable: table=__partition_table_t_x, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[name IN ([Utf8(\"ceresdb0\"), Utf8(\"ceresdb4\")])], time_range:TimeRange { inclusive_start: Timestamp(-9223372036854775808), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=0\n num_ssts=0\n scan_count=1\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=0\n total_rows_fetch_from_one=0\n scan_memtable_n, fetched_columns:[tsid,t,name,id,value]:\n=0]\n\n__partition_table_t_x:\nCoalescePartitionsExec, metrics=[output_rows=0, elapsed_compute=xxs]\n ScanTable: table=__partition_table_t_x, parallelism=8, priority=Low, partition_count=UnknownPartitioning(8), metrics=[\nPredicate { exprs:[name IN ([Utf8(\"ceresdb1\"), Utf8(\"ceresdb3\")])], time_range:TimeRange { inclusive_start: Timestamp(-9223372036854775808), exclusive_end: Timestamp(9223372036854775807) } }\nscan_table:\n do_merge_sort=true\n iter_num=1\n merge_iter_0:\n init_duration=xxs\n num_memtables=0\n num_ssts=0\n scan_count=1\n scan_duration=xxs\n times_fetch_row_from_multiple=0\n times_fetch_rows_from_one=0\n total_rows_fetch_from_one=0\n scan_memtable_n, fetched_columns:[tsid,t,name,id,value]:\n=0]\n=0]\n"),


ALTER TABLE partition_table_t ADD COLUMN (b string);
Expand Down
4 changes: 2 additions & 2 deletions integration_tests/cases/env/cluster/ddl/partition_table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,15 @@ SELECT * from partition_table_t where name in ("horaedb5", "horaedb6", "horaedb7
-- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
-- SQLNESS REPLACE compute=\d+.?\d*(µ|m|n) compute=xx
-- SQLNESS REPLACE time=\d+.?\d*(µ|m|n) time=xx
-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
-- SQLNESS REPLACE elapsed_compute=\d+.?\d*(µ|m|n) elapsed_compute=xx
-- SQLNESS REPLACE scan_memtable_\d+ scan_memtable_n
EXPLAIN ANALYZE SELECT * from partition_table_t where name = "ceresdb0";

-- SQLNESS REPLACE duration=\d+.?\d*(µ|m|n) duration=xx
-- SQLNESS REPLACE compute=\d+.?\d*(µ|m|n) compute=xx
-- SQLNESS REPLACE __partition_table_t_\d __partition_table_t_x
-- SQLNESS REPLACE time=\d+.?\d*(µ|m|n) time=xx
-- SQLNESS REPLACE metrics=\[.*?s\] metrics=xx
-- SQLNESS REPLACE elapsed_compute=\d+.?\d*(µ|m|n) elapsed_compute=xx
-- SQLNESS REPLACE scan_memtable_\d+ scan_memtable_n
EXPLAIN ANALYZE SELECT * from partition_table_t where name in ("ceresdb0", "ceresdb1", "ceresdb2", "ceresdb3", "ceresdb4");

Expand Down
5 changes: 4 additions & 1 deletion src/df_engine_extensions/src/dist_sql_query/physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use datafusion::{
};
use futures::{future::BoxFuture, FutureExt, Stream, StreamExt};
use runtime::Priority;
use table_engine::{remote::model::TableIdentifier, table::ReadRequest};
use table_engine::{predicate::Predicate, remote::model::TableIdentifier, table::ReadRequest};
use trace_metric::{collector::FormatCollectorVisitor, MetricsCollector, TraceMetricWhenDrop};

use crate::dist_sql_query::{RemotePhysicalPlanExecutor, RemoteTaskContext, TableScanContext};
Expand All @@ -62,13 +62,15 @@ pub struct UnresolvedPartitionedScan {
pub table_scan_ctx: TableScanContext,
pub metrics_collector: MetricsCollector,
pub priority: Priority,
pub predicates: Option<Vec<Predicate>>,
}

impl UnresolvedPartitionedScan {
pub fn new(
table_name: &str,
sub_tables: Vec<TableIdentifier>,
read_request: ReadRequest,
predicates: Option<Vec<Predicate>>,
) -> Self {
let metrics_collector = MetricsCollector::new(table_name.to_string());
let table_scan_ctx = TableScanContext {
Expand All @@ -83,6 +85,7 @@ impl UnresolvedPartitionedScan {
table_scan_ctx,
metrics_collector,
priority: read_request.priority,
predicates,
}
}
}
Expand Down
16 changes: 14 additions & 2 deletions src/df_engine_extensions/src/dist_sql_query/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,22 @@ impl Resolver {
let sub_tables = unresolved.sub_tables.clone();
let remote_plans = sub_tables
.into_iter()
.map(|table| {
.enumerate()
.map(|(idx, table)| {
let plan = Arc::new(UnresolvedSubTableScan {
table: table.clone(),
table_scan_ctx: unresolved.table_scan_ctx.clone(),
table_scan_ctx: if let Some(ref predicates) = unresolved.predicates {
// Since all each partition has different predicate, so we shall build
// seperate ctx regarding each partition
// with different predicate
let mut ctx = unresolved.table_scan_ctx.clone();
// overwrite old predicate (it's the predidcate before partiton
// calculation) with optimized predicate
ctx.predicate = Arc::new(predicates[idx].clone());
ctx
} else {
unresolved.table_scan_ctx.clone()
},
});
let sub_metrics_collect = metrics_collector.span(table.table.clone());

Expand Down
2 changes: 2 additions & 0 deletions src/df_engine_extensions/src/dist_sql_query/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ impl TestContext {
"test",
sub_tables,
self.request.clone(),
None,
));

let filter: Arc<dyn ExecutionPlan> =
Expand Down Expand Up @@ -364,6 +365,7 @@ impl TestContext {
"test",
self.sub_table_groups[0].clone(),
self.request.clone(),
None,
));

self.build_aggr_plan_with_input(unresolved_scan)
Expand Down
49 changes: 46 additions & 3 deletions src/partition_table_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,17 @@ use std::sync::Arc;

use analytic_engine::TableOptions;
use async_trait::async_trait;
use datafusion::logical_expr::expr::{Expr, InList};
use generic_error::BoxError;
use snafu::{OptionExt, ResultExt};
use snafu::{ensure, OptionExt, ResultExt};
use table_engine::{
engine::{
CloseShardRequest, CloseTableRequest, CreateTableParams, CreateTableRequest,
DropTableRequest, OpenShardRequest, OpenShardResult, OpenTableRequest, Result, TableEngine,
Unexpected, UnexpectedNoCause,
DropTableRequest, InvalidPartitionContext, OpenShardRequest, OpenShardResult,
OpenTableRequest, Result, TableEngine, Unexpected, UnexpectedNoCause,
},
partition::rule::df_adapter::PartitionedFilterKeyIndex,
predicate::Predicate,
remote::RemoteEngineRef,
table::TableRef,
PARTITION_TABLE_ENGINE_TYPE,
Expand Down Expand Up @@ -110,3 +113,43 @@ impl TableEngine for PartitionTableEngine {
vec![Ok("".to_string())]
}
}

pub fn partitioned_predicates(
predicate: Arc<Predicate>,
partitions: &[usize],
partitioned_key_indices: &mut PartitionedFilterKeyIndex,
) -> Result<Vec<Predicate>> {
ensure!(
partitions.len() == partitioned_key_indices.keys().len(),
InvalidPartitionContext {
msg: format!(
"partitions length:{}, partitioned_key_indices length: {}",
partitions.len(),
partitioned_key_indices.keys().len()
)
}
);
let mut predicates = vec![(*predicate).clone(); partitions.len()];
for (idx, predicate) in predicates.iter_mut().enumerate() {
let partition = partitions[idx];
if let Some(filter_indices) = partitioned_key_indices.get(&partition) {
let exprs = predicate.mut_exprs();
for (filter_idx, key_indices) in filter_indices {
if let Expr::InList(InList {
list,
negated: false,
..
}) = &mut exprs[*filter_idx]
{
let mut idx = 0;
list.retain(|_| {
let should_kept = key_indices.contains(&idx);
idx += 1;
should_kept
});
}
}
}
}
Ok(predicates)
}
8 changes: 4 additions & 4 deletions src/partition_table_engine/src/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ use table_engine::{
partition::{
format_sub_partition_table_name,
rule::{
df_adapter::DfPartitionRuleAdapter, PartitionedRow, PartitionedRows,
PartitionedRowsIter,
df_adapter::{DfPartitionRuleAdapter, PartitionedFilterKeyIndex},
PartitionedRow, PartitionedRows, PartitionedRowsIter,
},
PartitionInfo,
},
Expand Down Expand Up @@ -289,14 +289,14 @@ impl Table for PartitionTableImpl {
.context(CreatePartitionRule)?
}
};

let mut partitioned_key_indices = PartitionedFilterKeyIndex::new();
// Evaluate expr and locate partition.
let partitions = {
let _locate_timer = PARTITION_TABLE_PARTITIONED_READ_DURATION_HISTOGRAM
.with_label_values(&["locate"])
.start_timer();
df_partition_rule
.locate_partitions_for_read(request.predicate.exprs())
.locate_partitions_for_read(request.predicate.exprs(), &mut partitioned_key_indices)
.box_err()
.context(LocatePartitions)?
};
Expand Down
Loading

0 comments on commit e2970b1

Please sign in to comment.