Skip to content

Commit

Permalink
Optimize Spark Dask take function
Browse files Browse the repository at this point in the history
  • Loading branch information
goodwanghan authored Jan 6, 2024
1 parent 29f105d commit cc1dbe7
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 0 deletions.
5 changes: 5 additions & 0 deletions fugue_dask/execution_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,11 @@ def _partition_take(partition, n, presort):
).head(n)

else:
if len(_presort.keys()) == 0 and n == 1:
return d.drop_duplicates(
subset=partition_spec.partition_by, ignore_index=True, keep="first"
)

d = (
d.groupby(partition_spec.partition_by, dropna=False)
.apply(_partition_take, n=n, presort=_presort, meta=meta)
Expand Down
3 changes: 3 additions & 0 deletions fugue_spark/execution_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,9 @@ def _presort_to_col(_col: str, _asc: bool) -> Any:

# If partition exists
else:
if len(_presort.keys()) == 0 and n == 1:
return d.dropDuplicates(subset=partition_spec.partition_by)

w = Window.partitionBy([col(x) for x in partition_spec.partition_by])

if len(_presort.keys()) > 0:
Expand Down

0 comments on commit cc1dbe7

Please sign in to comment.