Skip to content

Commit

Permalink
Use execute_query for introspection
Browse files Browse the repository at this point in the history
  • Loading branch information
codedmart committed Dec 11, 2024
1 parent 205ecee commit 31e820d
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 41 deletions.
4 changes: 3 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion crates/configuration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ workspace = true
ndc-models = { workspace = true }
query-engine-metadata = { path = "../query-engine/metadata" }
query-engine-metrics = { path = "../query-engine/metrics" }

query-engine-execution = { path = "../query-engine/execution" }
query-engine-sql = { path = "../query-engine/sql" }

schemars = { version = "0.8.16", features = ["smol_str", "preserve_order"] }
serde = "1.0.198"
Expand Down
6 changes: 6 additions & 0 deletions crates/configuration/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ pub enum Error {
#[error("Error creating connection pool while introspecting the database: {0}")]
ConnectionPoolError(#[from] bb8_tiberius::Error),

#[error("Failed to get connection from pool: {0}")]
GetConnectionFromPool(#[from] bb8::RunError<bb8_tiberius::Error>),

#[error("JSON deserialization error: {0}")]
JsonDeserializationError(String),

// error while parsing stored procedure introspection
#[error("Error parsing stored procedure introspection: {0}")]
StoredProcedureIntrospectionError(serde_json::Error),
Expand Down
71 changes: 33 additions & 38 deletions crates/configuration/src/version1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,20 @@ use crate::secret::Secret;
use crate::{uri, ConnectionUri};

use ndc_models::{AggregateFunctionName, CollectionName, ComparisonOperatorName, FieldName};
use query_engine_execution::query::execute_query;
use query_engine_metadata::metadata;
use query_engine_metadata::metadata::stored_procedures::{
StoredProcedureArgumentInfo, StoredProcedureInfo, StoredProcedures,

Check warning on line 13 in crates/configuration/src/version1.rs

View workflow job for this annotation

GitHub Actions / cargo fmt

Diff in /home/runner/work/ndc-sqlserver/ndc-sqlserver/crates/configuration/src/version1.rs
};
use query_engine_metadata::metadata::{database, Nullable};

use query_engine_sql::sql::{ast::RawSql,string::SQL};
use query_engine_metrics::metrics;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::collections::BTreeSet;

use thiserror::Error;
use tiberius::Query;

// TODO(KC): Move the `table_configuration.sql` to the `static` folder present
// in the root of this repo.
Expand Down Expand Up @@ -156,22 +156,6 @@ async fn create_mssql_pool(
bb8::Pool::builder().max_size(2).build(mgr).await
}

async fn select_first_row(
mssql_pool: &bb8::Pool<bb8_tiberius::ConnectionManager>,
query: &str,
) -> tiberius::Row {
let mut connection = mssql_pool.get().await.unwrap();

// let's do a query to check everything is ok
let select = Query::new(query);

// go!
let stream = select.query(&mut connection).await.unwrap();

// Nothing is fetched, the first result set starts.
stream.into_row().await.unwrap().unwrap()
}

// get_stored_procedures fetches the stored procedures from the database and returns them as a
// vector of introspection::IntrospectStoredProcedure.
async fn configure_stored_procedures(
Expand All @@ -181,12 +165,17 @@ async fn configure_stored_procedures(
) -> Result<StoredProcedures, Error> {
match config_options {
Some(config_options) => {

Check warning on line 167 in crates/configuration/src/version1.rs

View workflow job for this annotation

GitHub Actions / cargo fmt

Diff in /home/runner/work/ndc-sqlserver/ndc-sqlserver/crates/configuration/src/version1.rs
let stored_procedures_row =
select_first_row(mssql_pool, STORED_PROCS_CONFIGURATION_QUERY).await;
let introspected_stored_procedures: Vec<introspection::IntrospectStoredProcedure> =
serde_json::from_str(stored_procedures_row.get(0).unwrap())
.map_err(Error::StoredProcedureIntrospectionError)?;
let new_stored_procedures = get_stored_procedures(introspected_stored_procedures);
let mut connection = mssql_pool
.get()
.await.map_err(Error::GetConnectionFromPool)?;
// Let's do some stored procedures introspection
let mut stored_procs_query = SQL::new();
RawSql::RawText(STORED_PROCS_CONFIGURATION_QUERY.to_string()).to_sql(&mut stored_procs_query);
let mut stored_procs_rows = Vec::new();
execute_query(&mut connection, &stored_procs_query, &BTreeMap::new(), &mut stored_procs_rows).await.unwrap();
let introspected_stored_procedures: Vec<introspection::IntrospectStoredProcedure> = serde_json::from_slice(&stored_procs_rows)
.map_err(|e| Error::JsonDeserializationError(e.to_string()))?;
let new_stored_procedures = get_stored_procedures(introspected_stored_procedures);

// traverse the new stored procedures and add them to the existing stored procedures
let mut merged_stored_procedures = existing_stored_procedures.0.clone();
Expand Down Expand Up @@ -233,26 +222,32 @@ pub async fn configure(
.await
.map_err(Error::ConnectionPoolError)?;

Check warning on line 224 in crates/configuration/src/version1.rs

View workflow job for this annotation

GitHub Actions / cargo fmt

Diff in /home/runner/work/ndc-sqlserver/ndc-sqlserver/crates/configuration/src/version1.rs
let mut metadata = query_engine_metadata::metadata::Metadata::default();
let mut connection = mssql_pool
.get()
.await.map_err(Error::GetConnectionFromPool)?;

// Let's do some table introspection
let mut table_query = SQL::new();
RawSql::RawText(TABLE_CONFIGURATION_QUERY.to_string()).to_sql(&mut table_query);
let mut table_rows = Vec::new();

Check warning on line 232 in crates/configuration/src/version1.rs

View workflow job for this annotation

GitHub Actions / cargo fmt

Diff in /home/runner/work/ndc-sqlserver/ndc-sqlserver/crates/configuration/src/version1.rs
execute_query(&mut connection, &table_query, &BTreeMap::new(), &mut table_rows).await.unwrap();
let tables: Vec<introspection::IntrospectionTable> = serde_json::from_slice(&table_rows)
.map_err(|e| Error::JsonDeserializationError(e.to_string()))?;

// Let's do some types introspection
let mut types_query = SQL::new();
RawSql::RawText(TYPES_QUERY.to_string()).to_sql(&mut types_query);
let mut types_rows = Vec::new();

Check warning on line 240 in crates/configuration/src/version1.rs

View workflow job for this annotation

GitHub Actions / cargo fmt

Diff in /home/runner/work/ndc-sqlserver/ndc-sqlserver/crates/configuration/src/version1.rs
execute_query(&mut connection, &types_query, &BTreeMap::new(), &mut types_rows).await.unwrap();
let type_names: Vec<TypeItem> = serde_json::from_slice(&types_rows)
.map_err(|e| Error::JsonDeserializationError(e.to_string()))?;

let mut metadata = query_engine_metadata::metadata::Metadata::default();
metadata.native_queries = configuration.metadata.native_queries.clone();
metadata.native_mutations = configuration.metadata.native_mutations.clone();

let tables_row = select_first_row(&mssql_pool, TABLE_CONFIGURATION_QUERY).await;

let tables: Vec<introspection::IntrospectionTable> =
serde_json::from_str(tables_row.get(0).unwrap()).unwrap();

metadata.tables = get_tables_info(tables);

let types_row = select_first_row(&mssql_pool, TYPES_QUERY).await;

let type_names: Vec<TypeItem> = serde_json::from_str(types_row.get(0).unwrap()).unwrap();

metadata.comparison_operators = get_comparison_operators(&type_names);

metadata.aggregate_functions = get_aggregate_functions(&type_names);

metadata.stored_procedures = configure_stored_procedures(
&mssql_pool,
configuration.metadata.stored_procedures.clone(),
Expand Down
9 changes: 9 additions & 0 deletions crates/ndc-sqlserver/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,15 @@ impl<Env: Environment + Send + Sync> connector::ConnectorSetup for SQLServerSetu
configuration::Error::ConnectionPoolError(inner) => {
std::io::Error::new(std::io::ErrorKind::Other, inner.to_string()).into()
}
configuration::Error::GetConnectionFromPool(inner) => {

Check warning on line 111 in crates/ndc-sqlserver/src/connector.rs

View workflow job for this annotation

GitHub Actions / cargo fmt

Diff in /home/runner/work/ndc-sqlserver/ndc-sqlserver/crates/ndc-sqlserver/src/connector.rs
std::io::Error::new(std::io::ErrorKind::Other, inner.to_string()).into()
}
configuration::Error::JsonDeserializationError(inner) => {
connector::ParseError::from(std::io::Error::new(
std::io::ErrorKind::Other,
inner.to_string(),
))
}
configuration::Error::StoredProcedureIntrospectionError(inner) => {
connector::ParseError::from(std::io::Error::new(
std::io::ErrorKind::Other,
Expand Down
2 changes: 1 addition & 1 deletion crates/query-engine/execution/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ async fn execute_queries(
}

/// Execute the query on one set of variables.
pub(crate) async fn execute_query(
pub async fn execute_query(
connection: &mut bb8::PooledConnection<'_, bb8_tiberius::ConnectionManager>,
query: &sql::string::SQL,
variables: &BTreeMap<ndc_models::VariableName, serde_json::Value>,
Expand Down

0 comments on commit 31e820d

Please sign in to comment.