Skip to content

Commit

Permalink
chore: 🎨 change dasgter imports
Browse files Browse the repository at this point in the history
  • Loading branch information
davidgasquez committed Oct 26, 2024
1 parent 93da061 commit 1492480
Show file tree
Hide file tree
Showing 13 changed files with 2,926 additions and 87 deletions.
File renamed without changes.
6 changes: 3 additions & 3 deletions .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "Datadex",
"build": {
"dockerfile": "../Dockerfile",
"dockerfile": "Dockerfile",
"context": ".."
},
"customizations": {
Expand All @@ -24,8 +24,8 @@
},
"notebook.formatOnSave.enabled": true,
"notebook.codeActionsOnSave": {
"source.fixAll.ruff": true,
"source.organizeImports.ruff": true
"source.fixAll.ruff": "explicit",
"source.organizeImports.ruff": "explicit"
},
"python.analysis.typeCheckingMode": "basic",
"python.analysis.autoImportCompletions": true,
Expand Down
12 changes: 12 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# To get started with Dependabot version updates, you'll need to specify which
# package ecosystems to update and where the package manifests are located.
# Please see the documentation for more information:
# https://docs.github.com/github/administering-a-repository/configuration-options-for-dependency-updates
# https://containers.dev/guide/dependabot

version: 2
updates:
- package-ecosystem: "devcontainers"
directory: "/"
schedule:
interval: weekly
12 changes: 5 additions & 7 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,12 @@ jobs:
steps:
- name: Checkout
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
- name: Install uv
uses: astral-sh/setup-uv@v2
with:
python-version: 3.12
cache: pip
- name: Install dependencies
run: |
pip install -e .
enable-cache: true
- name: Install Python 3.12
run: uv python install 3.12
- name: Run
run: |
make run
Expand Down
11 changes: 5 additions & 6 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
.DEFAULT_GOAL := run

run:
dagster-dbt project prepare-and-package --file datadex/dbt_project.py
dagster asset materialize --select \* -m datadex
uv run dagster-dbt project prepare-and-package --file datadex/dbt_project.py
uv run dagster asset materialize --select \* -m datadex

dev:
dagster dev
uv run dagster dev

preview:
quarto preview portal

setup:
@command -v uv >/dev/null 2>&1 || pip install -U uv
uv venv
uv pip install -U -e ".[dev]"
command -v uv >/dev/null 2>&1 || pip install -U uv
uv sync
. .venv/bin/activate

dbt-docs:
Expand Down
13 changes: 7 additions & 6 deletions datadex/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os

from dagster import EnvVar, Definitions, load_assets_from_modules
import dagster as dg

from dagster_dbt import DbtCliResource
from dagster_duckdb_polars import DuckDBPolarsIOManager

Expand All @@ -10,15 +11,15 @@

DATABASE_PATH = os.getenv("DATABASE_PATH", "data/database.duckdb")

all_assets = load_assets_from_modules([indicators, huggingface, others, spain, dbt])
all_assets = dg.load_assets_from_modules([indicators, huggingface, others, spain, dbt])

resources = {
"dbt": DbtCliResource(project_dir=dbt_project),
"io_manager": DuckDBPolarsIOManager(database=DATABASE_PATH, schema="main"),
"iucn_redlist_api": IUCNRedListAPI(token=EnvVar("IUCN_REDLIST_TOKEN")),
"aemet_api": AEMETAPI(token=EnvVar("AEMET_API_TOKEN")),
"iucn_redlist_api": IUCNRedListAPI(token=dg.EnvVar("IUCN_REDLIST_TOKEN")),
"aemet_api": AEMETAPI(token=dg.EnvVar("AEMET_API_TOKEN")),
"miteco_api": MITECOArcGisAPI(),
"dp": DatasetPublisher(hf_token=EnvVar("HUGGINGFACE_TOKEN")),
"dp": DatasetPublisher(hf_token=dg.EnvVar("HUGGINGFACE_TOKEN")),
}

defs = Definitions(assets=[*all_assets], resources=resources)
defs = dg.Definitions(assets=[*all_assets], resources=resources)
4 changes: 2 additions & 2 deletions datadex/assets/dbt.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from dagster import AssetExecutionContext
import dagster as dg
from dagster_dbt import DbtCliResource, dbt_assets

from ..dbt_project import dbt_project


