From eb5b7cc1bfbb884d6eeb5a5e9cd9dfe64af63b46 Mon Sep 17 00:00:00 2001 From: Han Wang Date: Fri, 18 Nov 2022 09:41:11 -0800 Subject: [PATCH] Improve Dataset display (#391) * Fix minor bugs * Update dataframe display --- Makefile | 7 ++ fugue/__init__.py | 11 ++- fugue/bag/bag.py | 71 +++++++++-------- fugue/dataframe/dataframe.py | 73 ++++++++++-------- fugue/dataset.py | 98 ++++++++++++++++-------- fugue/extensions/_builtins/outputters.py | 17 ++-- fugue_notebook/env.py | 69 +++++++++++------ fugue_sql/exceptions.py | 10 +++ tests/fugue/dataframe/test_dataframe.py | 2 + 9 files changed, 223 insertions(+), 135 deletions(-) create mode 100644 fugue_sql/exceptions.py diff --git a/Makefile b/Makefile index fca86acd..90ed1c41 100644 --- a/Makefile +++ b/Makefile @@ -74,6 +74,13 @@ jupyter: jupyter nbextension enable fugue_notebook --py jupyter notebook --port=8888 --ip=0.0.0.0 --no-browser --allow-root --NotebookApp.token='' --NotebookApp.password='' --NotebookApp.allow_origin='*' +lab: + mkdir -p tmp + pip install . + pip install fugue-jupyter + fugue-jupyter install startup + jupyter lab --port=8888 --ip=0.0.0.0 --no-browser --allow-root --NotebookApp.token='' --NotebookApp.password='' --NotebookApp.allow_origin='*' + test: python3 -b -m pytest --reruns 2 --only-rerun 'Overflow in cast' --only-rerun 'Table or view not found' tests/ diff --git a/fugue/__init__.py b/fugue/__init__.py index 0af45afb..10575633 100644 --- a/fugue/__init__.py +++ b/fugue/__init__.py @@ -3,19 +3,24 @@ from triad.collections.fs import FileSystem from fugue.bag.array_bag import ArrayBag -from fugue.bag.bag import Bag +from fugue.bag.bag import Bag, BagDisplay from fugue.collections.partition import PartitionCursor, PartitionSpec from fugue.collections.yielded import Yielded, YieldedFile from fugue.constants import register_global_conf from fugue.dataframe.array_dataframe import ArrayDataFrame from fugue.dataframe.arrow_dataframe import ArrowDataFrame -from fugue.dataframe.dataframe import DataFrame, LocalBoundedDataFrame, LocalDataFrame +from fugue.dataframe.dataframe import ( + DataFrame, + DataFrameDisplay, + LocalBoundedDataFrame, + LocalDataFrame, +) from fugue.dataframe.dataframe_iterable_dataframe import LocalDataFrameIterableDataFrame from fugue.dataframe.dataframes import DataFrames from fugue.dataframe.iterable_dataframe import IterableDataFrame from fugue.dataframe.pandas_dataframe import PandasDataFrame from fugue.dataframe.utils import to_local_bounded_df, to_local_df -from fugue.dataset import Dataset, display_dataset +from fugue.dataset import Dataset, DatasetDisplay, get_dataset_display from fugue.execution.execution_engine import ExecutionEngine, MapEngine, SQLEngine from fugue.execution.factory import ( infer_execution_engine, diff --git a/fugue/bag/bag.py b/fugue/bag/bag.py index b7f2c6e0..45c58eb7 100644 --- a/fugue/bag/bag.py +++ b/fugue/bag/bag.py @@ -1,9 +1,7 @@ from abc import abstractmethod from typing import Any, List, Optional -from triad import SerializableRLock - -from ..dataset import Dataset, display_dataset +from ..dataset import Dataset, DatasetDisplay, get_dataset_display class Bag(Dataset): @@ -66,33 +64,40 @@ def is_bounded(self) -> bool: return True -_SHOW_LOCK = SerializableRLock() - - -@display_dataset.candidate( - lambda ds, *args, **kwargs: isinstance(ds, Bag), priority=0.1 -) -def _display_bag( - ds: Bag, n: int = 10, with_count: bool = False, title: Optional[str] = None -): - head_rows = ds.head(n).as_array() - if len(head_rows) < n: - count = len(head_rows) - else: - count = ds.count() if with_count else -1 - with _SHOW_LOCK: - if title is not None and title != "": - print(title) - print(type(ds).__name__) - print(head_rows) - if count >= 0: - print(f"Total count: {count}") - print("") - if ds.has_metadata: - print("Metadata:") - try: - # try pretty print, but if not convertible to json, print original - print(ds.metadata.to_json(indent=True)) - except Exception: # pragma: no cover - print(ds.metadata) - print("") +class BagDisplay(DatasetDisplay): + """:class:`~.Bag` plain display class""" + + @property + def bg(self) -> Bag: + """The target :class:`~.Bag`""" + return self._ds # type: ignore + + def show( + self, n: int = 10, with_count: bool = False, title: Optional[str] = None + ) -> None: + head_rows = self.bg.head(n).as_array() + if len(head_rows) < n: + count = len(head_rows) + else: + count = self.bg.count() if with_count else -1 + with DatasetDisplay._SHOW_LOCK: + if title is not None and title != "": + print(title) + print(type(self.bg).__name__) + print(head_rows) + if count >= 0: + print(f"Total count: {count}") + print("") + if self.bg.has_metadata: + print("Metadata:") + try: + # try pretty print, but if not convertible to json, print original + print(self.bg.metadata.to_json(indent=True)) + except Exception: # pragma: no cover + print(self.bg.metadata) + print("") + + +@get_dataset_display.candidate(lambda ds: isinstance(ds, Bag), priority=0.1) +def _get_bag_display(ds: Bag): + return BagDisplay(ds) diff --git a/fugue/dataframe/dataframe.py b/fugue/dataframe/dataframe.py index 8e837536..2b7b0063 100644 --- a/fugue/dataframe/dataframe.py +++ b/fugue/dataframe/dataframe.py @@ -12,7 +12,7 @@ from .._utils.display import PrettyTable from ..collections.yielded import Yielded -from ..dataset import Dataset, display_dataset +from ..dataset import Dataset, DatasetDisplay, get_dataset_display from ..exceptions import FugueDataFrameOperationError @@ -374,38 +374,45 @@ def result(self) -> DataFrame: return self._df -_SHOW_LOCK = SerializableRLock() - - -@display_dataset.candidate( - lambda ds, *args, **kwargs: isinstance(ds, DataFrame), priority=0.1 -) -def _display_dataframe( - ds: DataFrame, n: int = 10, with_count: bool = False, title: Optional[str] = None -): - best_width = 100 - head_rows = ds.head(n).as_array(type_safe=True) - if len(head_rows) < n: - count = len(head_rows) - else: - count = ds.count() if with_count else -1 - with _SHOW_LOCK: - if title is not None and title != "": - print(title) - print(type(ds).__name__) - tb = PrettyTable(ds.schema, head_rows, best_width) - print("\n".join(tb.to_string())) - if count >= 0: - print(f"Total count: {count}") - print("") - if ds.has_metadata: - print("Metadata:") - try: - # try pretty print, but if not convertible to json, print original - print(ds.metadata.to_json(indent=True)) - except Exception: # pragma: no cover - print(ds.metadata) - print("") +class DataFrameDisplay(DatasetDisplay): + """:class:`~.DataFrame` plain display class""" + + @property + def df(self) -> DataFrame: + """The target :class:`~.DataFrame`""" + return self._ds # type: ignore + + def show( + self, n: int = 10, with_count: bool = False, title: Optional[str] = None + ) -> None: + best_width = 100 + head_rows = self.df.head(n).as_array(type_safe=True) + if len(head_rows) < n: + count = len(head_rows) + else: + count = self.df.count() if with_count else -1 + with DatasetDisplay._SHOW_LOCK: + if title is not None and title != "": + print(title) + print(type(self.df).__name__) + tb = PrettyTable(self.df.schema, head_rows, best_width) + print("\n".join(tb.to_string())) + if count >= 0: + print(f"Total count: {count}") + print("") + if self.df.has_metadata: + print("Metadata:") + try: + # try pretty print, but if not convertible to json, print original + print(self.df.metadata.to_json(indent=True)) + except Exception: # pragma: no cover + print(self.df.metadata) + print("") + + +@get_dataset_display.candidate(lambda ds: isinstance(ds, DataFrame), priority=0.1) +def _get_dataframe_display(ds: DataFrame): + return DataFrameDisplay(ds) def _get_schema_change( diff --git a/fugue/dataset.py b/fugue/dataset.py index 6ae2db73..080337c9 100644 --- a/fugue/dataset.py +++ b/fugue/dataset.py @@ -1,35 +1,11 @@ +import html from abc import ABC, abstractmethod from typing import Any, Optional -from triad import ParamDict, assert_or_throw -from .exceptions import FugueDatasetEmptyError -from ._utils.registry import fugue_plugin - - -@fugue_plugin -def display_dataset( - ds: "Dataset", n: int = 10, with_count: bool = False, title: Optional[str] = None -) -> None: # pragma: no cover - """General function to display a :class:`~.Dataset` - - .. admonition:: Example: how to register a custom display - - .. code-block:: python - from fugue import display_dataset, DataFrame +from triad import ParamDict, SerializableRLock, assert_or_throw - # higher priority will overwrite the existing display functions - @display_dataset.candidate( - lambda ds, *args, **kwargs: isinstance(ds, DataFrame), priority=1.0) - def my_dataframe_display(ds, n=10, with_count=False, title=None): - print(type(ds)) - - :param ds: the Dataset to be displayed - :param n: top n items to display, defaults to 10 - :param with_count: whether to display the total count, defaults to False - :param title: title to display, defaults to None - """ - - raise NotImplementedError(f"No matching display function registered for {type(ds)}") +from ._utils.registry import fugue_plugin +from .exceptions import FugueDatasetEmptyError class Dataset(ABC): @@ -91,6 +67,13 @@ def count(self) -> int: # pragma: no cover """Get number of rows of this dataframe""" raise NotImplementedError + def assert_not_empty(self) -> None: + """Assert this dataframe is not empty + + :raises FugueDatasetEmptyError: if it is empty + """ + assert_or_throw(not self.empty, FugueDatasetEmptyError("dataframe is empty")) + def show( self, n: int = 10, with_count: bool = False, title: Optional[str] = None ) -> None: @@ -107,11 +90,60 @@ def show( need to :func:`fugue.execution.execution_engine.ExecutionEngine.persist` the dataset. """ - return display_dataset(self, n=n, with_count=with_count, title=title) + return get_dataset_display(self).show(n=n, with_count=with_count, title=title) - def assert_not_empty(self) -> None: - """Assert this dataframe is not empty + def __repr__(self): + """String representation of the Dataset""" + return get_dataset_display(self).repr() - :raises FugueDatasetEmptyError: if it is empty + def _repr_html_(self): + """HTML representation of the Dataset""" + return get_dataset_display(self).repr_html() + + +class DatasetDisplay(ABC): + """The base class for display handlers of :class:`~.Dataset` + + :param ds: the Dataset + """ + + _SHOW_LOCK = SerializableRLock() + + def __init__(self, ds: Dataset): + self._ds = ds + + @abstractmethod + def show( + self, n: int = 10, with_count: bool = False, title: Optional[str] = None + ) -> None: # pragma: no cover + """Show the :class:`~.Dataset` + + :param n: top n items to display, defaults to 10 + :param with_count: whether to display the total count, defaults to False + :param title: title to display, defaults to None """ - assert_or_throw(not self.empty, FugueDatasetEmptyError("dataframe is empty")) + raise NotImplementedError + + def repr(self) -> str: + """The string representation of the :class:`~.Dataset` + + :return: the string representation + """ + return str(type(self._ds).__name__) + + def repr_html(self) -> str: + """The HTML representation of the :class:`~.Dataset` + + :return: the HTML representation + """ + return html.escape(self.repr()) + + +@fugue_plugin +def get_dataset_display(ds: "Dataset") -> DatasetDisplay: # pragma: no cover + """Get the display class to display a :class:`~.Dataset` + + :param ds: the Dataset to be displayed + """ + + raise NotImplementedError(f"No matching DatasetDisplay registered for {type(ds)}") diff --git a/fugue/extensions/_builtins/outputters.py b/fugue/extensions/_builtins/outputters.py index c74ef5da..946a5ff2 100644 --- a/fugue/extensions/_builtins/outputters.py +++ b/fugue/extensions/_builtins/outputters.py @@ -1,21 +1,18 @@ from typing import List, no_type_check +from triad import ParamDict, Schema, SerializableRLock, assert_or_throw +from triad.utils.convert import to_type + from fugue.collections.partition import PartitionCursor from fugue.dataframe import DataFrame, DataFrames, LocalDataFrame from fugue.dataframe.array_dataframe import ArrayDataFrame from fugue.dataframe.utils import _df_eq, to_local_bounded_df -from fugue.dataset import display_dataset from fugue.exceptions import FugueWorkflowError from fugue.execution.execution_engine import _generate_comap_empty_dfs from fugue.extensions.outputter import Outputter from fugue.extensions.transformer.convert import _to_output_transformer from fugue.extensions.transformer.transformer import CoTransformer, Transformer from fugue.rpc import EmptyRPCHandler, to_rpc_handler -from triad import SerializableRLock -from triad.collections.dict import ParamDict -from triad.collections.schema import Schema -from triad.utils.assertion import assert_or_throw -from triad.utils.convert import to_type class Show(Outputter): @@ -27,12 +24,10 @@ def process(self, dfs: DataFrames) -> None: n = self.params.get("n", 10) with_count = self.params.get("with_count", False) with Show.LOCK: - n = 0 + m = 0 for df in dfs.values(): - display_dataset( - df, n=n, with_count=with_count, title=title if n == 0 else None - ) - n += 1 + df.show(n=n, with_count=with_count, title=title if m == 0 else None) + m += 1 class AssertEqual(Outputter): diff --git a/fugue_notebook/env.py b/fugue_notebook/env.py index 9102631d..7912020b 100644 --- a/fugue_notebook/env.py +++ b/fugue_notebook/env.py @@ -3,14 +3,23 @@ import json from typing import Any, Dict, List, Optional -import fugue_sql -from fugue import DataFrame, ExecutionEngine, display_dataset, make_execution_engine -from fugue.dataframe import YieldedDataFrame -from fugue.exceptions import FugueSQLSyntaxError from IPython.core.magic import Magics, cell_magic, magics_class, needs_local_scope +from IPython import get_ipython from IPython.display import HTML, display from triad import ParamDict from triad.utils.convert import to_instance +from triad.utils.pyarrow import _field_to_expression + +import fugue_sql +from fugue import ( + DataFrame, + DataFrameDisplay, + ExecutionEngine, + get_dataset_display, + make_execution_engine, +) +from fugue.dataframe import YieldedDataFrame +from fugue.exceptions import FugueSQLSyntaxError class NotebookSetup(object): @@ -84,25 +93,41 @@ def get_engine(self, line: str, lc: Dict[str, Any]) -> ExecutionEngine: return make_execution_engine(engine, cf) -@display_dataset.candidate( - lambda ds, *args, **kwargs: isinstance(ds, DataFrame), priority=3.0 +class JupyterDataFrameDisplay(DataFrameDisplay): + def show( + self, n: int = 10, with_count: bool = False, title: Optional[str] = None + ) -> None: + components: List[Any] = [] + if title is not None: + components.append(HTML(f"

{html.escape(title)}

")) + if with_count: + count = self.df.count() + else: + count = -1 + components.append(HTML(self._generate_df_html(n))) + if count >= 0: + components.append(HTML(f"total count: {count}")) + display(*components) + + def repr_html(self) -> str: + return self._generate_df_html(10) + + def _generate_df_html(self, n: int) -> str: + res: List[str] = [] + pdf = self.df.head(n).as_pandas() + cols = [_field_to_expression(f) for f in self.df.schema.fields] + pdf.columns = cols + res.append(pdf._repr_html_()) + schema = type(self.df).__name__ + ": " + str(self.df.schema) + res.append('' + html.escape(schema) + "") + return "\n".join(res) + + +@get_dataset_display.candidate( + lambda ds: get_ipython() is not None and isinstance(ds, DataFrame), priority=3.0 ) -def _display_dataframe_in_notebook( - ds: DataFrame, n: int = 10, with_count: bool = False, title: Optional[str] = None -): - components: List[Any] = [] - if title is not None: - components.append(HTML(f"

{html.escape(title)}

")) - if with_count: - count = ds.count() - else: - count = -1 - pdf = ds.head(n).as_pandas() - components.append(pdf) - if count >= 0: - components.append(HTML(f"total count: {count}")) - components.append(HTML(f"schema: {ds.schema}")) - display(*components) +def _get_jupyter_dataframe_display(ds: DataFrame): + return JupyterDataFrameDisplay(ds) def _setup_fugue_notebook( diff --git a/fugue_sql/exceptions.py b/fugue_sql/exceptions.py new file mode 100644 index 00000000..bb4b4f56 --- /dev/null +++ b/fugue_sql/exceptions.py @@ -0,0 +1,10 @@ +# pylint: disable-all +# flake8: noqa +# TODO: This folder is to be deprecated +import warnings +from fugue.exceptions import * + +warnings.warn( + "fsql and FugueSQLWorkflow now should be imported directly from fugue, " + "fugue_sql will be removed in 0.9.0" +) diff --git a/tests/fugue/dataframe/test_dataframe.py b/tests/fugue/dataframe/test_dataframe.py index a204e012..506e6de8 100644 --- a/tests/fugue/dataframe/test_dataframe.py +++ b/tests/fugue/dataframe/test_dataframe.py @@ -7,6 +7,8 @@ def test_show(): df = ArrayDataFrame(schema="a:str,b:str") df.show() + assert repr(df) == df._repr_html_() + s = " ".join(["x"] * 2) df = ArrayDataFrame([[s, 1], ["b", 2]], "a:str,b:str") df.show()