Skip to content

Commit

Permalink
Make transformer schema more flexible (#28)
Browse files Browse the repository at this point in the history
* setup fugue sql

* SQL: add basic extensions and tests

* update

* update

* clean up sql files

* fix syntax, add save load

* add test for load

* FugueSQLWorkflow

* update version

* Add pandas udf support, add SQL persist broadcast

* update

* update

* update

* update

* update

* make transformer schema more flexible
  • Loading branch information
Han Wang authored Jun 13, 2020
1 parent 05ab46d commit b709814
Show file tree
Hide file tree
Showing 6 changed files with 8 additions and 15 deletions.
2 changes: 1 addition & 1 deletion fugue/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# flake8: noqa

__version__ = "0.2.6"
__version__ = "0.2.7"

from fugue.collections.partition import PartitionCursor, PartitionSpec
from fugue.dataframe.array_dataframe import ArrayDataFrame
Expand Down
13 changes: 3 additions & 10 deletions fugue/extensions/transformer/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,18 +77,11 @@ def __uuid__(self) -> str:
return to_uuid(self._wrapper.__uuid__(), self._output_schema_arg)

def _parse_schema(self, obj: Any, df: DataFrame) -> Schema:
if obj is None:
return Schema()
if isinstance(obj, str):
if "*" in obj:
obj = obj.replace("*", str(df.schema))
return Schema(obj)
return df.schema.transform(obj)
if isinstance(obj, List):
s = Schema()
for x in obj:
s += self._parse_schema(x, df)
return s
return Schema(obj)
return df.schema.transform(*obj)
return df.schema.transform(obj)

@staticmethod
def from_func(func: Callable, schema: Any) -> "_FuncAsTransformer":
Expand Down
2 changes: 1 addition & 1 deletion fugue/workflow/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ def to_self_type(self: TDF, df: "WorkflowDataFrame") -> TDF:
return df # type: ignore

def drop( # type: ignore
self: TDF, columns: Dict[str, str], if_exists: bool = False
self: TDF, columns: List[str], if_exists: bool = False
) -> TDF:
df = self.workflow.process(
self, using=DropColumns, params=dict(columns=columns, if_exists=if_exists)
Expand Down
2 changes: 1 addition & 1 deletion fugue_test/builtin_suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ def transform(self, df: LocalDataFrame) -> LocalDataFrame:
return PandasDataFrame(pdf, self.output_schema)


@transformer("*,p:int")
@transformer(lambda s: s + "p:int")
def mock_tf0(df: pd.DataFrame, p=1) -> pd.DataFrame:
df["p"] = p
return df
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
triad>=0.3.5
triad>=0.3.6
adagio>=0.1.6
sqlalchemy

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
author_email="[email protected]",
keywords="distributed spark dask sql dsl domain specific language",
url="http://github.com/fugue-project/fugue",
install_requires=["triad>=0.3.5", "adagio>=0.1.6", "sqlalchemy"],
install_requires=["triad>=0.3.6", "adagio>=0.1.6", "sqlalchemy"],
extras_require={
"sql": ["antlr4-python3-runtime", "jinjasql"],
"spark": ["pyspark"],
Expand Down

0 comments on commit b709814

Please sign in to comment.