From 6cc9dfbb81ff8f4185700d24873d34ab86973220 Mon Sep 17 00:00:00 2001 From: sorhawell Date: Sun, 10 Nov 2024 01:11:25 +0100 Subject: [PATCH 1/3] try_once_cell --- man/DataFrame_class.Rd | 2 +- man/DataFrame_to_data_frame.Rd | 2 +- man/DataFrame_to_list.Rd | 2 +- man/LazyFrame_class.Rd | 2 +- man/S3_as.data.frame.Rd | 2 +- man/S3_as.vector.Rd | 2 +- man/S3_as_arrow_table.Rd | 2 +- man/S3_as_nanoarrow_array_stream.Rd | 14 +++- man/S3_as_record_batch_reader.Rd | 2 +- man/S3_infer_nanoarrow_schema.Rd | 4 +- man/S3_knit_print.Rd | 2 +- man/Series_class.Rd | 2 +- man/Series_to_r.Rd | 2 +- src/rust/src/lib.rs | 11 +++- src/rust/src/utils/extendr_concurrent.rs | 81 +++++++++--------------- tests/testthat/_snaps/pkg-knitr.md | 6 +- tests/testthat/_snaps/pkg-knitr.new.md | 80 +++++++++++++++++++++++ 17 files changed, 145 insertions(+), 73 deletions(-) create mode 100644 tests/testthat/_snaps/pkg-knitr.new.md diff --git a/man/DataFrame_class.Rd b/man/DataFrame_class.Rd index 5b241516a..ab743e4ed 100644 --- a/man/DataFrame_class.Rd +++ b/man/DataFrame_class.Rd @@ -115,7 +115,7 @@ withr::with_timezone( ) \} ) -#> +#> withr::with_timezone( "America/New_York", diff --git a/man/DataFrame_to_data_frame.Rd b/man/DataFrame_to_data_frame.Rd index a166999f7..b0cf57ba3 100644 --- a/man/DataFrame_to_data_frame.Rd +++ b/man/DataFrame_to_data_frame.Rd @@ -58,7 +58,7 @@ withr::with_timezone( ) \} ) -#> +#> withr::with_timezone( "America/New_York", diff --git a/man/DataFrame_to_list.Rd b/man/DataFrame_to_list.Rd index 35f22b40e..dea830664 100644 --- a/man/DataFrame_to_list.Rd +++ b/man/DataFrame_to_list.Rd @@ -68,7 +68,7 @@ withr::with_timezone( ) \} ) -#> +#> withr::with_timezone( "America/New_York", diff --git a/man/LazyFrame_class.Rd b/man/LazyFrame_class.Rd index 37b1144fb..8bd25656c 100644 --- a/man/LazyFrame_class.Rd +++ b/man/LazyFrame_class.Rd @@ -85,7 +85,7 @@ withr::with_timezone( ) \} ) -#> +#> withr::with_timezone( "America/New_York", diff --git a/man/S3_as.data.frame.Rd b/man/S3_as.data.frame.Rd index 299859831..cc8585180 100644 --- a/man/S3_as.data.frame.Rd +++ b/man/S3_as.data.frame.Rd @@ -109,7 +109,7 @@ withr::with_timezone( ) \} ) -#> +#> withr::with_timezone( "America/New_York", diff --git a/man/S3_as.vector.Rd b/man/S3_as.vector.Rd index dea4207d7..fb8163b71 100644 --- a/man/S3_as.vector.Rd +++ b/man/S3_as.vector.Rd @@ -45,7 +45,7 @@ withr::with_timezone( ) \} ) -#> +#> withr::with_timezone( "America/New_York", diff --git a/man/S3_as_arrow_table.Rd b/man/S3_as_arrow_table.Rd index 4e9b3bfcc..a6919146a 100644 --- a/man/S3_as_arrow_table.Rd +++ b/man/S3_as_arrow_table.Rd @@ -4,7 +4,7 @@ \alias{as_arrow_table.RPolarsDataFrame} \title{Create a arrow Table from a Polars object} \usage{ -\method{as_arrow_table}{RPolarsDataFrame}(x, ..., compat_level = FALSE) +as_arrow_table.RPolarsDataFrame(x, ..., compat_level = FALSE) } \arguments{ \item{x}{\link[=DataFrame_class]{A Polars DataFrame}} diff --git a/man/S3_as_nanoarrow_array_stream.Rd b/man/S3_as_nanoarrow_array_stream.Rd index 6a0334929..45e69aa6b 100644 --- a/man/S3_as_nanoarrow_array_stream.Rd +++ b/man/S3_as_nanoarrow_array_stream.Rd @@ -5,9 +5,19 @@ \alias{as_nanoarrow_array_stream.RPolarsSeries} \title{Create a nanoarrow_array_stream from a Polars object} \usage{ -\method{as_nanoarrow_array_stream}{RPolarsDataFrame}(x, ..., schema = NULL, compat_level = FALSE) +as_nanoarrow_array_stream.RPolarsDataFrame( + x, + ..., + schema = NULL, + compat_level = FALSE +) -\method{as_nanoarrow_array_stream}{RPolarsSeries}(x, ..., schema = NULL, compat_level = FALSE) +as_nanoarrow_array_stream.RPolarsSeries( + x, + ..., + schema = NULL, + compat_level = FALSE +) } \arguments{ \item{x}{A polars object} diff --git a/man/S3_as_record_batch_reader.Rd b/man/S3_as_record_batch_reader.Rd index 1731f12de..5d22e8d56 100644 --- a/man/S3_as_record_batch_reader.Rd +++ b/man/S3_as_record_batch_reader.Rd @@ -4,7 +4,7 @@ \alias{as_record_batch_reader.RPolarsDataFrame} \title{Create a arrow RecordBatchReader from a Polars object} \usage{ -\method{as_record_batch_reader}{RPolarsDataFrame}(x, ..., compat_level = FALSE) +as_record_batch_reader.RPolarsDataFrame(x, ..., compat_level = FALSE) } \arguments{ \item{x}{\link[=DataFrame_class]{A Polars DataFrame}} diff --git a/man/S3_infer_nanoarrow_schema.Rd b/man/S3_infer_nanoarrow_schema.Rd index 00932ed8d..a0d21ae2a 100644 --- a/man/S3_infer_nanoarrow_schema.Rd +++ b/man/S3_infer_nanoarrow_schema.Rd @@ -5,9 +5,9 @@ \alias{infer_nanoarrow_schema.RPolarsSeries} \title{Infer nanoarrow schema from a Polars object} \usage{ -\method{infer_nanoarrow_schema}{RPolarsDataFrame}(x, ..., compat_level = FALSE) +infer_nanoarrow_schema.RPolarsDataFrame(x, ..., compat_level = FALSE) -\method{infer_nanoarrow_schema}{RPolarsSeries}(x, ..., compat_level = FALSE) +infer_nanoarrow_schema.RPolarsSeries(x, ..., compat_level = FALSE) } \arguments{ \item{x}{A polars object} diff --git a/man/S3_knit_print.Rd b/man/S3_knit_print.Rd index 1f46ad155..4cfa87bb2 100644 --- a/man/S3_knit_print.Rd +++ b/man/S3_knit_print.Rd @@ -4,7 +4,7 @@ \alias{knit_print.RPolarsDataFrame} \title{knit print polars DataFrame} \usage{ -\method{knit_print}{RPolarsDataFrame}(x, ...) +knit_print.RPolarsDataFrame(x, ...) } \arguments{ \item{x}{a polars DataFrame to knit_print} diff --git a/man/Series_class.Rd b/man/Series_class.Rd index bfb719b7d..73685d2cb 100644 --- a/man/Series_class.Rd +++ b/man/Series_class.Rd @@ -142,7 +142,7 @@ withr::with_timezone( ) \} ) -#> +#> withr::with_timezone( "America/New_York", diff --git a/man/Series_to_r.Rd b/man/Series_to_r.Rd index d77ee5fae..37f2763cd 100644 --- a/man/Series_to_r.Rd +++ b/man/Series_to_r.Rd @@ -61,7 +61,7 @@ withr::with_timezone( ) \} ) -#> +#> withr::with_timezone( "America/New_York", diff --git a/src/rust/src/lib.rs b/src/rust/src/lib.rs index 54a191ec1..899238b69 100644 --- a/src/rust/src/lib.rs +++ b/src/rust/src/lib.rs @@ -34,9 +34,14 @@ pub use polars_core; pub use smartstring; use crate::concurrent::{RFnOutput, RFnSignature}; -use crate::utils::extendr_concurrent::{InitCell, ThreadCom}; -type ThreadComStorage = InitCell>>>; -static CONFIG: ThreadComStorage = InitCell::new(); +use crate::utils::extendr_concurrent::ThreadCom; +type ThreadComStorage = Lazy>>>; + +use once_cell::sync::Lazy; +use std::sync::RwLock; + +static CONFIG: Lazy>>> = + Lazy::new(|| RwLock::new(None)); pub use crate::rbackground::RBGPOOL; // Macro to generate exports diff --git a/src/rust/src/utils/extendr_concurrent.rs b/src/rust/src/utils/extendr_concurrent.rs index 7ce95ab8d..e5f4f3be3 100644 --- a/src/rust/src/utils/extendr_concurrent.rs +++ b/src/rust/src/utils/extendr_concurrent.rs @@ -1,13 +1,12 @@ use extendr_api::prelude::*; //use std::sync::mpsc::{Receiver, Sender}; +use once_cell::sync::Lazy; use std::sync::RwLock; use std::thread; use flume::{Receiver, Sender}; -pub use state::InitCell; - //shamelessly make Robj send + sync //no crashes so far for the 'data'-SEXPS as Vectors, lists, pairlists //mainly for debug should not be used in production @@ -96,67 +95,45 @@ where .expect("thread failed recieve, likely a user interrupt") } - pub fn update_global(&self, conf: &InitCell>>>) - where - S: Send, - R: Send, - { - //set or update global thread_com - let conf_status = conf.set(RwLock::new(Some(self.clone()))); - //dbg!(conf_status); - if !conf_status { - let mut gtc = conf - .get() - .write() - .expect("failed to modify GLOBAL therad_com"); - *gtc = Some(self.clone()); - } + // Update the global state with the current ThreadCom instance + pub fn update_global(&self, conf: &Lazy>>>) { + let mut global = conf + .write() + .expect("Failed to acquire write lock on global ThreadCom"); + *global = Some(self.clone()); } - pub fn kill_global(conf: &InitCell>>>) { - let mut val = conf - .get() + // Clear the global state + pub fn kill_global(conf: &Lazy>>>) { + let mut global = conf .write() - .expect("another thread crashed while touching CONFIG"); - *val = None; + .expect("Failed to acquire write lock on global ThreadCom"); + *global = None; } - pub fn from_global(config: &InitCell>>>) -> Self - where - S: Send, - R: Send, - { - let thread_com = config - .get() + // Retrieve a copy of ThreadCom from the global state, panics if uninitialized + pub fn from_global(conf: &Lazy>>>) -> Self { + let global = conf .read() - .expect("failded to restore thread_com") + .expect("Failed to acquire read lock on global ThreadCom"); + global .as_ref() - .unwrap() - .clone(); - - thread_com + .expect("Global ThreadCom is uninitialized") + .clone() } + // Try to retrieve ThreadCom from the global state; returns an error if uninitialized pub fn try_from_global( - config: &InitCell>>>, - ) -> std::result::Result - where - S: Send, - R: Send, - { - let lock = config.try_get(); - let inner_lock = lock - .ok_or("internal error: global ThreadCom storage has not already been initialized")? + conf: &Lazy>>>, + ) -> std::result::Result { + let global = conf .read() - .expect("internal error: RwLock was poisoned (some other thread used the RwLock but panicked)"); + .expect("Failed to acquire read lock on global ThreadCom"); - let opt_thread_com = inner_lock.as_ref(); - - let thread_com = opt_thread_com - .ok_or("Global ThreadCom storage is empty")? - .clone(); - - Ok(thread_com) + match global.as_ref() { + Some(thread_com) => Ok(thread_com.clone()), + None => Err("Global ThreadCom is empty".to_string()), + } } } @@ -180,7 +157,7 @@ pub fn concurrent_handler( f: F, //y: Y, i: I, - conf: &InitCell>>>, + conf: &Lazy>>>, ) -> std::result::Result> where F: FnOnce(ThreadCom) -> T + Send + 'static, diff --git a/tests/testthat/_snaps/pkg-knitr.md b/tests/testthat/_snaps/pkg-knitr.md index 9fbea81cd..f26cd47e2 100644 --- a/tests/testthat/_snaps/pkg-knitr.md +++ b/tests/testthat/_snaps/pkg-knitr.md @@ -24,7 +24,7 @@ white-space: pre-wrap; } - shape: (3, 2)
ab
i32str
1"a"
2"b"
3"c"
+ shape: (3, 2)
ab
i32str
1"a"
2"b"
3"c"
``` --- @@ -53,7 +53,7 @@ white-space: pre-wrap; } - shape: (3, 2)
ab
i32str
1"a"
2"b"
3"c"
+ shape: (3, 2)
ab
i32str
1"a"
2"b"
3"c"
``` --- @@ -74,7 +74,7 @@ white-space: pre-wrap; } - shape: (3, 2)
ab
i32str
1"a"
2"b"
3"c"
+ shape: (3, 2)
ab
i32str
1"a"
2"b"
3"c"
--- diff --git a/tests/testthat/_snaps/pkg-knitr.new.md b/tests/testthat/_snaps/pkg-knitr.new.md new file mode 100644 index 000000000..7caab78a9 --- /dev/null +++ b/tests/testthat/_snaps/pkg-knitr.new.md @@ -0,0 +1,80 @@ +# Snapshot test of knitr + + Code + .knit_file("dataframe.Rmd") + Output + --- + output: + github_document: + df_print: kable + html_preview: false + --- + + + ``` r + df = data.frame(a = 1:3, b = letters[1:3]) + as_polars_df(df) + ``` + + ``` + ## Warning in raw_block(x, "html", ...): raw_block() requires Pandoc >= 2.0.0 + ``` + + ```{=html} +
+ shape: (3, 2)
ab
i32str
1"a"
2"b"
3"c"
+ ``` + +--- + + Code + .knit_file("dataframe.Rmd") + Output + --- + output: + github_document: + df_print: kable + html_preview: false + --- + + + ``` r + df = data.frame(a = 1:3, b = letters[1:3]) + as_polars_df(df) + ``` + + ``` + ## Warning in raw_block(x, "html", ...): raw_block() requires Pandoc >= 2.0.0 + ``` + + ```{=html} +
+ shape: (3, 2)
ab
i32str
1"a"
2"b"
3"c"
+ ``` + +# to_html_table + + Code + to_html_table(mtcars, 3, 3) + Output + [1] "
\nshape: (32, 11)
mpgcarb
dbldbl
214
21.42
21.42
" + +--- + + Code + to_html_table(mtcars) + Output + [1] "
\nshape: (32, 11)
mpgcyldisphpdratwtqsecvsamgearcarb
dbldbldbldbldbldbldbldbldbldbldbl
2161601103.92.6216.460144
2161601103.92.87517.020144
22.84108933.852.3218.611141
21.462581103.083.21519.441031
18.783601753.153.4417.020032
18.162251052.763.4620.221031
14.383602453.213.5715.840034
24.44146.7623.693.19201042
22.84140.8953.923.1522.91042
19.26167.61233.923.4418.31044
17.86167.61233.923.4418.91044
16.48275.81803.074.0717.40033
17.38275.81803.073.7317.60033
15.28275.81803.073.78180033
10.484722052.935.2517.980034
10.4846021535.42417.820034
14.784402303.235.34517.420034
32.4478.7664.082.219.471141
30.4475.7524.931.61518.521142
33.9471.1654.221.83519.91141
21.54120.1973.72.46520.011031
15.583181502.763.5216.870032
15.283041503.153.43517.30032
13.383502453.733.8415.410034
19.284001753.083.84517.050032
27.3479664.081.93518.91141
264120.3914.432.1416.70152
30.4495.11133.771.51316.91152
15.883512644.223.1714.50154
19.761451753.622.7715.50156
1583013353.543.5714.60158
21.441211094.112.7818.61142
" + From 56db9a0b7afe0d3661625c80a7ef2896ac29bd0f Mon Sep 17 00:00:00 2001 From: Soren H Welling Date: Thu, 21 Nov 2024 23:19:31 +0100 Subject: [PATCH 2/3] fix concurrent bugs --- src/rust/src/concurrent.rs | 105 ++++++++++------------- src/rust/src/lib.rs | 8 +- src/rust/src/utils/extendr_concurrent.rs | 24 +++--- tests/testthat/test-concurrent.R | 61 +++++++++++++ 4 files changed, 123 insertions(+), 75 deletions(-) create mode 100644 tests/testthat/test-concurrent.R diff --git a/src/rust/src/concurrent.rs b/src/rust/src/concurrent.rs index 56a1bc01d..236f28c54 100644 --- a/src/rust/src/concurrent.rs +++ b/src/rust/src/concurrent.rs @@ -89,60 +89,45 @@ fn serve_r(rfsig: RFnSignature) -> Result> // which could call R from any spawned thread by polars. This function is a bridge between multithraedded polars // and mostly single threaded only R pub fn collect_with_r_func_support(lazy_df: pl::LazyFrame) -> RResult { - let new_df = if ThreadCom::try_from_global(&CONFIG).is_ok() { - #[cfg(feature = "rpolars_debug_print")] - println!("in collect: concurrent handler already started"); - lazy_df.collect().map_err(polars_to_rpolars_err) - } else { - #[cfg(feature = "rpolars_debug_print")] - println!("in collect: starting a concurrent handler"); + #[cfg(feature = "rpolars_debug_print")] + println!("in collect: concurrent handler done"); + concurrent_handler( + // closure 1: spawned by main thread + // tc is a ThreadCom which any child thread can use to submit R jobs to main thread + move |tc| { + // get return value + let retval = lazy_df.collect(); - #[cfg(feature = "rpolars_debug_print")] - println!("in collect: concurrent handler done"); - concurrent_handler( - // closure 1: spawned by main thread - // tc is a ThreadCom which any child thread can use to submit R jobs to main thread - move |tc| { - // get return value - let retval = lazy_df.collect(); + // drop the last two ThreadCom clones, signals to main/R-serving thread to shut down. + ThreadCom::kill_global(&CONFIG); + drop(tc); - // drop the last two ThreadCom clones, signals to main/R-serving thread to shut down. - ThreadCom::kill_global(&CONFIG); - drop(tc); - - retval - }, - // closure 2: how to serve polars worker R job request in main thread - serve_r, - //CONFIG is "global variable" where any new thread can request a clone of ThreadCom to establish contact with main thread - &CONFIG, - ) - .map_err(|err| RPolarsErr::new().plain(err.to_string()))? - .map_err(polars_to_rpolars_err) - }; - - //wrap ok - Ok(RPolarsDataFrame(new_df?)) + retval + }, + // closure 2: how to serve polars worker R job request in main thread + serve_r, + //CONFIG is "global variable" where any new thread can request a clone of ThreadCom to establish contact with main thread + &CONFIG, + ) + .map_err(|err| RPolarsErr::new().plain(err.to_string()))? + .map_err(polars_to_rpolars_err) + .map(RPolarsDataFrame) } pub fn profile_with_r_func_support( lazy_df: pl::LazyFrame, ) -> RResult<(RPolarsDataFrame, RPolarsDataFrame)> { - if ThreadCom::try_from_global(&CONFIG).is_ok() { - lazy_df.profile() - } else { - concurrent_handler( - move |tc| { - let retval = lazy_df.profile(); - ThreadCom::kill_global(&CONFIG); - drop(tc); - retval - }, - serve_r, - &CONFIG, - ) - .map_err(|err| RPolarsErr::new().plain(err.to_string()))? - } + concurrent_handler( + move |tc| { + let retval = lazy_df.profile(); + ThreadCom::kill_global(&CONFIG); + drop(tc); + retval + }, + serve_r, + &CONFIG, + ) + .map_err(|err| RPolarsErr::new().plain(err.to_string()))? .map_err(polars_to_rpolars_err) .map(|(result_df, profile_df)| (RPolarsDataFrame(result_df), RPolarsDataFrame(profile_df))) } @@ -151,21 +136,17 @@ pub fn fetch_with_r_func_support( lazy_df: pl::LazyFrame, n_rows: usize, ) -> RResult { - if ThreadCom::try_from_global(&CONFIG).is_ok() { - lazy_df.fetch(n_rows) - } else { - concurrent_handler( - move |tc| { - let retval = lazy_df.fetch(n_rows); - ThreadCom::kill_global(&CONFIG); - drop(tc); - retval - }, - serve_r, - &CONFIG, - ) - .map_err(|err| RPolarsErr::new().plain(err.to_string()))? - } + concurrent_handler( + move |tc| { + let retval = lazy_df.fetch(n_rows); + ThreadCom::kill_global(&CONFIG); + drop(tc); + retval + }, + serve_r, + &CONFIG, + ) + .map_err(|err| RPolarsErr::new().plain(err.to_string()))? .map_err(polars_to_rpolars_err) .map(RPolarsDataFrame) } diff --git a/src/rust/src/lib.rs b/src/rust/src/lib.rs index 899238b69..b0de883cd 100644 --- a/src/rust/src/lib.rs +++ b/src/rust/src/lib.rs @@ -35,13 +35,15 @@ pub use smartstring; use crate::concurrent::{RFnOutput, RFnSignature}; use crate::utils::extendr_concurrent::ThreadCom; -type ThreadComStorage = Lazy>>>; use once_cell::sync::Lazy; use std::sync::RwLock; -static CONFIG: Lazy>>> = - Lazy::new(|| RwLock::new(None)); +//TODO verify reallocation of Vec does not mess something up. with_capcity(99) +// ensure at least no reallication before 99 levels of nested r-polars queries +// which is quite unlikely. +static CONFIG: Lazy>>> = + Lazy::new(|| RwLock::new(Vec::with_capacity(99))); pub use crate::rbackground::RBGPOOL; // Macro to generate exports diff --git a/src/rust/src/utils/extendr_concurrent.rs b/src/rust/src/utils/extendr_concurrent.rs index e5f4f3be3..63f4a5a11 100644 --- a/src/rust/src/utils/extendr_concurrent.rs +++ b/src/rust/src/utils/extendr_concurrent.rs @@ -96,41 +96,45 @@ where } // Update the global state with the current ThreadCom instance - pub fn update_global(&self, conf: &Lazy>>>) { + pub fn update_global(&self, conf: &Lazy>>>) { let mut global = conf .write() .expect("Failed to acquire write lock on global ThreadCom"); - *global = Some(self.clone()); + global.push(self.clone()); + #[cfg(feature = "rpolars_debug_print")] + println!("threadcom stack is now {} levels deep.", global.len()) } // Clear the global state - pub fn kill_global(conf: &Lazy>>>) { + pub fn kill_global(conf: &Lazy>>>) { let mut global = conf .write() .expect("Failed to acquire write lock on global ThreadCom"); - *global = None; + let _old_tc = global + .pop() + .expect("at least one threadcom must have remained on stack"); } // Retrieve a copy of ThreadCom from the global state, panics if uninitialized - pub fn from_global(conf: &Lazy>>>) -> Self { + pub fn from_global(conf: &Lazy>>>) -> Self { let global = conf .read() .expect("Failed to acquire read lock on global ThreadCom"); global - .as_ref() + .last() .expect("Global ThreadCom is uninitialized") .clone() } - // Try to retrieve ThreadCom from the global state; returns an error if uninitialized + // Try to retrieve ThreadCom from the global state; returns an error if uninitialized. pub fn try_from_global( - conf: &Lazy>>>, + conf: &Lazy>>>, ) -> std::result::Result { let global = conf .read() .expect("Failed to acquire read lock on global ThreadCom"); - match global.as_ref() { + match global.last() { Some(thread_com) => Ok(thread_com.clone()), None => Err("Global ThreadCom is empty".to_string()), } @@ -157,7 +161,7 @@ pub fn concurrent_handler( f: F, //y: Y, i: I, - conf: &Lazy>>>, + conf: &Lazy>>>, ) -> std::result::Result> where F: FnOnce(ThreadCom) -> T + Send + 'static, diff --git a/tests/testthat/test-concurrent.R b/tests/testthat/test-concurrent.R new file mode 100644 index 000000000..cf39e6fe4 --- /dev/null +++ b/tests/testthat/test-concurrent.R @@ -0,0 +1,61 @@ +# library(polars) +# library(testthat) +# polars_code_completion_activate() + + +test_that("concurrent recover on error", { + # cause an error + expect_error({ + df = as_polars_df(iris)$ + select( + pl$col("Sepal.Length")$ + map_batches(\(s) stop("this is a test error")) + ) + }) + + # causing an error in one map_batches do not block the concurrent handler + # in R session thereafter + + # can achieve unity + df = as_polars_df(iris)$ + select( + pl$col("Sepal.Length")$ + map_batches(\(s) s) + ) + expect_identical( + df$to_data_frame(), + iris[, 1, drop = FALSE] + ) +}) + +test_that("nested r-polars queries works", { + # cause an error + # s = as_polars_df(iris)$to_series(0) + + # compute two columns with map batches, which will be handled in two + # threads by polars. Each thread will call R, and each thread call R + # and create a new sub rpolars-query and there by putting a new + # concurrent handler on the handler-stack. When sub-query completes it will + # pop the handler stack and allow other previous handler to continue. + # TODO implement some diagnostic rust function and verify these claims. + + # happy path test of above claim + df = as_polars_df(iris)$ + select( + pl$col("Sepal.Length")$ + map_batches(\(s) { + # make a new rpolars query within a query + pl$DataFrame(s)$lazy()$collect()$to_series(0) * 2 + }), + pl$col("Sepal.Width")$ + map_batches(\(s) { + # make another nested rpolars query within a query + pl$DataFrame(s * 2)$lazy()$collect()$to_series(0) + }) + ) + + expect_identical( + df$to_data_frame(), + iris[, 1:2, drop = FALSE] * 2 + ) +}) From b7918a0f1180219ad8e2fd72ab8ae48e8e135cbc Mon Sep 17 00:00:00 2001 From: Soren H Welling Date: Sat, 23 Nov 2024 23:45:00 +0100 Subject: [PATCH 3/3] fix threadcom clean up + more unit tests --- R/extendr-wrappers.R | 2 + src/rust/src/concurrent.rs | 10 ++- src/rust/src/lazy/dsl.rs | 2 +- src/rust/src/rlib.rs | 7 ++ src/rust/src/utils/extendr_concurrent.rs | 11 ++- tests/testthat/test-concurrent.R | 102 +++++++++++++++++------ 6 files changed, 103 insertions(+), 31 deletions(-) diff --git a/R/extendr-wrappers.R b/R/extendr-wrappers.R index 22d82f063..fb0e253dd 100644 --- a/R/extendr-wrappers.R +++ b/R/extendr-wrappers.R @@ -84,6 +84,8 @@ test_wrong_call_pl_lit <- function(robj) .Call(wrap__test_wrong_call_pl_lit, rob test_robj_to_rchoice <- function(robj) .Call(wrap__test_robj_to_rchoice, robj) +test_threadcom_stack_size <- function() .Call(wrap__test_threadcom_stack_size) + concat_lf <- function(l, rechunk, parallel, to_supertypes) .Call(wrap__concat_lf, l, rechunk, parallel, to_supertypes) concat_lf_diagonal <- function(l, rechunk, parallel, to_supertypes) .Call(wrap__concat_lf_diagonal, l, rechunk, parallel, to_supertypes) diff --git a/src/rust/src/concurrent.rs b/src/rust/src/concurrent.rs index 236f28c54..b41ed6c22 100644 --- a/src/rust/src/concurrent.rs +++ b/src/rust/src/concurrent.rs @@ -109,8 +109,14 @@ pub fn collect_with_r_func_support(lazy_df: pl::LazyFrame) -> RResult RResult { robj_to_rchoice(robj) } +#[extendr] +fn test_threadcom_stack_size() -> i32 { + // can be used to check if threadcom are cleaned up after use + ThreadCom::get_global_threadcom_stack_size(&CONFIG) as i32 +} + #[extendr] fn fold(acc: Robj, lambda: Robj, exprs: Robj) -> RResult { let par_fn = ParRObj(lambda); @@ -427,4 +433,5 @@ extendr_module! { fn test_robj_to_expr; fn test_wrong_call_pl_lit; fn test_robj_to_rchoice; + fn test_threadcom_stack_size; } diff --git a/src/rust/src/utils/extendr_concurrent.rs b/src/rust/src/utils/extendr_concurrent.rs index 63f4a5a11..5f875d6d0 100644 --- a/src/rust/src/utils/extendr_concurrent.rs +++ b/src/rust/src/utils/extendr_concurrent.rs @@ -139,6 +139,14 @@ where None => Err("Global ThreadCom is empty".to_string()), } } + + // only for testing + pub fn get_global_threadcom_stack_size(conf: &Lazy>>>) -> usize { + let global = conf + .read() + .expect("Failed to acquire read lock on global ThreadCom"); + global.len() + } } // //debug threads @@ -202,7 +210,8 @@ where let answer = i(s); //handle requst with i closure let a = answer.map_err(|err| format!("user function raised an error: {:?} \n", err))?; - c_tx.send(a).unwrap(); + c_tx.send(a) + .expect("failed to result back to polars thread"); } else if let Err(recv_err) = any_new_msg { #[cfg(feature = "rpolars_debug_print")] dbg!(&recv_err); diff --git a/tests/testthat/test-concurrent.R b/tests/testthat/test-concurrent.R index cf39e6fe4..69fb34762 100644 --- a/tests/testthat/test-concurrent.R +++ b/tests/testthat/test-concurrent.R @@ -1,10 +1,9 @@ -# library(polars) -# library(testthat) -# polars_code_completion_activate() +test_that("R handler recovers from an error", { + # Get previous thradcom stacksize, for comparison. Should not increase. + threadcom_stack_size_before = polars:::test_threadcom_stack_size() -test_that("concurrent recover on error", { - # cause an error + # Cause an error in user function. This must be cleaned up. expect_error({ df = as_polars_df(iris)$ select( @@ -13,10 +12,14 @@ test_that("concurrent recover on error", { ) }) - # causing an error in one map_batches do not block the concurrent handler - # in R session thereafter + # Check R handler threadcom stack size is restored + expect_identical( + polars:::test_threadcom_stack_size(), + threadcom_stack_size_before + ) - # can achieve unity + # Causing an error in one map_batches does block the concurrent handler + # in R session thereafter df = as_polars_df(iris)$ select( pl$col("Sepal.Length")$ @@ -28,34 +31,79 @@ test_that("concurrent recover on error", { ) }) -test_that("nested r-polars queries works", { - # cause an error - # s = as_polars_df(iris)$to_series(0) +test_that("nested r-polars queries with nested errors works", { - # compute two columns with map batches, which will be handled in two - # threads by polars. Each thread will call R, and each thread call R - # and create a new sub rpolars-query and there by putting a new - # concurrent handler on the handler-stack. When sub-query completes it will - # pop the handler stack and allow other previous handler to continue. - # TODO implement some diagnostic rust function and verify these claims. + # Get previous thradcom stacksize, for comparison. Should not increase. + threadcom_stack_size_before = polars:::test_threadcom_stack_size() # happy path test of above claim df = as_polars_df(iris)$ select( - pl$col("Sepal.Length")$ - map_batches(\(s) { - # make a new rpolars query within a query - pl$DataFrame(s)$lazy()$collect()$to_series(0) * 2 - }), - pl$col("Sepal.Width")$ + pl$col("Sepal.Length")$ + map_batches(\(s) { + # make a new rpolars subquery within a query + pl$DataFrame(s)$lazy()$collect()$to_series(0) * 2 + }), + pl$col("Sepal.Width")$ + map_batches(\(s) { + + # make another sub-query with a polars err + pl$DataFrame(s * 2)$ + lazy()$ + select( + pl$col("this column is not there, will cause polars") + )$ + collect()$ + to_series(0) |> + polars:::result() |> + polars:::is_err() |> + stopifnot() # ensure there was a polars error + + # just return s * 2 + s * 2 + }) + ) + + expect_identical( + df$to_data_frame(), + iris[, 1:2, drop = FALSE] * 2 + ) + + + expect_identical( + polars:::test_threadcom_stack_size(), + threadcom_stack_size_before + ) +}) + +test_that("nested r-polars queries with r handling works", { + pdf = pl$ + LazyFrame(a = 1:5)$ + select( + pl$col("a")$ map_batches(\(s) { - # make another nested rpolars query within a query - pl$DataFrame(s * 2)$lazy()$collect()$to_series(0) + # while locking main R handler, start a new r-polars sub query + pl$DataFrame(s * 2)$lazy()$select( + # acquire R handler from sub query + # must use in_background = TRUE to avoid a dead-lock + pl$col("a")$map_batches(\(s) s / 2, in_background = TRUE)$alias("a1"), + pl$col("a")$map_elements(\(s) s / 2, in_background = TRUE)$alias("a2") + )$collect()$to_series(0) }) + )$ + collect() + + expect_identical( + pdf$to_data_frame(), + data.frame(a = (1:5) * 2 / 2) ) +}) +test_that("finally threadcom_stack_size should be zero", { + # if failing, something left an old ThreadCom on the stack. expect_identical( - df$to_data_frame(), - iris[, 1:2, drop = FALSE] * 2 + polars:::test_threadcom_stack_size(), + 0L ) }) +