Skip to content

Commit

Permalink
refactor: 🏗️ modularize and clean up project structure
Browse files Browse the repository at this point in the history
Reorganized project structure into modules, creating dedicated subdirectories and files for dbt, huggingface, indicators, and others. Removed duplicate functionality and legacy code. Updated import paths and resource definitions. Enhanced readability and maintainability by restructuring assets and resources, centralizing their definitions. Updated Makefile command for correct module path in Dagster asset materialization.
  • Loading branch information
davidgasquez committed Oct 26, 2024
1 parent cadd50b commit e74bd34
Show file tree
Hide file tree
Showing 19 changed files with 458 additions and 329 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

run:
uv run dagster-dbt project prepare-and-package --file datadex/dbt_project.py
uv run dagster asset materialize --select \* -m datadex
uv run dagster asset materialize --select \* -m datadex.definitions

dev:
uv run dagster dev
Expand Down
25 changes: 0 additions & 25 deletions datadex/__init__.py

This file was deleted.

8 changes: 3 additions & 5 deletions datadex/assets/indicators.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,16 @@ def world_bank_wdi() -> pl.DataFrame:

# Reshape the dataframe
df = df.melt(
id_vars=["Country Name", "Country Code",
"Indicator Name", "Indicator Code"],
id_vars=["Country Name", "Country Code", "Indicator Name", "Indicator Code"],
value_name="Indicator Value",
variable_name="Year",
)

# Make one column per Indicator Name
df = df.pivot(
index=["Country Name", "Country Code", "Year"],
columns="Indicator Name",
values="Indicator Value",
on="Indicator Name",
)

# Cast to floats
Expand All @@ -75,8 +74,7 @@ def world_bank_wdi() -> pl.DataFrame:

# Clean column names
df = df.rename(
lambda column_name: slugify(
column_name.replace("%", "percent"), separator="_")
lambda column_name: slugify(column_name.replace("%", "percent"), separator="_")
)

return df
64 changes: 0 additions & 64 deletions datadex/assets/others.py

This file was deleted.

4 changes: 2 additions & 2 deletions datadex/assets/dbt.py → datadex/dbt/assets.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import dagster as dg
from dagster_dbt import DbtCliResource, dbt_assets

from ..dbt_project import dbt_project
from datadex.dbt.resources import dbt_project


