Skip to content

Commit

Permalink
Merge pull request #131 from kvnkho/master
Browse files Browse the repository at this point in the history
limit with partition by
  • Loading branch information
kvnkho authored Jan 11, 2021
2 parents 250c5ab + 33391cd commit 9315ee0
Show file tree
Hide file tree
Showing 9 changed files with 353 additions and 8 deletions.
35 changes: 33 additions & 2 deletions fugue/execution/execution_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,9 +404,9 @@ def sample(
Sample dataframe by number of rows or by fraction
:param df: DataFrame
:param n: number of rows to sample, one and only one of ``n`` and ``fact``
:param n: number of rows to sample, one and only one of ``n`` and ``frac``
must be set
:param frac: fraction [0,1] to sample, one and only one of ``n`` and ``fact``
:param frac: fraction [0,1] to sample, one and only one of ``n`` and ``frac``
must be set
:param replace: whether replacement is allowed. With replacement,
there may be duplicated rows in the result, defaults to False
Expand All @@ -419,6 +419,37 @@ def sample(
"""
pass

@abstractmethod
def limit(
self,
df: DataFrame,
n: int,
presort: str,
na_position: str = "last",
partition_spec: PartitionSpec = EMPTY_PARTITION_SPEC,
metadata: Any = None,
) -> DataFrame: # pragma: no cover
"""
Get the first n rows of a DataFrame per partition. If a presort is defined,
use the presort before applying limit. presort overrides partition_spec.presort.
The Fugue implementation of the presort follows Pandas convention of specifying
NULLs first or NULLs last. This is different from the Spark and SQL convention
of NULLs as the smallest value.
:param df: DataFrame
:param n: number of rows to return
:param presort: presort expression similar to partition presort
:param na_position: position of null values during the presort.
can accept ``first`` or ``last``
:param partition_spec: PartitionSpec to apply the limit operation
:param metadata: dict-like object to add to the result dataframe,
defaults to None
:return: n rows of DataFrame per partition
:rtype: DataFrame
"""
pass

def zip(
self,
df1: DataFrame,
Expand Down
37 changes: 36 additions & 1 deletion fugue/execution/native_execution_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
EMPTY_PARTITION_SPEC,
PartitionCursor,
PartitionSpec,
_parse_presort_exp,
)
from fugue.dataframe import (
DataFrame,
Expand All @@ -25,7 +26,7 @@
from qpd_pandas.engine import PandasUtils
from sqlalchemy import create_engine
from triad.collections import Schema
from triad.collections.dict import ParamDict
from triad.collections.dict import ParamDict, IndexedOrderedDict
from triad.collections.fs import FileSystem
from triad.utils.assertion import assert_or_throw

Expand Down Expand Up @@ -279,6 +280,40 @@ def sample(
ValueError("one and only one of n and frac should be set"),
)
d = df.as_pandas().sample(n=n, frac=frac, replace=replace, random_state=seed)
return PandasDataFrame(d.reset_index(drop=True), df.schema, metadata)

def limit(
self,
df: DataFrame,
n: int,
presort: str,
na_position: str = "last",
partition_spec: PartitionSpec = EMPTY_PARTITION_SPEC,
metadata: Any = None,
) -> DataFrame:
assert_or_throw(
isinstance(n, int),
ValueError("n needs to be an integer"),
)
d = df.as_pandas()

# Use presort over partition_spec.presort if possible
if presort:
presort = _parse_presort_exp(presort)
_presort: IndexedOrderedDict = presort or partition_spec.presort

if len(_presort.keys()) > 0:
d = d.sort_values(
list(_presort.keys()),
ascending=list(_presort.values()),
na_position=na_position,
)

if len(partition_spec.partition_by) == 0:
d = d.head(n)
else:
d = d.groupby(by=partition_spec.partition_by, dropna=False).head(n)

return PandasDataFrame(
d.reset_index(drop=True), df.schema, metadata, pandas_df_wrapper=True
)
Expand Down
1 change: 1 addition & 0 deletions fugue/extensions/_builtins/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
DropColumns,
Dropna,
Fillna,
Limit,
Rename,
RunJoin,
RunSetOperation,
Expand Down
17 changes: 17 additions & 0 deletions fugue/extensions/_builtins/processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,23 @@ def process(self, dfs: DataFrames) -> DataFrame:
)


class Limit(Processor):
def process(self, dfs: DataFrames) -> DataFrame:
assert_or_throw(len(dfs) == 1, FugueWorkflowError("not single input"))
# All _get_or operations convert float to int
n = self.params.get_or_none("n", int)
presort = self.params.get_or_none("presort", str)
na_position = self.params.get("na_position", "last")
partition_spec = self.partition_spec
return self.execution_engine.limit(
dfs[0],
n,
presort=presort,
na_position=na_position,
partition_spec=partition_spec,
)


class SaveAndUse(Processor):
def process(self, dfs: DataFrames) -> DataFrame:
assert_or_throw(len(dfs) == 1, FugueWorkflowError("not single input"))
Expand Down
33 changes: 33 additions & 0 deletions fugue/workflow/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
DropColumns,
Dropna,
Fillna,
Limit,
Load,
Rename,
RunJoin,
Expand Down Expand Up @@ -609,6 +610,38 @@ def sample(
df = self.workflow.process(self, using=Sample, params=params)
return self._to_self_type(df)

def limit(self: TDF, n: int, presort: str = None, na_position: str = "last") -> TDF:
"""
Get the first n rows of a DataFrame per partition. If a presort is defined,
use the presort before applying limit. presort overrides partition_spec.presort
:param n: number of rows to return
:param presort: presort expression similar to partition presort
:param na_position: position of null values during the presort.
can accept ``first`` or ``last``
:return: n rows of DataFrame per partition
"""
params: Dict[str, Any] = dict()
params["n"] = n
# Note float is converted to int with triad _get_or
assert_or_throw(
isinstance(n, int),
ValueError("n needs to be an integer"),
)
assert_or_throw(
na_position in ("first", "last"),
ValueError("na_position must be either 'first' or 'last'"),
)
params["na_position"] = na_position
if presort is not None:
params["presort"] = presort

df = self.workflow.process(
self, using=Limit, pre_partition=self.partition_spec, params=params
)
return self._to_self_type(df)

def weak_checkpoint(self: TDF, lazy: bool = False, **kwargs: Any) -> TDF:
"""Cache the dataframe in memory
Expand Down
57 changes: 54 additions & 3 deletions fugue_dask/execution_engine.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import logging
from typing import Any, Callable, List, Optional, Union

import dask.dataframe as pd
import dask.dataframe as dd
from fugue._utils.io import load_df, save_df
from fugue.collections.partition import (
EMPTY_PARTITION_SPEC,
PartitionCursor,
PartitionSpec,
_parse_presort_exp,
)
from fugue.constants import KEYWORD_CORECOUNT, KEYWORD_ROWCOUNT
from fugue.dataframe import DataFrame, DataFrames, LocalDataFrame, PandasDataFrame
Expand All @@ -18,7 +19,7 @@
)
from qpd_dask import run_sql_on_dask
from triad.collections import Schema
from triad.collections.dict import ParamDict
from triad.collections.dict import ParamDict, IndexedOrderedDict
from triad.collections.fs import FileSystem
from triad.utils.assertion import assert_or_throw
from triad.utils.hash import to_uuid
Expand Down Expand Up @@ -183,7 +184,7 @@ def map(
)
)

