diff --git a/melodies_monet/_cli.py b/melodies_monet/_cli.py index b4f49904..11244f77 100644 --- a/melodies_monet/_cli.py +++ b/melodies_monet/_cli.py @@ -7,6 +7,7 @@ import time from contextlib import contextmanager from pathlib import Path +from typing import List try: import typer @@ -914,6 +915,250 @@ def get_utc_offset(*, lat, lon): ds.to_netcdf(dst / out_name) +@app.command() +def get_aqs( + start_date: str = typer.Option(..., "-s", "--start-date", help=f"Start date. {_DATE_FMT_NOTE}"), + end_date: str = typer.Option(..., "-e", "--end-date", help=f"End date. {_DATE_FMT_NOTE} {_DATE_END_NOTE}"), + daily: bool = typer.Option(False, help=( + "Whether to retrieve the daily averaged data product. " + "By default, the hourly data is fetched." + ) + ), + param: List[str] = typer.Option(["O3", "PM2.5", "PM10"], "-p", "--params", help=( + "Parameter groups. " + "Use '-p' more than once to get multiple groups. " + "Other examples: 'SPEC' (speciated PM2.5), 'PM10SPEC' (speciated PM10), " + "'VOC', 'NONOxNOy', 'SO2', 'NO2', 'CO', 'PM2.5_FRM'." + ) + ), + # TODO: add network selection option once working in monetio + out_name: str = typer.Option(None, "-o", + help=( + "Output file name (or full/relative path). " + "By default the name is generated like 'AQS__.nc'." + ) + ), + dst: Path = typer.Option(".", "-d", "--dst", help=( + "Destination directory (to control output location " + "if using default output file name)." + ) + ), + compress: bool = typer.Option(True, help=( + "If true, pack float to int and apply compression using zlib with complevel 7. " + "This can take time if the dataset is large, but can lead to " + "significant space savings." + ) + ), + num_workers: int = typer.Option(1, "-n", "--num-workers", help="Number of download workers."), + verbose: bool = typer.Option(False), + debug: bool = typer.Option( + False, "--debug/", help="Print more messages (including full tracebacks)." + ), +): + """Download EPA AQS data using monetio and reformat for MM usage. + + These are archived data, stored in per-year per-parameter-group files, described at + https://aqs.epa.gov/aqsweb/airdata/download_files.html + + Recent-past data are generally not available from this source. + """ + import warnings + + import monetio as mio + import pandas as pd + + from .util.write_util import write_ncf + + global DEBUG + + DEBUG = debug + + if verbose: + from dask.diagnostics import ProgressBar + + ProgressBar().register() + + typer.echo(HEADER) + + start_date = pd.Timestamp(start_date) + end_date = pd.Timestamp(end_date) + dates = pd.date_range(start_date, end_date, freq="H" if not daily else "D") + if verbose: + print("Dates:") + print(dates) + print("Params:") + print(param) + + # Set destination and file name + fmt = r"%Y%m%d" + if out_name is None: + out_name = f"AQS_{start_date:{fmt}}_{end_date:{fmt}}.nc" + else: + p = Path(out_name) + if p.name == out_name: + # `out_name` is just the file name + out_name = p.name + else: + # `out_name` has path + if dst != Path("."): + typer.echo(f"warning: overriding `dst` setting {dst.as_posix()!r} with `out_name` {p.as_posix()!r}") + dst = p.parent + out_name = p.name + + with _timer("Fetching data with monetio"): + with warnings.catch_warnings(): + warnings.filterwarnings( + "ignore", + message="The (error|warn)_bad_lines argument has been deprecated" + ) + try: + df = mio.aqs.add_data( + dates, + param=param, + daily=daily, + network=None, + download=False, + local=False, + wide_fmt=True, # column for each variable + n_procs=num_workers, + meta=False, # TODO: enable or add option once monetio fixes released + ) + except KeyError as e: + if daily and str(e) == "'time'": + typer.echo("Note that the daily option currently requires monetio >0.2.5") + raise + + if not daily: + with _timer("Fetching site metadata"): + # Need UTC offset in order to compute local time + # But currently the `meta=True` option doesn't work + meta0 = pd.read_csv( + "https://aqs.epa.gov/aqsweb/airdata/aqs_sites.zip", + encoding="ISO-8859-1", + usecols=[0, 1, 2, 17], + dtype=str, + ) + meta = ( + meta0.copy() + .assign( + siteid=meta0["State Code"] + meta0["County Code"] + meta0["Site Number"], + utcoffset=meta0["GMT Offset"].astype(int), + ) + .drop( + columns=["State Code", "County Code", "Site Number", "GMT Offset"], + ) + ) + + with _timer("Forming xarray Dataset"): + # Select requested time period (older monetio doesn't do this) + df = df[df.time.between(dates[0], dates[-1], inclusive="both")] + + df = df.dropna(subset=["latitude", "longitude"]) + + # Variables associated with a measurement, + # currently not properly useful in the wide format. + if daily: + v_vns = [ + "parameter_code", + "poc", + "parameter_name", + "sample_duration", + "pollutant_standard", + "event_type", + "observation_count", + "observation_percent", + "1st_max_value", + "1st_max_hour", + "aqi", + "method_code", + "method_name", + ] + else: + v_vns = [ + "parameter_code", + "poc", # parameter occurrence code + "parameter_name", + "mdl", # method detection limit + "uncertainty", + "method_type", + "method_code", + "method_name", + ] + df = df.drop(columns=v_vns).drop_duplicates() + # TODO: may be better to get long fmt and drop these first and then pivot + # TODO: option to average duplicate measurements at same site instead of keeping first? + if "datum" in df: + df = df.drop(columns=["datum"]) + + site_vns = [ + "siteid", + "state_code", + "county_code", + "site_num", + "latitude", + "longitude", + ] + if daily: + site_vns.extend(["local_site_name", "address", "city_name", "msa_name"]) + # NOTE: time_local not included since it varies in time as well + if not daily: + df = df.merge(meta, on="siteid", how="left") + site_vns.append("utcoffset") + + ds_site = ( + df[site_vns] + .groupby("siteid") + .first() + .to_xarray() + .swap_dims(siteid="x") + ) + + # Extract units info so we can add as attrs + unit_suff = "_unit" + unit_cols = [n for n in df.columns if n.endswith(unit_suff)] + assert (df[unit_cols].nunique() == 1).all() + units = df[unit_cols][~df[unit_cols].isnull()].iloc[0].to_dict() + + cols = [n for n in df.columns if not n.endswith(unit_suff)] + ds = ( + df[cols] + .drop(columns=[vn for vn in site_vns if vn != "siteid"]) + .drop_duplicates(["time", "siteid"], keep="first") + .set_index(["time", "siteid"]) + .to_xarray() + .swap_dims(siteid="x") + .merge(ds_site) + .set_coords(["latitude", "longitude"]) + .assign(x=range(ds_site.dims["x"])) + ) + + # Add units + for k, u in units.items(): + vn = k[:-len(unit_suff)] + ds[vn].attrs.update(units=u) + + # Fill in local time array + # (in the df, not all sites have rows for all times, so we have NaTs at this point) + if not daily: + ds["time_local"] = ds.time + ds.utcoffset.astype("timedelta64[h]") + + # Expand + ds = ( + ds + .expand_dims("y") + .transpose("time", "y", "x") + ) + + # Can't have `/` in variable name for netCDF + to_rename = [vn for vn in ds.data_vars if "/" in vn] + ds = ds.rename_vars({vn: vn.replace("/", "_") for vn in to_rename}) + + with _timer("Writing netCDF file"): + if compress: + write_ncf(ds, dst / out_name, verbose=verbose) + else: + ds.to_netcdf(dst / out_name) + cli = app diff --git a/melodies_monet/tests/test_get_data_cli.py b/melodies_monet/tests/test_get_data_cli.py index 515e7e9c..a61289d5 100644 --- a/melodies_monet/tests/test_get_data_cli.py +++ b/melodies_monet/tests/test_get_data_cli.py @@ -114,3 +114,44 @@ def test_get_ish_box(tmp_path): assert ds.time.size == 24 assert np.unique(ds.state) == ["CO"] + + +def test_get_aqs_daily(tmp_path): + fn = "x.nc" + cmd = [ + "melodies-monet", "get-aqs", + "-s", "2019-08-01", "-e", "2019-08-02", + "-p", "O3", + "--daily", + "--dst", tmp_path.as_posix(), "-o", fn, + ] + subprocess.run(cmd, check=True) + + ds = xr.open_dataset(tmp_path / fn) + + assert ds.time.size == 2, "two days" + assert { + v + for v in ds.data_vars + if ds[v].dims == ("time", "y", "x") + } == {"OZONE"} + + +def test_get_aqs_hourly(tmp_path): + fn = "x.nc" + cmd = [ + "melodies-monet", "get-aqs", + "-s", "1980-08-01", "-e", "1980-08-01 23:00", + "-p", "O3", + "--dst", tmp_path.as_posix(), "-o", fn, + ] + subprocess.run(cmd, check=True) + + ds = xr.open_dataset(tmp_path / fn) + + assert ds.time.size == 24, "one day" + assert { + v + for v in ds.data_vars + if ds[v].dims == ("time", "y", "x") + } == {"OZONE", "time_local"}