Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CLI: initial AQS support #249

Merged
merged 11 commits into from
May 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
245 changes: 245 additions & 0 deletions melodies_monet/_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import time
from contextlib import contextmanager
from pathlib import Path
from typing import List

try:
import typer
Expand Down Expand Up @@ -914,6 +915,250 @@ def get_utc_offset(*, lat, lon):
ds.to_netcdf(dst / out_name)


@app.command()
def get_aqs(
start_date: str = typer.Option(..., "-s", "--start-date", help=f"Start date. {_DATE_FMT_NOTE}"),
end_date: str = typer.Option(..., "-e", "--end-date", help=f"End date. {_DATE_FMT_NOTE} {_DATE_END_NOTE}"),
daily: bool = typer.Option(False, help=(
"Whether to retrieve the daily averaged data product. "
"By default, the hourly data is fetched."
)
),
param: List[str] = typer.Option(["O3", "PM2.5", "PM10"], "-p", "--params", help=(
"Parameter groups. "
"Use '-p' more than once to get multiple groups. "
"Other examples: 'SPEC' (speciated PM2.5), 'PM10SPEC' (speciated PM10), "
"'VOC', 'NONOxNOy', 'SO2', 'NO2', 'CO', 'PM2.5_FRM'."
)
),
# TODO: add network selection option once working in monetio
out_name: str = typer.Option(None, "-o",
help=(
"Output file name (or full/relative path). "
"By default the name is generated like 'AQS_<start-date>_<end-date>.nc'."
)
),
dst: Path = typer.Option(".", "-d", "--dst", help=(
"Destination directory (to control output location "
"if using default output file name)."
)
),
compress: bool = typer.Option(True, help=(
"If true, pack float to int and apply compression using zlib with complevel 7. "
"This can take time if the dataset is large, but can lead to "
"significant space savings."
)
),
num_workers: int = typer.Option(1, "-n", "--num-workers", help="Number of download workers."),
verbose: bool = typer.Option(False),
debug: bool = typer.Option(
False, "--debug/", help="Print more messages (including full tracebacks)."
),
):
"""Download EPA AQS data using monetio and reformat for MM usage.

These are archived data, stored in per-year per-parameter-group files, described at
https://aqs.epa.gov/aqsweb/airdata/download_files.html

Recent-past data are generally not available from this source.
"""
import warnings

import monetio as mio
import pandas as pd

from .util.write_util import write_ncf

global DEBUG

DEBUG = debug

if verbose:
from dask.diagnostics import ProgressBar

ProgressBar().register()

typer.echo(HEADER)

start_date = pd.Timestamp(start_date)
end_date = pd.Timestamp(end_date)
dates = pd.date_range(start_date, end_date, freq="H" if not daily else "D")
if verbose:
print("Dates:")
print(dates)
print("Params:")
print(param)

# Set destination and file name
fmt = r"%Y%m%d"
if out_name is None:
out_name = f"AQS_{start_date:{fmt}}_{end_date:{fmt}}.nc"
else:
p = Path(out_name)
if p.name == out_name:
# `out_name` is just the file name
out_name = p.name
else:
# `out_name` has path
if dst != Path("."):
typer.echo(f"warning: overriding `dst` setting {dst.as_posix()!r} with `out_name` {p.as_posix()!r}")
dst = p.parent
out_name = p.name

with _timer("Fetching data with monetio"):
with warnings.catch_warnings():
warnings.filterwarnings(
"ignore",
message="The (error|warn)_bad_lines argument has been deprecated"
)
try:
df = mio.aqs.add_data(
dates,
param=param,
daily=daily,
network=None,
download=False,
local=False,
wide_fmt=True, # column for each variable
n_procs=num_workers,
meta=False, # TODO: enable or add option once monetio fixes released
)
except KeyError as e:
if daily and str(e) == "'time'":
typer.echo("Note that the daily option currently requires monetio >0.2.5")
raise

if not daily:
with _timer("Fetching site metadata"):
# Need UTC offset in order to compute local time
# But currently the `meta=True` option doesn't work
meta0 = pd.read_csv(
"https://aqs.epa.gov/aqsweb/airdata/aqs_sites.zip",
encoding="ISO-8859-1",
usecols=[0, 1, 2, 17],
dtype=str,
)
meta = (
meta0.copy()
.assign(
siteid=meta0["State Code"] + meta0["County Code"] + meta0["Site Number"],
utcoffset=meta0["GMT Offset"].astype(int),
)
.drop(
columns=["State Code", "County Code", "Site Number", "GMT Offset"],
)
)

