From 87fd3c416479b560a8ec52682d43dac9f539f402 Mon Sep 17 00:00:00 2001 From: Han Wang Date: Sun, 10 May 2020 22:09:24 -0700 Subject: [PATCH] dask repartition (#12) --- fugue_dask/execution_engine.py | 1 + fugue_test/builtin_suite.py | 4 ++-- setup.py | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/fugue_dask/execution_engine.py b/fugue_dask/execution_engine.py index f33cc616..2066bffa 100644 --- a/fugue_dask/execution_engine.py +++ b/fugue_dask/execution_engine.py @@ -126,6 +126,7 @@ def _map(pdf: Any) -> pd.DataFrame: pdf = self.repartition(df, partition_spec) result = pdf.native.map_partitions(_map, meta=output_schema.pandas_dtype) else: + df = self.repartition(df, PartitionSpec(num=partition_spec.num_partitions)) result = DASK_UTILS.safe_groupby_apply( df.native, partition_spec.partition_by, diff --git a/fugue_test/builtin_suite.py b/fugue_test/builtin_suite.py index 34e46409..a74fdce4 100644 --- a/fugue_test/builtin_suite.py +++ b/fugue_test/builtin_suite.py @@ -33,8 +33,8 @@ def dag(self) -> "DagTester": def test_create_show(self): with self.dag() as dag: - dag.df([[0]], "a:int").persist(123).partition(num=2).show() - dag.df(ArrayDataFrame([[0]], "a:int")).persist(456).broadcast().show() + dag.df([[0]], "a:int").persist().partition(num=2).show() + dag.df(ArrayDataFrame([[0]], "a:int")).persist().broadcast().show() def test_transform(self): with self.dag() as dag: diff --git a/setup.py b/setup.py index 2106ad5c..633e6a47 100644 --- a/setup.py +++ b/setup.py @@ -1,6 +1,6 @@ from setuptools import setup, find_packages -VERSION = "0.1.5" +VERSION = "0.1.6" with open("README.md") as f: LONG_DESCRIPTION = f.read()