From 326c370abc88f23e0e7aa611363da98b72617784 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B8ren=20Havelund=20Welling?= Date: Wed, 11 Oct 2023 01:06:48 +0300 Subject: [PATCH] fix: not drop first queue member if drop handler (#416) Co-authored-by: eitsupi --- src/rust/src/rbackground.rs | 14 ++++++++--- tests/testthat/test-rbackground.R | 41 +++++++++++++++++++++++++++---- 2 files changed, 47 insertions(+), 8 deletions(-) diff --git a/src/rust/src/rbackground.rs b/src/rust/src/rbackground.rs index b3a69161f..234cfc1f4 100644 --- a/src/rust/src/rbackground.rs +++ b/src/rust/src/rbackground.rs @@ -320,10 +320,18 @@ impl InnerRBackgroundPool { fn release_handler(&mut self, mut handle: RBackgroundHandler) -> RResult<()> { match (handle.proc.try_wait()?, self.queue.pop_front()) { - // 1a: too many active handlers. Kill it ! - (_, _) if self.active > self.cap => { + // 1a-a: too many active handlers. Kill it ! + Put back first queue member in queue + (_, Some(first_queue_member)) if self.active > self.cap => { #[cfg(feature = "rpolars_debug_print")] - println!("1a - too many kill it!"); + println!("1a-a - "); + self.queue.push_front(first_queue_member); + self.destroy_handler(handle) + } + + // 1a-a: too many active handlers and not needed. Kill it ! + (_, None) if self.active > self.cap => { + #[cfg(feature = "rpolars_debug_print")] + println!("1a-b - "); self.destroy_handler(handle) } diff --git a/tests/testthat/test-rbackground.R b/tests/testthat/test-rbackground.R index 8e2a866c9..8c6c229ee 100644 --- a/tests/testthat/test-rbackground.R +++ b/tests/testthat/test-rbackground.R @@ -1,6 +1,7 @@ lf = pl$LazyFrame(data.frame(x = 1:10, y = 11:20)) test_that("Test collecting LazyFrame in background", { + skip_if_not(Sys.getenv("CI") == "true") compute = lf$select(pl$col("x") * pl$col("y")) res_bg = compute$collect_in_background()$join() expect_equal(res_bg$to_data_frame(), compute$collect()$to_data_frame()) @@ -11,6 +12,7 @@ test_that("Test collecting LazyFrame in background", { }) test_that("Test using $map() in background", { + skip_if_not(Sys.getenv("CI") == "true") # change capacity pl$set_options(rpool_cap = 0) expect_equal(pl$options$rpool_cap, 0) @@ -63,6 +65,7 @@ test_that("Test using $map() in background", { test_that("reset rpool_cap", { + skip_if_not(Sys.getenv("CI") == "true") pl$reset_options() orig = pl$options$rpool_cap pl$set_options(rpool_cap = orig + 1) @@ -73,15 +76,43 @@ test_that("reset rpool_cap", { test_that("rpool errors", { - - ctx = pl$set_options(rpool_cap = c(1, 2)) |> get_err_ctx() + skip_if_not(Sys.getenv("CI") == "true") + ctx = pl$set_options(rpool_cap = c(1, 2)) |> get_err_ctx() expect_identical(ctx$BadArgument, "rpool_cap") - expect_true(startsWith(ctx$TypeMismatch,"i64")) + expect_true(startsWith(ctx$TypeMismatch, "i64")) ctx = pl$set_options(rpool_cap = -1) |> get_err_ctx() expect_identical(ctx$ValueOutOfScope, "cannot be less than zero") - ctx = {polars_optenv$rpool_active <- 0} |> get_err_ctx() - expect_true(endsWith(ctx$PlainErrorMessage,"rpool_active cannot be set directly")) + ctx = + { + polars_optenv$rpool_active = 0 + } |> get_err_ctx() + expect_true(endsWith(ctx$PlainErrorMessage, "rpool_active cannot be set directly")) +}) + +test_that("reduce cap and active while jobs in queue", { + skip_if_not(Sys.getenv("CI") == "true") + pl$set_options(rpool_cap = 0) + pl$set_options(rpool_cap = 3) + l_expr = lapply(1:5, \(i) { + pl$lit(i)$map(\(x) { + Sys.sleep(.4) + -i + }, in_background = TRUE)$alias(paste0("lit_", i)) + }) + lf = pl$LazyFrame()$select(l_expr) + handle = lf$collect(collect_in_background = TRUE) + Sys.sleep(.2) + pl$set_options(rpool_cap = 2) + Sys.sleep(.1) + pl$set_options(rpool_cap = 1) + df = handle$join() + expect_identical( + df$to_list(), + list(lit_1 = -1L, lit_2 = -2L, lit_3 = -3L, lit_4 = -4L, lit_5 = -5L) + ) + + pl$reset_options() })