From 873ee7beaf8ab240ab271ffc72f78bec9fc6a585 Mon Sep 17 00:00:00 2001 From: Runji Wang Date: Tue, 21 May 2024 20:37:56 +0800 Subject: [PATCH] fix clippy Signed-off-by: Runji Wang --- src/batch/benches/hash_agg.rs | 1 + src/expr/core/src/aggregate/def.rs | 12 ++++++-- src/expr/impl/benches/expr.rs | 30 +++++++------------ src/meta/src/stream/test_fragmenter.rs | 1 + src/sqlparser/tests/sqlparser_postgres.rs | 2 +- .../integration_tests/eowc_over_window.rs | 6 ++-- .../tests/integration_tests/over_window.rs | 10 +++---- 7 files changed, 32 insertions(+), 30 deletions(-) diff --git a/src/batch/benches/hash_agg.rs b/src/batch/benches/hash_agg.rs index a918541beea18..f37261f7563e4 100644 --- a/src/batch/benches/hash_agg.rs +++ b/src/batch/benches/hash_agg.rs @@ -49,6 +49,7 @@ fn create_agg_call( order_by: vec![], filter: None, direct_args: vec![], + udf: None, } } diff --git a/src/expr/core/src/aggregate/def.rs b/src/expr/core/src/aggregate/def.rs index 62bb647c69587..5ca07d0153196 100644 --- a/src/expr/core/src/aggregate/def.rs +++ b/src/expr/core/src/aggregate/def.rs @@ -497,9 +497,7 @@ impl AggArgs { val_indices: args.iter().map(|arg| arg.get_index() as usize).collect(), }) } -} -impl AggArgs { /// return the types of arguments. pub fn arg_types(&self) -> &[DataType] { &self.data_types @@ -510,3 +508,13 @@ impl AggArgs { &self.val_indices } } + +impl FromIterator<(DataType, usize)> for AggArgs { + fn from_iter>(iter: T) -> Self { + let (data_types, val_indices): (Vec<_>, Vec<_>) = iter.into_iter().unzip(); + AggArgs { + data_types: data_types.into(), + val_indices: val_indices.into(), + } + } +} diff --git a/src/expr/impl/benches/expr.rs b/src/expr/impl/benches/expr.rs index e04bc9faee35e..21036d3649acb 100644 --- a/src/expr/impl/benches/expr.rs +++ b/src/expr/impl/benches/expr.rs @@ -26,7 +26,7 @@ use criterion::{criterion_group, criterion_main, Criterion}; use risingwave_common::array::*; use risingwave_common::types::test_utils::IntervalTestExt; use risingwave_common::types::*; -use risingwave_expr::aggregate::{build_append_only, AggArgs, AggCall, AggKind}; +use risingwave_expr::aggregate::{build_append_only, AggCall, AggKind}; use risingwave_expr::expr::*; use risingwave_expr::sig::FUNCTION_REGISTRY; use risingwave_pb::expr::expr_node::PbType; @@ -396,26 +396,17 @@ fn bench_expr(c: &mut Criterion) { } let agg = match build_append_only(&AggCall { kind: sig.name.as_aggregate(), - args: match sig.inputs_type.as_slice() { - [] => AggArgs::None, - [t] => AggArgs::Unary(t.as_exact().clone(), input_index_for_type(t.as_exact())), - [t1, t2] => AggArgs::Binary( - [t1.as_exact().clone(), t2.as_exact().clone()], - [ - input_index_for_type(t1.as_exact()), - input_index_for_type(t2.as_exact()), - ], - ), - _ => { - println!("todo: {sig:?}"); - continue; - } - }, + args: sig + .inputs_type + .iter() + .map(|t| (t.as_exact().clone(), input_index_for_type(t.as_exact()))) + .collect(), return_type: sig.ret_type.as_exact().clone(), column_orders: vec![], filter: None, distinct: false, direct_args: vec![], + user_defined: None, }) { Ok(agg) => agg, Err(e) => { @@ -433,9 +424,10 @@ fn bench_expr(c: &mut Criterion) { _ => unreachable!(), }; c.bench_function(&format!("{sig:?}"), |bencher| { - bencher - .to_async(FuturesExecutor) - .iter(|| async { agg.update(&mut agg.create_state(), &input).await.unwrap() }) + bencher.to_async(FuturesExecutor).iter(|| async { + let mut state = agg.create_state().unwrap(); + agg.update(&mut state, &input).await.unwrap() + }) }); } } diff --git a/src/meta/src/stream/test_fragmenter.rs b/src/meta/src/stream/test_fragmenter.rs index c0fc385ad96fb..838527b32382b 100644 --- a/src/meta/src/stream/test_fragmenter.rs +++ b/src/meta/src/stream/test_fragmenter.rs @@ -74,6 +74,7 @@ fn make_sum_aggcall(idx: u32) -> AggCall { order_by: vec![], filter: None, direct_args: vec![], + udf: None, } } diff --git a/src/sqlparser/tests/sqlparser_postgres.rs b/src/sqlparser/tests/sqlparser_postgres.rs index dd2020d5d3a9d..59d6c9d6d82a1 100644 --- a/src/sqlparser/tests/sqlparser_postgres.rs +++ b/src/sqlparser/tests/sqlparser_postgres.rs @@ -910,7 +910,7 @@ fn parse_create_aggregate() { or_replace: true, name: ObjectName(vec![Ident::new_unchecked("sum")]), args: vec![OperateFunctionArg::unnamed(DataType::Int)], - returns: Some(DataType::BigInt), + returns: DataType::BigInt, append_only: true, params: CreateFunctionBody { language: Some("python".into()), diff --git a/src/stream/tests/integration_tests/eowc_over_window.rs b/src/stream/tests/integration_tests/eowc_over_window.rs index cc674e556ab5e..72e3dade4c993 100644 --- a/src/stream/tests/integration_tests/eowc_over_window.rs +++ b/src/stream/tests/integration_tests/eowc_over_window.rs @@ -86,14 +86,14 @@ async fn test_over_window() { // lag(x, 1) WindowFuncCall { kind: WindowFuncKind::Aggregate(AggKind::FirstValue), - args: AggArgs::Unary(DataType::Int32, 3), + args: AggArgs::from_iter([(DataType::Int32, 3)]), return_type: DataType::Int32, frame: Frame::rows(FrameBound::Preceding(1), FrameBound::Preceding(1)), }, // lead(x, 1) WindowFuncCall { kind: WindowFuncKind::Aggregate(AggKind::FirstValue), - args: AggArgs::Unary(DataType::Int32, 3), + args: AggArgs::from_iter([(DataType::Int32, 3)]), return_type: DataType::Int32, frame: Frame::rows(FrameBound::Following(1), FrameBound::Following(1)), }, @@ -186,7 +186,7 @@ async fn test_over_window_aggregate() { let store = MemoryStateStore::new(); let calls = vec![WindowFuncCall { kind: WindowFuncKind::Aggregate(AggKind::Sum), - args: AggArgs::Unary(DataType::Int32, 3), + args: AggArgs::from_iter([(DataType::Int32, 3)]), return_type: DataType::Int64, frame: Frame::rows(FrameBound::Preceding(1), FrameBound::Following(1)), }]; diff --git a/src/stream/tests/integration_tests/over_window.rs b/src/stream/tests/integration_tests/over_window.rs index 0be8e1848e9cd..f1ebba752b167 100644 --- a/src/stream/tests/integration_tests/over_window.rs +++ b/src/stream/tests/integration_tests/over_window.rs @@ -105,14 +105,14 @@ async fn test_over_window_lag_lead_append_only() { // lag(x, 1) WindowFuncCall { kind: WindowFuncKind::Aggregate(AggKind::FirstValue), - args: AggArgs::Unary(DataType::Int32, 3), + args: AggArgs::from_iter([(DataType::Int32, 3)]), return_type: DataType::Int32, frame: Frame::rows(FrameBound::Preceding(1), FrameBound::Preceding(1)), }, // lead(x, 1) WindowFuncCall { kind: WindowFuncKind::Aggregate(AggKind::FirstValue), - args: AggArgs::Unary(DataType::Int32, 3), + args: AggArgs::from_iter([(DataType::Int32, 3)]), return_type: DataType::Int32, frame: Frame::rows(FrameBound::Following(1), FrameBound::Following(1)), }, @@ -216,14 +216,14 @@ async fn test_over_window_lag_lead_with_updates() { // lag(x, 1) WindowFuncCall { kind: WindowFuncKind::Aggregate(AggKind::FirstValue), - args: AggArgs::Unary(DataType::Int32, 3), + args: AggArgs::from_iter([(DataType::Int32, 3)]), return_type: DataType::Int32, frame: Frame::rows(FrameBound::Preceding(1), FrameBound::Preceding(1)), }, // lead(x, 1) WindowFuncCall { kind: WindowFuncKind::Aggregate(AggKind::FirstValue), - args: AggArgs::Unary(DataType::Int32, 3), + args: AggArgs::from_iter([(DataType::Int32, 3)]), return_type: DataType::Int32, frame: Frame::rows(FrameBound::Following(1), FrameBound::Following(1)), }, @@ -391,7 +391,7 @@ async fn test_over_window_sum() { // ) WindowFuncCall { kind: WindowFuncKind::Aggregate(AggKind::Sum), - args: AggArgs::Unary(DataType::Int32, 3), + args: AggArgs::from_iter([(DataType::Int32, 3)]), return_type: DataType::Int64, frame: Frame::rows_with_exclusion( FrameBound::Preceding(1),