@dbt_assets(manifest=dbt_project.manifest_path)
def dbt_project_assets(context: dg.AssetExecutionContext, dbt: DbtCliResource):
def dbt(context: dg.AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
8 changes: 8 additions & 0 deletions datadex/dbt/definitions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import dagster as dg

from datadex.dbt import assets
from datadex.dbt.resources import dbt_resource

aemet_assets = dg.load_assets_from_modules([assets])

definitions = dg.Definitions(assets=aemet_assets, resources={"dbt": dbt_resource})
15 changes: 15 additions & 0 deletions datadex/dbt/resources.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from pathlib import Path

from dagster_dbt import DbtCliResource, DbtProject

RELATIVE_PATH_TO_MY_DBT_PROJECT = "../../dbt"

dbt_project = DbtProject(
project_dir=Path(__file__)
.joinpath("..", RELATIVE_PATH_TO_MY_DBT_PROJECT)
.resolve(),
)

dbt_project.prepare_if_dev()

dbt_resource = DbtCliResource(project_dir=dbt_project)
9 changes: 0 additions & 9 deletions datadex/dbt_project.py

This file was deleted.

17 changes: 17 additions & 0 deletions datadex/definitions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import dagster as dg

import datadex.dbt.definitions as dbt_definitions
import datadex.huggingface.definitions as huggingface_definitions
import datadex.indicators.definitions as indicators_definitions
import datadex.others.definitions as others_definitions
from datadex.resources import io_manager

common_resources = {"io_manager": io_manager}

definitions = dg.Definitions.merge(
dg.Definitions(resources=common_resources),
dbt_definitions.definitions,
indicators_definitions.definitions,
huggingface_definitions.definitions,
others_definitions.definitions,
)
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import polars as pl
import dagster as dg
import polars as pl

from ..resources import DatasetPublisher
from datadex.huggingface.resources import HuggingFaceDatasetPublisher


def create_hf_asset(dataset_name: str):
@dg.asset(
name="huggingface_" + dataset_name, ins={"data": dg.AssetIn(dataset_name)}
)
def hf_asset(data: pl.DataFrame, dp: DatasetPublisher) -> None:
def hf_asset(data: pl.DataFrame, dp: HuggingFaceDatasetPublisher) -> None:
"""
Upload data to HuggingFace.
"""
Expand All @@ -31,6 +31,7 @@ def hf_asset(data: pl.DataFrame, dp: DatasetPublisher) -> None:
dp.publish(
dataset=data,
dataset_name=dataset_name,
username="datonic",
readme=readme_content,
generate_datapackage=True,
)
Expand Down
11 changes: 11 additions & 0 deletions datadex/huggingface/definitions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import dagster as dg

from datadex.huggingface.assets import assets
from datadex.huggingface.resources import HuggingFaceDatasetPublisher

definitions = dg.Definitions(
assets=assets,
resources={
"dp": HuggingFaceDatasetPublisher(hf_token=dg.EnvVar("HUGGINGFACE_TOKEN"))
},
)
72 changes: 72 additions & 0 deletions datadex/huggingface/resources.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import os
import tempfile
from typing import Optional

import polars as pl
import yaml
from dagster import ConfigurableResource, InitResourceContext, get_dagster_logger
from huggingface_hub import HfApi
from pydantic import PrivateAttr

log = get_dagster_logger()


class HuggingFaceDatasetPublisher(ConfigurableResource):
hf_token: str

_api: HfApi = PrivateAttr()

def setup_for_execution(self, context: InitResourceContext) -> None:
self._api = HfApi(token=self.hf_token)

def publish(
self,
dataset: pl.DataFrame,
dataset_name: str,
username: str,
readme: Optional[str] = None,
generate_datapackage: bool = False,
):
with tempfile.TemporaryDirectory() as temp_dir:
# Define the file path
data_dir = os.path.join(temp_dir, "data")
os.makedirs(data_dir, exist_ok=True)
file_path = os.path.join(data_dir, f"{dataset_name}.parquet")

# Write the dataset to a parquet file
dataset.write_parquet(file_path)

if readme:
readme_path = os.path.join(temp_dir, "README.md")
with open(readme_path, "w") as readme_file:
readme_file.write(readme)

if generate_datapackage:
datapackage = {
"name": dataset_name,
"resources": [
{"path": f"data/{dataset_name}.parquet", "format": "parquet"}
],
}
datapackage_path = os.path.join(temp_dir, "datapackage.yaml")
with open(datapackage_path, "w") as dp_file:
yaml.dump(datapackage, dp_file)

# Check if the repository exists
repo_id = f"{username}/{dataset_name}"

try:
self._api.repo_info(repo_id=repo_id, repo_type="dataset")
log.info(f"Repository {repo_id} exists.")
except Exception:
log.info(
f"Repository {repo_id} does not exist. Creating a new repository."
)
self._api.create_repo(
repo_id=repo_id, repo_type="dataset", private=False
)

# Upload the entire folder to Hugging Face
self._api.upload_large_folder(
folder_path=temp_dir, repo_id=repo_id, repo_type="dataset"
)
80 changes: 80 additions & 0 deletions datadex/indicators/assets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import io
import zipfile

import dagster as dg
import httpx
import polars as pl
from slugify import slugify


@dg.asset()
def owid_energy_data() -> pl.DataFrame:
"""
Raw Energy data from Our World in Data.
"""
energy_owid_url = (
"https://raw.githubusercontent.com/owid/energy-data/master/owid-energy-data.csv"
)

return pl.read_csv(energy_owid_url)


@dg.asset()
def owid_co2_data() -> pl.DataFrame:
"""
Raw CO2 data from Our World in Data.
"""
co2_owid_url = (
"https://raw.githubusercontent.com/owid/co2-data/master/owid-co2-data.csv"
)

return pl.read_csv(co2_owid_url)


@dg.asset()
def world_bank_wdi() -> pl.DataFrame:
"""
World Development Indicators (WDI) is the World Bank's premier compilation of cross-country comparable data on development.
Bulk data download is available at https://datatopics.worldbank.org/world-development-indicators/
"""

url = "https://databankfiles.worldbank.org/public/ddpext_download/WDI_CSV.zip"

response = httpx.get(url)

zipfile.ZipFile(io.BytesIO(response.content)).extractall(path="/tmp/")

# Load the WDICSV.csv file as a DataFrame
df = pl.read_csv("/tmp/WDICSV.csv")

# Reshape the dataframe
df = df.melt(
id_vars=["Country Name", "Country Code", "Indicator Name", "Indicator Code"],
value_name="Indicator Value",
variable_name="Year",
)

# Make one column per Indicator Name
df = df.pivot(
index=["Country Name", "Country Code", "Year"],
values="Indicator Value",
on="Indicator Value",
)

# Cast to floats
df = df.select(
[
pl.col("Country Name"),
pl.col("Country Code"),
pl.col("Year").cast(pl.Int32),
*[pl.col(col).cast(pl.Float32) for col in df.columns[3:]],
]
)

# Clean column names
df = df.rename(
lambda column_name: slugify(column_name.replace("%", "percent"), separator="_")
)

return df
7 changes: 7 additions & 0 deletions datadex/indicators/definitions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import dagster as dg

from datadex.indicators import assets

indicators_assets = dg.load_assets_from_modules([assets])

definitions = dg.Definitions(assets=indicators_assets)
Loading

0 comments on commit e74bd34

Please sign in to comment.