From 619d986bbe7ff969fda82c11b7d72b98e366c60e Mon Sep 17 00:00:00 2001 From: sorhawell Date: Sun, 8 Oct 2023 17:17:12 +0300 Subject: [PATCH 1/4] fix: not drop first queue member if drop handler --- src/rust/src/rbackground.rs | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/src/rust/src/rbackground.rs b/src/rust/src/rbackground.rs index 9c1f1a2cc..234cfc1f4 100644 --- a/src/rust/src/rbackground.rs +++ b/src/rust/src/rbackground.rs @@ -320,10 +320,18 @@ impl InnerRBackgroundPool { fn release_handler(&mut self, mut handle: RBackgroundHandler) -> RResult<()> { match (handle.proc.try_wait()?, self.queue.pop_front()) { - // 1a: too many active handlers. Kill it ! - (_, _) if self.active > self.cap => { + // 1a-a: too many active handlers. Kill it ! + Put back first queue member in queue + (_, Some(first_queue_member)) if self.active > self.cap => { #[cfg(feature = "rpolars_debug_print")] - println!("1a - too many kill it!"); + println!("1a-a - "); + self.queue.push_front(first_queue_member); + self.destroy_handler(handle) + } + + // 1a-a: too many active handlers and not needed. Kill it ! + (_, None) if self.active > self.cap => { + #[cfg(feature = "rpolars_debug_print")] + println!("1a-b - "); self.destroy_handler(handle) } @@ -520,10 +528,7 @@ pub fn set_global_rpool_cap(c: Robj) -> RResult<()> { #[extendr] pub fn get_global_rpool_cap() -> RResult { let pool_guard = RBGPOOL.0.lock()?; - Ok(list!( - active = pool_guard.active, - capacity = pool_guard.cap - )) + Ok(list!(active = pool_guard.active, capacity = pool_guard.cap)) } #[extendr] From 378f9c0cb5bb56de3e8ba12bd29608463bcdbac6 Mon Sep 17 00:00:00 2001 From: sorhawell Date: Sun, 8 Oct 2023 17:29:16 +0300 Subject: [PATCH 2/4] add test to check queue member is not lost again --- tests/testthat/test-rbackground.R | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/tests/testthat/test-rbackground.R b/tests/testthat/test-rbackground.R index 8e2a866c9..df9efe54d 100644 --- a/tests/testthat/test-rbackground.R +++ b/tests/testthat/test-rbackground.R @@ -85,3 +85,24 @@ test_that("rpool errors", { expect_true(endsWith(ctx$PlainErrorMessage,"rpool_active cannot be set directly")) }) + +test_that("reduce cap and active while jobs in queue",{ + pl$set_options(rpool_cap = 3) + l_expr = lapply(1:5,\(i) { + pl$lit(i)$map(\(x) {Sys.sleep(.4); -i}, in_background = TRUE)$alias(paste0("lit_",i)) + }) + lf = pl$LazyFrame()$select(l_expr) + handle = lf$collect(collect_in_background = TRUE) + Sys.sleep(.2) + pl$set_options(rpool_cap = 2) + Sys.sleep(.1) + pl$set_options(rpool_cap = 1) + df = handle$join() + + expect_identical( + df$to_list(), + list(lit_1 = -1L, lit_2 = -2L, lit_3 = -3L, lit_4 = -4L, lit_5 = -5L) + ) + + pl$reset_options() +} ) From ddb8b2056539d2594d7ae6f0f112a520df1e585a Mon Sep 17 00:00:00 2001 From: sorhawell Date: Sun, 8 Oct 2023 22:35:13 +0300 Subject: [PATCH 3/4] skip background process test if not env CI='true' --- tests/testthat/test-rbackground.R | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tests/testthat/test-rbackground.R b/tests/testthat/test-rbackground.R index df9efe54d..a412e8402 100644 --- a/tests/testthat/test-rbackground.R +++ b/tests/testthat/test-rbackground.R @@ -1,6 +1,7 @@ lf = pl$LazyFrame(data.frame(x = 1:10, y = 11:20)) test_that("Test collecting LazyFrame in background", { + skip_if_not(Sys.getenv("CI") == 'true') compute = lf$select(pl$col("x") * pl$col("y")) res_bg = compute$collect_in_background()$join() expect_equal(res_bg$to_data_frame(), compute$collect()$to_data_frame()) @@ -11,6 +12,7 @@ test_that("Test collecting LazyFrame in background", { }) test_that("Test using $map() in background", { + skip_if_not(Sys.getenv("CI") == 'true') # change capacity pl$set_options(rpool_cap = 0) expect_equal(pl$options$rpool_cap, 0) @@ -63,6 +65,7 @@ test_that("Test using $map() in background", { test_that("reset rpool_cap", { + skip_if_not(Sys.getenv("CI") == 'true') pl$reset_options() orig = pl$options$rpool_cap pl$set_options(rpool_cap = orig + 1) @@ -73,7 +76,7 @@ test_that("reset rpool_cap", { test_that("rpool errors", { - + skip_if_not(Sys.getenv("CI") == 'true') ctx = pl$set_options(rpool_cap = c(1, 2)) |> get_err_ctx() expect_identical(ctx$BadArgument, "rpool_cap") expect_true(startsWith(ctx$TypeMismatch,"i64")) @@ -87,6 +90,8 @@ test_that("rpool errors", { }) test_that("reduce cap and active while jobs in queue",{ + skip_if_not(Sys.getenv("CI") == 'true') + pl$set_options(rpool_cap = 0) pl$set_options(rpool_cap = 3) l_expr = lapply(1:5,\(i) { pl$lit(i)$map(\(x) {Sys.sleep(.4); -i}, in_background = TRUE)$alias(paste0("lit_",i)) From a66b6a92fa49d6c2677f2736ee1883300cdfe047 Mon Sep 17 00:00:00 2001 From: eitsupi Date: Mon, 9 Oct 2023 08:18:30 +0000 Subject: [PATCH 4/4] auto formatting --- tests/testthat/test-rbackground.R | 33 ++++++++++++++++++------------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/tests/testthat/test-rbackground.R b/tests/testthat/test-rbackground.R index a412e8402..8c6c229ee 100644 --- a/tests/testthat/test-rbackground.R +++ b/tests/testthat/test-rbackground.R @@ -1,7 +1,7 @@ lf = pl$LazyFrame(data.frame(x = 1:10, y = 11:20)) test_that("Test collecting LazyFrame in background", { - skip_if_not(Sys.getenv("CI") == 'true') + skip_if_not(Sys.getenv("CI") == "true") compute = lf$select(pl$col("x") * pl$col("y")) res_bg = compute$collect_in_background()$join() expect_equal(res_bg$to_data_frame(), compute$collect()$to_data_frame()) @@ -12,7 +12,7 @@ test_that("Test collecting LazyFrame in background", { }) test_that("Test using $map() in background", { - skip_if_not(Sys.getenv("CI") == 'true') + skip_if_not(Sys.getenv("CI") == "true") # change capacity pl$set_options(rpool_cap = 0) expect_equal(pl$options$rpool_cap, 0) @@ -65,7 +65,7 @@ test_that("Test using $map() in background", { test_that("reset rpool_cap", { - skip_if_not(Sys.getenv("CI") == 'true') + skip_if_not(Sys.getenv("CI") == "true") pl$reset_options() orig = pl$options$rpool_cap pl$set_options(rpool_cap = orig + 1) @@ -76,25 +76,30 @@ test_that("reset rpool_cap", { test_that("rpool errors", { - skip_if_not(Sys.getenv("CI") == 'true') - ctx = pl$set_options(rpool_cap = c(1, 2)) |> get_err_ctx() + skip_if_not(Sys.getenv("CI") == "true") + ctx = pl$set_options(rpool_cap = c(1, 2)) |> get_err_ctx() expect_identical(ctx$BadArgument, "rpool_cap") - expect_true(startsWith(ctx$TypeMismatch,"i64")) + expect_true(startsWith(ctx$TypeMismatch, "i64")) ctx = pl$set_options(rpool_cap = -1) |> get_err_ctx() expect_identical(ctx$ValueOutOfScope, "cannot be less than zero") - ctx = {polars_optenv$rpool_active <- 0} |> get_err_ctx() - expect_true(endsWith(ctx$PlainErrorMessage,"rpool_active cannot be set directly")) - + ctx = + { + polars_optenv$rpool_active = 0 + } |> get_err_ctx() + expect_true(endsWith(ctx$PlainErrorMessage, "rpool_active cannot be set directly")) }) -test_that("reduce cap and active while jobs in queue",{ - skip_if_not(Sys.getenv("CI") == 'true') +test_that("reduce cap and active while jobs in queue", { + skip_if_not(Sys.getenv("CI") == "true") pl$set_options(rpool_cap = 0) pl$set_options(rpool_cap = 3) - l_expr = lapply(1:5,\(i) { - pl$lit(i)$map(\(x) {Sys.sleep(.4); -i}, in_background = TRUE)$alias(paste0("lit_",i)) + l_expr = lapply(1:5, \(i) { + pl$lit(i)$map(\(x) { + Sys.sleep(.4) + -i + }, in_background = TRUE)$alias(paste0("lit_", i)) }) lf = pl$LazyFrame()$select(l_expr) handle = lf$collect(collect_in_background = TRUE) @@ -110,4 +115,4 @@ test_that("reduce cap and active while jobs in queue",{ ) pl$reset_options() -} ) +})