diff --git a/docs/cli.rst b/docs/cli.rst index c39108b0..91555888 100644 --- a/docs/cli.rst +++ b/docs/cli.rst @@ -18,6 +18,7 @@ any Python code:: * |get-aqs|_ -- get AQS data * |get-ish|_ -- get ISH data * |get-ish-lite|_ -- get ISH-Lite data +* |get-openaq|_ -- get OpenAQ data .. |run| replace:: ``run`` .. _run: #melodies-monet-run @@ -37,6 +38,9 @@ any Python code:: .. |get-ish-lite| replace:: ``get-ish-lite`` .. _get-ish-lite: #melodies-monet-get-ish-lite +.. |get-openaq| replace:: ``get-openaq`` +.. _get-openaq: #melodies-monet-get-openaq + .. click:: melodies_monet._cli:_typer_click_object :prog: melodies-monet :nested: full diff --git a/melodies_monet/_cli.py b/melodies_monet/_cli.py index 849ad646..35539f9f 100644 --- a/melodies_monet/_cli.py +++ b/melodies_monet/_cli.py @@ -4,10 +4,17 @@ """ melodies-monet -- MELODIES MONET CLI """ +import os import time from contextlib import contextmanager from pathlib import Path -from typing import List +from typing import List, Tuple + +_LOGGING_LEVEL = os.environ.get("MM_LOGGING_LEVEL", None) +if _LOGGING_LEVEL is not None: + import logging + + logging.basicConfig(level=_LOGGING_LEVEL.upper()) try: import typer @@ -20,8 +27,6 @@ ) raise SystemExit(1) -from typing import Tuple - DEBUG = False INFO_COLOR = typer.colors.CYAN ERROR_COLOR = typer.colors.BRIGHT_RED @@ -1160,6 +1165,246 @@ def get_aqs( ds.to_netcdf(dst / out_name) +@app.command() +def get_openaq( + start_date: str = typer.Option(..., "-s", "--start-date", help=f"Start date. {_DATE_FMT_NOTE}"), + end_date: str = typer.Option(..., "-e", "--end-date", help=f"End date. {_DATE_FMT_NOTE} {_DATE_END_NOTE}"), + out_name: str = typer.Option(None, "-o", + help=( + "Output file name (or full/relative path). " + "By default the name is generated like 'OpenAQ__.nc'." + ) + ), + dst: Path = typer.Option(".", "-d", "--dst", help=( + "Destination directory (to control output location " + "if using default output file name)." + ) + ), + param: List[str] = typer.Option(["o3", "pm25", "pm10"], "-p", "--param", help=( + "Parameters. " + "Use '-p' more than once to get multiple parameters. " + "Other examples: 'no', 'no2', 'nox', 'so2', 'co', 'bc'. " + "Only applicable to the web API methods ('api-v2')." + ) + ), + reference_grade: bool = typer.Option(True, help="Include reference-grade sensors."), + low_cost: bool = typer.Option(False, help="Include low-cost sensors."), + method: str = typer.Option("api-v2", "-m", "--method", help=( + "Method (reader) to use for fetching data. " + "Options: 'api-v2', 'openaq-fetches'." + ) + ), + compress: bool = typer.Option(True, help=( + "If true, pack float to int and apply compression using zlib with complevel 7. " + "This can take time if the dataset is large, but can lead to " + "significant space savings." + ) + ), + num_workers: int = typer.Option(1, "-n", "--num-workers", help="Number of download workers."), + verbose: bool = typer.Option(False), + debug: bool = typer.Option( + False, "--debug/", help="Print more messages (including full tracebacks)." + ), +): + """Download hourly OpenAQ data using monetio and reformat for MM usage.""" + import warnings + + import monetio as mio + import pandas as pd + + from .util.write_util import write_ncf + + global DEBUG + + DEBUG = debug + + typer.echo(HEADER) + + if method not in {"api-v2", "openaq-fetches"}: + typer.secho(f"Error: method {method!r} not recognized", fg=ERROR_COLOR) + raise typer.Exit(2) + + start_date = pd.Timestamp(start_date) + end_date = pd.Timestamp(end_date) + + if method in {"openaq-fetches"}: + dates = pd.date_range(start_date, end_date, freq="D") + elif method in {"api-v2"}: + dates = pd.date_range(start_date, end_date, freq="h") + else: + raise AssertionError + if verbose: + print("Dates:") + print(dates) + + if verbose and method in {"api-v2"}: + print("Params:", param) + + # Set destination and file name + fmt = r"%Y%m%d" + if out_name is None: + out_name = f"OpenAQ_{start_date:{fmt}}_{end_date:{fmt}}.nc" + else: + p = Path(out_name) + if p.name == out_name: + # `out_name` is just the file name + out_name = p.name + else: + # `out_name` has path + if dst != Path("."): + typer.echo(f"warning: overriding `dst` setting {dst.as_posix()!r} with `out_name` {p.as_posix()!r}") + dst = p.parent + out_name = p.name + + sensor_types = [] + if reference_grade: + sensor_types.append("reference grade") + if low_cost: + sensor_types.append("low-cost sensor") + if not sensor_types and method in {"api-v2"}: + typer.secho( + "Error: no sensor types selected. Use --reference-grade and/or --low-cost", + fg=ERROR_COLOR, + ) + raise typer.Exit(2) + + if verbose and method in {"openaq-fetches"}: + from dask.diagnostics import ProgressBar + + ProgressBar().register() + + with _timer("Fetching data with monetio"): + if method == "openaq-fetches": + with warnings.catch_warnings(): + warnings.filterwarnings( + "ignore", + message="The (error|warn)_bad_lines argument has been deprecated" + ) + df = mio.openaq.add_data( + dates, + n_procs=num_workers, + # wide_fmt=True, + ) + + # Address time-wise non-unique site IDs + # Some (most?) are just slightly different lat/lon + # But seems like a few are actual time-wise lat/lon duplicates + df = df.drop_duplicates(["time", "siteid"]) + + elif method == "api-v2": + df = mio.obs.openaq_v2.add_data( + dates, + parameters=param, + sensor_type=sensor_types, + wide_fmt=True, + timeout=60, + retry=15, + threads=num_workers if num_workers > 1 else None, + ) + + dupes = df[df.duplicated(["time", "siteid"], keep=False)] + if not dupes.empty: + typer.echo( + f"warning: {len(dupes)} unexpected time-siteid duplicated rows:" + ) + if verbose: + typer.echo(dupes) + df = df.drop_duplicates(["time", "siteid"]) + else: + raise AssertionError + + # Drop times not on the hour + good = df.time == df.time.dt.floor("H") + typer.echo(f"Dropping {(~good).sum()}/{len(good)} rows that aren't on the hour.") + df = df[good] + + with _timer("Forming xarray Dataset"): + df = df.drop(columns=["index"], errors="ignore") + df = df.dropna(subset=["latitude", "longitude"]) + + if method == "openaq-fetches": + site_vns = [ + "siteid", # based on country and lat/lon + "latitude", + "longitude", + "utcoffset", + # + "city", + "country", # 2-char codes + # + "sourceName", + "sourceType", # "government" + ] + # NOTE: time_local not included since it varies in time as well + elif method == "api-v2": + site_vns = [ + "siteid", # real OpenAQ location ID + "latitude", + "longitude", + "utcoffset", + # + "location", + "city", + "country", + # + "entity", + "sensor_type", + "is_mobile", + "is_analysis", + ] + for vn in ["city", "is_analysis"]: # may have been dropped for being all null + if vn not in df.columns: + site_vns.remove(vn) + + else: + raise AssertionError + + ds_site = ( + df[site_vns] + .groupby("siteid") + .first() + .to_xarray() + .swap_dims(siteid="x") + ) + + ds = ( + df.drop(columns=[vn for vn in site_vns if vn not in ["siteid"]]) + .set_index(["time", "siteid"]) + .to_xarray() + .swap_dims(siteid="x") + .merge(ds_site) + .set_coords(["latitude", "longitude"]) + .assign(x=range(ds_site.dims["x"])) + ) + + # Rename species vars and add units as attr + nice_us = {"ppm": "ppmv", "ugm3": "ug m-3", "ppb": "pbbv"} + for vn0 in [n for n in df.columns if n.endswith(("_ppm", "ppb", "_ugm3", "_umg3"))]: + i_last_underscore = vn0.rfind("_") + vn, u = vn0[:i_last_underscore], vn0[i_last_underscore + 1:] + if u == "umg3": + u = "ugm3" + nice_u = nice_us[u] + ds[vn0].attrs.update(units=nice_u) + ds = ds.rename_vars({vn0: vn}) + + # Fill in local time array + # (in the df, not all sites have rows for all times, so we have NaTs at this point) + ds["time_local"] = ds.time + ds.utcoffset + + # Expand + ds = ( + ds + .expand_dims("y") + .transpose("time", "y", "x") + ) + + with _timer("Writing netCDF file"): + if compress: + write_ncf(ds, dst / out_name, verbose=verbose) + else: + ds.to_netcdf(dst / out_name) + cli = app _typer_click_object = typer.main.get_command(app) # for sphinx-click in docs diff --git a/melodies_monet/tests/test_get_data_cli.py b/melodies_monet/tests/test_get_data_cli.py index 3cd6c3e5..42ba2e8d 100644 --- a/melodies_monet/tests/test_get_data_cli.py +++ b/melodies_monet/tests/test_get_data_cli.py @@ -4,6 +4,7 @@ """ Check for consistency with the tutorial datasets and that options work. """ +import os import subprocess import numpy as np @@ -30,6 +31,8 @@ ds0_aeronet = xr.open_dataset(fetch_example("aeronet:2019-09")) ds0_airnow = xr.open_dataset(fetch_example("airnow:2019-09")) +have_openaq_api_key = len(os.environ.get("OPENAQ_API_KEY", "")) > 0 + def test_get_aeronet(tmp_path): fn = "x.nc" @@ -173,3 +176,47 @@ def test_get_aqs_hourly(tmp_path): for v in ds.data_vars if ds[v].dims == ("time", "y", "x") } == {"OZONE", "time_local"} + + +@pytest.mark.skipif(not have_openaq_api_key, reason="OPENAQ_API_KEY not set") +def test_get_openaq(tmp_path): + fn = "x.nc" + cmd = [ + "melodies-monet", "get-openaq", + "-s", "2024-09-10", "-e" "2024-09-10 00:59", + "--dst", tmp_path.as_posix(), "-o", fn, + ] + subprocess.run(cmd, check=True) + + ds = xr.open_dataset(tmp_path / fn) + + assert ds.time.size == 1 + assert { + v + for v in ds.data_vars + if ds[v].dims == ("time", "y", "x") + } == {"o3", "pm25", "pm10", "time_local"} + assert (ds.sensor_type == "reference grade").all() + + +@pytest.mark.skipif(not have_openaq_api_key, reason="OPENAQ_API_KEY not set") +def test_get_openaq_lowcost(tmp_path): + fn = "x.nc" + cmd = [ + "melodies-monet", "get-openaq", + "-s", "2024-09-10", "-e" "2024-09-10 00:59", + "-p", "pm25", + "--no-reference-grade", "--low-cost", + "--dst", tmp_path.as_posix(), "-o", fn, + ] + subprocess.run(cmd, check=True) + + ds = xr.open_dataset(tmp_path / fn) + + assert ds.time.size == 1 + assert { + v + for v in ds.data_vars + if ds[v].dims == ("time", "y", "x") + } == {"pm25", "time_local"} + assert (ds.sensor_type == "low-cost sensor").all()