Skip to content

Commit

Permalink
chore(deps): replace backoff with backon
Browse files Browse the repository at this point in the history
Replace the `backoff` dependency with `backon`. The former one is no
longer maintained and is also pulling the `instant` crate, which has
been marked as unmaintained by RUSTSEC.

Prior to this commit the public API of kube-rs exposed a trait defined
by the `backoff` crate. This commits introduces a new trait defined by
kube-rs, which wraps the `backon` trait.

Fixes #1635

Signed-off-by: Flavio Castelli <[email protected]>
  • Loading branch information
flavio committed Nov 29, 2024
1 parent 3ee4ae5 commit 53cc42e
Show file tree
Hide file tree
Showing 9 changed files with 234 additions and 108 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ assert-json-diff = "2.0.2"
async-broadcast = "0.7.0"
async-stream = "0.3.5"
async-trait = "0.1.64"
backoff = "0.4.0"
backon = "1.3"
base64 = "0.22.1"
bytes = "1.1.0"
chrono = { version = "0.4.34", default-features = false }
Expand Down
2 changes: 1 addition & 1 deletion examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ tower-http = { workspace = true, features = ["trace", "decompression-gzip"] }
hyper = { workspace = true, features = ["client", "http1"] }
hyper-util = { workspace = true, features = ["client-legacy", "http1", "tokio"] }
thiserror.workspace = true
backoff.workspace = true
backon.workspace = true
clap = { version = "4.0", default-features = false, features = ["std", "cargo", "derive"] }
edit = "0.1.3"
tokio-stream = { version = "0.1.9", features = ["net"] }
Expand Down
2 changes: 1 addition & 1 deletion kube-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ json-patch.workspace = true
jsonptr.workspace = true
serde_json.workspace = true
thiserror.workspace = true
backoff.workspace = true
backon.workspace = true
async-trait.workspace = true
hashbrown.workspace = true
k8s-openapi.workspace = true
Expand Down
5 changes: 3 additions & 2 deletions kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ use crate::{
ObjectRef,
},
scheduler::{debounced_scheduler, ScheduleRequest},
utils::{trystream_try_via, CancelableJoinHandle, KubeRuntimeStreamExt, StreamBackoff, WatchStreamExt},
utils::{
trystream_try_via, Backoff, CancelableJoinHandle, KubeRuntimeStreamExt, StreamBackoff, WatchStreamExt,
},
watcher::{self, metadata_watcher, watcher, DefaultBackoff},
};
use backoff::backoff::Backoff;
use educe::Educe;
use futures::{
channel,
Expand Down
62 changes: 29 additions & 33 deletions kube-runtime/src/utils/backoff_reset_timer.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,40 @@
use std::time::{Duration, Instant};

use backoff::{backoff::Backoff, Clock, SystemClock};
pub trait Backoff: Iterator<Item = Duration> + Send + Sync + Unpin {
/// Resets the internal state to the initial value.
fn reset(&mut self);
}

impl<B: Backoff + ?Sized> Backoff for Box<B> {
fn reset(&mut self) {
let this: &mut B = self;
this.reset()
}
}

/// A [`Backoff`] wrapper that resets after a fixed duration has elapsed.
pub struct ResetTimerBackoff<B, C = SystemClock> {
pub struct ResetTimerBackoff<B: Backoff> {
backoff: B,
clock: C,
last_backoff: Option<Instant>,
reset_duration: Duration,
}

impl<B: Backoff> ResetTimerBackoff<B> {
pub fn new(backoff: B, reset_duration: Duration) -> Self {
Self::new_with_custom_clock(backoff, reset_duration, SystemClock {})
}
}

impl<B: Backoff, C: Clock> ResetTimerBackoff<B, C> {
fn new_with_custom_clock(backoff: B, reset_duration: Duration, clock: C) -> Self {
Self {
backoff,
clock,
last_backoff: None,
reset_duration,
}
}
}

impl<B: Backoff, C: Clock> Backoff for ResetTimerBackoff<B, C> {
fn next_backoff(&mut self) -> Option<Duration> {
impl<B: Backoff> Iterator for ResetTimerBackoff<B> {
type Item = Duration;

fn next(&mut self) -> Option<Duration> {
if let Some(last_backoff) = self.last_backoff {
if self.clock.now() > last_backoff + self.reset_duration {
if tokio::time::Instant::now().into_std() > last_backoff + self.reset_duration {
tracing::debug!(
?last_backoff,
reset_duration = ?self.reset_duration,
Expand All @@ -39,48 +43,40 @@ impl<B: Backoff, C: Clock> Backoff for ResetTimerBackoff<B, C> {
self.backoff.reset();
}
}
self.last_backoff = Some(self.clock.now());
self.backoff.next_backoff()
self.last_backoff = Some(tokio::time::Instant::now().into_std());
self.backoff.next()
}
}

impl<B: Backoff> Backoff for ResetTimerBackoff<B> {
fn reset(&mut self) {
// Do not even bother trying to reset here, since `next_backoff` will take care of this when the timer expires.
self.backoff.reset();
}
}

#[cfg(test)]
mod tests {
use backoff::{backoff::Backoff, Clock};
use tokio::time::advance;

use super::ResetTimerBackoff;
use crate::utils::stream_backoff::tests::LinearBackoff;
use std::time::{Duration, Instant};
use std::time::Duration;

#[tokio::test]
async fn should_reset_when_timer_expires() {
tokio::time::pause();
let mut backoff = ResetTimerBackoff::new_with_custom_clock(
let mut backoff = ResetTimerBackoff::new(
LinearBackoff::new(Duration::from_secs(2)),
Duration::from_secs(60),
TokioClock,
);
assert_eq!(backoff.next_backoff(), Some(Duration::from_secs(2)));
assert_eq!(backoff.next(), Some(Duration::from_secs(2)));
advance(Duration::from_secs(40)).await;
assert_eq!(backoff.next_backoff(), Some(Duration::from_secs(4)));
assert_eq!(backoff.next(), Some(Duration::from_secs(4)));
advance(Duration::from_secs(40)).await;
assert_eq!(backoff.next_backoff(), Some(Duration::from_secs(6)));
assert_eq!(backoff.next(), Some(Duration::from_secs(6)));
advance(Duration::from_secs(80)).await;
assert_eq!(backoff.next_backoff(), Some(Duration::from_secs(2)));
assert_eq!(backoff.next(), Some(Duration::from_secs(2)));
advance(Duration::from_secs(80)).await;
assert_eq!(backoff.next_backoff(), Some(Duration::from_secs(2)));
}

struct TokioClock;

impl Clock for TokioClock {
fn now(&self) -> Instant {
tokio::time::Instant::now().into_std()
}
assert_eq!(backoff.next(), Some(Duration::from_secs(2)));
}
}
2 changes: 1 addition & 1 deletion kube-runtime/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ mod reflect;
mod stream_backoff;
mod watch_ext;

pub use backoff_reset_timer::ResetTimerBackoff;
pub use backoff_reset_timer::{Backoff, ResetTimerBackoff};
pub use event_decode::EventDecode;
pub use event_modify::EventModify;
pub use predicate::{predicates, Predicate, PredicateFilter};
Expand Down
78 changes: 66 additions & 12 deletions kube-runtime/src/utils/stream_backoff.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use std::{future::Future, pin::Pin, task::Poll};

use backoff::backoff::Backoff;
use futures::{Stream, TryStream};
use pin_project::pin_project;
use tokio::time::{sleep, Instant, Sleep};

use crate::utils::Backoff;

/// Applies a [`Backoff`] policy to a [`Stream`]
///
/// After any [`Err`] is emitted, the stream is paused for [`Backoff::next_backoff`]. The
Expand Down Expand Up @@ -71,7 +72,7 @@ impl<S: TryStream, B: Backoff> Stream for StreamBackoff<S, B> {
let next_item = this.stream.try_poll_next(cx);
match &next_item {
Poll::Ready(Some(Err(_))) => {
if let Some(backoff_duration) = this.backoff.next_backoff() {
if let Some(backoff_duration) = this.backoff.next() {
let backoff_sleep = sleep(backoff_duration);
tracing::debug!(
deadline = ?backoff_sleep.deadline(),
Expand All @@ -98,16 +99,54 @@ impl<S: TryStream, B: Backoff> Stream for StreamBackoff<S, B> {
pub(crate) mod tests {
use std::{pin::pin, task::Poll, time::Duration};

use crate::utils::Backoff;

use super::StreamBackoff;
use backoff::backoff::Backoff;
use backon::BackoffBuilder;
use futures::{channel::mpsc, poll, stream, StreamExt};

pub struct ConstantBackoff {
inner: backon::ConstantBackoff,
delay: Duration,
max_times: usize,
}

impl ConstantBackoff {
pub fn new(delay: Duration, max_times: usize) -> Self {
Self {
inner: backon::ConstantBuilder::default()
.with_delay(delay)
.with_max_times(max_times)
.build(),
delay,
max_times,
}
}
}

impl Iterator for ConstantBackoff {
type Item = Duration;

fn next(&mut self) -> Option<Duration> {
self.inner.next()
}
}

impl Backoff for ConstantBackoff {
fn reset(&mut self) {
self.inner = backon::ConstantBuilder::default()
.with_delay(self.delay)
.with_max_times(self.max_times)
.build();
}
}

#[tokio::test]
async fn stream_should_back_off() {
tokio::time::pause();
let tick = Duration::from_secs(1);
let rx = stream::iter([Ok(0), Ok(1), Err(2), Ok(3), Ok(4)]);
let mut rx = pin!(StreamBackoff::new(rx, backoff::backoff::Constant::new(tick)));
let mut rx = pin!(StreamBackoff::new(rx, ConstantBackoff::new(tick, 10)));
assert_eq!(poll!(rx.next()), Poll::Ready(Some(Ok(0))));
assert_eq!(poll!(rx.next()), Poll::Ready(Some(Ok(1))));
assert_eq!(poll!(rx.next()), Poll::Ready(Some(Err(2))));
Expand Down Expand Up @@ -149,16 +188,27 @@ pub(crate) mod tests {
#[tokio::test]
async fn backoff_should_close_when_requested() {
assert_eq!(
StreamBackoff::new(
stream::iter([Ok(0), Ok(1), Err(2), Ok(3)]),
backoff::backoff::Stop {}
)
.collect::<Vec<_>>()
.await,
StreamBackoff::new(stream::iter([Ok(0), Ok(1), Err(2), Ok(3)]), StoppedBackoff {})
.collect::<Vec<_>>()
.await,
vec![Ok(0), Ok(1), Err(2)]
);
}

struct StoppedBackoff;

impl Backoff for StoppedBackoff {
fn reset(&mut self) {}
}

impl Iterator for StoppedBackoff {
type Item = Duration;

fn next(&mut self) -> Option<Duration> {
None
}
}

/// Dynamic backoff policy that is still deterministic and testable
pub struct LinearBackoff {
interval: Duration,
Expand All @@ -174,12 +224,16 @@ pub(crate) mod tests {
}
}

impl Backoff for LinearBackoff {
fn next_backoff(&mut self) -> Option<Duration> {
impl Iterator for LinearBackoff {
type Item = Duration;

fn next(&mut self) -> Option<Duration> {
self.current_duration += self.interval;
Some(self.current_duration)
}
}

impl Backoff for LinearBackoff {
fn reset(&mut self) {
self.current_duration = Duration::ZERO
}
Expand Down
6 changes: 4 additions & 2 deletions kube-runtime/src/utils/watch_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ use crate::{
};
use kube_client::Resource;

use crate::{reflector::store::Writer, utils::Reflect};
use crate::{
reflector::store::Writer,
utils::{Backoff, Reflect},
};

use crate::watcher::DefaultBackoff;
use backoff::backoff::Backoff;
use futures::{Stream, TryStream};

/// Extension trait for streams returned by [`watcher`](watcher()) or [`reflector`](crate::reflector::reflector)
Expand Down
Loading

0 comments on commit 53cc42e

Please sign in to comment.