with _timer("Forming xarray Dataset"):
# Select requested time period (older monetio doesn't do this)
df = df[df.time.between(dates[0], dates[-1], inclusive="both")]

df = df.dropna(subset=["latitude", "longitude"])

# Variables associated with a measurement,
# currently not properly useful in the wide format.
if daily:
v_vns = [
"parameter_code",
"poc",
"parameter_name",
"sample_duration",
"pollutant_standard",
"event_type",
"observation_count",
"observation_percent",
"1st_max_value",
"1st_max_hour",
"aqi",
"method_code",
"method_name",
]
else:
v_vns = [
"parameter_code",
"poc", # parameter occurrence code
"parameter_name",
"mdl", # method detection limit
"uncertainty",
"method_type",
"method_code",
"method_name",
]
df = df.drop(columns=v_vns).drop_duplicates()
# TODO: may be better to get long fmt and drop these first and then pivot
# TODO: option to average duplicate measurements at same site instead of keeping first?
if "datum" in df:
df = df.drop(columns=["datum"])

site_vns = [
"siteid",
"state_code",
"county_code",
"site_num",
"latitude",
"longitude",
]
if daily:
site_vns.extend(["local_site_name", "address", "city_name", "msa_name"])
# NOTE: time_local not included since it varies in time as well
if not daily:
df = df.merge(meta, on="siteid", how="left")
site_vns.append("utcoffset")

ds_site = (
df[site_vns]
.groupby("siteid")
.first()
.to_xarray()
.swap_dims(siteid="x")
)

# Extract units info so we can add as attrs
unit_suff = "_unit"
unit_cols = [n for n in df.columns if n.endswith(unit_suff)]
assert (df[unit_cols].nunique() == 1).all()
units = df[unit_cols][~df[unit_cols].isnull()].iloc[0].to_dict()

cols = [n for n in df.columns if not n.endswith(unit_suff)]
ds = (
df[cols]
.drop(columns=[vn for vn in site_vns if vn != "siteid"])
.drop_duplicates(["time", "siteid"], keep="first")
.set_index(["time", "siteid"])
.to_xarray()
.swap_dims(siteid="x")
.merge(ds_site)
.set_coords(["latitude", "longitude"])
.assign(x=range(ds_site.dims["x"]))
)

# Add units
for k, u in units.items():
vn = k[:-len(unit_suff)]
ds[vn].attrs.update(units=u)

# Fill in local time array
# (in the df, not all sites have rows for all times, so we have NaTs at this point)
if not daily:
ds["time_local"] = ds.time + ds.utcoffset.astype("timedelta64[h]")

# Expand
ds = (
ds
.expand_dims("y")
.transpose("time", "y", "x")
)

# Can't have `/` in variable name for netCDF
to_rename = [vn for vn in ds.data_vars if "/" in vn]
ds = ds.rename_vars({vn: vn.replace("/", "_") for vn in to_rename})

with _timer("Writing netCDF file"):
if compress:
write_ncf(ds, dst / out_name, verbose=verbose)
else:
ds.to_netcdf(dst / out_name)


cli = app

Expand Down
41 changes: 41 additions & 0 deletions melodies_monet/tests/test_get_data_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,44 @@ def test_get_ish_box(tmp_path):

assert ds.time.size == 24
assert np.unique(ds.state) == ["CO"]


def test_get_aqs_daily(tmp_path):
fn = "x.nc"
cmd = [
"melodies-monet", "get-aqs",
"-s", "2019-08-01", "-e", "2019-08-02",
"-p", "O3",
"--daily",
"--dst", tmp_path.as_posix(), "-o", fn,
]
subprocess.run(cmd, check=True)

ds = xr.open_dataset(tmp_path / fn)

assert ds.time.size == 2, "two days"
assert {
v
for v in ds.data_vars
if ds[v].dims == ("time", "y", "x")
} == {"OZONE"}


def test_get_aqs_hourly(tmp_path):
fn = "x.nc"
cmd = [
"melodies-monet", "get-aqs",
"-s", "1980-08-01", "-e", "1980-08-01 23:00",
"-p", "O3",
"--dst", tmp_path.as_posix(), "-o", fn,
]
subprocess.run(cmd, check=True)

ds = xr.open_dataset(tmp_path / fn)

assert ds.time.size == 24, "one day"
assert {
v
for v in ds.data_vars
if ds[v].dims == ("time", "y", "x")
} == {"OZONE", "time_local"}
Loading