Skip to content

Commit

Permalink
fix: only support one spanset temporarily (#106)
Browse files Browse the repository at this point in the history
* fix: only support one spanset temporarily

* chore: fix ut
  • Loading branch information
caibirdme authored Aug 14, 2024
1 parent 8a0786b commit 600f503
Show file tree
Hide file tree
Showing 10 changed files with 125 additions and 152 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/prcheck.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ jobs:
run: cargo binstall -y cargo-sort
- name: run checkers
run: |
cargo sort -c
cargo sort -c -w
cargo fmt -- --check
cargo clippy -- -D warnings
cargo test
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ ordered-float = { version = "4.2.2" }
humantime = { version = "2.1.0" }
chrono = { version = "0.4.38", features = ["serde"] }
pretty_assertions = "1.4.0"
opentelemetry-proto = { version = "0.7.0", features = ["full"] }

[dependencies]
anyhow = "1.0.86"
Expand All @@ -45,7 +46,7 @@ logql = { path = "logql" }
moka = { version = "0.12.8", features = ["default", "sync"] }
opentelemetry = { version = "0.24.0", features = ["metrics"] }
opentelemetry-prometheus = "0.17.0"
opentelemetry-proto = { version = "0.7.0", features = ["full"] }
opentelemetry-proto = { workspace = true }
opentelemetry-semantic-conventions = { version = "0.16.0" }
opentelemetry_sdk = { version = "0.24.1", features = ["metrics"] }
ordered-float = { version = "4.2.2" }
Expand Down
4 changes: 2 additions & 2 deletions common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ rust-version = "1.76.0"
authors = ["caibirdme <[email protected]>"]

[dependencies]
chrono = {workspace=true}
anyhow = { version = "1.0.86"}
anyhow = { version = "1.0.86" }
chrono = { workspace = true }
5 changes: 2 additions & 3 deletions logql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@ rust-version = "1.76.0"
authors = ["caibirdme <[email protected]>"]

[dependencies]
nom = { workspace = true }
humantime-serde = { workspace = true }
itertools = { workspace = true }

nom = { workspace = true }

[dev-dependencies]
pretty_assertions = { workspace = true }
pretty_assertions = { workspace = true }
9 changes: 5 additions & 4 deletions sqlbuilder/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ authors = ["caibirdme <[email protected]>"]

[dependencies]
chrono ={ workspace = true }
ordered-float = {workspace = true}
common = {path = "../common"}
logql = {path = "../logql"}
traceql = {path = "../traceql"}
common = { path = "../common" }
itertools = { workspace = true }
logql = { path = "../logql" }
opentelemetry-proto = { workspace = true }
ordered-float = { workspace = true }
traceql = { path = "../traceql" }
75 changes: 52 additions & 23 deletions sqlbuilder/src/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ use super::builder::{
TableSchema,
};
use itertools::Itertools as _;
use opentelemetry_proto::tonic::trace::v1::status::StatusCode as PBStatusCode;
use traceql::{
ComparisonOperator, Expression, FieldExpr, FieldType, FieldValue,
IntrisincField, LogicalOperator, SpanSet,
IntrisincField, LogicalOperator, SpanSet, StatusCode,
};

#[allow(dead_code)]
enum SubQuery<T: TableSchema, C: QueryConverter> {
Basic(QueryPlan<T, C>),
And(Box<SubQuery<T, C>>, Box<SubQuery<T, C>>),
Expand All @@ -31,7 +33,7 @@ where
{
match expr {
Expression::SpanSet(spanset) => {
let selection = spanset_to_qp(spanset);
let selection = spanset_to_selection(spanset);
let mut qp = QueryPlan::new(
converter.clone(),
schema.clone(),
Expand All @@ -49,26 +51,19 @@ where
qp.projection = vec![schema.trace_key().to_string()];
SubQuery::Basic(qp)
}
Expression::Logical(left, op, right) => {
let l =
Self::new(converter.clone(), left, schema.clone(), spans);
let r =
Self::new(converter.clone(), right, schema.clone(), spans);
match op {
LogicalOperator::And => {
SubQuery::And(Box::new(l), Box::new(r))
}
LogicalOperator::Or => {
SubQuery::Or(Box::new(l), Box::new(r))
}
}
Expression::Logical(_, _, _) => {
unimplemented!("logical expression")
}
}
}
fn as_sql(&self) -> String {
match self {
SubQuery::Basic(qp) => {
format!("sub.{} IN ({})", qp.schema.trace_key(), qp.as_sql())
format!(
"sub.{} GLOBAL IN ({})",
qp.schema.trace_key(),
qp.as_sql()
)
}
SubQuery::And(l, r) => {
let l_sql = l.as_sql();
Expand All @@ -84,7 +79,7 @@ where
}
}

fn spanset_to_qp(spanset: &SpanSet) -> Selection {
fn spanset_to_selection(spanset: &SpanSet) -> Selection {
match spanset {
SpanSet::Expr(expr) => {
// expand unscoped into (resource or span)
Expand All @@ -98,16 +93,16 @@ fn spanset_to_qp(spanset: &SpanSet) -> Selection {
operator: expr.operator,
});
return Selection::LogicalOr(
Box::new(spanset_to_qp(&left)),
Box::new(spanset_to_qp(&right)),
Box::new(spanset_to_selection(&left)),
Box::new(spanset_to_selection(&right)),
);
}
let c = field_expr_to_condition(expr);
Selection::Unit(c)
}
SpanSet::Logical(left, op, right) => {
let l = spanset_to_qp(left);
let r = spanset_to_qp(right);
let l = spanset_to_selection(left);
let r = spanset_to_selection(right);
match op {
LogicalOperator::And => {
Selection::LogicalAnd(Box::new(l), Box::new(r))
Expand Down Expand Up @@ -167,12 +162,22 @@ fn construct_condition(
}
}

fn convert_status_code(s: StatusCode) -> PBStatusCode {
match s {
StatusCode::Err => PBStatusCode::Error,
StatusCode::Ok => PBStatusCode::Ok,
StatusCode::Unset => PBStatusCode::Unset,
}
}

fn field_expr_to_condition(expr: &FieldExpr) -> Condition {
match &expr.kv {
FieldType::Intrinsic(intrisinc) => match intrisinc {
IntrisincField::Status(status) => construct_condition(
Column::Raw("StatusCode".to_string()),
PlaceValue::Integer((*status).into()),
PlaceValue::String(
convert_status_code(*status).as_str_name().to_string(),
),
expr.operator,
),
IntrisincField::Duraion(d) => construct_condition(
Expand Down Expand Up @@ -253,7 +258,7 @@ where
}
pub fn as_sql(&self) -> String {
let mut sql = format!(
"SELECT * FROM {} sp WHERE sp.{} IN (SELECT {} FROM (",
"SELECT * FROM {} sp WHERE sp.{} GLOBAL IN (SELECT {} FROM (",
self.schema.table(),
self.schema.span_id_key(),
self.schema.span_id_key(),
Expand All @@ -270,3 +275,27 @@ where
sql
}
}

pub fn single_spanset_query<T, C>(
spanset: &SpanSet,
schema: T,
projection: Vec<String>,
converter: C,
) -> String
where
T: TableSchema,
C: QueryConverter,
{
let selection = spanset_to_selection(spanset);
QueryPlan::new(
converter,
schema,
projection,
Some(selection),
vec![],
vec![],
vec![],
Some(500),
)
.as_sql()
}
82 changes: 51 additions & 31 deletions src/storage/ck/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ use opentelemetry_proto::tonic::trace::v1::{
};
use reqwest::Client;
use serde_json::Value as JSONValue;
use sqlbuilder::{builder::TableSchema, trace::ComplexQuery};
use sqlbuilder::{builder::TableSchema, trace::single_spanset_query};
use std::collections::HashMap;
use traceql::*;
use tracing::error;
use tracing::{error, warn};

#[derive(Clone)]
pub struct CKTraceQuerier {
Expand Down Expand Up @@ -67,25 +67,40 @@ impl TraceStorage for CKTraceQuerier {
expr: &Expression,
_opt: QueryLimits,
) -> Result<Vec<SpanItem>> {
let converter = CKLogConverter::new(self.schema.clone(), true);
let sql =
ComplexQuery::new(expr, self.schema.clone(), converter).as_sql();
let mut results = vec![];
let rows =
send_query(self.client.clone(), self.ck_cfg.common.clone(), sql)
match expr {
Expression::Logical(_, _, _) => {
warn!("Search span does not support logical expression");
return Ok(vec![]);
}
Expression::SpanSet(sp) => {
let converter = CKLogConverter::new(self.schema.clone(), true);
let sql = single_spanset_query(
sp,
self.schema.clone(),
self.schema.projection(),
converter,
);
let mut results = vec![];
let rows = send_query(
self.client.clone(),
self.ck_cfg.common.clone(),
sql,
)
.await
.map_err(|e| {
error!("Query trace error: {:?}", e);
e
})?;
for row in rows {
let record = TraceRecord::try_from(row).map_err(|e| {
error!("Convert trace record error: {:?}", e);
e
})?;
results.push(record.into());
for row in rows {
let record = TraceRecord::try_from(row).map_err(|e| {
error!("Convert trace record error: {:?}", e);
e
})?;
results.push(record.into());
}
Ok(results)
}
}
Ok(results)
}
}

Expand Down Expand Up @@ -372,7 +387,7 @@ impl TableSchema for TraceTable {
mod tests {
use super::*;
use pretty_assertions::assert_eq;
use sqlparser::{dialect::AnsiDialect, parser::Parser};
use sqlparser::{dialect::ClickHouseDialect, parser::Parser};
use std::{fs, path::PathBuf};
use traceql::parse_traceql;

Expand All @@ -396,21 +411,26 @@ mod tests {
);
for (name, tc) in cases {
let expr = parse_traceql(&tc.input).unwrap();
let sql = ComplexQuery::new(
&expr,
schema.clone(),
CKLogConverter::new(schema.clone(), true),
)
.as_sql();
let actual_ast = Parser::parse_sql(&AnsiDialect {}, &sql).unwrap();
let expect_ast =
Parser::parse_sql(&AnsiDialect {}, &tc.expect).unwrap();
assert_eq!(
expect_ast[0].to_string(),
actual_ast[0].to_string(),
"case: {}",
name
);
if let Expression::SpanSet(sp) = expr {
let converter = CKLogConverter::new(schema.clone(), true);
let sql = single_spanset_query(
&sp,
schema.clone(),
schema.projection(),
converter,
);
let actual_ast =
Parser::parse_sql(&ClickHouseDialect {}, &sql).unwrap();
let expect_ast =
Parser::parse_sql(&ClickHouseDialect {}, &tc.expect)
.unwrap();
assert_eq!(
expect_ast[0].to_string(),
actual_ast[0].to_string(),
"case: {}",
name
);
}
}
}
}
Loading

0 comments on commit 600f503

Please sign in to comment.