Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: r concurrent handler #1295

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions R/extendr-wrappers.R
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ test_wrong_call_pl_lit <- function(robj) .Call(wrap__test_wrong_call_pl_lit, rob

test_robj_to_rchoice <- function(robj) .Call(wrap__test_robj_to_rchoice, robj)

test_threadcom_stack_size <- function() .Call(wrap__test_threadcom_stack_size)

concat_lf <- function(l, rechunk, parallel, to_supertypes) .Call(wrap__concat_lf, l, rechunk, parallel, to_supertypes)

concat_lf_diagonal <- function(l, rechunk, parallel, to_supertypes) .Call(wrap__concat_lf_diagonal, l, rechunk, parallel, to_supertypes)
Expand Down
2 changes: 1 addition & 1 deletion man/DataFrame_class.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion man/DataFrame_to_data_frame.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion man/DataFrame_to_list.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion man/LazyFrame_class.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion man/S3_as.data.frame.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion man/S3_as.vector.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion man/S3_as_arrow_table.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 12 additions & 2 deletions man/S3_as_nanoarrow_array_stream.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion man/S3_as_record_batch_reader.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions man/S3_infer_nanoarrow_schema.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion man/S3_knit_print.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion man/Series_class.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion man/Series_to_r.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

109 changes: 48 additions & 61 deletions src/rust/src/concurrent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,60 +89,51 @@ fn serve_r(rfsig: RFnSignature) -> Result<RFnOutput, Box<dyn std::error::Error>>
// which could call R from any spawned thread by polars. This function is a bridge between multithraedded polars
// and mostly single threaded only R
pub fn collect_with_r_func_support(lazy_df: pl::LazyFrame) -> RResult<RPolarsDataFrame> {
let new_df = if ThreadCom::try_from_global(&CONFIG).is_ok() {
#[cfg(feature = "rpolars_debug_print")]
println!("in collect: concurrent handler already started");
lazy_df.collect().map_err(polars_to_rpolars_err)
} else {
#[cfg(feature = "rpolars_debug_print")]
println!("in collect: starting a concurrent handler");
#[cfg(feature = "rpolars_debug_print")]
println!("in collect: concurrent handler done");
concurrent_handler(
// closure 1: spawned by main thread
// tc is a ThreadCom which any child thread can use to submit R jobs to main thread
move |tc| {
// get return value
let retval = lazy_df.collect();

#[cfg(feature = "rpolars_debug_print")]
println!("in collect: concurrent handler done");
concurrent_handler(
// closure 1: spawned by main thread
// tc is a ThreadCom which any child thread can use to submit R jobs to main thread
move |tc| {
// get return value
let retval = lazy_df.collect();
// drop the last two ThreadCom clones, signals to main/R-serving thread to shut down.
ThreadCom::kill_global(&CONFIG);
drop(tc);

// drop the last two ThreadCom clones, signals to main/R-serving thread to shut down.
ThreadCom::kill_global(&CONFIG);
drop(tc);
retval
},
// closure 2: how to serve polars worker R job request in main thread
serve_r,
//CONFIG is "global variable" where any new thread can request a clone of ThreadCom to establish contact with main thread
&CONFIG,
)
.map_err(|err| {
//THIS AN SIDE EFFECT, CLEAN UP THREADCOM IN CASE OF AN R ERROR.
ThreadCom::kill_global(&CONFIG);

retval
},
// closure 2: how to serve polars worker R job request in main thread
serve_r,
//CONFIG is "global variable" where any new thread can request a clone of ThreadCom to establish contact with main thread
&CONFIG,
)
.map_err(|err| RPolarsErr::new().plain(err.to_string()))?
.map_err(polars_to_rpolars_err)
};

//wrap ok
Ok(RPolarsDataFrame(new_df?))
// This is the error mapped
RPolarsErr::new().plain(err.to_string())
})? // propagate any R error
.map_err(polars_to_rpolars_err) // map any polars error
.map(RPolarsDataFrame)
}

pub fn profile_with_r_func_support(
lazy_df: pl::LazyFrame,
) -> RResult<(RPolarsDataFrame, RPolarsDataFrame)> {
if ThreadCom::try_from_global(&CONFIG).is_ok() {
lazy_df.profile()
} else {
concurrent_handler(
move |tc| {
let retval = lazy_df.profile();
ThreadCom::kill_global(&CONFIG);
drop(tc);
retval
},
serve_r,
&CONFIG,
)
.map_err(|err| RPolarsErr::new().plain(err.to_string()))?
}
concurrent_handler(
move |tc| {
let retval = lazy_df.profile();
ThreadCom::kill_global(&CONFIG);
drop(tc);
retval
},
serve_r,
&CONFIG,
)
.map_err(|err| RPolarsErr::new().plain(err.to_string()))?
.map_err(polars_to_rpolars_err)
.map(|(result_df, profile_df)| (RPolarsDataFrame(result_df), RPolarsDataFrame(profile_df)))
}
Expand All @@ -151,21 +142,17 @@ pub fn fetch_with_r_func_support(
lazy_df: pl::LazyFrame,
n_rows: usize,
) -> RResult<RPolarsDataFrame> {
if ThreadCom::try_from_global(&CONFIG).is_ok() {
lazy_df.fetch(n_rows)
} else {
concurrent_handler(
move |tc| {
let retval = lazy_df.fetch(n_rows);
ThreadCom::kill_global(&CONFIG);
drop(tc);
retval
},
serve_r,
&CONFIG,
)
.map_err(|err| RPolarsErr::new().plain(err.to_string()))?
}
concurrent_handler(
move |tc| {
let retval = lazy_df.fetch(n_rows);
ThreadCom::kill_global(&CONFIG);
drop(tc);
retval
},
serve_r,
&CONFIG,
)
.map_err(|err| RPolarsErr::new().plain(err.to_string()))?
.map_err(polars_to_rpolars_err)
.map(RPolarsDataFrame)
}
2 changes: 1 addition & 1 deletion src/rust/src/lazy/dsl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1980,7 +1980,7 @@ impl RPolarsExpr {
let par_fn = ParRObj(lambda);
let f = move |s: pl::Series| {
let thread_com = ThreadCom::try_from_global(&CONFIG)
.expect("polars was thread could not initiate ThreadCommunication to R");
.expect("polars thread could not initiate ThreadCom(munication) to R");
thread_com.send(RFnSignature::FnSeriesToSeries(par_fn.clone(), s));
let s = thread_com.recv().unwrap_series();
Ok(Some(s))
Expand Down
13 changes: 10 additions & 3 deletions src/rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,16 @@ pub use polars_core;
pub use smartstring;

use crate::concurrent::{RFnOutput, RFnSignature};
use crate::utils::extendr_concurrent::{InitCell, ThreadCom};
type ThreadComStorage = InitCell<std::sync::RwLock<Option<ThreadCom<RFnSignature, RFnOutput>>>>;
static CONFIG: ThreadComStorage = InitCell::new();
use crate::utils::extendr_concurrent::ThreadCom;

use once_cell::sync::Lazy;
use std::sync::RwLock;

//TODO verify reallocation of Vec<ThreadCom> does not mess something up. with_capcity(99)
// ensure at least no reallication before 99 levels of nested r-polars queries
// which is quite unlikely.
static CONFIG: Lazy<RwLock<Vec<ThreadCom<RFnSignature, RFnOutput>>>> =
Lazy::new(|| RwLock::new(Vec::with_capacity(99)));
pub use crate::rbackground::RBGPOOL;

// Macro to generate exports
Expand Down
7 changes: 7 additions & 0 deletions src/rust/src/rlib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,12 @@ fn test_robj_to_rchoice(robj: Robj) -> RResult<String> {
robj_to_rchoice(robj)
}

#[extendr]
fn test_threadcom_stack_size() -> i32 {
// can be used to check if threadcom are cleaned up after use
ThreadCom::get_global_threadcom_stack_size(&CONFIG) as i32
}

#[extendr]
fn fold(acc: Robj, lambda: Robj, exprs: Robj) -> RResult<RPolarsExpr> {
let par_fn = ParRObj(lambda);
Expand Down Expand Up @@ -427,4 +433,5 @@ extendr_module! {
fn test_robj_to_expr;
fn test_wrong_call_pl_lit;
fn test_robj_to_rchoice;
fn test_threadcom_stack_size;
}
Loading