@dbt_assets(manifest=dbt_project.manifest_path)
def dbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource):
def dbt_project_assets(context: dg.AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
6 changes: 4 additions & 2 deletions datadex/assets/huggingface.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import polars as pl
from dagster import AssetIn, asset
import dagster as dg

from ..resources import DatasetPublisher


def create_hf_asset(dataset_name: str):
@asset(name="huggingface_" + dataset_name, ins={"data": AssetIn(dataset_name)})
@dg.asset(
name="huggingface_" + dataset_name, ins={"data": dg.AssetIn(dataset_name)}
)
def hf_asset(data: pl.DataFrame, dp: DatasetPublisher) -> None:
"""
Upload data to HuggingFace.
Expand Down
12 changes: 6 additions & 6 deletions datadex/assets/others.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@

import httpx
import polars as pl
from dagster import AssetExecutionContext, Backoff, RetryPolicy, asset
import dagster as dg

from ..resources import IUCNRedListAPI


@asset(
retry_policy=RetryPolicy(max_retries=5, delay=2, backoff=Backoff.EXPONENTIAL),
@dg.asset(
retry_policy=dg.RetryPolicy(max_retries=5, delay=2, backoff=dg.Backoff.EXPONENTIAL),
)
def threatened_animal_species(
context: AssetExecutionContext, iucn_redlist_api: IUCNRedListAPI
context: dg.AssetExecutionContext, iucn_redlist_api: IUCNRedListAPI
) -> pl.DataFrame:
"""
Threatened animal species data from the IUCN Red List API.
Expand All @@ -33,8 +33,8 @@ def threatened_animal_species(
return pl.DataFrame(all_results, infer_schema_length=None)


@asset(
retry_policy=RetryPolicy(max_retries=5, delay=2, backoff=Backoff.EXPONENTIAL),
@dg.asset(
retry_policy=dg.RetryPolicy(max_retries=5, delay=2, backoff=dg.Backoff.EXPONENTIAL),
)
def wikidata_asteroids() -> pl.DataFrame:
"""
Expand Down
24 changes: 13 additions & 11 deletions datadex/assets/spain.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,18 @@

import httpx
import polars as pl
from dagster import Backoff, RetryPolicy, AssetExecutionContext, asset
import dagster as dg
from slugify import slugify

from ..resources import AEMETAPI, MITECOArcGisAPI


@asset(
retry_policy=RetryPolicy(max_retries=3, delay=10, backoff=Backoff.EXPONENTIAL),
@dg.asset(
retry_policy=dg.RetryPolicy(
max_retries=3, delay=10, backoff=dg.Backoff.EXPONENTIAL
),
)
async def spain_energy_demand(context: AssetExecutionContext) -> pl.DataFrame:
async def spain_energy_demand(context: dg.AssetExecutionContext) -> pl.DataFrame:
"""
Spain energy demand data.
"""
Expand Down Expand Up @@ -60,8 +62,8 @@ async def spain_energy_demand(context: AssetExecutionContext) -> pl.DataFrame:
return df


@asset(
retry_policy=RetryPolicy(max_retries=5, delay=1, backoff=Backoff.EXPONENTIAL),
@dg.asset(
retry_policy=dg.RetryPolicy(max_retries=5, delay=1, backoff=dg.Backoff.EXPONENTIAL),
)
def spain_ipc() -> pl.DataFrame:
"""
Expand Down Expand Up @@ -96,7 +98,7 @@ def spain_ipc() -> pl.DataFrame:
return df


@asset()
@dg.asset()
def spain_aemet_stations_data(aemet_api: AEMETAPI) -> pl.DataFrame:
"""
Spain AEMET stations data.
Expand Down Expand Up @@ -125,9 +127,9 @@ def convert_to_decimal(coord):
return df


@asset()
@dg.asset()
def spain_aemet_weather_data(
context: AssetExecutionContext, aemet_api: AEMETAPI
context: dg.AssetExecutionContext, aemet_api: AEMETAPI
) -> pl.DataFrame:
"""
Spain weather data since 1940.
Expand Down Expand Up @@ -167,9 +169,9 @@ def spain_aemet_weather_data(
return df


@asset()
@dg.asset()
def spain_water_reservoirs_data(
context: AssetExecutionContext, miteco_api: MITECOArcGisAPI
context: dg.AssetExecutionContext, miteco_api: MITECOArcGisAPI
) -> pl.DataFrame:
"""
Spain water reservoirs data since 1988.
Expand Down
20 changes: 10 additions & 10 deletions datadex/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@
import yaml
import httpx
import polars as pl
from dagster import InitResourceContext, ConfigurableResource, get_dagster_logger
import dagster as dg
from pydantic import PrivateAttr
from tenacity import retry, wait_exponential, stop_after_attempt
from huggingface_hub import HfApi

log = get_dagster_logger()
log = dg.get_dagster_logger()


class IUCNRedListAPI(ConfigurableResource):
class IUCNRedListAPI(dg.ConfigurableResource):
token: str

def get_species(self, page):
Expand All @@ -30,7 +30,7 @@ def get_species(self, page):
return r.json()["result"]


class REDataAPI(ConfigurableResource):
class REDataAPI(dg.ConfigurableResource):
endpoint: str = "https://apidatos.ree.es/en/datos"
first_day: str = "2014-01-01"

Expand Down Expand Up @@ -61,13 +61,13 @@ def get_market_prices(self, start_date: str, end_date: str, time_trunc="hour"):
return self.query(category, widget, start_date, end_date, time_trunc)


class AEMETAPI(ConfigurableResource):
class AEMETAPI(dg.ConfigurableResource):
endpoint: str = "https://opendata.aemet.es/opendata/api"
token: str

_client: httpx.Client = PrivateAttr()

def setup_for_execution(self, context: InitResourceContext) -> None:
def setup_for_execution(self, context: dg.InitResourceContext) -> None:
transport = httpx.HTTPTransport(retries=5)
limits = httpx.Limits(max_keepalive_connections=5, max_connections=10)
self._client = httpx.Client(
Expand Down Expand Up @@ -131,11 +131,11 @@ def get_all_stations(self):

return self.query(url)

def teardown_after_execution(self, context: InitResourceContext) -> None:
def teardown_after_execution(self, context: dg.InitResourceContext) -> None:
self._client.close()


class MITECOArcGisAPI(ConfigurableResource):
class MITECOArcGisAPI(dg.ConfigurableResource):
endpoint: str = (
"https://services-eu1.arcgis.com/RvnYk1PBUJ9rrAuT/ArcGIS/rest/services/"
)
Expand Down Expand Up @@ -168,12 +168,12 @@ def get_water_reservoirs_data(self, start_date=None, end_date=None):
return query_response


class DatasetPublisher(ConfigurableResource):
class DatasetPublisher(dg.ConfigurableResource):
hf_token: str

_api: HfApi = PrivateAttr()

def setup_for_execution(self, context: InitResourceContext) -> None:
def setup_for_execution(self, context: dg.InitResourceContext) -> None:
self._api = HfApi(token=self.hf_token)

def publish(
Expand Down
54 changes: 20 additions & 34 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,51 +3,37 @@ name = "datadex"
version = "1.0.0"
authors = [{ name = "David Gasquez" }]
dependencies = [
"altair",
"dagster-dbt==0.24.2",
"dagster-duckdb-polars==0.24.2",
"dagster-duckdb",
"dagster==1.8.2",
"datasets",
"dbt-core",
"dbt-duckdb",
"duckdb==1.0.0",
"huggingface_hub",
"httpx[http2]",
"ipykernel",
"ipywidgets",
"pyarrow",
"python-slugify",
"tenacity",
"dagster-dbt>=0.24.13",
"dagster-duckdb>=0.24.13",
"dagster-duckdb-polars>=0.24.13",
"dagster>=1.8.13",
"datasets>=3.0.2",
"dbt-core>=1.8.8",
"dbt-duckdb>=1.9.0",
"duckdb>=1.1.2",
"httpx>=0.27.2",
"huggingface-hub>=0.26.1",
"ipykernel>=6.29.5",
"pyarrow>=17.0.0",
"python-slugify>=8.0.4",
"tenacity>=9.0.0",
]

requires-python = ">=3.11, <=3.13"
readme = "README.md"
license = { text = "MIT" }

[project.urls]
Homepage = "https://davidgasquez.github.io/datadex/"
Changelog = "https://github.com/davidgasquez/datadex/commits/main/"
Issues = "https://github.com/davidgasquez/datadex/issues"
CI = "https://github.com/davidgasquez/datadex/actions"
Homepage = "https://datadex.datonic.io/"
Changelog = "https://github.com/datonic/datadex/commits/main/"
Issues = "https://github.com/datonic/datadex/issues"
CI = "https://github.com/datonic/datadex/actions"

[project.optional-dependencies]
dev = ["dagster-webserver", "ruff"]

[build-system]
requires = ["setuptools"]
build-backend = "setuptools.build_meta"
[tool.uv]
dev-dependencies = ["dagster-webserver>=1.8.13"]

[tool.setuptools]
packages = ["datadex"]

[tool.setuptools.package-data]
"datadex" = ["../dbt/**"]

[tool.dagster]
module_name = "datadex"

[tool.ruff.lint.isort]
case-sensitive = true
combine-as-imports = true
length-sort = true
Loading

0 comments on commit 1492480

Please sign in to comment.