From 31e820d929300b7821b1456128d091f4a1fdecc1 Mon Sep 17 00:00:00 2001 From: Brandon Martin Date: Tue, 10 Dec 2024 18:13:55 -0700 Subject: [PATCH] Use execute_query for introspection --- Cargo.lock | 4 +- crates/configuration/Cargo.toml | 3 +- crates/configuration/src/error.rs | 6 ++ crates/configuration/src/version1.rs | 71 ++++++++++------------ crates/ndc-sqlserver/src/connector.rs | 9 +++ crates/query-engine/execution/src/query.rs | 2 +- 6 files changed, 54 insertions(+), 41 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 01b472ea..cdaba66a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "addr2line" @@ -1320,8 +1320,10 @@ dependencies = [ "bb8-tiberius", "ndc-models", "prometheus", + "query-engine-execution", "query-engine-metadata", "query-engine-metrics", + "query-engine-sql", "schemars", "serde", "serde_json", diff --git a/crates/configuration/Cargo.toml b/crates/configuration/Cargo.toml index 7c722d33..7b405d71 100644 --- a/crates/configuration/Cargo.toml +++ b/crates/configuration/Cargo.toml @@ -10,7 +10,8 @@ workspace = true ndc-models = { workspace = true } query-engine-metadata = { path = "../query-engine/metadata" } query-engine-metrics = { path = "../query-engine/metrics" } - +query-engine-execution = { path = "../query-engine/execution" } +query-engine-sql = { path = "../query-engine/sql" } schemars = { version = "0.8.16", features = ["smol_str", "preserve_order"] } serde = "1.0.198" diff --git a/crates/configuration/src/error.rs b/crates/configuration/src/error.rs index 1458a645..4e5a403c 100644 --- a/crates/configuration/src/error.rs +++ b/crates/configuration/src/error.rs @@ -34,6 +34,12 @@ pub enum Error { #[error("Error creating connection pool while introspecting the database: {0}")] ConnectionPoolError(#[from] bb8_tiberius::Error), + #[error("Failed to get connection from pool: {0}")] + GetConnectionFromPool(#[from] bb8::RunError), + + #[error("JSON deserialization error: {0}")] + JsonDeserializationError(String), + // error while parsing stored procedure introspection #[error("Error parsing stored procedure introspection: {0}")] StoredProcedureIntrospectionError(serde_json::Error), diff --git a/crates/configuration/src/version1.rs b/crates/configuration/src/version1.rs index 43f90e0b..2f9a21ea 100644 --- a/crates/configuration/src/version1.rs +++ b/crates/configuration/src/version1.rs @@ -7,12 +7,13 @@ use crate::secret::Secret; use crate::{uri, ConnectionUri}; use ndc_models::{AggregateFunctionName, CollectionName, ComparisonOperatorName, FieldName}; +use query_engine_execution::query::execute_query; use query_engine_metadata::metadata; use query_engine_metadata::metadata::stored_procedures::{ StoredProcedureArgumentInfo, StoredProcedureInfo, StoredProcedures, }; use query_engine_metadata::metadata::{database, Nullable}; - +use query_engine_sql::sql::{ast::RawSql,string::SQL}; use query_engine_metrics::metrics; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; @@ -20,7 +21,6 @@ use std::collections::BTreeMap; use std::collections::BTreeSet; use thiserror::Error; -use tiberius::Query; // TODO(KC): Move the `table_configuration.sql` to the `static` folder present // in the root of this repo. @@ -156,22 +156,6 @@ async fn create_mssql_pool( bb8::Pool::builder().max_size(2).build(mgr).await } -async fn select_first_row( - mssql_pool: &bb8::Pool, - query: &str, -) -> tiberius::Row { - let mut connection = mssql_pool.get().await.unwrap(); - - // let's do a query to check everything is ok - let select = Query::new(query); - - // go! - let stream = select.query(&mut connection).await.unwrap(); - - // Nothing is fetched, the first result set starts. - stream.into_row().await.unwrap().unwrap() -} - // get_stored_procedures fetches the stored procedures from the database and returns them as a // vector of introspection::IntrospectStoredProcedure. async fn configure_stored_procedures( @@ -181,12 +165,17 @@ async fn configure_stored_procedures( ) -> Result { match config_options { Some(config_options) => { - let stored_procedures_row = - select_first_row(mssql_pool, STORED_PROCS_CONFIGURATION_QUERY).await; - let introspected_stored_procedures: Vec = - serde_json::from_str(stored_procedures_row.get(0).unwrap()) - .map_err(Error::StoredProcedureIntrospectionError)?; - let new_stored_procedures = get_stored_procedures(introspected_stored_procedures); + let mut connection = mssql_pool + .get() + .await.map_err(Error::GetConnectionFromPool)?; + // Let's do some stored procedures introspection + let mut stored_procs_query = SQL::new(); + RawSql::RawText(STORED_PROCS_CONFIGURATION_QUERY.to_string()).to_sql(&mut stored_procs_query); + let mut stored_procs_rows = Vec::new(); + execute_query(&mut connection, &stored_procs_query, &BTreeMap::new(), &mut stored_procs_rows).await.unwrap(); + let introspected_stored_procedures: Vec = serde_json::from_slice(&stored_procs_rows) + .map_err(|e| Error::JsonDeserializationError(e.to_string()))?; + let new_stored_procedures = get_stored_procedures(introspected_stored_procedures); // traverse the new stored procedures and add them to the existing stored procedures let mut merged_stored_procedures = existing_stored_procedures.0.clone(); @@ -233,26 +222,32 @@ pub async fn configure( .await .map_err(Error::ConnectionPoolError)?; - let mut metadata = query_engine_metadata::metadata::Metadata::default(); + let mut connection = mssql_pool + .get() + .await.map_err(Error::GetConnectionFromPool)?; + + // Let's do some table introspection + let mut table_query = SQL::new(); + RawSql::RawText(TABLE_CONFIGURATION_QUERY.to_string()).to_sql(&mut table_query); + let mut table_rows = Vec::new(); + execute_query(&mut connection, &table_query, &BTreeMap::new(), &mut table_rows).await.unwrap(); + let tables: Vec = serde_json::from_slice(&table_rows) + .map_err(|e| Error::JsonDeserializationError(e.to_string()))?; + + // Let's do some types introspection + let mut types_query = SQL::new(); + RawSql::RawText(TYPES_QUERY.to_string()).to_sql(&mut types_query); + let mut types_rows = Vec::new(); + execute_query(&mut connection, &types_query, &BTreeMap::new(), &mut types_rows).await.unwrap(); + let type_names: Vec = serde_json::from_slice(&types_rows) + .map_err(|e| Error::JsonDeserializationError(e.to_string()))?; + let mut metadata = query_engine_metadata::metadata::Metadata::default(); metadata.native_queries = configuration.metadata.native_queries.clone(); metadata.native_mutations = configuration.metadata.native_mutations.clone(); - - let tables_row = select_first_row(&mssql_pool, TABLE_CONFIGURATION_QUERY).await; - - let tables: Vec = - serde_json::from_str(tables_row.get(0).unwrap()).unwrap(); - metadata.tables = get_tables_info(tables); - - let types_row = select_first_row(&mssql_pool, TYPES_QUERY).await; - - let type_names: Vec = serde_json::from_str(types_row.get(0).unwrap()).unwrap(); - metadata.comparison_operators = get_comparison_operators(&type_names); - metadata.aggregate_functions = get_aggregate_functions(&type_names); - metadata.stored_procedures = configure_stored_procedures( &mssql_pool, configuration.metadata.stored_procedures.clone(), diff --git a/crates/ndc-sqlserver/src/connector.rs b/crates/ndc-sqlserver/src/connector.rs index f1e23f0e..3eb3caf0 100644 --- a/crates/ndc-sqlserver/src/connector.rs +++ b/crates/ndc-sqlserver/src/connector.rs @@ -108,6 +108,15 @@ impl connector::ConnectorSetup for SQLServerSetu configuration::Error::ConnectionPoolError(inner) => { std::io::Error::new(std::io::ErrorKind::Other, inner.to_string()).into() } + configuration::Error::GetConnectionFromPool(inner) => { + std::io::Error::new(std::io::ErrorKind::Other, inner.to_string()).into() + } + configuration::Error::JsonDeserializationError(inner) => { + connector::ParseError::from(std::io::Error::new( + std::io::ErrorKind::Other, + inner.to_string(), + )) + } configuration::Error::StoredProcedureIntrospectionError(inner) => { connector::ParseError::from(std::io::Error::new( std::io::ErrorKind::Other, diff --git a/crates/query-engine/execution/src/query.rs b/crates/query-engine/execution/src/query.rs index 47d49171..0b51d026 100644 --- a/crates/query-engine/execution/src/query.rs +++ b/crates/query-engine/execution/src/query.rs @@ -71,7 +71,7 @@ async fn execute_queries( } /// Execute the query on one set of variables. -pub(crate) async fn execute_query( +pub async fn execute_query( connection: &mut bb8::PooledConnection<'_, bb8_tiberius::ConnectionManager>, query: &sql::string::SQL, variables: &BTreeMap,