Skip to content

Commit

Permalink
Fix pop_queue_message_into_pending
Browse files Browse the repository at this point in the history
  • Loading branch information
co42 committed Oct 25, 2023
1 parent c3fbe25 commit 0c2a50c
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 0 deletions.
13 changes: 13 additions & 0 deletions kube-runtime/src/controller/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ where
let next_msg_poll = scheduler
.as_mut()
.hold_unless(|msg| !slots.contains_key(msg))
// .hold_unless(|msg| *this.is_ready_to_execute && !slots.contains_key(msg))
.poll_next_unpin(cx);
match next_msg_poll {
Poll::Ready(Some(msg)) => {
Expand Down Expand Up @@ -444,6 +445,18 @@ mod tests {
advance(Duration::from_secs(3)).await;
assert!(poll!(runner.as_mut()).is_pending());

// Send the third message again and check it's ran
sched_tx
.send(ScheduleRequest {
message: 3,
run_at: Instant::now(),
})
.await
.unwrap();
advance(Duration::from_secs(3)).await;
assert!(poll!(runner.as_mut()).is_pending());
assert_eq!(*count.lock().unwrap(), 4);

let (mut sched_tx, sched_rx) = mpsc::unbounded();
let mut runner = Box::pin(
Runner::new(scheduler(sched_rx), 1, |_| {
Expand Down
3 changes: 3 additions & 0 deletions kube-runtime/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ impl<'a, T: Hash + Eq + Clone, R> SchedulerProj<'a, T, R> {
pub fn pop_queue_message_into_pending(&mut self, cx: &mut Context<'_>) {
while let Poll::Ready(Some(msg)) = self.queue.poll_expired(cx) {
let msg = msg.into_inner();
self.scheduled.remove_entry(&msg).expect(
"Expired message was popped from the Scheduler queue, but was not in the metadata map",
);
self.pending.insert(msg);
}
}
Expand Down

0 comments on commit 0c2a50c

Please sign in to comment.