Skip to content

Commit

Permalink
Better docs for Controller::reconcile_on
Browse files Browse the repository at this point in the history
The unstable method currently suggests that this method can be used to help share a store with the reconciler.
This is actually nothing specific to `reconcile_on`, and you can do the same with the streams interface with `watches_stream`.

We made the `reconcile_on` right before `watches_stream` became a thing so this makes sense.

Have reworded the example to highlight that this has a better use-case with actually getting arbitrary third-party info,
and then mapping that to kubernetes objects.

First example that came to mind was using an IntervalStream with tokio and just cycle through a bunch of objects, but there may be a better example that does not pull in the extra dev dep.

Signed-off-by: clux <[email protected]>
  • Loading branch information
clux committed Oct 7, 2023
1 parent 7ef858d commit 8022ddf
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 17 deletions.
1 change: 1 addition & 0 deletions kube-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ tokio = { version = "1.14.0", features = ["full", "test-util"] }
rand = "0.8.0"
schemars = "0.8.6"
tracing-subscriber = "0.3.17"
tokio-stream = "0.1.14"

[dev-dependencies.k8s-openapi]
version = "0.20.0"
Expand Down
30 changes: 13 additions & 17 deletions kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1087,41 +1087,37 @@ where

/// Trigger the reconciliation process for a managed object `ObjectRef<K>` whenever `trigger` emits a value
///
/// For example, this can be used to watch resources once and use the stream to trigger reconciliation and also keep a cache of those objects.
/// That way it's possible to use this up to date cache instead of querying Kubernetes to access those resources
/// This can be used to inject reconciliations for specific objects from an external resource.
///
/// # Example:
///
/// ```no_run
/// # async {
/// # use futures::{StreamExt, TryStreamExt};
/// # use k8s_openapi::api::core::v1::{ConfigMap, Pod};
/// # use k8s_openapi::api::core::v1::{ConfigMap};
/// # use kube::api::Api;
/// # use kube::runtime::controller::Action;
/// # use kube::runtime::reflector::{ObjectRef, Store};
/// # use kube::runtime::{reflector, watcher, Controller, WatchStreamExt};
/// # use kube::runtime::watcher::Config;
/// # use kube::{Client, Error, ResourceExt};
/// # use tokio_stream::wrappers::IntervalStream;
/// # use std::future;
/// # use std::time::Duration;
/// # use std::sync::Arc;
/// #
/// # let client: Client = todo!();
/// # async fn reconcile(_: Arc<ConfigMap>, _: Arc<Store<Pod>>) -> Result<Action, Error> { Ok(Action::await_change()) }
/// # fn error_policy(_: Arc<ConfigMap>, _: &kube::Error, _: Arc<Store<Pod>>) -> Action { Action::await_change() }
/// # async fn reconcile(_: Arc<ConfigMap>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
/// # fn error_policy(_: Arc<ConfigMap>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
/// #
/// // Store can be used in the reconciler instead of querying Kube
/// let (pod_store, writer) = reflector::store();
/// let pod_stream = watcher(Api::<Pod>::all(client.clone()), Config::default())
/// .default_backoff()
/// .reflect(writer)
/// .applied_objects()
/// // Map to the relevant `ObjectRef<K>` to reconcile
/// .map_ok(|pod| ObjectRef::new(&format!("{}-cm", pod.name_any())).within(&pod.namespace().unwrap()));
/// let ns = "external-configs".to_string();
/// let mut next_object = [ObjectRef::new("managed-cm1").within(&ns)].into_iter().cycle();
/// let interval = tokio::time::interval(Duration::from_secs(60)); // hit the object every minute
/// let external_stream = IntervalStream::new(interval).map(|_| Ok(next_object.next().unwrap()));
///
/// Controller::new(Api::<ConfigMap>::all(client), Config::default())
/// .reconcile_on(pod_stream)
/// // The store can be re-used between controllers and even inspected from the reconciler through [Context]
/// .run(reconcile, error_policy, Arc::new(pod_store))
/// Controller::new(Api::<ConfigMap>::namespaced(client, &ns), Config::default())
/// .reconcile_on(external_stream)
/// .run(reconcile, error_policy, Arc::new(()))
/// .for_each(|_| future::ready(()))
/// .await;
/// # };
Expand Down

0 comments on commit 8022ddf

Please sign in to comment.