Skip to content

Commit

Permalink
Index.drop_duplicates() (#458)
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky authored Dec 5, 2023
1 parent e323341 commit 55b5b54
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 15 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ bench/shakespeare.txt
*.sw?
.DS_STORE
\.tox/
.idea/
.ipynb_checkpoints/
16 changes: 8 additions & 8 deletions dask_expr/_reductions.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ def _lower(self):
# Convert back to Series if necessary
if is_series_like(self._meta):
shuffled = shuffled[shuffled.columns[0]]
elif is_index_like(self._meta):
shuffled = shuffled.index

# Blockwise aggregate
result = Aggregate(
Expand Down Expand Up @@ -499,16 +501,14 @@ def split_by(self):
def _meta(self):
return self.chunk(meta_nonempty(self.frame._meta), **self.chunk_kwargs)

def _subset_kwargs(self):
if is_series_like(self.frame._meta):
return {}
return {"subset": self.subset}

@property
def chunk_kwargs(self):
if PANDAS_GE_200:
return {"ignore_index": self.ignore_index, **self._subset_kwargs()}
return self._subset_kwargs()
out = {}
if is_dataframe_like(self.frame._meta):
out["subset"] = self.subset
if PANDAS_GE_200 and not is_index_like(self.frame._meta):
out["ignore_index"] = self.ignore_index
return out

def _simplify_up(self, parent):
if self.subset is not None and isinstance(parent, Projection):
Expand Down
4 changes: 4 additions & 0 deletions dask_expr/_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -1034,6 +1034,10 @@ def _lower(self):
)
return SortIndexBlockwise(index_set)

@property
def npartitions(self):
return len(self.new_divisions)


class _SetPartitionsPreSetIndex(Blockwise):
_parameters = ["frame", "new_divisions", "ascending", "na_position"]
Expand Down
18 changes: 11 additions & 7 deletions dask_expr/tests/test_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
@pytest.fixture
def pdf():
pdf = lib.DataFrame({"x": range(100)})
pdf["y"] = pdf.x * 10.0
pdf["y"] = pdf.x // 7 # Not unique; duplicates span different partitions
yield pdf


Expand Down Expand Up @@ -949,17 +949,21 @@ def test_drop_duplicates(df, pdf, split_out):
pdf.drop_duplicates(ignore_index=True),
check_index=split_out is not True,
)
assert_eq(
df.drop_duplicates(subset=["x"], split_out=split_out),
pdf.drop_duplicates(subset=["x"]),
)
assert_eq(
df.drop_duplicates(subset=["y"], split_out=split_out),
pdf.drop_duplicates(subset=["y"]),
)
assert_eq(
df.x.drop_duplicates(split_out=split_out),
pdf.x.drop_duplicates(),
df.y.drop_duplicates(split_out=split_out),
pdf.y.drop_duplicates(),
)

actual = df.set_index("y").index.drop_duplicates(split_out=split_out)
if split_out is True:
actual = actual.compute().sort_values() # shuffle is unordered
assert_eq(
actual,
pdf.set_index("y").index.drop_duplicates(),
)

with pytest.raises(KeyError, match="'a'"):
Expand Down

0 comments on commit 55b5b54

Please sign in to comment.