Skip to content

Commit

Permalink
fix: not drop first queue member if drop handler (#416)
Browse files Browse the repository at this point in the history
Co-authored-by: eitsupi <[email protected]>
  • Loading branch information
sorhawell and eitsupi authored Oct 10, 2023
1 parent b64a700 commit 326c370
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 8 deletions.
14 changes: 11 additions & 3 deletions src/rust/src/rbackground.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,10 +320,18 @@ impl InnerRBackgroundPool {

fn release_handler(&mut self, mut handle: RBackgroundHandler) -> RResult<()> {
match (handle.proc.try_wait()?, self.queue.pop_front()) {
// 1a: too many active handlers. Kill it !
(_, _) if self.active > self.cap => {
// 1a-a: too many active handlers. Kill it ! + Put back first queue member in queue
(_, Some(first_queue_member)) if self.active > self.cap => {
#[cfg(feature = "rpolars_debug_print")]
println!("1a - too many kill it!");
println!("1a-a - ");
self.queue.push_front(first_queue_member);
self.destroy_handler(handle)
}

// 1a-a: too many active handlers and not needed. Kill it !
(_, None) if self.active > self.cap => {
#[cfg(feature = "rpolars_debug_print")]
println!("1a-b - ");
self.destroy_handler(handle)
}

Expand Down
41 changes: 36 additions & 5 deletions tests/testthat/test-rbackground.R
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
lf = pl$LazyFrame(data.frame(x = 1:10, y = 11:20))

test_that("Test collecting LazyFrame in background", {
skip_if_not(Sys.getenv("CI") == "true")
compute = lf$select(pl$col("x") * pl$col("y"))
res_bg = compute$collect_in_background()$join()
expect_equal(res_bg$to_data_frame(), compute$collect()$to_data_frame())
Expand All @@ -11,6 +12,7 @@ test_that("Test collecting LazyFrame in background", {
})

test_that("Test using $map() in background", {
skip_if_not(Sys.getenv("CI") == "true")
# change capacity
pl$set_options(rpool_cap = 0)
expect_equal(pl$options$rpool_cap, 0)
Expand Down Expand Up @@ -63,6 +65,7 @@ test_that("Test using $map() in background", {


test_that("reset rpool_cap", {
skip_if_not(Sys.getenv("CI") == "true")
pl$reset_options()
orig = pl$options$rpool_cap
pl$set_options(rpool_cap = orig + 1)
Expand All @@ -73,15 +76,43 @@ test_that("reset rpool_cap", {


test_that("rpool errors", {

ctx = pl$set_options(rpool_cap = c(1, 2)) |> get_err_ctx()
skip_if_not(Sys.getenv("CI") == "true")
ctx = pl$set_options(rpool_cap = c(1, 2)) |> get_err_ctx()
expect_identical(ctx$BadArgument, "rpool_cap")
expect_true(startsWith(ctx$TypeMismatch,"i64"))
expect_true(startsWith(ctx$TypeMismatch, "i64"))

ctx = pl$set_options(rpool_cap = -1) |> get_err_ctx()
expect_identical(ctx$ValueOutOfScope, "cannot be less than zero")

ctx = {polars_optenv$rpool_active <- 0} |> get_err_ctx()
expect_true(endsWith(ctx$PlainErrorMessage,"rpool_active cannot be set directly"))
ctx =
{
polars_optenv$rpool_active = 0
} |> get_err_ctx()
expect_true(endsWith(ctx$PlainErrorMessage, "rpool_active cannot be set directly"))
})

test_that("reduce cap and active while jobs in queue", {
skip_if_not(Sys.getenv("CI") == "true")
pl$set_options(rpool_cap = 0)
pl$set_options(rpool_cap = 3)
l_expr = lapply(1:5, \(i) {
pl$lit(i)$map(\(x) {
Sys.sleep(.4)
-i
}, in_background = TRUE)$alias(paste0("lit_", i))
})
lf = pl$LazyFrame()$select(l_expr)
handle = lf$collect(collect_in_background = TRUE)
Sys.sleep(.2)
pl$set_options(rpool_cap = 2)
Sys.sleep(.1)
pl$set_options(rpool_cap = 1)
df = handle$join()

expect_identical(
df$to_list(),
list(lit_1 = -1L, lit_2 = -2L, lit_3 = -3L, lit_4 = -4L, lit_5 = -5L)
)

pl$reset_options()
})

0 comments on commit 326c370

Please sign in to comment.