Skip to content

Commit

Permalink
add support for pl$concat(<LazyFrame>, . . . ) + add to_supertypes
Browse files Browse the repository at this point in the history
…auto casting (#407)

Co-authored-by: Etienne Bacher <[email protected]>
  • Loading branch information
sorhawell and etiennebacher authored Oct 9, 2023
1 parent 2d2aaa7 commit ec81870
Show file tree
Hide file tree
Showing 24 changed files with 761 additions and 154 deletions.
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

## What's changed

- `pl$concat()` now also supports `Series`, `Expr` and `LazyFrame` (#407).
- New method `$unnest()` for `LazyFrame` (#397).
- New method `$sample()` for `DataFrame` (#399).
- New method `$meta$tree_format()` to display an `Expr` as a tree (#401).
Expand Down
87 changes: 84 additions & 3 deletions R/dataframe__frame.R
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ DataFrame
#'
#' # custom schema
#' pl$DataFrame(iris, schema = list(Sepal.Length = pl$Float32, Species = pl$Utf8))

pl$DataFrame = function(..., make_names_unique = TRUE, schema = NULL) {
largs = unpack_list(...)

Expand Down Expand Up @@ -181,9 +180,9 @@ pl$DataFrame = function(..., make_names_unique = TRUE, schema = NULL) {
names(largs) = keys
lapply(seq_along(largs), \(x) {
varname = keys[x]
out <- pl$lit(largs[[x]])
out = pl$lit(largs[[x]])
if (!is.null(schema) && varname %in% names(schema)) {
out <- out$cast(schema[[varname]], strict = TRUE)
out = out$cast(schema[[varname]], strict = TRUE)
}
out$alias(varname)
}) |>
Expand Down Expand Up @@ -1033,6 +1032,88 @@ DataFrame_first = function() {
self$lazy()$first()$collect()
}


#' @title Number of chunks of the Series in a DataFrame
#' @description
#' Number of chunks (memory allocations) for all or first Series in a DataFrame.
#' @details
#' A DataFrame is a vector of Series. Each Series in rust-polars is a wrapper
#' around a ChunkedArray, which is like a virtual contiguous vector physically
#' backed by an ordered set of chunks. Each chunk of values has a contiguous
#' memory layout and is an arrow array. Arrow arrays are a fast, thread-safe and
#' cross-platform memory layout.
#'
#' In R, combining with `c()` or `rbind()` requires immediate vector re-allocation
#' to place vector values in contiguous memory. This is slow and memory consuming,
#' and it is why repeatedly appending to a vector in R is discouraged.
#'
#' In polars, when we concatenate or append to Series or DataFrame, the
#' re-allocation can be avoided or delayed by simply appending chunks to each
#' individual Series. However, if chunks become many and small or are misaligned
#' across Series, this can hurt the performance of subsequent operations.
#'
#' Most places in the polars api where chunking could occur, the user have to
#' typically actively opt-out by setting an argument `rechunk = FALSE`.
#'
#' @keywords DataFrame
#' @param strategy Either `"all"` or `"first"`. `"first"` only returns chunks
#' for the first Series.
#' @return A real vector of chunk counts per Series.
#' @seealso [`<DataFrame>$rechunk()`][DataFrame_rechunk]
#' @examples
#' # create DataFrame with misaligned chunks
#' df = pl$concat(
#' 1:10, # single chunk
#' pl$concat(1:5, 1:5, rechunk = FALSE, how = "vertical")$rename("b"), # two chunks
#' how = "horizontal"
#' )
#' df
#' df$n_chunks()
#'
#' # rechunk a chunked DataFrame
#' df$rechunk()$n_chunks()
#'
#' # rechunk is not an in-place operation
#' df$n_chunks()
#'
#' # The following toy example emulates the Series "chunkyness" in R. Here it a
#' # S3-classed list with same type of vectors and where have all relevant S3
#' # generics implemented to make behave as if it was a regular vector.
#' "+.chunked_vector" = \(x, y) structure(list(unlist(x) + unlist(y)), class = "chunked_vector")
#' print.chunked_vector = \(x, ...) print(unlist(x), ...)
#' c.chunked_vector = \(...) {
#' structure(do.call(c, lapply(list(...), unclass)), class = "chunked_vector")
#' }
#' rechunk = \(x) structure(unlist(x), class = "chunked_vector")
#' x = structure(list(1:4, 5L), class = "chunked_vector")
#' x
#' x + 5:1
#' lapply(x, tracemem) # trace chunks to verify no re-allocation
#' z = c(x, x)
#' z # looks like a plain vector
#' lapply(z, tracemem) # mem allocation in z are the same from x
#' str(z)
#' z = rechunk(z)
#' str(z)
DataFrame_n_chunks = function(strategy = "all") {
.pr$DataFrame$n_chunks(self, strategy) |>
unwrap("in n_chunks():")
}


#' @title Rechunk a DataFrame
#' @description Rechunking re-allocates any "chunked" memory allocations to
#' speed-up e.g. vectorized operations.
#' @inherit DataFrame_n_chunks details examples
#'
#' @keywords DataFrame
#' @return A DataFrame
#' @seealso [`<DataFrame>$n_chunks()`][DataFrame_n_chunks]
DataFrame_rechunk = function() {
.pr$DataFrame$rechunk(self)
}


#' @title Get the last row of the DataFrame.
#' @keywords DataFrame
#' @return A DataFrame with one row.
Expand Down
13 changes: 10 additions & 3 deletions R/error__rpolarserr.R
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,19 @@ bad_robj = function(r) {
.pr$RPolarsErr$new()$bad_robj(r)
}

Err_plain = function(x) {
Err(.pr$RPolarsErr$new()$plain(x))
Err_plain = function(...) {
Err(.pr$RPolarsErr$new()$plain(paste(..., collapse = " ")))
}

# short hand for extracting an error context in unit testing, will raise error if not an RPolarsErr
get_err_ctx = \(x) unwrap_err(result(x))$contexts()
get_err_ctx = \(x, select = NULL) {
ctx = unwrap_err(result(x))$contexts()
if (is.null(select)) {
ctx
} else {
ctx[[match.arg(select, names(ctx))]]
}
}


# wrapper to return Result
Expand Down
3 changes: 1 addition & 2 deletions R/expr__meta.R
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,8 @@ ExprMeta_is_regex_projection = function() {
#' @examples
#' my_expr = (pl$col("foo") * pl$col("bar"))$sum()$over(pl$col("ham")) / 2
#' my_expr$meta$tree_format()

ExprMeta_tree_format = function(return_as_string = FALSE) {
out <- .pr$Expr$meta_tree_format(self) |>
out = .pr$Expr$meta_tree_format(self) |>
unwrap("in $tree_format():")
if (isTRUE(return_as_string)) {
out
Expand Down
30 changes: 18 additions & 12 deletions R/extendr-wrappers.R
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,6 @@
#' @useDynLib polars, .registration = TRUE
NULL

rlazy_csv_reader <- function(path, sep, has_header, ignore_errors, skip_rows, n_rows, cache, overwrite_dtype, low_memory, comment_char, quote_char, null_values, infer_schema_length, skip_rows_after_header, encoding, row_count_name, row_count_offset, parse_dates) .Call(wrap__rlazy_csv_reader, path, sep, has_header, ignore_errors, skip_rows, n_rows, cache, overwrite_dtype, low_memory, comment_char, quote_char, null_values, infer_schema_length, skip_rows_after_header, encoding, row_count_name, row_count_offset, parse_dates)

import_arrow_ipc <- function(path, n_rows, cache, rechunk, row_name, row_count, memmap) .Call(wrap__import_arrow_ipc, path, n_rows, cache, rechunk, row_name, row_count, memmap)

new_from_parquet <- function(path, n_rows, cache, parallel, rechunk, row_name, row_count, low_memory) .Call(wrap__new_from_parquet, path, n_rows, cache, parallel, rechunk, row_name, row_count, low_memory)

concat_df <- function(vdf) .Call(wrap__concat_df, vdf)

hor_concat_df <- function(dfs) .Call(wrap__hor_concat_df, dfs)

diag_concat_df <- function(dfs) .Call(wrap__diag_concat_df, dfs)

min_exprs <- function(exprs) .Call(wrap__min_exprs, exprs)

max_exprs <- function(exprs) .Call(wrap__max_exprs, exprs)
Expand Down Expand Up @@ -75,6 +63,20 @@ test_wrong_call_pl_lit <- function(robj) .Call(wrap__test_wrong_call_pl_lit, rob

polars_features <- function() .Call(wrap__polars_features)

concat_lf <- function(l, rechunk, parallel, to_supertypes) .Call(wrap__concat_lf, l, rechunk, parallel, to_supertypes)

diag_concat_lf <- function(l, rechunk, parallel) .Call(wrap__diag_concat_lf, l, rechunk, parallel)

hor_concat_df <- function(l) .Call(wrap__hor_concat_df, l)

concat_series <- function(l, rechunk, to_supertypes) .Call(wrap__concat_series, l, rechunk, to_supertypes)

rlazy_csv_reader <- function(path, sep, has_header, ignore_errors, skip_rows, n_rows, cache, overwrite_dtype, low_memory, comment_char, quote_char, null_values, infer_schema_length, skip_rows_after_header, encoding, row_count_name, row_count_offset, parse_dates) .Call(wrap__rlazy_csv_reader, path, sep, has_header, ignore_errors, skip_rows, n_rows, cache, overwrite_dtype, low_memory, comment_char, quote_char, null_values, infer_schema_length, skip_rows_after_header, encoding, row_count_name, row_count_offset, parse_dates)

import_arrow_ipc <- function(path, n_rows, cache, rechunk, row_name, row_count, memmap) .Call(wrap__import_arrow_ipc, path, n_rows, cache, rechunk, row_name, row_count, memmap)

new_from_parquet <- function(path, n_rows, cache, parallel, rechunk, row_name, row_count, low_memory) .Call(wrap__new_from_parquet, path, n_rows, cache, parallel, rechunk, row_name, row_count, low_memory)

test_rpolarserr <- function() .Call(wrap__test_rpolarserr)

setup_renv <- function() .Call(wrap__setup_renv)
Expand Down Expand Up @@ -111,6 +113,10 @@ DataFrame <- new.env(parent = emptyenv())

DataFrame$shape <- function() .Call(wrap__DataFrame__shape, self)

DataFrame$n_chunks <- function(strategy) .Call(wrap__DataFrame__n_chunks, self, strategy)

DataFrame$rechunk <- function() .Call(wrap__DataFrame__rechunk, self)

DataFrame$clone_see_me_macro <- function() .Call(wrap__DataFrame__clone_see_me_macro, self)

DataFrame$default <- function() .Call(wrap__DataFrame__default)
Expand Down
139 changes: 110 additions & 29 deletions R/functions__eager.R
Original file line number Diff line number Diff line change
@@ -1,13 +1,24 @@
#' Concat polars objects
#' @name pl_concat
#' @param l list of DataFrame, or Series, LazyFrame or Expr
#' @param rechunk perform a rechunk at last
#' @param how choice of bind direction "vertical"(rbind) "horizontal"(cbind) "diagonal" diagonally
#' @param parallel BOOL default TRUE, only used for LazyFrames
#' @param ... Either individual unpacked args or args wrapped in list(). Args can
#' be eager as DataFrame, Series and R vectors, or lazy as LazyFrame and Expr.
#' The first element determines the output of `$concat()`: if the first element
#' is lazy, a LazyFrame is returned; otherwise, a DataFrame is returned (note
#' that if the first element is eager, all other elements have to be eager to
#' avoid implicit collect).
#' @param rechunk Perform a rechunk at last.
#' @param how Bind direction. Can be "vertical" (like `rbind()`), "horizontal"
#' (like `cbind()`), or "diagonal".
#' @param parallel Only used for LazyFrames. If `TRUE` (default), lazy
#' computations may be executed in parallel.
#' @param to_supertypes If `TRUE` (default), cast columns shared super types, if
#' any. For example, if we try to vertically concatenate two columns of types `i32`
#' and `f64`, the column of type `i32` will be cast to `f64` beforehand. This
#' argument is equivalent to the "_relaxed" operations in Python polars.
#'
#' @details
#' Categorical columns/Series must have been constructed while global string cache enabled
#' [`pl$enable_string_cache()`][pl_enable_string_cache]
#' Categorical columns/Series must have been constructed while global string
#' cache enabled. See [`pl$enable_string_cache()`][pl_enable_string_cache].
#'
#'
#' @return DataFrame, or Series, LazyFrame or Expr
Expand All @@ -23,7 +34,6 @@
#' })
#' pl$concat(l_ver, how = "vertical")
#'
#'
#' # horizontal
#' l_hor = lapply(1:10, function(i) {
#' l_internal = list(
Expand All @@ -34,44 +44,116 @@
#' pl$DataFrame(l_internal)
#' })
#' pl$concat(l_hor, how = "horizontal")
#'
#' # diagonal
#' pl$concat(l_hor, how = "diagonal")
#'
#' # if two columns don't share the same type, concat() will error unless we use
#' # `to_supertypes = TRUE`:
#' test = pl$DataFrame(x = 1L) # i32
#' test2 = pl$DataFrame(x = 1.0) #f64
#'
#' pl$concat(test, test2, to_supertypes = TRUE)
pl$concat = function(
l, # list of DataFrames or Series or lazyFrames or expr
..., # list of DataFrames or Series or lazyFrames or expr
rechunk = TRUE,
how = c("vertical", "horizontal", "diagonal"),
parallel = TRUE # not used yet
) {
parallel = TRUE,
to_supertypes = FALSE) {
# unpack arg list
l = unpack_list(..., skip_classes = "data.frame")

# nothing becomes NULL
if (length(l) == 0L) {
return(NULL)
}

## Check inputs
how = match.arg(how[1L], c("vertical", "horizontal", "diagonal"))
how_args = c("vertical", "horizontal", "diagonal") # , "vertical_relaxed", "diangonal_relaxed")

how = match.arg(how[1L], how_args) |>
result() |>
unwrap("in pl$concat()")

# dispatch on item class and how
first = l[[1L]]
result = pcase(
inherits(first, "DataFrame"),
eager = !inherits(first, "LazyFrame")
args_modified = names(as.list(sys.call()[-1L]))

# check not using any mixing of types which could lead to implicit collect
if (eager) {
for (i in seq_along(l)) {
if (inherits(l[[i]], c("LazyFrame", "Expr"))) {
.pr$RPolarsErr$new()$
plain("tip: explicitly collect lazy inputs first, e.g. pl$concat(dataframe, lazyframe$collect())")$
plain("LazyFrame or Expr not allowed if first arg is a DataFrame, to avoid implicit collect")$
bad_robj(l[[i]])$
bad_arg(paste("of those to concatenate, number", i)) |>
Err() |>
unwrap("in pl$concat()")
}
}
}

# dispatch on item class and how
Result_out = pcase(
how == "vertical" && (inherits(first, "Series") || is.vector(first)),
{
vdf = l_to_vdf(l)
pcase(
how == "vertical", concat_df(vdf),
how == "diagonal", diag_concat_df(vdf),
how == "horizontal", hor_concat_df(vdf),
or_else = stopf("Internal error")
)
if (any(args_modified %in% c("parallel"))) {
warning(
"in pl$concat(): argument `parallel` is not used when concatenating Series",
call. = FALSE
)
}
concat_series(l, rechunk, to_supertypes)
},
inherits(first, "Series"),
how == "vertical",
concat_lf(l, rechunk, parallel, to_supertypes),
how == "diagonal",
{
stopf("not implemented Series")
if (any(args_modified %in% c("to_supertypes"))) {
warning(
"Argument `to_supertypes` is not used when how=='diagonal'",
call. = FALSE
)
}
diag_concat_lf(l, rechunk, parallel)
},
inherits(first, "Expr"),
how == "horizontal" && !eager,
{
stopf("not implemented Expr")
Err_plain(
"how=='horizontal' is not supported for lazy (first element is LazyFrame).",
"Try e.g. <LazyFrame>$join() to get Lazy join or pl$concat(lf1$collect(), lf2, lf3).",
"to get a eager horizontal concatenation"
)
},
how == "horizontal",
{
if (any(args_modified %in% c("parallel", "to_supertypes"))) {
warning(
"Arguments `parallel`, `rechunk`, `eager` and `to_supertypes` are not used when how=='horizontal'",
call. = FALSE
)
}
hor_concat_df(l)
},

# TODO implement Series, Expr, Lazy etc
or_else = stopf(paste0("type of first list element: '", class(first), "' is not supported"))
or_else = Err_plain("internal error:", how, "not handled")
)

unwrap(result)
# convert back from lazy if eager
and_then(Result_out, \(x) {
pcase(
# run-time assertion for future changes
inherits(x, "DataFrame") && !eager, Err_plain("internal logical error in pl$concat()"),

# must collect as in rust side only lazy concat is implemented. Eager inputs are wrapped in
# lazy and then collected again. This does not mean any user input is collected.
inherits(x, "LazyFrame") && eager, Ok(x$collect()),
or_else = Ok(x)
)
}) |>
unwrap("in pl$concat()")
}


Expand Down Expand Up @@ -136,8 +218,7 @@ pl$date_range = function(
name = NULL, # : str | None = None,
time_unit = "us",
time_zone = NULL, # : str | None = None
explode = TRUE
) {
explode = TRUE) {
if (missing(end)) {
end = start
interval = "1h"
Expand Down
Loading

0 comments on commit ec81870

Please sign in to comment.