def _map(pdf: Any) -> pd.DataFrame:
def _map(pdf: Any) -> dd.DataFrame:
if pdf.shape[0] == 0:
return PandasDataFrame([], output_schema).as_pandas()
if len(presort_keys) > 0:
Expand Down Expand Up @@ -355,6 +356,56 @@ def sample(
)
return DaskDataFrame(d, df.schema, metadata)

def limit(
self,
df: DataFrame,
n: int,
presort: str,
na_position: str = "last",
partition_spec: PartitionSpec = EMPTY_PARTITION_SPEC,
metadata: Any = None,
) -> DataFrame:
assert_or_throw(
isinstance(n, int),
ValueError("n needs to be an integer"),
)
d = self.to_df(df).native
meta = [(d[x].name, d[x].dtype) for x in d.columns]

if presort:
presort = _parse_presort_exp(presort)
# Use presort over partition_spec.presort if possible
_presort: IndexedOrderedDict = presort or partition_spec.presort

def _partition_limit(partition, n, presort):
if len(presort.keys()) > 0:
partition = partition.sort_values(
list(presort.keys()),
ascending=list(presort.values()),
na_position=na_position,
)
return partition.head(n)

if len(partition_spec.partition_by) == 0:
if len(_presort.keys()) == 0:
d = d.head(n)
else:
# Use the default partition
d = d.map_partitions(_partition_limit, n, _presort, meta=meta).compute()
# compute() brings this to Pandas so we can use pandas
d = d.sort_values(
list(_presort.keys()),
ascending=list(_presort.values()),
na_position=na_position,
).head(n)

