Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CLI: get-openaq #270

Merged
merged 16 commits into from
Nov 5, 2024
4 changes: 4 additions & 0 deletions docs/cli.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ any Python code::
* |get-aqs|_ -- get AQS data
* |get-ish|_ -- get ISH data
* |get-ish-lite|_ -- get ISH-Lite data
* |get-openaq|_ -- get OpenAQ data

.. |run| replace:: ``run``
.. _run: #melodies-monet-run
Expand All @@ -37,6 +38,9 @@ any Python code::
.. |get-ish-lite| replace:: ``get-ish-lite``
.. _get-ish-lite: #melodies-monet-get-ish-lite

.. |get-openaq| replace:: ``get-openaq``
.. _get-openaq: #melodies-monet-get-openaq

.. click:: melodies_monet._cli:_typer_click_object
:prog: melodies-monet
:nested: full
251 changes: 248 additions & 3 deletions melodies_monet/_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,17 @@
"""
melodies-monet -- MELODIES MONET CLI
"""
import os
import time
from contextlib import contextmanager
from pathlib import Path
from typing import List
from typing import List, Tuple

_LOGGING_LEVEL = os.environ.get("MM_LOGGING_LEVEL", None)
if _LOGGING_LEVEL is not None:
import logging

logging.basicConfig(level=_LOGGING_LEVEL.upper())

try:
import typer
Expand All @@ -20,8 +27,6 @@
)
raise SystemExit(1)

from typing import Tuple

DEBUG = False
INFO_COLOR = typer.colors.CYAN
ERROR_COLOR = typer.colors.BRIGHT_RED
Expand Down Expand Up @@ -1160,6 +1165,246 @@ def get_aqs(
ds.to_netcdf(dst / out_name)


@app.command()
def get_openaq(
start_date: str = typer.Option(..., "-s", "--start-date", help=f"Start date. {_DATE_FMT_NOTE}"),
end_date: str = typer.Option(..., "-e", "--end-date", help=f"End date. {_DATE_FMT_NOTE} {_DATE_END_NOTE}"),
out_name: str = typer.Option(None, "-o",
help=(
"Output file name (or full/relative path). "
"By default the name is generated like 'OpenAQ_<start-date>_<end-date>.nc'."
)
),
dst: Path = typer.Option(".", "-d", "--dst", help=(
"Destination directory (to control output location "
"if using default output file name)."
)
),
param: List[str] = typer.Option(["o3", "pm25", "pm10"], "-p", "--params", help=(
zmoon marked this conversation as resolved.
Show resolved Hide resolved
"Parameters. "
"Use '-p' more than once to get multiple parameters. "
"Other examples: 'no', 'no2', 'nox', 'so2', 'co', 'bc'. "
"Only applicable to the web API methods ('api-v2')."
)
),
reference_grade: bool = typer.Option(True, help="Include reference-grade sensors."),
low_cost: bool = typer.Option(False, help="Include low-cost sensors."),
method: str = typer.Option("api-v2", "-m", "--method", help=(
"Method (reader) to use for fetching data. "
"Options: 'api-v2', 'openaq-fetches'."
)
),
compress: bool = typer.Option(True, help=(
"If true, pack float to int and apply compression using zlib with complevel 7. "
"This can take time if the dataset is large, but can lead to "
"significant space savings."
)
),
num_workers: int = typer.Option(1, "-n", "--num-workers", help="Number of download workers."),
verbose: bool = typer.Option(False),
debug: bool = typer.Option(
False, "--debug/", help="Print more messages (including full tracebacks)."
),
):
"""Download hourly OpenAQ data using monetio and reformat for MM usage."""
import warnings

import monetio as mio
import pandas as pd

from .util.write_util import write_ncf

global DEBUG

DEBUG = debug

typer.echo(HEADER)

if method not in {"api-v2", "openaq-fetches"}:
typer.secho(f"Error: method {method!r} not recognized", fg=ERROR_COLOR)
raise typer.Exit(2)

start_date = pd.Timestamp(start_date)
end_date = pd.Timestamp(end_date)

if method in {"openaq-fetches"}:
dates = pd.date_range(start_date, end_date, freq="D")
elif method in {"api-v2"}:
dates = pd.date_range(start_date, end_date, freq="H")
zmoon marked this conversation as resolved.
Show resolved Hide resolved
else:
raise AssertionError
if verbose:
print("Dates:")
print(dates)

if verbose and method in {"api-v2"}:
print("Params:", param)

# Set destination and file name
fmt = r"%Y%m%d"
if out_name is None:
out_name = f"OpenAQ_{start_date:{fmt}}_{end_date:{fmt}}.nc"
else:
p = Path(out_name)
if p.name == out_name:
# `out_name` is just the file name
out_name = p.name
else:
# `out_name` has path
if dst != Path("."):
typer.echo(f"warning: overriding `dst` setting {dst.as_posix()!r} with `out_name` {p.as_posix()!r}")
dst = p.parent
out_name = p.name

sensor_types = []
if reference_grade:
sensor_types.append("reference grade")
if low_cost:
sensor_types.append("low-cost sensor")
if not sensor_types and method in {"api-v2"}:
typer.secho(
"Error: no sensor types selected. Use --reference-grade and/or --low-cost",
fg=ERROR_COLOR,
)
raise typer.Exit(2)

if verbose and method in {"openaq-fetches"}:
from dask.diagnostics import ProgressBar

ProgressBar().register()

with _timer("Fetching data with monetio"):
if method == "openaq-fetches":
with warnings.catch_warnings():
warnings.filterwarnings(
"ignore",
message="The (error|warn)_bad_lines argument has been deprecated"
)
df = mio.openaq.add_data(
dates,
n_procs=num_workers,
# wide_fmt=True,
)

# Address time-wise non-unique site IDs
# Some (most?) are just slightly different lat/lon
# But seems like a few are actual time-wise lat/lon duplicates
df = df.drop_duplicates(["time", "siteid"])

elif method == "api-v2":
df = mio.obs.openaq_v2.add_data(
dates,
parameters=param,
sensor_type=sensor_types,
wide_fmt=True,
timeout=60,
retry=15,
threads=num_workers if num_workers > 1 else None,
)

dupes = df[df.duplicated(["time", "siteid"], keep=False)]
if not dupes.empty:
typer.echo(
f"warning: {len(dupes)} unexpected time-siteid duplicated rows:"
)
if verbose:
typer.echo(dupes)
df = df.drop_duplicates(["time", "siteid"])
else:
raise AssertionError

# Drop times not on the hour
good = df.time == df.time.dt.floor("H")
typer.echo(f"Dropping {(~good).sum()}/{len(good)} rows that aren't on the hour.")
df = df[good]

with _timer("Forming xarray Dataset"):
df = df.drop(columns=["index"], errors="ignore")
df = df.dropna(subset=["latitude", "longitude"])

if method == "openaq-fetches":
site_vns = [
"siteid", # based on country and lat/lon
"latitude",
"longitude",
"utcoffset",
#
"city",
"country", # 2-char codes
#
"sourceName",
"sourceType", # "government"
]
# NOTE: time_local not included since it varies in time as well
elif method == "api-v2":
site_vns = [
"siteid", # real OpenAQ location ID
"latitude",
"longitude",
"utcoffset",
#
"location",
"city",
"country",
#
"entity",
"sensor_type",
"is_mobile",
"is_analysis",
]
for vn in ["city", "is_analysis"]: # may have been dropped for being all null
if vn not in df.columns:
site_vns.remove(vn)

else:
raise AssertionError

ds_site = (
df[site_vns]
.groupby("siteid")
.first()
.to_xarray()
.swap_dims(siteid="x")
)

ds = (
df.drop(columns=[vn for vn in site_vns if vn not in ["siteid"]])
.set_index(["time", "siteid"])
.to_xarray()
.swap_dims(siteid="x")
.merge(ds_site)
.set_coords(["latitude", "longitude"])
.assign(x=range(ds_site.dims["x"]))
)

# Rename species vars and add units as attr
nice_us = {"ppm": "ppmv", "ugm3": "ug m-3", "ppb": "pbbv"}
for vn0 in [n for n in df.columns if n.endswith(("_ppm", "ppb", "_ugm3", "_umg3"))]:
i_last_underscore = vn0.rfind("_")
vn, u = vn0[:i_last_underscore], vn0[i_last_underscore + 1:]
if u == "umg3":
u = "ugm3"
nice_u = nice_us[u]
ds[vn0].attrs.update(units=nice_u)
ds = ds.rename_vars({vn0: vn})

# Fill in local time array
# (in the df, not all sites have rows for all times, so we have NaTs at this point)
ds["time_local"] = ds.time + ds.utcoffset

# Expand
ds = (
ds
.expand_dims("y")
.transpose("time", "y", "x")
)

with _timer("Writing netCDF file"):
if compress:
write_ncf(ds, dst / out_name, verbose=verbose)
else:
ds.to_netcdf(dst / out_name)

cli = app

_typer_click_object = typer.main.get_command(app) # for sphinx-click in docs
Expand Down
47 changes: 47 additions & 0 deletions melodies_monet/tests/test_get_data_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"""
Check for consistency with the tutorial datasets and that options work.
"""
import os
import subprocess

import numpy as np
Expand All @@ -30,6 +31,8 @@
ds0_aeronet = xr.open_dataset(fetch_example("aeronet:2019-09"))
ds0_airnow = xr.open_dataset(fetch_example("airnow:2019-09"))

have_openaq_api_key = len(os.environ.get("OPENAQ_API_KEY", "")) > 0


def test_get_aeronet(tmp_path):
fn = "x.nc"
Expand Down Expand Up @@ -173,3 +176,47 @@ def test_get_aqs_hourly(tmp_path):
for v in ds.data_vars
if ds[v].dims == ("time", "y", "x")
} == {"OZONE", "time_local"}


