Skip to content

Commit

Permalink
feat(service): schema mismatch disables service (#198)
Browse files Browse the repository at this point in the history
* feat: refactor State to hide inner vars, expose functionality via methods

Adds `State::event_db()` which returns an optional handle to the DB connection pool
Adds `State::schema_version_check()` which retuns the result of the schema version check query
Adds `State::is_schema_version_status(svs)` which compares the inner value with the argument
Adds `State::set_schema_version_status(svs)` which sets the inner value with the argument

* feat: add SchemaVersionValidation middleware

Add middleware type to check the State's schema version status variable, if a mismatch
is detected, a `503 Service unavailable` response is returned.

* fix: update GET /health/ready endpoint

* feat: update endpoint to use 'State::event_db()' method

* feat: add SchemaVersionValidation middleware to existing API endpoints

* fix: spelling
  • Loading branch information
saibatizoku authored Jan 3, 2024
1 parent 63df50d commit 930c2ce
Show file tree
Hide file tree
Showing 9 changed files with 193 additions and 56 deletions.
27 changes: 14 additions & 13 deletions catalyst-gateway/bin/src/service/api/health/ready_get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use poem_extensions::{
};

use crate::{
cli::Error,
event_db::error::Error as DBError,
service::common::responses::{
resp_2xx::NoContent,
resp_5xx::{server_error, ServerError, ServiceUnavailable},
Expand Down Expand Up @@ -48,23 +50,22 @@ pub(crate) type AllResponses = response! {
/// but unlikely)
/// * 503 Service Unavailable - Service is not ready, do not send other requests.
pub(crate) async fn endpoint(state: Data<&Arc<State>>) -> AllResponses {
match state.event_db.schema_version_check().await {
match state.schema_version_check().await {
Ok(_) => {
tracing::debug!("DB schema version status ok");
if let Ok(mut g) = state.schema_version_status.lock() {
*g = SchemaVersionStatus::Ok;
}
state.set_schema_version_status(SchemaVersionStatus::Ok);
T204(NoContent)
},
Err(crate::event_db::error::Error::TimedOut) => T503(ServiceUnavailable),
Err(err) => {
tracing::error!("DB schema version status mismatch");
if let Ok(mut g) = state.schema_version_status.lock() {
*g = SchemaVersionStatus::Mismatch;
T503(ServiceUnavailable)
} else {
T500(server_error!("{}", err.to_string()))
}
Err(Error::EventDb(DBError::MismatchedSchema { was, expected })) => {
tracing::error!(
expected = expected,
current = was,
"DB schema version status mismatch"
);
state.set_schema_version_status(SchemaVersionStatus::Mismatch);
T503(ServiceUnavailable)
},
Err(Error::EventDb(DBError::TimedOut)) => T503(ServiceUnavailable),
Err(err) => T500(server_error!("{}", err.to_string())),
}
}
66 changes: 37 additions & 29 deletions catalyst-gateway/bin/src/service/api/registration/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::Arc;
use poem::web::Data;
use poem_extensions::{
response,
UniResponse::{T200, T404, T500},
UniResponse::{T200, T404, T500, T503},
};
use poem_openapi::{
param::{Path, Query},
Expand All @@ -13,17 +13,20 @@ use poem_openapi::{
};

use crate::{
service::common::{
objects::{
event_id::EventId, voter_registration::VoterRegistration,
voting_public_key::VotingPublicKey,
},
responses::{
resp_2xx::OK,
resp_4xx::NotFound,
resp_5xx::{server_error, ServerError},
service::{
common::{
objects::{
event_id::EventId, voter_registration::VoterRegistration,
voting_public_key::VotingPublicKey,
},
responses::{
resp_2xx::OK,
resp_4xx::NotFound,
resp_5xx::{server_error, ServerError, ServiceUnavailable},
},
tags::ApiTags,
},
tags::ApiTags,
utilities::middleware::schema_validation::schema_version_validation,
},
state::State,
};
Expand All @@ -36,7 +39,8 @@ impl RegistrationApi {
#[oai(
path = "/voter/:voting_key",
method = "get",
operation_id = "getVoterInfo"
operation_id = "getVoterInfo",
transform = "schema_version_validation"
)]
/// Voter's info
///
Expand Down Expand Up @@ -65,24 +69,28 @@ impl RegistrationApi {
200: OK<Json<VoterRegistration>>,
404: NotFound,
500: ServerError,
503: ServiceUnavailable,
} {
let voter = pool
.event_db
.get_voter(
&event_id.0.map(Into::into),
voting_key.0 .0,
*with_delegators,
)
.await;
match voter {
Ok(voter) => {
match voter.try_into() {
Ok(voter) => T200(OK(Json(voter))),
Err(err) => T500(server_error!("{}", err.to_string())),
}
},
Err(crate::event_db::error::Error::NotFound(_)) => T404(NotFound),
Err(err) => T500(server_error!("{}", err.to_string())),
if let Ok(event_db) = pool.event_db() {
let voter = event_db
.get_voter(
&event_id.0.map(Into::into),
voting_key.0 .0,
*with_delegators,
)
.await;
match voter {
Ok(voter) => {
match voter.try_into() {
Ok(voter) => T200(OK(Json(voter))),
Err(err) => T500(server_error!("{}", err.to_string())),
}
},
Err(crate::event_db::error::Error::NotFound(_)) => T404(NotFound),
Err(err) => T500(server_error!("{}", err.to_string())),
}
} else {
T503(ServiceUnavailable)
}
}
}
10 changes: 8 additions & 2 deletions catalyst-gateway/bin/src/service/api/v0/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@ use std::sync::Arc;
use poem::web::Data;
use poem_openapi::{payload::Binary, OpenApi};

use crate::{service::common::tags::ApiTags, state::State};
use crate::{
service::{
common::tags::ApiTags, utilities::middleware::schema_validation::schema_version_validation,
},
state::State,
};

mod message_post;
mod plans_get;
Expand All @@ -25,7 +30,8 @@ impl V0Api {
#[oai(
path = "/vote/active/plans",
method = "get",
operation_id = "GetActivePlans"
operation_id = "GetActivePlans",
transform = "schema_version_validation"
)]
async fn plans_get(&self, state: Data<&Arc<State>>) -> plans_get::AllResponses {
plans_get::endpoint(state).await
Expand Down
14 changes: 10 additions & 4 deletions catalyst-gateway/bin/src/service/api/v1/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@ use poem::web::{Data, Path};
use poem_openapi::{param::Query, payload::Json, OpenApi};

use crate::{
service::common::{
objects::{account_votes::AccountId, fragments_batch::FragmentsBatch},
tags::ApiTags,
service::{
common::{
objects::{account_votes::AccountId, fragments_batch::FragmentsBatch},
tags::ApiTags,
},
utilities::middleware::schema_validation::schema_version_validation,
},
state::State,
};
Expand All @@ -25,7 +28,8 @@ impl V1Api {
#[oai(
path = "/votes/plan/account-votes/:account_id",
method = "get",
operation_id = "AccountVotes"
operation_id = "AccountVotes",
transform = "schema_version_validation"
)]
/// Get from all active vote plans, the index of the voted proposals
/// by th given account ID.
Expand All @@ -43,6 +47,7 @@ impl V1Api {
method = "post",
operation_id = "fragments",
tag = "ApiTags::Fragments",
transform = "schema_version_validation",
deprecated = true
)]
async fn fragments_post(
Expand All @@ -61,6 +66,7 @@ impl V1Api {
method = "get",
operation_id = "fragmentsStatuses",
tag = "ApiTags::Fragments",
transform = "schema_version_validation",
deprecated = true
)]
async fn fragments_statuses(
Expand Down
6 changes: 6 additions & 0 deletions catalyst-gateway/bin/src/service/common/responses/resp_5xx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,9 @@ impl ResponseError for ServerError {
///
/// #### NO DATA BODY IS RETURNED FOR THIS RESPONSE
pub(crate) struct ServiceUnavailable;

impl ResponseError for ServiceUnavailable {
fn status(&self) -> StatusCode {
StatusCode::SERVICE_UNAVAILABLE
}
}
3 changes: 3 additions & 0 deletions catalyst-gateway/bin/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ pub(crate) enum Error {
/// An IO error has occurred
#[error(transparent)]
Io(#[from] std::io::Error),
/// A mismatch in the expected EventDB schema version
#[error("expected schema version mismatch")]
SchemaVersionMismatch,
}

/// # Run Catalyst Gateway Service.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//! Custom POEM Middleware for this service.
pub(crate) mod schema_validation;
pub(crate) mod tracing_mw;
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
//! Middleware to verify the status of the last DB schema version validation.
//!
//! If a mismatch is detected, the middleware returns an error with `ServiceUnavailable`
//! status code (503). Otherwise, the middleware calls and returns the wrapped endpoint's
//! response.
//!
//! This middleware checks the `State.schema_version_status` value, if it is Ok,
//! the wrapped endpoint is called and its response is returned.
use std::sync::Arc;

use poem::{web::Data, Endpoint, EndpointExt, Middleware, Request, Result};

use crate::{
service::common::responses::resp_5xx::ServiceUnavailable,
state::{SchemaVersionStatus, State},
};

/// A middleware that raises an error with `ServiceUnavailable` and 503 status code
/// if a DB schema version mismatch is found the existing `State`.
pub(crate) struct SchemaVersionValidation;

impl<E: Endpoint> Middleware<E> for SchemaVersionValidation {
type Output = SchemaVersionValidationImpl<E>;

fn transform(&self, ep: E) -> Self::Output {
SchemaVersionValidationImpl { ep }
}
}

/// The new endpoint type generated by the `SchemaVersionValidation`.
pub(crate) struct SchemaVersionValidationImpl<E> {
/// Endpoint wrapped by the middleware.
ep: E,
}

#[poem::async_trait]
impl<E: Endpoint> Endpoint for SchemaVersionValidationImpl<E> {
type Output = E::Output;

async fn call(&self, req: Request) -> Result<Self::Output> {
if let Some(state) = req.data::<Data<&Arc<State>>>() {
// Check if the inner schema version status is set to `Mismatch`,
// if so, return the `ServiceUnavailable` error, which implements
// `ResponseError`, with status code `503`.
// Otherwise, return the endpoint as usual.
if state.is_schema_version_status(&SchemaVersionStatus::Mismatch) {
return Err(ServiceUnavailable.into());
}
}
// Calls the endpoint with the request, and returns the response.
self.ep.call(req).await
}
}

/// A function that wraps an endpoint with the `SchemaVersionValidation`.
///
/// This function is convenient to use with `poem-openapi` [operation parameters](https://docs.rs/poem-openapi/latest/poem_openapi/attr.OpenApi.html#operation-parameters) via the
/// `transform` attribute.
pub(crate) fn schema_version_validation(ep: impl Endpoint) -> impl Endpoint {
ep.with(SchemaVersionValidation)
}
60 changes: 52 additions & 8 deletions catalyst-gateway/bin/src/state/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
//! Shared state used by all endpoints.
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, MutexGuard};

use crate::{
cli::Error,
event_db::{establish_connection, queries::EventDbQueries},
service::Error as ServiceError,
};

/// The status of the expected DB schema version.
#[derive(Debug, PartialEq, Eq)]
pub(crate) enum SchemaVersionStatus {
/// The current DB schema version matches what is expected.
Ok,
Expand All @@ -23,10 +25,10 @@ pub(crate) struct State {
/// This is Private, it needs to be accessed with a function.
// event_db_handle: Arc<ArcSwap<Option<dyn EventDbQueries>>>,
// Private need to get it with a function.
pub(crate) event_db: Arc<dyn EventDbQueries>, /* This needs to be obsoleted, we want the DB
* to be able to be down. */
event_db: Arc<dyn EventDbQueries>, /* This needs to be obsoleted, we want the DB
* to be able to be down. */
/// Status of the last DB schema version check.
pub(crate) schema_version_status: Mutex<SchemaVersionStatus>,
schema_version_status: Mutex<SchemaVersionStatus>,
}

impl State {
Expand All @@ -49,8 +51,50 @@ impl State {
Ok(state)
}

// pub(crate) async fn event_db(&self) -> Option<Arc<dyn EventDbQueries>> {
//
//
// }
/// Get the reference to the database connection pool for `EventDB`.
pub(crate) fn event_db(&self) -> Result<Arc<dyn EventDbQueries>, Error> {
let guard = self.schema_version_status_lock();
match *guard {
SchemaVersionStatus::Ok => Ok(self.event_db.clone()),
SchemaVersionStatus::Mismatch => Err(ServiceError::SchemaVersionMismatch.into()),
}
}

/// Check the DB schema version matches the one expected by the service.
pub(crate) async fn schema_version_check(&self) -> Result<i32, Error> {
Ok(self.event_db.schema_version_check().await?)
}

/// Compare the `State`'s inner value with a given `&SchemaVersionStatus`, returns
/// `bool`.
pub(crate) fn is_schema_version_status(&self, svs: &SchemaVersionStatus) -> bool {
let guard = self.schema_version_status_lock();
&*guard == svs
}

/// Set the state's `SchemaVersionStatus`.
pub(crate) fn set_schema_version_status(&self, svs: SchemaVersionStatus) {
let mut guard = self.schema_version_status_lock();
tracing::debug!(
status = format!("{:?}", svs),
"db schema version status was set"
);
*guard = svs;
}

/// Get the `MutexGuard<SchemaVersionStatus>` from inner the variable.
///
/// Handle poisoned mutex by recovering the guard, and tracing the error.
fn schema_version_status_lock(&self) -> MutexGuard<SchemaVersionStatus> {
match self.schema_version_status.lock() {
Ok(guard) => guard,
Err(poisoned) => {
tracing::error!(
error = format!("{:?}", poisoned),
"recovering DB schema version status fom poisoned mutex"
);
poisoned.into_inner()
},
}
}
}

0 comments on commit 930c2ce

Please sign in to comment.