diff --git a/fugue_dask/execution_engine.py b/fugue_dask/execution_engine.py index 7dd0bae0..f1838a71 100644 --- a/fugue_dask/execution_engine.py +++ b/fugue_dask/execution_engine.py @@ -506,6 +506,17 @@ def _partition_take(partition, n, presort): ).head(n) else: + if len(_presort.keys()) == 0 and n == 1: + return DaskDataFrame( + d.drop_duplicates( + subset=partition_spec.partition_by, + ignore_index=True, + keep="first", + ), + df.schema, + type_safe=False, + ) + d = ( d.groupby(partition_spec.partition_by, dropna=False) .apply(_partition_take, n=n, presort=_presort, meta=meta) diff --git a/fugue_spark/execution_engine.py b/fugue_spark/execution_engine.py index 4d8b3b23..51d5c69c 100644 --- a/fugue_spark/execution_engine.py +++ b/fugue_spark/execution_engine.py @@ -674,6 +674,11 @@ def _presort_to_col(_col: str, _asc: bool) -> Any: # If partition exists else: + if len(_presort.keys()) == 0 and n == 1: + return self._to_spark_df( + d.dropDuplicates(subset=partition_spec.partition_by), df.schema + ) + w = Window.partitionBy([col(x) for x in partition_spec.partition_by]) if len(_presort.keys()) > 0: diff --git a/fugue_test/execution_suite.py b/fugue_test/execution_suite.py index f95ee9ed..a05e0d24 100644 --- a/fugue_test/execution_suite.py +++ b/fugue_test/execution_suite.py @@ -813,6 +813,46 @@ def test_take(self): "a:str,b:int,c:long", throw=True, ) + a = fa.as_fugue_engine_df( + e, + [ + ["a", 2, 3], + [None, 4, 2], + [None, 2, 1], + ], + "a:str,b:int,c:long", + ) + i = fa.take(a, n=1, partition="a", presort=None) + case1 = df_eq( + i, + [ + ["a", 2, 3], + [None, 4, 2], + ], + "a:str,b:int,c:long", + throw=False, + ) + case2 = df_eq( + i, + [ + ["a", 2, 3], + [None, 2, 1], + ], + "a:str,b:int,c:long", + throw=False, + ) + assert case1 or case2 + j = fa.take(a, n=2, partition="a", presort=None) + df_eq( + j, + [ + ["a", 2, 3], + [None, 4, 2], + [None, 2, 1], + ], + "a:str,b:int,c:long", + throw=True, + ) raises(ValueError, lambda: fa.take(a, n=0.5, presort=None)) def test_sample_n(self):