else:
d = d.groupby(partition_spec.partition_by, dropna=False).apply(
_partition_limit, n=n, presort=_presort, meta=meta
)

return DaskDataFrame(d, df.schema, metadata)

def load_df(
self,
path: Union[str, List[str]],
Expand Down
63 changes: 61 additions & 2 deletions fugue_spark/execution_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
EMPTY_PARTITION_SPEC,
PartitionCursor,
PartitionSpec,
_parse_presort_exp,
)
from fugue.constants import KEYWORD_ROWCOUNT
from fugue.dataframe import DataFrame, DataFrames, IterableDataFrame, LocalDataFrame
Expand All @@ -23,8 +24,9 @@
from pyspark import StorageLevel
from pyspark.rdd import RDD
from pyspark.sql import SparkSession
from pyspark.sql.functions import PandasUDFType, broadcast, col, pandas_udf
from triad.collections import ParamDict, Schema
from pyspark.sql.window import Window
from pyspark.sql.functions import PandasUDFType, broadcast, col, pandas_udf, row_number
from triad.collections import ParamDict, Schema, IndexedOrderedDict
from triad.collections.fs import FileSystem
from triad.utils.assertion import assert_arg_not_none, assert_or_throw
from triad.utils.hash import to_uuid
Expand Down Expand Up @@ -430,6 +432,63 @@ def sample(
)
return self.to_df(d, df.schema, metadata)

def limit(
self,
df: DataFrame,
n: int,
presort: str,
na_position: str = "last",
partition_spec: PartitionSpec = EMPTY_PARTITION_SPEC,
metadata: Any = None,
) -> DataFrame:
assert_or_throw(
isinstance(n, int),
ValueError("n needs to be an integer"),
)
d = self.to_df(df).native
nulls_last = bool(na_position == "last")

if presort:
presort = _parse_presort_exp(presort)
# Use presort over partition_spec.presort if possible
_presort: IndexedOrderedDict = presort or partition_spec.presort

def _presort_to_col(_col: str, _asc: bool) -> Any:
if nulls_last:
if _asc:
return col(_col).asc_nulls_last()
else:
return col(_col).desc_nulls_last()
else:
if _asc:
return col(_col).asc_nulls_first()
else:
return col(_col).desc_nulls_first()

# If no partition
if len(partition_spec.partition_by) == 0:
if len(_presort.keys()) > 0:
d = d.orderBy(
[_presort_to_col(_col, _presort[_col]) for _col, in _presort.keys()]
)
d = d.limit(n)

# If partition exists
else:
w = Window.partitionBy([col(x) for x in partition_spec.partition_by])
if len(_presort.keys()) > 0:
w = w.orderBy(
[_presort_to_col(_col, _presort[_col]) for _col, in _presort.keys()]
)

d = (
d.select(col("*"), row_number().over(w).alias("__row_number__"))
.filter(col("__row_number__") <= n)
.drop("__row_number__")
)

return self.to_df(d, df.schema, metadata)

def load_df(
self,
path: Union[str, List[str]],
Expand Down
Loading

0 comments on commit 9315ee0

Please sign in to comment.