Skip to content

Commit

Permalink
Use execute_query for introspection (#155)
Browse files Browse the repository at this point in the history
<!-- The PR description should answer 2 (maybe 3) important questions:
-->

### What
The configuration introspection was using `select_first_row` which was
not properly streaming all results and had a limits which caused
introspections to fail on larger sets of tables.

<!-- What is this PR trying to accomplish (and why, if it's not
obvious)? -->

### How
This removes `select_first_row` and uses `execute_query` to fetch all
introspection data.

<!-- How is it trying to accomplish it (what are the implementation
steps)? -->

---------

Co-authored-by: Daniel Harvey <[email protected]>
  • Loading branch information
codedmart and danieljharvey authored Dec 11, 2024
1 parent 025960a commit 5d9bfce
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 38 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion crates/configuration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ workspace = true
ndc-models = { workspace = true }
query-engine-metadata = { path = "../query-engine/metadata" }
query-engine-metrics = { path = "../query-engine/metrics" }

query-engine-execution = { path = "../query-engine/execution" }
query-engine-sql = { path = "../query-engine/sql" }

schemars = { version = "0.8.16", features = ["smol_str", "preserve_order"] }
serde = "1.0.198"
Expand Down
9 changes: 9 additions & 0 deletions crates/configuration/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,15 @@ pub enum Error {
#[error("Error creating connection pool while introspecting the database: {0}")]
ConnectionPoolError(#[from] bb8_tiberius::Error),

#[error("Failed to get connection from pool: {0}")]
GetConnectionFromPool(#[from] bb8::RunError<bb8_tiberius::Error>),

#[error("JSON deserialization error: {0}")]
JsonDeserializationError(String),

#[error("Failed to execute introspection query: {0}")]
IntrospectionQueryExecutionError(String),

// error while parsing stored procedure introspection
#[error("Error parsing stored procedure introspection: {0}")]
StoredProcedureIntrospectionError(serde_json::Error),
Expand Down
92 changes: 56 additions & 36 deletions crates/configuration/src/version1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,20 @@ use crate::secret::Secret;
use crate::{uri, ConnectionUri};

use ndc_models::{AggregateFunctionName, CollectionName, ComparisonOperatorName, FieldName};
use query_engine_execution::query::execute_query;
use query_engine_metadata::metadata;
use query_engine_metadata::metadata::stored_procedures::{
StoredProcedureArgumentInfo, StoredProcedureInfo, StoredProcedures,
};
use query_engine_metadata::metadata::{database, Nullable};

use query_engine_metrics::metrics;
use query_engine_sql::sql::{ast::RawSql, string::SQL};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::collections::BTreeSet;

use thiserror::Error;
use tiberius::Query;

// TODO(KC): Move the `table_configuration.sql` to the `static` folder present
// in the root of this repo.
Expand Down Expand Up @@ -156,22 +156,6 @@ async fn create_mssql_pool(
bb8::Pool::builder().max_size(2).build(mgr).await
}

async fn select_first_row(
mssql_pool: &bb8::Pool<bb8_tiberius::ConnectionManager>,
query: &str,
) -> tiberius::Row {
let mut connection = mssql_pool.get().await.unwrap();

// let's do a query to check everything is ok
let select = Query::new(query);

// go!
let stream = select.query(&mut connection).await.unwrap();

// Nothing is fetched, the first result set starts.
stream.into_row().await.unwrap().unwrap()
}

// get_stored_procedures fetches the stored procedures from the database and returns them as a
// vector of introspection::IntrospectStoredProcedure.
async fn configure_stored_procedures(
Expand All @@ -181,11 +165,26 @@ async fn configure_stored_procedures(
) -> Result<StoredProcedures, Error> {
match config_options {
Some(config_options) => {
let stored_procedures_row =
select_first_row(mssql_pool, STORED_PROCS_CONFIGURATION_QUERY).await;
let mut connection = mssql_pool
.get()
.await
.map_err(Error::GetConnectionFromPool)?;
// Let's do some stored procedures introspection
let mut stored_procs_query = SQL::new();
RawSql::RawText(STORED_PROCS_CONFIGURATION_QUERY.to_string())
.to_sql(&mut stored_procs_query);
let mut stored_procs_rows = Vec::new();
execute_query(
&mut connection,
&stored_procs_query,
&BTreeMap::new(),
&mut stored_procs_rows,
)
.await
.map_err(|e| Error::IntrospectionQueryExecutionError(format!("{:?}", e)))?;
let introspected_stored_procedures: Vec<introspection::IntrospectStoredProcedure> =
serde_json::from_str(stored_procedures_row.get(0).unwrap())
.map_err(Error::StoredProcedureIntrospectionError)?;
serde_json::from_slice(&stored_procs_rows)
.map_err(|e| Error::JsonDeserializationError(e.to_string()))?;
let new_stored_procedures = get_stored_procedures(introspected_stored_procedures);

// traverse the new stored procedures and add them to the existing stored procedures
Expand Down Expand Up @@ -233,26 +232,47 @@ pub async fn configure(
.await
.map_err(Error::ConnectionPoolError)?;

let mut metadata = query_engine_metadata::metadata::Metadata::default();
let mut connection = mssql_pool
.get()
.await
.map_err(Error::GetConnectionFromPool)?;

// Let's do some table introspection
let mut table_query = SQL::new();
RawSql::RawText(TABLE_CONFIGURATION_QUERY.to_string()).to_sql(&mut table_query);
let mut table_rows = Vec::new();
execute_query(
&mut connection,
&table_query,
&BTreeMap::new(),
&mut table_rows,
)
.await
.map_err(|e| Error::IntrospectionQueryExecutionError(format!("{:?}", e)))?;
let tables: Vec<introspection::IntrospectionTable> = serde_json::from_slice(&table_rows)
.map_err(|e| Error::JsonDeserializationError(e.to_string()))?;

// Let's do some types introspection
let mut types_query = SQL::new();
RawSql::RawText(TYPES_QUERY.to_string()).to_sql(&mut types_query);
let mut types_rows = Vec::new();
execute_query(
&mut connection,
&types_query,
&BTreeMap::new(),
&mut types_rows,
)
.await
.map_err(|e| Error::IntrospectionQueryExecutionError(format!("{:?}", e)))?;
let type_names: Vec<TypeItem> = serde_json::from_slice(&types_rows)
.map_err(|e| Error::JsonDeserializationError(e.to_string()))?;

let mut metadata = query_engine_metadata::metadata::Metadata::default();
metadata.native_queries = configuration.metadata.native_queries.clone();
metadata.native_mutations = configuration.metadata.native_mutations.clone();

let tables_row = select_first_row(&mssql_pool, TABLE_CONFIGURATION_QUERY).await;

let tables: Vec<introspection::IntrospectionTable> =
serde_json::from_str(tables_row.get(0).unwrap()).unwrap();

metadata.tables = get_tables_info(tables);

let types_row = select_first_row(&mssql_pool, TYPES_QUERY).await;

let type_names: Vec<TypeItem> = serde_json::from_str(types_row.get(0).unwrap()).unwrap();

metadata.comparison_operators = get_comparison_operators(&type_names);

metadata.aggregate_functions = get_aggregate_functions(&type_names);

metadata.stored_procedures = configure_stored_procedures(
&mssql_pool,
configuration.metadata.stored_procedures.clone(),
Expand Down
12 changes: 12 additions & 0 deletions crates/ndc-sqlserver/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,18 @@ impl<Env: Environment + Send + Sync> connector::ConnectorSetup for SQLServerSetu
configuration::Error::ConnectionPoolError(inner) => {
std::io::Error::new(std::io::ErrorKind::Other, inner.to_string()).into()
}
configuration::Error::GetConnectionFromPool(inner) => {
std::io::Error::new(std::io::ErrorKind::Other, inner.to_string()).into()
}
configuration::Error::JsonDeserializationError(inner) => connector::ParseError::from(
std::io::Error::new(std::io::ErrorKind::Other, inner.to_string()),
),
configuration::Error::IntrospectionQueryExecutionError(inner) => {
connector::ParseError::from(std::io::Error::new(
std::io::ErrorKind::Other,
inner.to_string(),
))
}
configuration::Error::StoredProcedureIntrospectionError(inner) => {
connector::ParseError::from(std::io::Error::new(
std::io::ErrorKind::Other,
Expand Down
2 changes: 1 addition & 1 deletion crates/query-engine/execution/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ async fn execute_queries(
}

/// Execute the query on one set of variables.
pub(crate) async fn execute_query(
pub async fn execute_query(
connection: &mut bb8::PooledConnection<'_, bb8_tiberius::ConnectionManager>,
query: &sql::string::SQL,
variables: &BTreeMap<ndc_models::VariableName, serde_json::Value>,
Expand Down

0 comments on commit 5d9bfce

Please sign in to comment.