Skip to content

Commit

Permalink
add approx_distinct implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
jimexist committed Oct 11, 2021
1 parent 90a4c84 commit 3bf646c
Show file tree
Hide file tree
Showing 11 changed files with 677 additions and 14 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ DataFusion also includes a simple command-line interactive SQL utility. See the
- [x] trim
- Miscellaneous/Boolean functions
- [x] nullif
- Approximation functions
- [ ] approx_distinct
- Common date/time functions
- [ ] Basic date functions
- [ ] Basic time functions
Expand Down
1 change: 1 addition & 0 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ enum AggregateFunction {
SUM = 2;
AVG = 3;
COUNT = 4;
APPROX_DISTINCT = 5;
}

message AggregateExprNode {
Expand Down
4 changes: 4 additions & 0 deletions ballista/rust/core/src/serde/logical_plan/to_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1137,6 +1137,9 @@ impl TryInto<protobuf::LogicalExprNode> for &Expr {
ref fun, ref args, ..
} => {
let aggr_function = match fun {
AggregateFunction::ApproxDistinct => {
protobuf::AggregateFunction::ApproxDistinct
}
AggregateFunction::Min => protobuf::AggregateFunction::Min,
AggregateFunction::Max => protobuf::AggregateFunction::Max,
AggregateFunction::Sum => protobuf::AggregateFunction::Sum,
Expand Down Expand Up @@ -1370,6 +1373,7 @@ impl From<&AggregateFunction> for protobuf::AggregateFunction {
AggregateFunction::Sum => Self::Sum,
AggregateFunction::Avg => Self::Avg,
AggregateFunction::Count => Self::Count,
AggregateFunction::ApproxDistinct => Self::ApproxDistinct,
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions ballista/rust/core/src/serde/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ impl From<protobuf::AggregateFunction> for AggregateFunction {
protobuf::AggregateFunction::Sum => AggregateFunction::Sum,
protobuf::AggregateFunction::Avg => AggregateFunction::Avg,
protobuf::AggregateFunction::Count => AggregateFunction::Count,
protobuf::AggregateFunction::ApproxDistinct => {
AggregateFunction::ApproxDistinct
}
}
}
}
Expand Down
15 changes: 15 additions & 0 deletions datafusion/src/logical_plan/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1495,6 +1495,21 @@ pub fn random() -> Expr {
}
}

/// Returns the approximate number of distinct input values.
/// This function provides an approximation of count(DISTINCT x).
/// Zero is returned if all input values are null.
/// This function should produce a standard error of 2.3%,
/// which is the standard deviation of the (approximately normal)
/// error distribution over all possible sets.
/// It does not guarantee an upper bound on the error for any specific input set.
pub fn approx_distinct(expr: Expr) -> Expr {
Expr::AggregateFunction {
fun: aggregates::AggregateFunction::Sum,
distinct: false,
args: vec![expr],
}
}

/// Create an convenience function representing a unary scalar function
macro_rules! unary_scalar_expr {
($ENUM:ident, $FUNC:ident) => {
Expand Down
22 changes: 11 additions & 11 deletions datafusion/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,17 @@ pub use builder::{
pub use dfschema::{DFField, DFSchema, DFSchemaRef, ToDFSchema};
pub use display::display_schema;
pub use expr::{
abs, acos, and, array, ascii, asin, atan, avg, binary_expr, bit_length, btrim, case,
ceil, character_length, chr, col, columnize_expr, combine_filters, concat, concat_ws,
cos, count, count_distinct, create_udaf, create_udf, date_part, date_trunc, digest,
exp, exprlist_to_fields, floor, in_list, initcap, left, length, lit,
lit_timestamp_nano, ln, log10, log2, lower, lpad, ltrim, max, md5, min,
normalize_col, normalize_cols, now, octet_length, or, random, regexp_match,
regexp_replace, repeat, replace, replace_col, reverse, right, round, rpad, rtrim,
sha224, sha256, sha384, sha512, signum, sin, split_part, sqrt, starts_with, strpos,
substr, sum, tan, to_hex, translate, trim, trunc, unnormalize_col, unnormalize_cols,
upper, when, Column, Expr, ExprRewriter, ExpressionVisitor, Literal, Recursion,
RewriteRecursion,
abs, acos, and, approx_distinct, array, ascii, asin, atan, avg, binary_expr,
bit_length, btrim, case, ceil, character_length, chr, col, columnize_expr,
combine_filters, concat, concat_ws, cos, count, count_distinct, create_udaf,
create_udf, date_part, date_trunc, digest, exp, exprlist_to_fields, floor, in_list,
initcap, left, length, lit, lit_timestamp_nano, ln, log10, log2, lower, lpad, ltrim,
max, md5, min, normalize_col, normalize_cols, now, octet_length, or, random,
regexp_match, regexp_replace, repeat, replace, replace_col, reverse, right, round,
rpad, rtrim, sha224, sha256, sha384, sha512, signum, sin, split_part, sqrt,
starts_with, strpos, substr, sum, tan, to_hex, translate, trim, trunc,
unnormalize_col, unnormalize_cols, upper, when, Column, Expr, ExprRewriter,
ExpressionVisitor, Literal, Recursion, RewriteRecursion,
};
pub use extension::UserDefinedLogicalNode;
pub use operators::Operator;
Expand Down
14 changes: 12 additions & 2 deletions datafusion/src/physical_plan/aggregates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ pub enum AggregateFunction {
Max,
/// avg
Avg,
/// Approximate aggregate function
ApproxDistinct,
}

impl fmt::Display for AggregateFunction {
Expand All @@ -77,6 +79,7 @@ impl FromStr for AggregateFunction {
"count" => AggregateFunction::Count,
"avg" => AggregateFunction::Avg,
"sum" => AggregateFunction::Sum,
"approx_distinct" => AggregateFunction::ApproxDistinct,
_ => {
return Err(DataFusionError::Plan(format!(
"There is no built-in function named {}",
Expand All @@ -96,7 +99,9 @@ pub fn return_type(fun: &AggregateFunction, arg_types: &[DataType]) -> Result<Da
data_types(arg_types, &signature(fun))?;

match fun {
AggregateFunction::Count => Ok(DataType::UInt64),
AggregateFunction::Count | AggregateFunction::ApproxDistinct => {
Ok(DataType::UInt64)
}
AggregateFunction::Max | AggregateFunction::Min => Ok(arg_types[0].clone()),
AggregateFunction::Sum => sum_return_type(&arg_types[0]),
AggregateFunction::Avg => avg_return_type(&arg_types[0]),
Expand Down Expand Up @@ -149,6 +154,9 @@ pub fn create_aggregate_expr(
"SUM(DISTINCT) aggregations are not available".to_string(),
));
}
(AggregateFunction::ApproxDistinct, _) => Arc::new(
expressions::ApproxDistinct::new(arg, name, arg_types[0].clone()),
),
(AggregateFunction::Min, _) => {
Arc::new(expressions::Min::new(arg, name, return_type))
}
Expand Down Expand Up @@ -194,7 +202,9 @@ static DATES: &[DataType] = &[DataType::Date32, DataType::Date64];
pub fn signature(fun: &AggregateFunction) -> Signature {
// note: the physical expression must accept the type returned by this function or the execution panics.
match fun {
AggregateFunction::Count => Signature::any(1, Volatility::Immutable),
AggregateFunction::Count | AggregateFunction::ApproxDistinct => {
Signature::any(1, Volatility::Immutable)
}
AggregateFunction::Min | AggregateFunction::Max => {
let valid = STRINGS
.iter()
Expand Down
Loading

0 comments on commit 3bf646c

Please sign in to comment.