Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
goodwanghan authored Jan 7, 2024
1 parent cc1dbe7 commit 370c53d
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 3 deletions.
10 changes: 8 additions & 2 deletions fugue_dask/execution_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,8 +507,14 @@ def _partition_take(partition, n, presort):

else:
if len(_presort.keys()) == 0 and n == 1:
return d.drop_duplicates(
subset=partition_spec.partition_by, ignore_index=True, keep="first"
return DaskDataFrame(
d.drop_duplicates(
subset=partition_spec.partition_by,
ignore_index=True,
keep="first",
),
df.schema,
type_safe=False,
)

d = (
Expand Down
4 changes: 3 additions & 1 deletion fugue_spark/execution_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,9 @@ def _presort_to_col(_col: str, _asc: bool) -> Any:
# If partition exists
else:
if len(_presort.keys()) == 0 and n == 1:
return d.dropDuplicates(subset=partition_spec.partition_by)
return self._to_spark_df(
d.dropDuplicates(subset=partition_spec.partition_by), df.schema
)

w = Window.partitionBy([col(x) for x in partition_spec.partition_by])

Expand Down
29 changes: 29 additions & 0 deletions fugue_test/execution_suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -813,6 +813,35 @@ def test_take(self):
"a:str,b:int,c:long",
throw=True,
)
a = fa.as_fugue_engine_df(
e,
[
["a", 2, 3],
[None, 4, 2],
[None, 2, 1],
],
"a:str,b:int,c:long",
)
i = fa.take(a, n=1, partition="a", presort=None)
case1 = df_eq(
i,
[
["a", 2, 3],
[None, 4, 2],
],
"a:str,b:int,c:long",
throw=False,
)
case2 = df_eq(
i,
[
["a", 2, 3],
[None, 2, 1],
],
"a:str,b:int,c:long",
throw=False,
)
assert case1 or case2
raises(ValueError, lambda: fa.take(a, n=0.5, presort=None))

def test_sample_n(self):
Expand Down

0 comments on commit 370c53d

Please sign in to comment.