From f7248dd2e39da4719e4cd17ff788b3f887fc7b4b Mon Sep 17 00:00:00 2001 From: zmoon Date: Wed, 6 Sep 2023 10:12:01 -0600 Subject: [PATCH 01/14] Start OpenAQ CLI based on AirNow's --- melodies_monet/_cli.py | 156 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 156 insertions(+) diff --git a/melodies_monet/_cli.py b/melodies_monet/_cli.py index 28d43a99..8d16d9c5 100644 --- a/melodies_monet/_cli.py +++ b/melodies_monet/_cli.py @@ -471,6 +471,162 @@ def get_airnow( ds.to_netcdf(dst / out_name) +@app.command() +def get_openaq( + start_date: str = typer.Option(..., "-s", "--start-date", help=f"Start date. {_DATE_FMT_NOTE}"), + end_date: str = typer.Option(..., "-e", "--end-date", help=f"End date. {_DATE_FMT_NOTE} {_DATE_END_NOTE}"), + out_name: str = typer.Option(None, "-o", + help=( + "Output file name (or full/relative path). " + "By default the name is generated like 'OpenAQ__.nc'." + ) + ), + dst: Path = typer.Option(".", "-d", "--dst", help=( + "Destination directory (to control output location " + "if using default output file name)." + ) + ), + compress: bool = typer.Option(True, help=( + "If true, pack float to int and apply compression using zlib with complevel 7. " + "This can take time if the dataset is large, but can lead to " + "significant space savings." + ) + ), + num_workers: int = typer.Option(1, "-n", "--num-workers", help="Number of download workers."), + verbose: bool = typer.Option(False), + debug: bool = typer.Option( + False, "--debug/", help="Print more messages (including full tracebacks)." + ), +): + """Download OpenAQ data using monetio and reformat for MM usage.""" + import warnings + + import monetio as mio + import pandas as pd + + from .util.write_util import write_ncf + + global DEBUG + + DEBUG = debug + + typer.echo(HEADER) + + start_date = pd.Timestamp(start_date) + end_date = pd.Timestamp(end_date) + dates = pd.date_range(start_date, end_date, freq="D") + if verbose: + print("Dates:") + print(dates) + + # Set destination and file name + fmt = r"%Y%m%d" + if out_name is None: + out_name = f"OpenAQ_{start_date:{fmt}}_{end_date:{fmt}}.nc" + else: + p = Path(out_name) + if p.name == out_name: + # `out_name` is just the file name + out_name = p.name + else: + # `out_name` has path + if dst != Path("."): + typer.echo(f"warning: overriding `dst` setting {dst.as_posix()!r} with `out_name` {p.as_posix()!r}") + dst = p.parent + out_name = p.name + + with _timer("Fetching data with monetio"): + with warnings.catch_warnings(): + warnings.filterwarnings( + "ignore", + message="The (error|warn)_bad_lines argument has been deprecated" + ) + df = mio.openaq.add_data( + dates, + n_procs=num_workers, + ) + + with _timer("Forming xarray Dataset"): + df = df.dropna(subset=["latitude", "longitude"]) + + site_vns = [ + "site", + "siteid", + "utcoffset", + "latitude", + "longitude", + "cmsa_name", + "msa_code", + "msa_name", + "state_name", + "epa_region", + ] + # NOTE: time_local not included since it varies in time as well + + # site_vn_str = [ + # "site", # site name + # "siteid", # site code (9 or 12 digits/chars) + # # + # "cmsa_name", + # "msa_code", + # "msa_name", + # "state_name", + # "epa_region", + # ] + + # df[site_vn_str] = df[site_vn_str].astype("string") + + ds_site = ( + df[site_vns] + # .replace(["", " ", None], pd.NA) # TODO: monetio should do? + .groupby("siteid") + .first() + .to_xarray() + .swap_dims(siteid="x") + ) + + # Extract units info so we can add as attrs + unit_suff = "_unit" + unit_cols = [n for n in df.columns if n.endswith(unit_suff)] + assert (df[unit_cols].nunique() == 1).all() + units = df[unit_cols][~df[unit_cols].isnull()].iloc[0].to_dict() + + cols = [n for n in df.columns if not n.endswith(unit_suff)] + ds = ( + df[cols] + .set_index(["time", "siteid"]) + .to_xarray() + .swap_dims(siteid="x") + .drop_vars(site_vns) + .merge(ds_site) + .set_coords(["latitude", "longitude"]) + .assign(x=range(ds_site.dims["x"])) + ) + + # Add units + for k, u in units.items(): + vn = k[:-len(unit_suff)] + ds[vn].attrs.update(units=u) + + # Fill in local time array + # (in the df, not all sites have rows for all times, so we have NaTs at this point) + ds["time_local"] = ds.time + ds.utcoffset.astype("timedelta64[h]") + + # Expand + ds = ( + ds + .expand_dims("y") + .transpose("time", "y", "x") + ) + + with _timer("Writing netCDF file"): + if compress: + write_ncf(ds, dst / out_name, verbose=verbose) + else: + ds.to_netcdf(dst / out_name) + + + cli = app _typer_click_object = typer.main.get_command(app) # for sphinx-click in docs From 818bb04be57f33ccfb9713304d8ab1d4e229be46 Mon Sep 17 00:00:00 2001 From: zmoon Date: Wed, 6 Sep 2023 12:29:51 -0600 Subject: [PATCH 02/14] WIP: OpenAQ CLI mostly working --- melodies_monet/_cli.py | 111 ++++++++++++++++++++++++----------------- 1 file changed, 64 insertions(+), 47 deletions(-) diff --git a/melodies_monet/_cli.py b/melodies_monet/_cli.py index 8d16d9c5..49e8892f 100644 --- a/melodies_monet/_cli.py +++ b/melodies_monet/_cli.py @@ -536,81 +536,98 @@ def get_openaq( out_name = p.name with _timer("Fetching data with monetio"): - with warnings.catch_warnings(): - warnings.filterwarnings( - "ignore", - message="The (error|warn)_bad_lines argument has been deprecated" - ) - df = mio.openaq.add_data( - dates, - n_procs=num_workers, - ) + # with warnings.catch_warnings(): + # warnings.filterwarnings( + # "ignore", + # message="The (error|warn)_bad_lines argument has been deprecated" + # ) + # df = mio.openaq.add_data( + # dates, + # n_procs=num_workers, + # ) + + # FIXME: local testing only + df = pd.read_csv("openaq_2019-08.csv.gz", index_col=0, parse_dates=["time", "time_local"]) + df["utcoffset"] = pd.to_timedelta(df["utcoffset"]) # str in the CSV + + # Remove dates outside of requested range + # TODO: fix in monetio? + good = (df.time >= start_date) & (df.time <= end_date) + df = df[good] + + # Address time-wise non-unique site IDs + # Some (most?) are just slightly different lat/lon + # But seems like a few are actual time-wise lat/lon duplicates + # TODO: fix in monetio (maybe OpenAQ *has* site IDs? or can just make them up) + df = df.drop_duplicates(["time", "siteid"]) with _timer("Forming xarray Dataset"): + df = df.drop(columns=["index"], errors="ignore") df = df.dropna(subset=["latitude", "longitude"]) + # ['index', + # 'time', + # 'latitude', + # 'longitude', + # 'sourceName', + # 'sourceType', + # 'city', + # 'country', + # 'utcoffset', + # 'bc_umg3', # TODO: should be "ugm3" + # 'co_ppm', + # 'no2_ppm', + # 'o3_ppm', + # 'pm10_ugm3', + # 'pm25_ugm3', + # 'so2_ppm', + # 'siteid', + # 'time_local'] + site_vns = [ - "site", - "siteid", - "utcoffset", + "siteid", # based on country and lat/lon "latitude", "longitude", - "cmsa_name", - "msa_code", - "msa_name", - "state_name", - "epa_region", + "utcoffset", + "city", + "country", # 2-char codes + "sourceName", + "sourceType", # "government" ] # NOTE: time_local not included since it varies in time as well - # site_vn_str = [ - # "site", # site name - # "siteid", # site code (9 or 12 digits/chars) - # # - # "cmsa_name", - # "msa_code", - # "msa_name", - # "state_name", - # "epa_region", - # ] - - # df[site_vn_str] = df[site_vn_str].astype("string") - ds_site = ( df[site_vns] - # .replace(["", " ", None], pd.NA) # TODO: monetio should do? .groupby("siteid") .first() .to_xarray() .swap_dims(siteid="x") ) - # Extract units info so we can add as attrs - unit_suff = "_unit" - unit_cols = [n for n in df.columns if n.endswith(unit_suff)] - assert (df[unit_cols].nunique() == 1).all() - units = df[unit_cols][~df[unit_cols].isnull()].iloc[0].to_dict() - - cols = [n for n in df.columns if not n.endswith(unit_suff)] ds = ( - df[cols] + df.drop(columns=[vn for vn in site_vns if vn not in ["siteid"]]) .set_index(["time", "siteid"]) .to_xarray() .swap_dims(siteid="x") - .drop_vars(site_vns) .merge(ds_site) .set_coords(["latitude", "longitude"]) .assign(x=range(ds_site.dims["x"])) ) - # Add units - for k, u in units.items(): - vn = k[:-len(unit_suff)] - ds[vn].attrs.update(units=u) + # Rename species vars and add units as attr + nice_us = {"ppm": "ppmv", "ugm3": "ug m-3"} + for vn0 in [n for n in df.columns if n.endswith(("_ppm", "_ugm3", "_umg3"))]: + i_last_underscore = vn0.rfind("_") + vn, u = vn0[:i_last_underscore], vn0[i_last_underscore + 1:] + if u == "umg3": + u = "ugm3" + nice_u = nice_us[u] + ds[vn0].attrs.update(units=nice_u) + ds = ds.rename_vars({vn0: vn}) # Fill in local time array # (in the df, not all sites have rows for all times, so we have NaTs at this point) - ds["time_local"] = ds.time + ds.utcoffset.astype("timedelta64[h]") + ds["time_local"] = ds.time + ds.utcoffset # Expand ds = ( @@ -619,14 +636,14 @@ def get_openaq( .transpose("time", "y", "x") ) + breakpoint() + with _timer("Writing netCDF file"): if compress: write_ncf(ds, dst / out_name, verbose=verbose) else: ds.to_netcdf(dst / out_name) - - cli = app _typer_click_object = typer.main.get_command(app) # for sphinx-click in docs From d4c37ed3b2997e2dd22209f97980be911ca799d0 Mon Sep 17 00:00:00 2001 From: zmoon Date: Wed, 6 Sep 2023 14:24:50 -0600 Subject: [PATCH 03/14] Drop times not on the hour probably comparing to hourly (or lower time res) model output maybe later would be good to have some options here, e.g. nearest or other interp --- melodies_monet/_cli.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/melodies_monet/_cli.py b/melodies_monet/_cli.py index 49e8892f..edb4ac25 100644 --- a/melodies_monet/_cli.py +++ b/melodies_monet/_cli.py @@ -555,6 +555,11 @@ def get_openaq( good = (df.time >= start_date) & (df.time <= end_date) df = df[good] + # Drop times not on the hour + good = df.time == df.time.dt.floor("H") + typer.secho(f"Dropping {(~good).sum()}/{len(good)} rows that aren't on the hour.", fg=INFO_COLOR) + df = df[good] + # Address time-wise non-unique site IDs # Some (most?) are just slightly different lat/lon # But seems like a few are actual time-wise lat/lon duplicates From 82c3f134fd41681e43f2512667d7e131b68f4fc7 Mon Sep 17 00:00:00 2001 From: zmoon Date: Wed, 6 Sep 2023 14:28:04 -0600 Subject: [PATCH 04/14] Progress bar if --verbose --- melodies_monet/_cli.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/melodies_monet/_cli.py b/melodies_monet/_cli.py index edb4ac25..eef80fb2 100644 --- a/melodies_monet/_cli.py +++ b/melodies_monet/_cli.py @@ -535,6 +535,11 @@ def get_openaq( dst = p.parent out_name = p.name + if verbose: + from dask.diagnostics import ProgressBar + + ProgressBar().register() + with _timer("Fetching data with monetio"): # with warnings.catch_warnings(): # warnings.filterwarnings( From 3d1b6dfb8402c9af663f33eb1b2ed1fff851c535 Mon Sep 17 00:00:00 2001 From: zmoon Date: Wed, 6 Sep 2023 14:30:50 -0600 Subject: [PATCH 05/14] cleanup --- melodies_monet/_cli.py | 49 ++++++++++++------------------------------ 1 file changed, 14 insertions(+), 35 deletions(-) diff --git a/melodies_monet/_cli.py b/melodies_monet/_cli.py index eef80fb2..63b5d135 100644 --- a/melodies_monet/_cli.py +++ b/melodies_monet/_cli.py @@ -541,19 +541,19 @@ def get_openaq( ProgressBar().register() with _timer("Fetching data with monetio"): - # with warnings.catch_warnings(): - # warnings.filterwarnings( - # "ignore", - # message="The (error|warn)_bad_lines argument has been deprecated" - # ) - # df = mio.openaq.add_data( - # dates, - # n_procs=num_workers, - # ) - - # FIXME: local testing only - df = pd.read_csv("openaq_2019-08.csv.gz", index_col=0, parse_dates=["time", "time_local"]) - df["utcoffset"] = pd.to_timedelta(df["utcoffset"]) # str in the CSV + with warnings.catch_warnings(): + warnings.filterwarnings( + "ignore", + message="The (error|warn)_bad_lines argument has been deprecated" + ) + df = mio.openaq.add_data( + dates, + n_procs=num_workers, + ) + + # # FIXME: local testing only + # df = pd.read_csv("openaq_2019-08.csv.gz", index_col=0, parse_dates=["time", "time_local"]) + # df["utcoffset"] = pd.to_timedelta(df["utcoffset"]) # str in the CSV # Remove dates outside of requested range # TODO: fix in monetio? @@ -562,7 +562,7 @@ def get_openaq( # Drop times not on the hour good = df.time == df.time.dt.floor("H") - typer.secho(f"Dropping {(~good).sum()}/{len(good)} rows that aren't on the hour.", fg=INFO_COLOR) + typer.echo(f"Dropping {(~good).sum()}/{len(good)} rows that aren't on the hour.") df = df[good] # Address time-wise non-unique site IDs @@ -575,25 +575,6 @@ def get_openaq( df = df.drop(columns=["index"], errors="ignore") df = df.dropna(subset=["latitude", "longitude"]) - # ['index', - # 'time', - # 'latitude', - # 'longitude', - # 'sourceName', - # 'sourceType', - # 'city', - # 'country', - # 'utcoffset', - # 'bc_umg3', # TODO: should be "ugm3" - # 'co_ppm', - # 'no2_ppm', - # 'o3_ppm', - # 'pm10_ugm3', - # 'pm25_ugm3', - # 'so2_ppm', - # 'siteid', - # 'time_local'] - site_vns = [ "siteid", # based on country and lat/lon "latitude", @@ -646,8 +627,6 @@ def get_openaq( .transpose("time", "y", "x") ) - breakpoint() - with _timer("Writing netCDF file"): if compress: write_ncf(ds, dst / out_name, verbose=verbose) From 0f64f03eaed2bc06de39c417aa60a16371ecd90a Mon Sep 17 00:00:00 2001 From: zmoon Date: Wed, 11 Sep 2024 13:36:06 -0500 Subject: [PATCH 06/14] Option to use new reader (on by default) which uses the v2 web API the openaq-fetches S3 bucket is still private --- melodies_monet/_cli.py | 140 ++++++++++++++++++++++++++++++----------- 1 file changed, 103 insertions(+), 37 deletions(-) diff --git a/melodies_monet/_cli.py b/melodies_monet/_cli.py index 63b5d135..de35c5f5 100644 --- a/melodies_monet/_cli.py +++ b/melodies_monet/_cli.py @@ -7,6 +7,7 @@ import time from contextlib import contextmanager from pathlib import Path +from typing import List try: import typer @@ -486,6 +487,18 @@ def get_openaq( "if using default output file name)." ) ), + param: List[str] = typer.Option(["o3", "pm25", "pm10"], "-p", "--params", help=( + "Parameters. " + "Use '-p' more than once to get multiple parameters. " + "Other examples: 'no', 'no2', 'nox', 'so2', 'co', 'bc'. " + "Only applicable to the web API methods ('api-v2')." + ) + ), + method: str = typer.Option("api-v2", "-m", "--method", help=( + "Method (reader) to use for fetching data. " + "Options: 'api-v2', 'openaq-fetches'." + ) + ), compress: bool = typer.Option(True, help=( "If true, pack float to int and apply compression using zlib with complevel 7. " "This can take time if the dataset is large, but can lead to " @@ -498,7 +511,7 @@ def get_openaq( False, "--debug/", help="Print more messages (including full tracebacks)." ), ): - """Download OpenAQ data using monetio and reformat for MM usage.""" + """Download hourly OpenAQ data using monetio and reformat for MM usage.""" import warnings import monetio as mio @@ -512,13 +525,26 @@ def get_openaq( typer.echo(HEADER) + if method not in {"api-v2", "openaq-fetches"}: + typer.secho(f"Error: method {method!r} not recognized", fg=ERROR_COLOR) + raise typer.Exit(2) + start_date = pd.Timestamp(start_date) end_date = pd.Timestamp(end_date) - dates = pd.date_range(start_date, end_date, freq="D") + + if method in {"openaq-fetches"}: + dates = pd.date_range(start_date, end_date, freq="D") + elif method in {"api-v2"}: + dates = pd.date_range(start_date, end_date, freq="H") + else: + raise AssertionError if verbose: print("Dates:") print(dates) + if verbose and method in {"api-v2"}: + print("Params:", param) + # Set destination and file name fmt = r"%Y%m%d" if out_name is None: @@ -535,57 +561,96 @@ def get_openaq( dst = p.parent out_name = p.name - if verbose: + if verbose and method in {"openaq-fetches"}: from dask.diagnostics import ProgressBar ProgressBar().register() with _timer("Fetching data with monetio"): - with warnings.catch_warnings(): - warnings.filterwarnings( - "ignore", - message="The (error|warn)_bad_lines argument has been deprecated" - ) - df = mio.openaq.add_data( + if method == "openaq-fetches": + with warnings.catch_warnings(): + warnings.filterwarnings( + "ignore", + message="The (error|warn)_bad_lines argument has been deprecated" + ) + df = mio.openaq.add_data( + dates, + n_procs=num_workers, + wide_fmt=True, + ) + + # Address time-wise non-unique site IDs + # Some (most?) are just slightly different lat/lon + # But seems like a few are actual time-wise lat/lon duplicates + df = df.drop_duplicates(["time", "siteid"]) + + elif method == "api-v2": + df = mio.obs.openaq_v2.add_data( dates, - n_procs=num_workers, + parameters=param, + sensor_type="reference grade", + wide_fmt=True, + timeout=60, + retry=15, + threads=num_workers if num_workers > 1 else None, ) - # # FIXME: local testing only - # df = pd.read_csv("openaq_2019-08.csv.gz", index_col=0, parse_dates=["time", "time_local"]) - # df["utcoffset"] = pd.to_timedelta(df["utcoffset"]) # str in the CSV - - # Remove dates outside of requested range - # TODO: fix in monetio? - good = (df.time >= start_date) & (df.time <= end_date) - df = df[good] + dupes = df[df.duplicated(["time", "siteid"], keep=False)] + if not dupes.empty: + typer.echo( + f"warning: {len(dupes)} unexpected time-siteid duplicated rows:" + ) + if verbose: + typer.echo(dupes) + df = df.drop_duplicates(["time", "siteid"]) + else: + raise AssertionError # Drop times not on the hour good = df.time == df.time.dt.floor("H") typer.echo(f"Dropping {(~good).sum()}/{len(good)} rows that aren't on the hour.") df = df[good] - # Address time-wise non-unique site IDs - # Some (most?) are just slightly different lat/lon - # But seems like a few are actual time-wise lat/lon duplicates - # TODO: fix in monetio (maybe OpenAQ *has* site IDs? or can just make them up) - df = df.drop_duplicates(["time", "siteid"]) - with _timer("Forming xarray Dataset"): df = df.drop(columns=["index"], errors="ignore") df = df.dropna(subset=["latitude", "longitude"]) - site_vns = [ - "siteid", # based on country and lat/lon - "latitude", - "longitude", - "utcoffset", - "city", - "country", # 2-char codes - "sourceName", - "sourceType", # "government" - ] - # NOTE: time_local not included since it varies in time as well + if method == "openaq-fetches": + site_vns = [ + "siteid", # based on country and lat/lon + "latitude", + "longitude", + "utcoffset", + # + "city", + "country", # 2-char codes + # + "sourceName", + "sourceType", # "government" + ] + # NOTE: time_local not included since it varies in time as well + elif method == "api-v2": + site_vns = [ + "siteid", # real OpenAQ location ID + "latitude", + "longitude", + "utcoffset", + # + "location", + "city", + "country", + # + "entity", + "sensor_type", + "is_mobile", + "is_analysis", + ] + for vn in ["city", "is_analysis"]: # may have been dropped for being all null + if vn not in df.columns: + site_vns.remove(vn) + + else: + raise AssertionError ds_site = ( df[site_vns] @@ -595,6 +660,7 @@ def get_openaq( .swap_dims(siteid="x") ) + breakpoint() ds = ( df.drop(columns=[vn for vn in site_vns if vn not in ["siteid"]]) .set_index(["time", "siteid"]) @@ -606,8 +672,8 @@ def get_openaq( ) # Rename species vars and add units as attr - nice_us = {"ppm": "ppmv", "ugm3": "ug m-3"} - for vn0 in [n for n in df.columns if n.endswith(("_ppm", "_ugm3", "_umg3"))]: + nice_us = {"ppm": "ppmv", "ugm3": "ug m-3", "ppb": "pbbv"} + for vn0 in [n for n in df.columns if n.endswith(("_ppm", "ppb", "_ugm3", "_umg3"))]: i_last_underscore = vn0.rfind("_") vn, u = vn0[:i_last_underscore], vn0[i_last_underscore + 1:] if u == "umg3": From 1301e432ccc2e073824637c71bc6d515f597dbb0 Mon Sep 17 00:00:00 2001 From: zmoon Date: Wed, 11 Sep 2024 14:09:04 -0500 Subject: [PATCH 07/14] Current release doesn't have wide-fmt option --- melodies_monet/_cli.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/melodies_monet/_cli.py b/melodies_monet/_cli.py index e9b45ec8..915465a9 100644 --- a/melodies_monet/_cli.py +++ b/melodies_monet/_cli.py @@ -1938,7 +1938,7 @@ def get_openaq( df = mio.openaq.add_data( dates, n_procs=num_workers, - wide_fmt=True, + # wide_fmt=True, ) # Address time-wise non-unique site IDs @@ -2022,7 +2022,6 @@ def get_openaq( .swap_dims(siteid="x") ) - breakpoint() ds = ( df.drop(columns=[vn for vn in site_vns if vn not in ["siteid"]]) .set_index(["time", "siteid"]) From 3ab475fffca0a451de4df2e61bc7d20dce1b7a2c Mon Sep 17 00:00:00 2001 From: zmoon Date: Wed, 11 Sep 2024 15:04:07 -0500 Subject: [PATCH 08/14] Remove duplicates oopsies --- melodies_monet/_cli.py | 674 ----------------------------------------- 1 file changed, 674 deletions(-) diff --git a/melodies_monet/_cli.py b/melodies_monet/_cli.py index 915465a9..f00cbda9 100644 --- a/melodies_monet/_cli.py +++ b/melodies_monet/_cli.py @@ -1160,680 +1160,6 @@ def get_aqs( ds.to_netcdf(dst / out_name) -@app.command() -def get_ish_lite( - start_date: str = typer.Option(..., "-s", "--start-date", help=f"Start date. {_DATE_FMT_NOTE}"), - end_date: str = typer.Option(..., "-e", "--end-date", help=f"End date. {_DATE_FMT_NOTE} {_DATE_END_NOTE}"), - country: str = typer.Option(None, "--country", - help=( - "Two-letter country code (e.g., in order of site count, " - "US, RS, CA, AS, BR, IN, CH, NO, JA, UK, FR, ...)." - ) - ), - state: str = typer.Option(None, "--state", help="Two-letter state code (e.g., MD, ...)."), - box: Tuple[float, float, float, float] = typer.Option((None, None, None, None), "--box", - help=( - "Bounding box for site selection. " - "(latmin, lonmin, latmax, lonmax) in [-180, 180) format. " - "Can't be used if specifying country or state." - ) - ), - out_name: str = typer.Option(None, "-o", - help=( - "Output file name (or full/relative path). " - "By default the name is generated like 'ISH-Lite__.nc'." - ) - ), - dst: Path = typer.Option(".", "-d", "--dst", help=( - "Destination directory (to control output location " - "if using default output file name)." - ) - ), - compress: bool = typer.Option(True, help=( - "If true, pack float to int and apply compression using zlib with complevel 7. " - "This can take time if the dataset is large, but can lead to " - "significant space savings." - ) - ), - num_workers: int = typer.Option(1, "-n", "--num-workers", help="Number of download workers."), - verbose: bool = typer.Option(False), - debug: bool = typer.Option( - False, "--debug/", help="Print more messages (including full tracebacks)." - ), -): - """Download ISH-Lite data using monetio and reformat for MM usage. - - Note that the data are stored in yearly files by site, so the runtime - mostly depends on the number of unique years that your date range includes, - as well as any site selection narrowing. - You can use --country or --state or --box to select groups of sites. - ISH-Lite is an hourly product. - """ - import warnings - - import monetio as mio - import pandas as pd - - from .util.write_util import write_ncf - - global DEBUG - - DEBUG = debug - - if verbose: - from dask.diagnostics import ProgressBar - - ProgressBar().register() - - typer.echo(HEADER) - - start_date = pd.Timestamp(start_date) - end_date = pd.Timestamp(end_date) - dates = pd.date_range(start_date, end_date, freq="h") - if verbose: - print("Dates:") - print(dates) - - if box == (None, None, None, None): - box = None - - # Set destination and file name - fmt = r"%Y%m%d" - if out_name is None: - out_name = f"ISH-Lite_{start_date:{fmt}}_{end_date:{fmt}}.nc" - else: - p = Path(out_name) - if p.name == out_name: - # `out_name` is just the file name - out_name = p.name - else: - # `out_name` has path - if dst != Path("."): - typer.echo(f"warning: overriding `dst` setting {dst.as_posix()!r} with `out_name` {p.as_posix()!r}") - dst = p.parent - out_name = p.name - - with _timer("Fetching data with monetio"): - with warnings.catch_warnings(): - warnings.filterwarnings( - "ignore", - message="The (error|warn)_bad_lines argument has been deprecated" - ) - df = mio.ish_lite.add_data( - dates, - box=box, - state=state, - country=country, - resample=False, - n_procs=num_workers, - verbose=verbose, - ) - - with _timer("Computing UTC offset for selected ISH-Lite sites"): - import datetime - - from timezonefinder import TimezoneFinder - from pytz import timezone, utc - - tf = TimezoneFinder(in_memory=True) - ref_date = datetime.datetime(2022, 1, 1, 0, 0) - - def get_utc_offset(*, lat, lon): - s = tf.timezone_at(lng=lon, lat=lat) - assert s is not None - - tz_target = timezone(s) - ref_date_tz_target = tz_target.localize(ref_date) - ref_date_utc = utc.localize(ref_date) - uo_h = (ref_date_utc - ref_date_tz_target).total_seconds() / 3600 - - return uo_h - - - locs = df[["siteid", "latitude", "longitude"]].groupby("siteid").first().reset_index() - locs["utcoffset"] = locs.apply(lambda r: get_utc_offset(lat=r.latitude, lon=r.longitude), axis="columns") - - df = df.merge(locs[["siteid", "utcoffset"]], on="siteid", how="left") - - - with _timer("Forming xarray Dataset"): - df = df.dropna(subset=["latitude", "longitude"]) - - df = df.rename( - columns={ - "station name": "station_name", - "elev(m)": "elevation", - }, - errors="ignore", - ) - - site_vns = [ - "siteid", - "latitude", - "longitude", - "country", - "state", - "station_name", - "usaf", - "wban", - "icao", - "elevation", - "utcoffset", - "begin", - "end", - ] - # NOTE: time_local not included since it varies in time as well as by site - - ds_site = ( - df[site_vns] - .groupby("siteid") - .first() - .to_xarray() - .swap_dims(siteid="x") - ) - - # TODO: units? - units = {} - - cols = list(df.columns) - ds = ( - df[cols] - .set_index(["time", "siteid"]) - .to_xarray() - .swap_dims(siteid="x") - .drop_vars(site_vns) - .merge(ds_site) - .set_coords(["latitude", "longitude"]) - .assign(x=range(ds_site.dims["x"])) - ) - - # Add units - for k, u in units.items(): - vn = k - ds[vn].attrs.update(units=u) - - # Fill in local time array - # (in the df, not all sites have rows for all times, so we have NaTs at this point) - ds["time_local"] = ds.time + (ds.utcoffset * 60).astype("timedelta64[m]") - - # Expand - ds = ( - ds - .expand_dims("y") - .transpose("time", "y", "x") - ) - - with _timer("Writing netCDF file"): - if compress: - write_ncf(ds, dst / out_name, verbose=verbose) - else: - ds.to_netcdf(dst / out_name) - - -@app.command() -def get_ish( - start_date: str = typer.Option(..., "-s", "--start-date", help=f"Start date. {_DATE_FMT_NOTE}"), - end_date: str = typer.Option(..., "-e", "--end-date", help=f"End date. {_DATE_FMT_NOTE} {_DATE_END_NOTE}"), - freq: str = typer.Option("h", "-f", "--freq", help=( - "Frequency to resample to. " - "Mean is used to reduce the time groups (as opposed to nearest, e.g.)." - ) - ), - country: str = typer.Option(None, "--country", - help=( - "Two-letter country code (e.g., in order of site count, " - "US, RS, CA, AS, BR, IN, CH, NO, JA, UK, FR, ...)." - ) - ), - state: str = typer.Option(None, "--state", help="Two-letter state code (e.g., MD, ...)."), - box: Tuple[float, float, float, float] = typer.Option((None, None, None, None), "--box", - help=( - "Bounding box for site selection. " - "(latmin, lonmin, latmax, lonmax) in [-180, 180) format. " - "Can't be used if specifying country or state." - ) - ), - out_name: str = typer.Option(None, "-o", - help=( - "Output file name (or full/relative path). " - "By default the name is generated like 'ISH__.nc'." - ) - ), - dst: Path = typer.Option(".", "-d", "--dst", help=( - "Destination directory (to control output location " - "if using default output file name)." - ) - ), - compress: bool = typer.Option(True, help=( - "If true, pack float to int and apply compression using zlib with complevel 7. " - "This can take time if the dataset is large, but can lead to " - "significant space savings." - ) - ), - num_workers: int = typer.Option(1, "-n", "--num-workers", help="Number of download workers."), - verbose: bool = typer.Option(False), - debug: bool = typer.Option( - False, "--debug/", help="Print more messages (including full tracebacks)." - ), -): - """Download ISH data using monetio and reformat for MM usage. - - Note that the data are stored in yearly files by site, so the runtime - mostly depends on the number of unique years that your date range includes, - as well as any site selection narrowing. - You can use --country or --state or --box to select groups of sites. - Time resolution may be sub-hourly, depending on site, - thus we resample to hourly by default. - """ - import warnings - - import monetio as mio - import pandas as pd - - from .util.write_util import write_ncf - - global DEBUG - - DEBUG = debug - - if verbose: - from dask.diagnostics import ProgressBar - - ProgressBar().register() - - typer.echo(HEADER) - - start_date = pd.Timestamp(start_date) - end_date = pd.Timestamp(end_date) - dates = pd.date_range(start_date, end_date, freq="h") - if verbose: - print("Dates:") - print(dates) - - if box == (None, None, None, None): - box = None - - # Set destination and file name - fmt = r"%Y%m%d" - if out_name is None: - out_name = f"ISH_{start_date:{fmt}}_{end_date:{fmt}}.nc" - else: - p = Path(out_name) - if p.name == out_name: - # `out_name` is just the file name - out_name = p.name - else: - # `out_name` has path - if dst != Path("."): - typer.echo(f"warning: overriding `dst` setting {dst.as_posix()!r} with `out_name` {p.as_posix()!r}") - dst = p.parent - out_name = p.name - - with _timer("Fetching data with monetio"), _ignore_pandas_numeric_only_futurewarning(): - with warnings.catch_warnings(): - warnings.filterwarnings( - "ignore", - message="The (error|warn)_bad_lines argument has been deprecated" - ) - df = mio.ish.add_data( - dates, - box=box, - state=state, - country=country, - resample=True, - window=freq, - n_procs=num_workers, - verbose=verbose, - ) - - with _timer("Computing UTC offset for selected ISH sites"): - import datetime - - from timezonefinder import TimezoneFinder - from pytz import timezone, utc - - tf = TimezoneFinder(in_memory=True) - ref_date = datetime.datetime(2022, 1, 1, 0, 0) - - def get_utc_offset(*, lat, lon): - s = tf.timezone_at(lng=lon, lat=lat) - assert s is not None - - tz_target = timezone(s) - ref_date_tz_target = tz_target.localize(ref_date) - ref_date_utc = utc.localize(ref_date) - uo_h = (ref_date_utc - ref_date_tz_target).total_seconds() / 3600 - - return uo_h - - - locs = df[["siteid", "latitude", "longitude"]].groupby("siteid").first().reset_index() - locs["utcoffset"] = locs.apply(lambda r: get_utc_offset(lat=r.latitude, lon=r.longitude), axis="columns") - - df = df.merge(locs[["siteid", "utcoffset"]], on="siteid", how="left") - - - with _timer("Forming xarray Dataset"): - df = ( - df.dropna(subset=["latitude", "longitude"]) - .rename( - columns={ - "station name": "station_name", - "elev(m)": "elevation", - }, - errors="ignore", - ) - .drop(columns=["elev"], errors="ignore") # keep just elevation from the site meta file - ) - - site_vns = [ - "siteid", - "latitude", - "longitude", - "country", - "state", - "station_name", - "usaf", - "wban", - "icao", - "elevation", - "utcoffset", - "begin", - "end", - ] - # NOTE: time_local not included since it varies in time as well as by site - - ds_site = ( - df[site_vns] - .groupby("siteid") - .first() - .to_xarray() - .swap_dims(siteid="x") - ) - - # TODO: units? - units = {} - - cols = list(df.columns) - ds = ( - df[cols] - .set_index(["time", "siteid"]) - .to_xarray() - .swap_dims(siteid="x") - .drop_vars(site_vns) - .merge(ds_site) - .set_coords(["latitude", "longitude"]) - .assign(x=range(ds_site.dims["x"])) - ) - - # Add units - for k, u in units.items(): - vn = k - ds[vn].attrs.update(units=u) - - # Fill in local time array - # (in the df, not all sites have rows for all times, so we have NaTs at this point) - ds["time_local"] = ds.time + (ds.utcoffset * 60).astype("timedelta64[m]") - - # Expand - ds = ( - ds - .expand_dims("y") - .transpose("time", "y", "x") - ) - - with _timer("Writing netCDF file"): - if compress: - write_ncf(ds, dst / out_name, verbose=verbose) - else: - ds.to_netcdf(dst / out_name) - - -@app.command() -def get_aqs( - start_date: str = typer.Option(..., "-s", "--start-date", help=f"Start date. {_DATE_FMT_NOTE}"), - end_date: str = typer.Option(..., "-e", "--end-date", help=f"End date. {_DATE_FMT_NOTE} {_DATE_END_NOTE}"), - daily: bool = typer.Option(False, help=( - "Whether to retrieve the daily averaged data product. " - "By default, the hourly data is fetched." - ) - ), - param: List[str] = typer.Option(["O3", "PM2.5", "PM10"], "-p", "--params", help=( - "Parameter groups. " - "Use '-p' more than once to get multiple groups. " - "Other examples: 'SPEC' (speciated PM2.5), 'PM10SPEC' (speciated PM10), " - "'VOC', 'NONOxNOy', 'SO2', 'NO2', 'CO', 'PM2.5_FRM'." - ) - ), - # TODO: add network selection option once working in monetio - out_name: str = typer.Option(None, "-o", - help=( - "Output file name (or full/relative path). " - "By default the name is generated like 'AQS__.nc'." - ) - ), - dst: Path = typer.Option(".", "-d", "--dst", help=( - "Destination directory (to control output location " - "if using default output file name)." - ) - ), - compress: bool = typer.Option(True, help=( - "If true, pack float to int and apply compression using zlib with complevel 7. " - "This can take time if the dataset is large, but can lead to " - "significant space savings." - ) - ), - num_workers: int = typer.Option(1, "-n", "--num-workers", help="Number of download workers."), - verbose: bool = typer.Option(False), - debug: bool = typer.Option( - False, "--debug/", help="Print more messages (including full tracebacks)." - ), -): - """Download EPA AQS data using monetio and reformat for MM usage. - - These are archived data, stored in per-year per-parameter-group files, described at - https://aqs.epa.gov/aqsweb/airdata/download_files.html - - Recent-past data are generally not available from this source. - """ - import warnings - - import monetio as mio - import pandas as pd - - from .util.write_util import write_ncf - - global DEBUG - - DEBUG = debug - - if verbose: - from dask.diagnostics import ProgressBar - - ProgressBar().register() - - typer.echo(HEADER) - - start_date = pd.Timestamp(start_date) - end_date = pd.Timestamp(end_date) - dates = pd.date_range(start_date, end_date, freq="h" if not daily else "D") - if verbose: - print("Dates:") - print(dates) - print("Params:") - print(param) - - # Set destination and file name - fmt = r"%Y%m%d" - if out_name is None: - out_name = f"AQS_{start_date:{fmt}}_{end_date:{fmt}}.nc" - else: - p = Path(out_name) - if p.name == out_name: - # `out_name` is just the file name - out_name = p.name - else: - # `out_name` has path - if dst != Path("."): - typer.echo(f"warning: overriding `dst` setting {dst.as_posix()!r} with `out_name` {p.as_posix()!r}") - dst = p.parent - out_name = p.name - - with _timer("Fetching data with monetio"): - with warnings.catch_warnings(): - warnings.filterwarnings( - "ignore", - message="The (error|warn)_bad_lines argument has been deprecated" - ) - try: - df = mio.aqs.add_data( - dates, - param=param, - daily=daily, - network=None, - download=False, - local=False, - wide_fmt=True, # column for each variable - n_procs=num_workers, - meta=False, # TODO: enable or add option once monetio fixes released - ) - except KeyError as e: - if daily and str(e) == "'time'": - typer.echo("Note that the daily option currently requires monetio >0.2.5") - raise - - if not daily: - with _timer("Fetching site metadata"): - # Need UTC offset in order to compute local time - # But currently the `meta=True` option doesn't work - meta0 = pd.read_csv( - "https://aqs.epa.gov/aqsweb/airdata/aqs_sites.zip", - encoding="ISO-8859-1", - usecols=[0, 1, 2, 17], - dtype=str, - ) - meta = ( - meta0.copy() - .assign( - siteid=meta0["State Code"] + meta0["County Code"] + meta0["Site Number"], - utcoffset=meta0["GMT Offset"].astype(int), - ) - .drop( - columns=["State Code", "County Code", "Site Number", "GMT Offset"], - ) - ) - - with _timer("Forming xarray Dataset"): - # Select requested time period (older monetio doesn't do this) - df = df[df.time.between(dates[0], dates[-1], inclusive="both")] - - df = df.dropna(subset=["latitude", "longitude"]) - - # Variables associated with a measurement, - # currently not properly useful in the wide format. - if daily: - v_vns = [ - "parameter_code", - "poc", - "parameter_name", - "sample_duration", - "pollutant_standard", - "event_type", - "observation_count", - "observation_percent", - "1st_max_value", - "1st_max_hour", - "aqi", - "method_code", - "method_name", - ] - else: - v_vns = [ - "parameter_code", - "poc", # parameter occurrence code - "parameter_name", - "mdl", # method detection limit - "uncertainty", - "method_type", - "method_code", - "method_name", - ] - df = df.drop(columns=v_vns).drop_duplicates() - # TODO: may be better to get long fmt and drop these first and then pivot - # TODO: option to average duplicate measurements at same site instead of keeping first? - if "datum" in df: - df = df.drop(columns=["datum"]) - - site_vns = [ - "siteid", - "state_code", - "county_code", - "site_num", - "latitude", - "longitude", - ] - if daily: - site_vns.extend(["local_site_name", "address", "city_name", "msa_name"]) - # NOTE: time_local not included since it varies in time as well - if not daily: - df = df.merge(meta, on="siteid", how="left") - site_vns.append("utcoffset") - - ds_site = ( - df[site_vns] - .groupby("siteid") - .first() - .to_xarray() - .swap_dims(siteid="x") - ) - - # Extract units info so we can add as attrs - unit_suff = "_unit" - unit_cols = [n for n in df.columns if n.endswith(unit_suff)] - assert (df[unit_cols].nunique() == 1).all() - units = df[unit_cols][~df[unit_cols].isnull()].iloc[0].to_dict() - - cols = [n for n in df.columns if not n.endswith(unit_suff)] - ds = ( - df[cols] - .drop(columns=[vn for vn in site_vns if vn != "siteid"]) - .drop_duplicates(["time", "siteid"], keep="first") - .set_index(["time", "siteid"]) - .to_xarray() - .swap_dims(siteid="x") - .merge(ds_site) - .set_coords(["latitude", "longitude"]) - .assign(x=range(ds_site.dims["x"])) - ) - - # Add units - for k, u in units.items(): - vn = k[:-len(unit_suff)] - ds[vn].attrs.update(units=u) - - # Fill in local time array - # (in the df, not all sites have rows for all times, so we have NaTs at this point) - if not daily: - ds["time_local"] = ds.time + ds.utcoffset.astype("timedelta64[h]") - - # Expand - ds = ( - ds - .expand_dims("y") - .transpose("time", "y", "x") - ) - - # Can't have `/` in variable name for netCDF - to_rename = [vn for vn in ds.data_vars if "/" in vn] - ds = ds.rename_vars({vn: vn.replace("/", "_") for vn in to_rename}) - - with _timer("Writing netCDF file"): - if compress: - write_ncf(ds, dst / out_name, verbose=verbose) - else: - ds.to_netcdf(dst / out_name) - - @app.command() def get_openaq( start_date: str = typer.Option(..., "-s", "--start-date", help=f"Start date. {_DATE_FMT_NOTE}"), From 4b120cb0bab0755e40c37df29601ffa1d14de9b8 Mon Sep 17 00:00:00 2001 From: zmoon Date: Wed, 11 Sep 2024 15:05:24 -0500 Subject: [PATCH 09/14] Add to doc jump links --- docs/cli.rst | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/cli.rst b/docs/cli.rst index c39108b0..91555888 100644 --- a/docs/cli.rst +++ b/docs/cli.rst @@ -18,6 +18,7 @@ any Python code:: * |get-aqs|_ -- get AQS data * |get-ish|_ -- get ISH data * |get-ish-lite|_ -- get ISH-Lite data +* |get-openaq|_ -- get OpenAQ data .. |run| replace:: ``run`` .. _run: #melodies-monet-run @@ -37,6 +38,9 @@ any Python code:: .. |get-ish-lite| replace:: ``get-ish-lite`` .. _get-ish-lite: #melodies-monet-get-ish-lite +.. |get-openaq| replace:: ``get-openaq`` +.. _get-openaq: #melodies-monet-get-openaq + .. click:: melodies_monet._cli:_typer_click_object :prog: melodies-monet :nested: full From 06c30d9ebbe0016fd736116ced2cb2ad8b07d427 Mon Sep 17 00:00:00 2001 From: zmoon Date: Mon, 23 Sep 2024 13:11:16 -0500 Subject: [PATCH 10/14] Set basic logging level with env var --- melodies_monet/_cli.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/melodies_monet/_cli.py b/melodies_monet/_cli.py index f00cbda9..ef777742 100644 --- a/melodies_monet/_cli.py +++ b/melodies_monet/_cli.py @@ -4,11 +4,18 @@ """ melodies-monet -- MELODIES MONET CLI """ +import os import time from contextlib import contextmanager from pathlib import Path from typing import List +_LOGGING_LEVEL = os.environ.get("MM_LOGGING_LEVEL", None) +if _LOGGING_LEVEL is not None: + import logging + + logging.basicConfig(level=_LOGGING_LEVEL.upper()) + try: import typer except ImportError as e: From 4147693cac62c9a77247442bd6b631f70ddfae79 Mon Sep 17 00:00:00 2001 From: zmoon Date: Mon, 23 Sep 2024 13:13:02 -0500 Subject: [PATCH 11/14] Clean up imports --- melodies_monet/_cli.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/melodies_monet/_cli.py b/melodies_monet/_cli.py index ef777742..a87be9a5 100644 --- a/melodies_monet/_cli.py +++ b/melodies_monet/_cli.py @@ -8,7 +8,7 @@ import time from contextlib import contextmanager from pathlib import Path -from typing import List +from typing import List, Tuple _LOGGING_LEVEL = os.environ.get("MM_LOGGING_LEVEL", None) if _LOGGING_LEVEL is not None: @@ -27,8 +27,6 @@ ) raise SystemExit(1) -from typing import Tuple - DEBUG = False INFO_COLOR = typer.colors.CYAN ERROR_COLOR = typer.colors.BRIGHT_RED From 8da70076352865302db5d1a29696d8d9a6b0ee8a Mon Sep 17 00:00:00 2001 From: zmoon Date: Fri, 18 Oct 2024 17:26:13 -0500 Subject: [PATCH 12/14] Add flags for sensor types still just reference-grade by default to get just low-cost, have to use two flags: --no-reference-grade --low-cost --- melodies_monet/_cli.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/melodies_monet/_cli.py b/melodies_monet/_cli.py index a87be9a5..1b039658 100644 --- a/melodies_monet/_cli.py +++ b/melodies_monet/_cli.py @@ -1187,6 +1187,8 @@ def get_openaq( "Only applicable to the web API methods ('api-v2')." ) ), + reference_grade: bool = typer.Option(True, help="Include reference-grade sensors."), + low_cost: bool = typer.Option(False, help="Include low-cost sensors."), method: str = typer.Option("api-v2", "-m", "--method", help=( "Method (reader) to use for fetching data. " "Options: 'api-v2', 'openaq-fetches'." @@ -1254,6 +1256,18 @@ def get_openaq( dst = p.parent out_name = p.name + sensor_types = [] + if reference_grade: + sensor_types.append("reference grade") + if low_cost: + sensor_types.append("low-cost sensor") + if not sensor_types and method in {"api-v2"}: + typer.secho( + "Error: no sensor types selected. Use --reference-grade and/or --low-cost", + fg=ERROR_COLOR, + ) + raise typer.Exit(2) + if verbose and method in {"openaq-fetches"}: from dask.diagnostics import ProgressBar @@ -1281,7 +1295,7 @@ def get_openaq( df = mio.obs.openaq_v2.add_data( dates, parameters=param, - sensor_type="reference grade", + sensor_type=sensor_types, wide_fmt=True, timeout=60, retry=15, From 3641810f6dd099291eb98da52db9414f97ec1859 Mon Sep 17 00:00:00 2001 From: zmoon Date: Wed, 23 Oct 2024 11:02:37 -0500 Subject: [PATCH 13/14] Add some tests skipped if API key not set via the env var --- melodies_monet/tests/test_get_data_cli.py | 48 +++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/melodies_monet/tests/test_get_data_cli.py b/melodies_monet/tests/test_get_data_cli.py index a61289d5..73aaa803 100644 --- a/melodies_monet/tests/test_get_data_cli.py +++ b/melodies_monet/tests/test_get_data_cli.py @@ -7,6 +7,8 @@ import subprocess import numpy as np +import os +import pytest import xarray as xr from melodies_monet.tutorial import fetch_example @@ -14,6 +16,8 @@ ds0_aeronet = xr.open_dataset(fetch_example("aeronet:2019-09")) ds0_airnow = xr.open_dataset(fetch_example("airnow:2019-09")) +have_openaq_api_key = len(os.environ.get("OPENAQ_API_KEY", "")) > 0 + def test_get_aeronet(tmp_path): fn = "x.nc" @@ -155,3 +159,47 @@ def test_get_aqs_hourly(tmp_path): for v in ds.data_vars if ds[v].dims == ("time", "y", "x") } == {"OZONE", "time_local"} + + +@pytest.mark.skipif(not have_openaq_api_key, reason="OPENAQ_API_KEY not set") +def test_get_openaq(tmp_path): + fn = "x.nc" + cmd = [ + "melodies-monet", "get-openaq", + "-s", "2024-09-10", "-e" "2024-09-10 00:59", + "--dst", tmp_path.as_posix(), "-o", fn, + ] + subprocess.run(cmd, check=True) + + ds = xr.open_dataset(tmp_path / fn) + + assert ds.time.size == 1 + assert { + v + for v in ds.data_vars + if ds[v].dims == ("time", "y", "x") + } == {"o3", "pm25", "pm10", "time_local"} + assert (ds.sensor_type == "reference grade").all() + + +@pytest.mark.skipif(not have_openaq_api_key, reason="OPENAQ_API_KEY not set") +def test_get_openaq_lowcost(tmp_path): + fn = "x.nc" + cmd = [ + "melodies-monet", "get-openaq", + "-s", "2024-09-10", "-e" "2024-09-10 00:59", + "-p", "pm25", + "--no-reference-grade", "--low-cost", + "--dst", tmp_path.as_posix(), "-o", fn, + ] + subprocess.run(cmd, check=True) + + ds = xr.open_dataset(tmp_path / fn) + + assert ds.time.size == 1 + assert { + v + for v in ds.data_vars + if ds[v].dims == ("time", "y", "x") + } == {"pm25", "time_local"} + assert (ds.sensor_type == "low-cost sensor").all() From 60fe02f1ce80cf72abf12e7b80fe607788214358 Mon Sep 17 00:00:00 2001 From: Zachary Moon Date: Tue, 5 Nov 2024 15:56:26 -0500 Subject: [PATCH 14/14] tweaks --- melodies_monet/_cli.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/melodies_monet/_cli.py b/melodies_monet/_cli.py index 1b039658..35539f9f 100644 --- a/melodies_monet/_cli.py +++ b/melodies_monet/_cli.py @@ -1180,7 +1180,7 @@ def get_openaq( "if using default output file name)." ) ), - param: List[str] = typer.Option(["o3", "pm25", "pm10"], "-p", "--params", help=( + param: List[str] = typer.Option(["o3", "pm25", "pm10"], "-p", "--param", help=( "Parameters. " "Use '-p' more than once to get multiple parameters. " "Other examples: 'no', 'no2', 'nox', 'so2', 'co', 'bc'. " @@ -1230,7 +1230,7 @@ def get_openaq( if method in {"openaq-fetches"}: dates = pd.date_range(start_date, end_date, freq="D") elif method in {"api-v2"}: - dates = pd.date_range(start_date, end_date, freq="H") + dates = pd.date_range(start_date, end_date, freq="h") else: raise AssertionError if verbose: