Skip to content

Commit

Permalink
fix: invalid objects don't stop the reconciliation (#482)
Browse files Browse the repository at this point in the history
  • Loading branch information
razvan authored Oct 23, 2024
1 parent e138c37 commit b28a0ba
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 32 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ All notable changes to this project will be documented in this file.
- BREAKING: The fields `connection` and `host` on `S3Connection` as well as `bucketName` on `S3Bucket`are now mandatory ([#472]).
- Fix `envOverrides` for SparkApplication and SparkHistoryServer ([#451]).
- Ensure SparkApplications can only create a single submit Job. Fix for #457 ([#460]).
- Invalid `SparkApplication`/`SparkHistoryServer` objects don't cause the operator to stop functioning (#[482]).

### Removed

Expand Down
42 changes: 32 additions & 10 deletions rust/operator-binary/src/history/history_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::history::operations::pdb::add_pdbs;
use crate::product_logging::{self, resolve_vector_aggregator_address};
use crate::Ctx;
use product_config::{types::PropertyNameKind, writer::to_java_properties_string};
use stackable_operator::kube::core::{error_boundary, DeserializeGuard};
use stackable_operator::{
builder::{
self,
Expand Down Expand Up @@ -196,6 +197,11 @@ pub enum Error {
AddVolumeMount {
source: builder::pod::container::Error,
},

#[snafu(display("SparkHistoryServer object is invalid"))]
InvalidSparkHistoryServer {
source: error_boundary::InvalidObject,
},
}

type Result<T, E = Error> = std::result::Result<T, E>;
Expand All @@ -206,9 +212,18 @@ impl ReconcilerError for Error {
}
}
/// Updates the status of the SparkApplication that started the pod.
pub async fn reconcile(shs: Arc<SparkHistoryServer>, ctx: Arc<Ctx>) -> Result<Action> {
pub async fn reconcile(
shs: Arc<DeserializeGuard<SparkHistoryServer>>,
ctx: Arc<Ctx>,
) -> Result<Action> {
tracing::info!("Starting reconcile history server");

let shs = shs
.0
.as_ref()
.map_err(error_boundary::InvalidObject::clone)
.context(InvalidSparkHistoryServerSnafu)?;

let client = &ctx.client;

let mut cluster_resources = ClusterResources::new(
Expand Down Expand Up @@ -244,7 +259,7 @@ pub async fn reconcile(shs: Arc<SparkHistoryServer>, ctx: Arc<Ctx>) -> Result<Ac

// Use a dedicated service account for history server pods.
let (serviceaccount, rolebinding) =
build_history_role_serviceaccount(&shs, &resolved_product_image.app_version_label)?;
build_history_role_serviceaccount(shs, &resolved_product_image.app_version_label)?;
let serviceaccount = cluster_resources
.add(client, serviceaccount)
.await
Expand All @@ -261,7 +276,7 @@ pub async fn reconcile(shs: Arc<SparkHistoryServer>, ctx: Arc<Ctx>) -> Result<Ac
.iter()
{
let service = build_service(
&shs,
shs,
&resolved_product_image.app_version_label,
role_name,
None,
Expand All @@ -273,7 +288,7 @@ pub async fn reconcile(shs: Arc<SparkHistoryServer>, ctx: Arc<Ctx>) -> Result<Ac

for (rolegroup_name, rolegroup_config) in role_config.iter() {
let rgr = RoleGroupRef {
cluster: ObjectRef::from_obj(&*shs),
cluster: ObjectRef::from_obj(shs),
role: role_name.into(),
role_group: rolegroup_name.into(),
};
Expand All @@ -283,7 +298,7 @@ pub async fn reconcile(shs: Arc<SparkHistoryServer>, ctx: Arc<Ctx>) -> Result<Ac
.context(FailedToResolveConfigSnafu)?;

let service = build_service(
&shs,
shs,
&resolved_product_image.app_version_label,
role_name,
Some(&rgr),
Expand All @@ -294,7 +309,7 @@ pub async fn reconcile(shs: Arc<SparkHistoryServer>, ctx: Arc<Ctx>) -> Result<Ac
.context(ApplyServiceSnafu)?;

let config_map = build_config_map(
&shs,
shs,
rolegroup_config,
&merged_config,
&resolved_product_image.app_version_label,
Expand All @@ -308,7 +323,7 @@ pub async fn reconcile(shs: Arc<SparkHistoryServer>, ctx: Arc<Ctx>) -> Result<Ac
.context(ApplyConfigMapSnafu)?;

let sts = build_stateful_set(
&shs,
shs,
&resolved_product_image,
&rgr,
&log_dir,
Expand All @@ -324,7 +339,7 @@ pub async fn reconcile(shs: Arc<SparkHistoryServer>, ctx: Arc<Ctx>) -> Result<Ac
let role_config = &shs.spec.nodes.role_config;
add_pdbs(
&role_config.pod_disruption_budget,
&shs,
shs,
client,
&mut cluster_resources,
)
Expand All @@ -340,8 +355,15 @@ pub async fn reconcile(shs: Arc<SparkHistoryServer>, ctx: Arc<Ctx>) -> Result<Ac
Ok(Action::await_change())
}

pub fn error_policy(_obj: Arc<SparkHistoryServer>, _error: &Error, _ctx: Arc<Ctx>) -> Action {
Action::requeue(*Duration::from_secs(5))
pub fn error_policy(
_obj: Arc<DeserializeGuard<SparkHistoryServer>>,
error: &Error,
_ctx: Arc<Ctx>,
) -> Action {
match error {
Error::InvalidSparkHistoryServer { .. } => Action::await_change(),
_ => Action::requeue(*Duration::from_secs(5)),
}
}

#[allow(clippy::result_large_err)]
Expand Down
19 changes: 10 additions & 9 deletions rust/operator-binary/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use stackable_operator::cli::{Command, ProductOperatorRun};
use stackable_operator::k8s_openapi::api::apps::v1::StatefulSet;
use stackable_operator::k8s_openapi::api::core::v1::Pod;
use stackable_operator::k8s_openapi::api::core::v1::{ConfigMap, Service};
use stackable_operator::kube::core::DeserializeGuard;
use stackable_operator::kube::runtime::{controller::Controller, watcher};
use stackable_operator::logging::controller::report_controller_reconciled;
use stackable_operator::CustomResourceExt;
Expand Down Expand Up @@ -83,11 +84,11 @@ async fn main() -> anyhow::Result<()> {
product_config: product_config.load(&PRODUCT_CONFIG_PATHS)?,
};
let app_controller = Controller::new(
watch_namespace.get_api::<SparkApplication>(&client),
watch_namespace.get_api::<DeserializeGuard<SparkApplication>>(&client),
watcher::Config::default(),
)
.owns(
watch_namespace.get_api::<ConfigMap>(&client),
watch_namespace.get_api::<DeserializeGuard<ConfigMap>>(&client),
watcher::Config::default(),
)
.shutdown_on_signal()
Expand All @@ -106,12 +107,12 @@ async fn main() -> anyhow::Result<()> {
.instrument(info_span!("app_controller"));

let pod_driver_controller = Controller::new(
watch_namespace.get_api::<Pod>(&client),
watch_namespace.get_api::<DeserializeGuard<Pod>>(&client),
watcher::Config::default()
.labels(&format!("app.kubernetes.io/managed-by={OPERATOR_NAME}_{CONTROLLER_NAME},spark-role=driver")),
)
.owns(
watch_namespace.get_api::<Pod>(&client),
watch_namespace.get_api::<DeserializeGuard<Pod>>(&client),
watcher::Config::default(),
)
.shutdown_on_signal()
Expand All @@ -129,23 +130,23 @@ async fn main() -> anyhow::Result<()> {
product_config: product_config.load(&PRODUCT_CONFIG_PATHS)?,
};
let history_controller = Controller::new(
watch_namespace.get_api::<SparkHistoryServer>(&client),
watch_namespace.get_api::<DeserializeGuard<SparkHistoryServer>>(&client),
watcher::Config::default(),
)
.owns(
watch_namespace.get_api::<SparkHistoryServer>(&client),
watch_namespace.get_api::<DeserializeGuard<SparkHistoryServer>>(&client),
watcher::Config::default(),
)
.owns(
watch_namespace.get_api::<StatefulSet>(&client),
watch_namespace.get_api::<DeserializeGuard<StatefulSet>>(&client),
watcher::Config::default(),
)
.owns(
watch_namespace.get_api::<Service>(&client),
watch_namespace.get_api::<DeserializeGuard<Service>>(&client),
watcher::Config::default(),
)
.owns(
watch_namespace.get_api::<ConfigMap>(&client),
watch_namespace.get_api::<DeserializeGuard<ConfigMap>>(&client),
watcher::Config::default(),
)
.shutdown_on_signal()
Expand Down
24 changes: 20 additions & 4 deletions rust/operator-binary/src/pod_driver_controller.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use stackable_operator::{
client::Client, k8s_openapi::api::core::v1::Pod, kube::runtime::controller::Action,
client::Client,
k8s_openapi::api::core::v1::Pod,
kube::core::{error_boundary, DeserializeGuard},
kube::runtime::controller::Action,
time::Duration,
};
use stackable_spark_k8s_crd::{
Expand Down Expand Up @@ -35,6 +38,10 @@ pub enum Error {
source: stackable_operator::client::Error,
name: String,
},
#[snafu(display("Pod object is invalid"))]
InvalidPod {
source: error_boundary::InvalidObject,
},
}

type Result<T, E = Error> = std::result::Result<T, E>;
Expand All @@ -45,9 +52,15 @@ impl ReconcilerError for Error {
}
}
/// Updates the status of the SparkApplication that started the pod.
pub async fn reconcile(pod: Arc<Pod>, client: Arc<Client>) -> Result<Action> {
pub async fn reconcile(pod: Arc<DeserializeGuard<Pod>>, client: Arc<Client>) -> Result<Action> {
tracing::info!("Starting reconcile driver pod");

let pod = pod
.0
.as_ref()
.map_err(error_boundary::InvalidObject::clone)
.context(InvalidPodSnafu)?;

let pod_name = pod.metadata.name.as_ref().context(PodNameNotFoundSnafu)?;
let app_name = pod
.metadata
Expand Down Expand Up @@ -94,6 +107,9 @@ pub async fn reconcile(pod: Arc<Pod>, client: Arc<Client>) -> Result<Action> {
Ok(Action::await_change())
}

pub fn error_policy(_obj: Arc<Pod>, _error: &Error, _ctx: Arc<Client>) -> Action {
Action::requeue(*Duration::from_secs(5))
pub fn error_policy(_obj: Arc<DeserializeGuard<Pod>>, error: &Error, _ctx: Arc<Client>) -> Action {
match error {
Error::InvalidPod { .. } => Action::await_change(),
_ => Action::requeue(*Duration::from_secs(5)),
}
}
40 changes: 31 additions & 9 deletions rust/operator-binary/src/spark_k8s_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use stackable_operator::{
Resource,
},
kube::{
core::{error_boundary, DeserializeGuard},
runtime::{controller::Action, reflector::ObjectRef},
ResourceExt,
},
Expand Down Expand Up @@ -194,6 +195,11 @@ pub enum Error {
AddVolumeMount {
source: builder::pod::container::Error,
},

#[snafu(display("SparkApplication object is invalid"))]
InvalidSparkApplication {
source: error_boundary::InvalidObject,
},
}

type Result<T, E = Error> = std::result::Result<T, E>;
Expand All @@ -204,9 +210,18 @@ impl ReconcilerError for Error {
}
}

pub async fn reconcile(spark_application: Arc<SparkApplication>, ctx: Arc<Ctx>) -> Result<Action> {
pub async fn reconcile(
spark_application: Arc<DeserializeGuard<SparkApplication>>,
ctx: Arc<Ctx>,
) -> Result<Action> {
tracing::info!("Starting reconcile");

let spark_application = spark_application
.0
.as_ref()
.map_err(error_boundary::InvalidObject::clone)
.context(InvalidSparkApplicationSnafu)?;

let client = &ctx.client;

if spark_application.k8s_job_has_been_created() {
Expand Down Expand Up @@ -269,7 +284,7 @@ pub async fn reconcile(spark_application: Arc<SparkApplication>, ctx: Arc<Ctx>)
.context(InvalidProductConfigSnafu)?;

let (serviceaccount, rolebinding) =
build_spark_role_serviceaccount(&spark_application, &resolved_product_image)?;
build_spark_role_serviceaccount(spark_application, &resolved_product_image)?;
client
.apply_patch(CONTROLLER_NAME, &serviceaccount, &serviceaccount)
.await
Expand Down Expand Up @@ -305,7 +320,7 @@ pub async fn reconcile(spark_application: Arc<SparkApplication>, ctx: Arc<Ctx>)
.and_then(|r| r.get(&"default".to_string()));

let driver_pod_template_config_map = pod_template_config_map(
&spark_application,
spark_application,
SparkApplicationRole::Driver,
&driver_config,
driver_product_config,
Expand Down Expand Up @@ -334,7 +349,7 @@ pub async fn reconcile(spark_application: Arc<SparkApplication>, ctx: Arc<Ctx>)
.and_then(|r| r.get(&"default".to_string()));

let executor_pod_template_config_map = pod_template_config_map(
&spark_application,
spark_application,
SparkApplicationRole::Executor,
&executor_config,
executor_product_config,
Expand Down Expand Up @@ -372,7 +387,7 @@ pub async fn reconcile(spark_application: Arc<SparkApplication>, ctx: Arc<Ctx>)
.and_then(|r| r.get(&"default".to_string()));

let submit_job_config_map = submit_job_config_map(
&spark_application,
spark_application,
submit_product_config,
&resolved_product_image,
)?;
Expand All @@ -386,7 +401,7 @@ pub async fn reconcile(spark_application: Arc<SparkApplication>, ctx: Arc<Ctx>)
.context(ApplyApplicationSnafu)?;

let job = spark_job(
&spark_application,
spark_application,
&resolved_product_image,
&serviceaccount,
&env_vars,
Expand All @@ -406,7 +421,7 @@ pub async fn reconcile(spark_application: Arc<SparkApplication>, ctx: Arc<Ctx>)
client
.apply_patch_status(
CONTROLLER_NAME,
spark_application.as_ref(),
spark_application,
&SparkApplicationStatus {
phase: "Unknown".to_string(),
},
Expand Down Expand Up @@ -984,6 +999,13 @@ fn security_context() -> PodSecurityContext {
}
}

pub fn error_policy(_obj: Arc<SparkApplication>, _error: &Error, _ctx: Arc<Ctx>) -> Action {
Action::requeue(*Duration::from_secs(5))
pub fn error_policy(
_obj: Arc<DeserializeGuard<SparkApplication>>,
error: &Error,
_ctx: Arc<Ctx>,
) -> Action {
match error {
Error::InvalidSparkApplication { .. } => Action::await_change(),
_ => Action::requeue(*Duration::from_secs(5)),
}
}

0 comments on commit b28a0ba

Please sign in to comment.