From f5ad8f42f21d209b3a455cf1a2e2a2d454c56902 Mon Sep 17 00:00:00 2001 From: Daniel Harvey Date: Wed, 22 Nov 2023 11:25:50 +0000 Subject: [PATCH] Return bytes directly --- Cargo.lock | 1 + crates/ndc-sqlserver/src/query.rs | 7 +- ...on__aggregate_count_albums_plus_field.snap | 18 ++--- crates/query-engine/execution/Cargo.toml | 2 +- .../query-engine/execution/src/execution.rs | 80 +++++++------------ 5 files changed, 47 insertions(+), 61 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1eb139c4..81126c30 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1732,6 +1732,7 @@ version = "0.1.0" dependencies = [ "bb8", "bb8-tiberius", + "bytes", "ndc-sdk", "prometheus", "query-engine-sql", diff --git a/crates/ndc-sqlserver/src/query.rs b/crates/ndc-sqlserver/src/query.rs index 4c162bf5..87c380e3 100644 --- a/crates/ndc-sqlserver/src/query.rs +++ b/crates/ndc-sqlserver/src/query.rs @@ -38,7 +38,6 @@ pub async fn query( // assuming query succeeded, increment counter state.metrics.record_successful_query(); - // TODO: return raw JSON Ok(result) } .instrument(info_span!("Execute query")) @@ -73,7 +72,7 @@ async fn execute_query( ) -> Result, connector::QueryError> { execution::mssql_execute(&state.mssql_pool, &state.metrics, plan) .await - .map(JsonResponse::Value) + .map(JsonResponse::Serialized) .map_err(|err| match err { execution::Error::Query(err) => { tracing::error!("{}", err); @@ -83,5 +82,9 @@ async fn execute_query( tracing::error!("{}", err); connector::QueryError::Other(err.into()) } + execution::Error::TiberiusError(err) => { + tracing::error!("{}", err); + connector::QueryError::Other(err.into()) + } }) } diff --git a/crates/ndc-sqlserver/tests/snapshots/query_tests__aggregation__aggregate_count_albums_plus_field.snap b/crates/ndc-sqlserver/tests/snapshots/query_tests__aggregation__aggregate_count_albums_plus_field.snap index b22307e2..0e29e48d 100644 --- a/crates/ndc-sqlserver/tests/snapshots/query_tests__aggregation__aggregate_count_albums_plus_field.snap +++ b/crates/ndc-sqlserver/tests/snapshots/query_tests__aggregation__aggregate_count_albums_plus_field.snap @@ -4,14 +4,6 @@ expression: result --- [ { - "aggregates": { - "how_many_albums": 347, - "how_many_artist_ids": 347, - "how_many_distinct_artist_ids": 204, - "min_artist_id": 1, - "max_artist_id": 275, - "avg_artist_id": 121 - }, "rows": [ { "Title": "Let There Be Rock" @@ -28,6 +20,14 @@ expression: result { "Title": "Warner 25 Anos" } - ] + ], + "aggregates": { + "how_many_albums": 347, + "how_many_artist_ids": 347, + "how_many_distinct_artist_ids": 204, + "min_artist_id": 1, + "max_artist_id": 275, + "avg_artist_id": 121 + } } ] diff --git a/crates/query-engine/execution/Cargo.toml b/crates/query-engine/execution/Cargo.toml index dd3af48e..89c636f4 100644 --- a/crates/query-engine/execution/Cargo.toml +++ b/crates/query-engine/execution/Cargo.toml @@ -11,7 +11,7 @@ ndc-sdk = { git = "https://github.com/hasura/ndc-hub.git", rev = "fb03873", pack tiberius = { version = "0.12.2", default-features = false, features = ["rustls"] } bb8 = "0.8.1" bb8-tiberius = "0.15.0" - +bytes = "1.5.0" prometheus = "0.13.3" serde_json = "1.0.108" tokio-stream = "0.1.14" diff --git a/crates/query-engine/execution/src/execution.rs b/crates/query-engine/execution/src/execution.rs index bc0fc48d..b8c52088 100644 --- a/crates/query-engine/execution/src/execution.rs +++ b/crates/query-engine/execution/src/execution.rs @@ -1,7 +1,7 @@ //! Execute an execution plan against the database. use crate::metrics; -use ndc_sdk::models; +use bytes::{BufMut, Bytes, BytesMut}; use query_engine_sql::sql; use serde_json; use std::collections::BTreeMap; @@ -14,7 +14,7 @@ pub async fn mssql_execute( mssql_pool: &bb8::Pool, metrics: &metrics::Metrics, plan: sql::execution_plan::ExecutionPlan, -) -> Result { +) -> Result { let query = plan.query(); tracing::info!( @@ -33,64 +33,48 @@ pub async fn mssql_execute( let mut connection = acquisition_timer.complete_with(connection_result)?; let query_timer = metrics.time_query_execution(); - let rows_result = execute_query(&mut connection, plan) + let bytes_result = execute_queries(&mut connection, plan) .instrument(info_span!("Database request")) .await; - let rows = query_timer.complete_with(rows_result)?; - - // Hack a response from the query results. See the 'response_hack' for more details. - let response = rows_to_response(rows); - - // tracing::info!( - // "Query response: {}", - // serde_json::to_string(&response).unwrap() - // ); - - Ok(response) + query_timer.complete_with(bytes_result) } -async fn execute_query( +async fn execute_queries( connection: &mut bb8::PooledConnection<'_, bb8_tiberius::ConnectionManager>, plan: sql::execution_plan::ExecutionPlan, -) -> Result, Error> { +) -> Result { let query = plan.query(); - // run the query on each set of variables. The result is a vector of rows each - // element in the vector is the result of running the query on one set of variables. - let rows: Vec = match plan.variables { + // this buffer represents the JSON response + let mut buffer = BytesMut::new(); + buffer.put(&[b'['][..]); // we start by opening the array + match plan.variables { None => { let empty_map = BTreeMap::new(); - let rows = execute_mssql_query(connection, &query, &empty_map).await?; - vec![rows] + execute_query(connection, &query, &empty_map, &mut buffer).await?; } Some(variable_sets) => { - let mut sets_of_rows = vec![]; - for vars in &variable_sets { - let rows = execute_mssql_query(connection, &query, vars).await?; - sets_of_rows.push(rows); + let mut i = variable_sets.iter(); + if let Some(first) = i.next() { + execute_query(connection, &query, first, &mut buffer).await?; + for vars in i { + buffer.put(&[b','][..]); // each result, except the first, is prefixed by a ',' + execute_query(connection, &query, vars, &mut buffer).await?; + } } - sets_of_rows } - }; - Ok(rows) -} - -/// Take the sqlserver results and return them as a QueryResponse. -fn rows_to_response(results: Vec) -> models::QueryResponse { - let rowsets = results - .into_iter() - .map(|raw_rowset| serde_json::from_value(raw_rowset).unwrap()) - .collect(); - - models::QueryResponse(rowsets) + } + buffer.put(&[b']'][..]); // we end by closing the array + Ok(buffer.freeze()) } /// Execute the query on one set of variables. -async fn execute_mssql_query( +async fn execute_query( connection: &mut bb8::PooledConnection<'_, bb8_tiberius::ConnectionManager>, query: &sql::string::SQL, variables: &BTreeMap, -) -> Result { + buffer: &mut (impl BufMut + Send), +) -> Result<(), Error> { // let's do a query to check everything is ok let query_text = query.sql.as_str(); @@ -136,9 +120,6 @@ async fn execute_mssql_query( // go! let mut stream = mssql_query.query(connection).await.unwrap(); - // collect big lump of json here - let mut result_str = String::new(); - // stream it out and collect it here: while let Some(item) = stream.try_next().await.unwrap() { match item { @@ -148,19 +129,20 @@ async fn execute_mssql_query( } // ...concatenate these QueryItem::Row(row) => { - let item = row.get(0).unwrap(); - result_str.push_str(item) + let item: &[u8] = row + .try_get(0) + .map_err(Error::TiberiusError) + .map(|item: Option<&str>| item.unwrap().as_bytes())?; + buffer.put(item); } } } - // once we're happy this is stable, we should stream the JSON string straight out - let json_value = serde_json::from_str(&result_str).unwrap(); - - Ok(json_value) + Ok(()) } pub enum Error { Query(String), ConnectionPool(bb8::RunError), + TiberiusError(tiberius::error::Error), }