Skip to content

Commit

Permalink
Return bytes directly
Browse files Browse the repository at this point in the history
  • Loading branch information
danieljharvey committed Nov 22, 2023
1 parent f52423e commit f5ad8f4
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 61 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions crates/ndc-sqlserver/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ pub async fn query(
// assuming query succeeded, increment counter
state.metrics.record_successful_query();

// TODO: return raw JSON
Ok(result)
}
.instrument(info_span!("Execute query"))
Expand Down Expand Up @@ -73,7 +72,7 @@ async fn execute_query(
) -> Result<JsonResponse<models::QueryResponse>, connector::QueryError> {
execution::mssql_execute(&state.mssql_pool, &state.metrics, plan)
.await
.map(JsonResponse::Value)
.map(JsonResponse::Serialized)
.map_err(|err| match err {
execution::Error::Query(err) => {
tracing::error!("{}", err);
Expand All @@ -83,5 +82,9 @@ async fn execute_query(
tracing::error!("{}", err);
connector::QueryError::Other(err.into())
}
execution::Error::TiberiusError(err) => {
tracing::error!("{}", err);
connector::QueryError::Other(err.into())
}
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,6 @@ expression: result
---
[
{
"aggregates": {
"how_many_albums": 347,
"how_many_artist_ids": 347,
"how_many_distinct_artist_ids": 204,
"min_artist_id": 1,
"max_artist_id": 275,
"avg_artist_id": 121
},
"rows": [
{
"Title": "Let There Be Rock"
Expand All @@ -28,6 +20,14 @@ expression: result
{
"Title": "Warner 25 Anos"
}
]
],
"aggregates": {
"how_many_albums": 347,
"how_many_artist_ids": 347,
"how_many_distinct_artist_ids": 204,
"min_artist_id": 1,
"max_artist_id": 275,
"avg_artist_id": 121
}
}
]
2 changes: 1 addition & 1 deletion crates/query-engine/execution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ ndc-sdk = { git = "https://github.com/hasura/ndc-hub.git", rev = "fb03873", pack
tiberius = { version = "0.12.2", default-features = false, features = ["rustls"] }
bb8 = "0.8.1"
bb8-tiberius = "0.15.0"

bytes = "1.5.0"
prometheus = "0.13.3"
serde_json = "1.0.108"
tokio-stream = "0.1.14"
Expand Down
80 changes: 31 additions & 49 deletions crates/query-engine/execution/src/execution.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Execute an execution plan against the database.
use crate::metrics;
use ndc_sdk::models;
use bytes::{BufMut, Bytes, BytesMut};
use query_engine_sql::sql;
use serde_json;
use std::collections::BTreeMap;
Expand All @@ -14,7 +14,7 @@ pub async fn mssql_execute(
mssql_pool: &bb8::Pool<bb8_tiberius::ConnectionManager>,
metrics: &metrics::Metrics,
plan: sql::execution_plan::ExecutionPlan,
) -> Result<models::QueryResponse, Error> {
) -> Result<Bytes, Error> {
let query = plan.query();

tracing::info!(
Expand All @@ -33,64 +33,48 @@ pub async fn mssql_execute(
let mut connection = acquisition_timer.complete_with(connection_result)?;

let query_timer = metrics.time_query_execution();
let rows_result = execute_query(&mut connection, plan)
let bytes_result = execute_queries(&mut connection, plan)
.instrument(info_span!("Database request"))
.await;
let rows = query_timer.complete_with(rows_result)?;

// Hack a response from the query results. See the 'response_hack' for more details.
let response = rows_to_response(rows);

// tracing::info!(
// "Query response: {}",
// serde_json::to_string(&response).unwrap()
// );

Ok(response)
query_timer.complete_with(bytes_result)
}

async fn execute_query(
async fn execute_queries(
connection: &mut bb8::PooledConnection<'_, bb8_tiberius::ConnectionManager>,
plan: sql::execution_plan::ExecutionPlan,
) -> Result<Vec<serde_json::Value>, Error> {
) -> Result<Bytes, Error> {
let query = plan.query();

// run the query on each set of variables. The result is a vector of rows each
// element in the vector is the result of running the query on one set of variables.
let rows: Vec<serde_json::Value> = match plan.variables {
// this buffer represents the JSON response
let mut buffer = BytesMut::new();
buffer.put(&[b'['][..]); // we start by opening the array
match plan.variables {
None => {
let empty_map = BTreeMap::new();
let rows = execute_mssql_query(connection, &query, &empty_map).await?;
vec![rows]
execute_query(connection, &query, &empty_map, &mut buffer).await?;
}
Some(variable_sets) => {
let mut sets_of_rows = vec![];
for vars in &variable_sets {
let rows = execute_mssql_query(connection, &query, vars).await?;
sets_of_rows.push(rows);
let mut i = variable_sets.iter();
if let Some(first) = i.next() {
execute_query(connection, &query, first, &mut buffer).await?;
for vars in i {
buffer.put(&[b','][..]); // each result, except the first, is prefixed by a ','
execute_query(connection, &query, vars, &mut buffer).await?;
}
}
sets_of_rows
}
};
Ok(rows)
}

/// Take the sqlserver results and return them as a QueryResponse.
fn rows_to_response(results: Vec<serde_json::Value>) -> models::QueryResponse {
let rowsets = results
.into_iter()
.map(|raw_rowset| serde_json::from_value(raw_rowset).unwrap())
.collect();

models::QueryResponse(rowsets)
}
buffer.put(&[b']'][..]); // we end by closing the array
Ok(buffer.freeze())
}

/// Execute the query on one set of variables.
async fn execute_mssql_query(
async fn execute_query(
connection: &mut bb8::PooledConnection<'_, bb8_tiberius::ConnectionManager>,
query: &sql::string::SQL,
variables: &BTreeMap<String, serde_json::Value>,
) -> Result<serde_json::Value, Error> {
buffer: &mut (impl BufMut + Send),
) -> Result<(), Error> {
// let's do a query to check everything is ok
let query_text = query.sql.as_str();

Expand Down Expand Up @@ -136,9 +120,6 @@ async fn execute_mssql_query(
// go!
let mut stream = mssql_query.query(connection).await.unwrap();

// collect big lump of json here
let mut result_str = String::new();

// stream it out and collect it here:
while let Some(item) = stream.try_next().await.unwrap() {
match item {
Expand All @@ -148,19 +129,20 @@ async fn execute_mssql_query(
}
// ...concatenate these
QueryItem::Row(row) => {
let item = row.get(0).unwrap();
result_str.push_str(item)
let item: &[u8] = row
.try_get(0)
.map_err(Error::TiberiusError)
.map(|item: Option<&str>| item.unwrap().as_bytes())?;
buffer.put(item);
}
}
}

// once we're happy this is stable, we should stream the JSON string straight out
let json_value = serde_json::from_str(&result_str).unwrap();

Ok(json_value)
Ok(())
}

pub enum Error {
Query(String),
ConnectionPool(bb8::RunError<bb8_tiberius::Error>),
TiberiusError(tiberius::error::Error),
}

0 comments on commit f5ad8f4

Please sign in to comment.