Skip to content

Commit

Permalink
chore: Avoid unnecessary k8s::Client creations (#295)
Browse files Browse the repository at this point in the history
* chore: Avoid unnecessary k8s::Client creations

* changelog

* Use expect

* Fix clippy
  • Loading branch information
sbernauer authored May 28, 2024
1 parent 439c248 commit 5e8d4d8
Show file tree
Hide file tree
Showing 19 changed files with 174 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,8 @@ impl ResourceRequests {
/// Validates the struct [`ResourceRequests`] by comparing the required
/// resources to the available ones in the current cluster. `object_name`
/// should be `stack` or `demo`.
pub async fn validate_cluster_size(&self, object_name: &str) -> Result<()> {
let kube_client = Client::new().await.context(KubeClientCreateSnafu)?;
let cluster_info = kube_client
.get_cluster_info()
.await
.context(ClusterInfoSnafu)?;
pub async fn validate_cluster_size(&self, client: &Client, object_name: &str) -> Result<()> {
let cluster_info = client.get_cluster_info().await.context(ClusterInfoSnafu)?;

let stack_cpu =
CpuQuantity::try_from(&self.cpu).context(ParseCpuResourceRequirementsSnafu)?;
Expand Down
8 changes: 4 additions & 4 deletions rust/stackable-cockpit/src/platform/credentials.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use kube::{core::DynamicObject, ResourceExt};
use serde::Serialize;
use snafu::{OptionExt, ResultExt, Snafu};

use crate::utils::k8s;
use crate::utils::k8s::{self, Client};

pub type Result<T, E = Error> = std::result::Result<T, E>;

Expand Down Expand Up @@ -34,7 +34,7 @@ impl Display for Credentials {
/// and/or `password_key` are not found or the product does not provide
/// any credentials.
pub async fn get(
kube_client: &k8s::Client,
client: &Client,
product_name: &str,
stacklet: &DynamicObject,
) -> Result<Option<Credentials>> {
Expand All @@ -56,7 +56,7 @@ pub async fn get(
.as_str()
.context(NoSecretSnafu)?;

kube_client
client
.get_credentials_from_secret(
secret_name,
&stacklet.namespace().unwrap(),
Expand All @@ -71,7 +71,7 @@ pub async fn get(
.as_str()
.context(NoSecretSnafu)?;

kube_client
client
.get_credentials_from_secret(
secret_name,
&stacklet.namespace().unwrap(),
Expand Down
36 changes: 27 additions & 9 deletions rust/stackable-cockpit/src/platform/demo/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@ use crate::{
release::ReleaseList,
stack::{self, StackInstallParameters, StackList},
},
utils::params::{
IntoParameters, IntoParametersError, Parameter, RawParameter, RawParameterParseError,
utils::{
k8s::Client,
params::{
IntoParameters, IntoParametersError, Parameter, RawParameter, RawParameterParseError,
},
},
xfer::{self, Client},
xfer,
};

pub type RawDemoParameterParseError = RawParameterParseError;
Expand Down Expand Up @@ -94,7 +97,11 @@ impl DemoSpec {
/// - Does the demo support to be installed in the requested namespace?
/// - Does the cluster have enough resources available to run this demo?
#[instrument(skip_all)]
pub async fn check_prerequisites(&self, product_namespace: &str) -> Result<(), Error> {
pub async fn check_prerequisites(
&self,
client: &Client,
product_namespace: &str,
) -> Result<(), Error> {
debug!("Checking prerequisites before installing demo");

// Returns an error if the demo doesn't support to be installed in the
Expand All @@ -109,7 +116,10 @@ impl DemoSpec {
// Checks if the available cluster resources are sufficient to deploy
// the demo.
if let Some(resource_requests) = &self.resource_requests {
if let Err(err) = resource_requests.validate_cluster_size("demo").await {
if let Err(err) = resource_requests
.validate_cluster_size(client, "demo")
.await
{
match err {
ResourceRequestsError::ValidationErrors { errors } => {
for error in errors {
Expand All @@ -129,15 +139,16 @@ impl DemoSpec {
stack_list: StackList,
release_list: ReleaseList,
install_parameters: DemoInstallParameters,
transfer_client: &Client,
client: &Client,
transfer_client: &xfer::Client,
) -> Result<(), Error> {
// Get the stack spec based on the name defined in the demo spec
let stack = stack_list.get(&self.stack).context(NoSuchStackSnafu {
name: self.stack.clone(),
})?;

// Check demo prerequisites
self.check_prerequisites(&install_parameters.product_namespace)
self.check_prerequisites(client, &install_parameters.product_namespace)
.await?;

let stack_install_parameters = StackInstallParameters {
Expand All @@ -151,19 +162,25 @@ impl DemoSpec {
};

stack
.install(release_list, stack_install_parameters, transfer_client)
.install(
release_list,
stack_install_parameters,
client,
transfer_client,
)
.await
.context(InstallStackSnafu)?;

// Install demo manifests
self.prepare_manifests(install_parameters, transfer_client)
self.prepare_manifests(install_parameters, client, transfer_client)
.await
}

#[instrument(skip_all)]
async fn prepare_manifests(
&self,
install_params: DemoInstallParameters,
client: &Client,
transfer_client: &xfer::Client,
) -> Result<(), Error> {
info!("Installing demo manifests");
Expand All @@ -179,6 +196,7 @@ impl DemoSpec {
&params,
&install_params.product_namespace,
install_params.labels,
client,
transfer_client,
)
.await
Expand Down
9 changes: 4 additions & 5 deletions rust/stackable-cockpit/src/platform/manifests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
common::manifest::ManifestSpec,
helm,
utils::{
k8s,
k8s::{self, Client},
path::{IntoPathOrUrl, PathOrUrlParseError},
},
xfer::{
Expand Down Expand Up @@ -61,20 +61,19 @@ pub enum Error {
}

pub trait InstallManifestsExt {
// TODO (Techassi): This step shouldn't care about templating the manifests nor fecthing them from remote
// TODO (Techassi): This step shouldn't care about templating the manifests nor fetching them from remote
#[instrument(skip_all)]
#[allow(async_fn_in_trait)]
async fn install_manifests(
manifests: &[ManifestSpec],
parameters: &HashMap<String, String>,
product_namespace: &str,
labels: Labels,
client: &Client,
transfer_client: &xfer::Client,
) -> Result<(), Error> {
debug!("Installing demo / stack manifests");

let kube_client = k8s::Client::new().await.context(CreateKubeClientSnafu)?;

for manifest in manifests {
match manifest {
ManifestSpec::HelmChart(helm_file) => {
Expand Down Expand Up @@ -137,7 +136,7 @@ pub trait InstallManifestsExt {
.await
.context(FileTransferSnafu)?;

kube_client
client
.deploy_manifests(&manifests, product_namespace, labels.clone())
.await
.context(DeployManifestSnafu)?
Expand Down
8 changes: 3 additions & 5 deletions rust/stackable-cockpit/src/platform/namespace.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use snafu::{ResultExt, Snafu};
use snafu::Snafu;

use crate::utils::k8s;
use crate::utils::k8s::{self, Client};

#[derive(Debug, Snafu)]
pub enum Error {
Expand All @@ -13,9 +13,7 @@ pub enum Error {

/// Creates a namespace with `name` if needed (not already present in the
/// cluster).
pub async fn create_if_needed(name: String) -> Result<(), Error> {
let client = k8s::Client::new().await.context(KubeClientCreateSnafu)?;

pub async fn create_if_needed(client: &Client, name: String) -> Result<(), Error> {
client
.create_namespace_if_needed(name)
.await
Expand Down
33 changes: 14 additions & 19 deletions rust/stackable-cockpit/src/platform/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use kube::{api::ListParams, ResourceExt};
use snafu::{OptionExt, ResultExt, Snafu};
use tracing::{debug, warn};

use crate::utils::k8s::{self, ListParamsExt};
use crate::utils::k8s::{self, Client, ListParamsExt};

#[derive(Debug, Snafu)]
pub enum Error {
Expand All @@ -40,15 +40,15 @@ pub enum Error {
}

pub async fn get_endpoints(
kube_client: &k8s::Client,
client: &Client,
product_name: &str,
object_name: &str,
object_namespace: &str,
) -> Result<IndexMap<String, String>, Error> {
let list_params =
ListParams::from_product(product_name, Some(object_name), k8s::ProductLabel::Name);

let listeners = kube_client
let listeners = client
.list_listeners(Some(object_namespace), &list_params)
.await;
let listeners = match listeners {
Expand Down Expand Up @@ -92,13 +92,13 @@ pub async fn get_endpoints(
return Ok(endpoints);
}

let services = kube_client
let services = client
.list_services(Some(object_namespace), &list_params)
.await
.context(KubeClientFetchSnafu)?;

for service in services {
match get_endpoint_urls(kube_client, &service, object_name).await {
match get_endpoint_urls(client, &service, object_name).await {
Ok(urls) => endpoints.extend(urls),
Err(err) => warn!(
"Failed to get endpoint_urls of service {service_name}: {err}",
Expand All @@ -111,7 +111,7 @@ pub async fn get_endpoints(
}

pub async fn get_endpoint_urls(
kube_client: &k8s::Client,
client: &Client,
service: &Service,
referenced_object_name: &str,
) -> Result<IndexMap<String, String>, Error> {
Expand All @@ -128,7 +128,7 @@ pub async fn get_endpoint_urls(
let endpoints = match service_spec.type_.as_deref() {
Some("NodePort") => {
get_endpoint_urls_for_nodeport(
kube_client,
client,
&service_name,
service_spec,
&service_namespace,
Expand All @@ -152,13 +152,13 @@ pub async fn get_endpoint_urls(
}

pub async fn get_endpoint_urls_for_nodeport(
kube_client: &k8s::Client,
client: &Client,
service_name: &str,
service_spec: &ServiceSpec,
service_namespace: &str,
referenced_object_name: &str,
) -> Result<IndexMap<String, String>, Error> {
let endpoints = kube_client
let endpoints = client
.get_endpoints(service_namespace, service_name)
.await
.context(KubeClientFetchSnafu)?;
Expand Down Expand Up @@ -191,7 +191,7 @@ pub async fn get_endpoint_urls_for_nodeport(
}
};

let node_ip = get_node_ip(kube_client, node_name).await?;
let node_ip = get_node_ip(client, node_name).await?;

let mut endpoints = IndexMap::new();
for service_port in service_spec.ports.iter().flatten() {
Expand Down Expand Up @@ -265,8 +265,8 @@ pub async fn get_endpoint_urls_for_loadbalancer(
Ok(endpoints)
}

async fn get_node_ip(kube_client: &k8s::Client, node_name: &str) -> Result<String, Error> {
let node_name_ip_mapping = get_node_name_ip_mapping(kube_client).await?;
async fn get_node_ip(client: &Client, node_name: &str) -> Result<String, Error> {
let node_name_ip_mapping = get_node_name_ip_mapping(client).await?;

match node_name_ip_mapping.get(node_name) {
Some(node_ip) => Ok(node_ip.to_string()),
Expand All @@ -276,13 +276,8 @@ async fn get_node_ip(kube_client: &k8s::Client, node_name: &str) -> Result<Strin

// TODO(sbernauer): Add caching. Not going to do so now, as listener-op
// will replace this code entirely anyway.
async fn get_node_name_ip_mapping(
kube_client: &k8s::Client,
) -> Result<HashMap<String, String>, Error> {
let nodes = kube_client
.list_nodes()
.await
.context(KubeClientFetchSnafu)?;
async fn get_node_name_ip_mapping(client: &Client) -> Result<HashMap<String, String>, Error> {
let nodes = client.list_nodes().await.context(KubeClientFetchSnafu)?;

let mut result = HashMap::new();
for node in nodes {
Expand Down
Loading

0 comments on commit 5e8d4d8

Please sign in to comment.