@pytest.mark.skipif(not have_openaq_api_key, reason="OPENAQ_API_KEY not set")
def test_get_openaq(tmp_path):
fn = "x.nc"
cmd = [
"melodies-monet", "get-openaq",
"-s", "2024-09-10", "-e" "2024-09-10 00:59",
"--dst", tmp_path.as_posix(), "-o", fn,
]
subprocess.run(cmd, check=True)

ds = xr.open_dataset(tmp_path / fn)

assert ds.time.size == 1
assert {
v
for v in ds.data_vars
if ds[v].dims == ("time", "y", "x")
} == {"o3", "pm25", "pm10", "time_local"}
assert (ds.sensor_type == "reference grade").all()


@pytest.mark.skipif(not have_openaq_api_key, reason="OPENAQ_API_KEY not set")
def test_get_openaq_lowcost(tmp_path):
fn = "x.nc"
cmd = [
"melodies-monet", "get-openaq",
"-s", "2024-09-10", "-e" "2024-09-10 00:59",
"-p", "pm25",
"--no-reference-grade", "--low-cost",
"--dst", tmp_path.as_posix(), "-o", fn,
]
subprocess.run(cmd, check=True)

ds = xr.open_dataset(tmp_path / fn)

assert ds.time.size == 1
assert {
v
for v in ds.data_vars
if ds[v].dims == ("time", "y", "x")
} == {"pm25", "time_local"}
assert (ds.sensor_type == "low-cost sensor").all()