diff --git a/docs/appendix/machine-specific-install.rst b/docs/appendix/machine-specific-install.rst index f4aa435b..236155b8 100644 --- a/docs/appendix/machine-specific-install.rst +++ b/docs/appendix/machine-specific-install.rst @@ -60,7 +60,7 @@ environment for running and developing MELODIES MONET. * You will need a NOAA HPC account to access the RDHPCS wiki link above. * Both Anaconda/Miniconda will work well for MELODIES MONET. See - `conda instructions `__ + `conda instructions `__ to determine, which is the best option for you. * Pick a directory for your download and run the following wget command with diff --git a/docs/environment-docs.yml b/docs/environment-docs.yml index 7fa4cbce..f0d0d7c6 100644 --- a/docs/environment-docs.yml +++ b/docs/environment-docs.yml @@ -16,6 +16,7 @@ dependencies: # # Extras - pooch + - timezonefinder - typer - wrf-python # for WRF-Chem reader in monetio # diff --git a/melodies_monet/_cli.py b/melodies_monet/_cli.py index d8743eb9..b4f49904 100644 --- a/melodies_monet/_cli.py +++ b/melodies_monet/_cli.py @@ -19,6 +19,8 @@ ) raise SystemExit(1) +from typing import Tuple + DEBUG = False INFO_COLOR = typer.colors.CYAN ERROR_COLOR = typer.colors.BRIGHT_RED @@ -356,6 +358,11 @@ def get_airnow( DEBUG = debug + if verbose: + from dask.diagnostics import ProgressBar + + ProgressBar().register() + typer.echo(HEADER) start_date = pd.Timestamp(start_date) @@ -410,7 +417,7 @@ def get_airnow( "state_name", "epa_region", ] - # NOTE: time_local not included since it varies in time as well + # NOTE: time_local not included since it varies in time as well as by site if daily: site_vns.remove("utcoffset") # not present in the daily data product @@ -478,6 +485,436 @@ def get_airnow( ds.to_netcdf(dst / out_name) +@app.command() +def get_ish_lite( + start_date: str = typer.Option(..., "-s", "--start-date", help=f"Start date. {_DATE_FMT_NOTE}"), + end_date: str = typer.Option(..., "-e", "--end-date", help=f"End date. {_DATE_FMT_NOTE} {_DATE_END_NOTE}"), + country: str = typer.Option(None, "--country", + help=( + "Two-letter country code (e.g., in order of site count, " + "US, RS, CA, AS, BR, IN, CH, NO, JA, UK, FR, ...)." + ) + ), + state: str = typer.Option(None, "--state", help="Two-letter state code (e.g., MD, ...)."), + box: Tuple[float, float, float, float] = typer.Option((None, None, None, None), "--box", + help=( + "Bounding box for site selection. " + "(latmin, lonmin, latmax, lonmax) in [-180, 180) format. " + "Can't be used if specifying country or state." + ) + ), + out_name: str = typer.Option(None, "-o", + help=( + "Output file name (or full/relative path). " + "By default the name is generated like 'ISH-Lite__.nc'." + ) + ), + dst: Path = typer.Option(".", "-d", "--dst", help=( + "Destination directory (to control output location " + "if using default output file name)." + ) + ), + compress: bool = typer.Option(True, help=( + "If true, pack float to int and apply compression using zlib with complevel 7. " + "This can take time if the dataset is large, but can lead to " + "significant space savings." + ) + ), + num_workers: int = typer.Option(1, "-n", "--num-workers", help="Number of download workers."), + verbose: bool = typer.Option(False), + debug: bool = typer.Option( + False, "--debug/", help="Print more messages (including full tracebacks)." + ), +): + """Download ISH-Lite data using monetio and reformat for MM usage. + + Note that the data are stored in yearly files by site, so the runtime + mostly depends on the number of unique years that your date range includes, + as well as any site selection narrowing. + You can use --country or --state or --box to select groups of sites. + ISH-Lite is an hourly product. + """ + import warnings + + import monetio as mio + import pandas as pd + + from .util.write_util import write_ncf + + global DEBUG + + DEBUG = debug + + if verbose: + from dask.diagnostics import ProgressBar + + ProgressBar().register() + + typer.echo(HEADER) + + start_date = pd.Timestamp(start_date) + end_date = pd.Timestamp(end_date) + dates = pd.date_range(start_date, end_date, freq="H") + if verbose: + print("Dates:") + print(dates) + + if box == (None, None, None, None): + box = None + + # Set destination and file name + fmt = r"%Y%m%d" + if out_name is None: + out_name = f"ISH-Lite_{start_date:{fmt}}_{end_date:{fmt}}.nc" + else: + p = Path(out_name) + if p.name == out_name: + # `out_name` is just the file name + out_name = p.name + else: + # `out_name` has path + if dst != Path("."): + typer.echo(f"warning: overriding `dst` setting {dst.as_posix()!r} with `out_name` {p.as_posix()!r}") + dst = p.parent + out_name = p.name + + with _timer("Fetching data with monetio"): + with warnings.catch_warnings(): + warnings.filterwarnings( + "ignore", + message="The (error|warn)_bad_lines argument has been deprecated" + ) + df = mio.ish_lite.add_data( + dates, + box=box, + state=state, + country=country, + resample=False, + n_procs=num_workers, + verbose=verbose, + ) + + with _timer("Computing UTC offset for selected ISH-Lite sites"): + import datetime + + from timezonefinder import TimezoneFinder + from pytz import timezone, utc + + tf = TimezoneFinder(in_memory=True) + ref_date = datetime.datetime(2022, 1, 1, 0, 0) + + def get_utc_offset(*, lat, lon): + s = tf.timezone_at(lng=lon, lat=lat) + assert s is not None + + tz_target = timezone(s) + ref_date_tz_target = tz_target.localize(ref_date) + ref_date_utc = utc.localize(ref_date) + uo_h = (ref_date_utc - ref_date_tz_target).total_seconds() / 3600 + + return uo_h + + + locs = df[["siteid", "latitude", "longitude"]].groupby("siteid").first().reset_index() + locs["utcoffset"] = locs.apply(lambda r: get_utc_offset(lat=r.latitude, lon=r.longitude), axis="columns") + + df = df.merge(locs[["siteid", "utcoffset"]], on="siteid", how="left") + + + with _timer("Forming xarray Dataset"): + df = df.dropna(subset=["latitude", "longitude"]) + + df = df.rename( + columns={ + "station name": "station_name", + "elev(m)": "elevation", + }, + errors="ignore", + ) + + site_vns = [ + "siteid", + "latitude", + "longitude", + "country", + "state", + "station_name", + "usaf", + "wban", + "icao", + "elevation", + "utcoffset", + "begin", + "end", + ] + # NOTE: time_local not included since it varies in time as well as by site + + ds_site = ( + df[site_vns] + .groupby("siteid") + .first() + .to_xarray() + .swap_dims(siteid="x") + ) + + # TODO: units? + units = {} + + cols = list(df.columns) + ds = ( + df[cols] + .set_index(["time", "siteid"]) + .to_xarray() + .swap_dims(siteid="x") + .drop_vars(site_vns) + .merge(ds_site) + .set_coords(["latitude", "longitude"]) + .assign(x=range(ds_site.dims["x"])) + ) + + # Add units + for k, u in units.items(): + vn = k + ds[vn].attrs.update(units=u) + + # Fill in local time array + # (in the df, not all sites have rows for all times, so we have NaTs at this point) + ds["time_local"] = ds.time + (ds.utcoffset * 60).astype("timedelta64[m]") + + # Expand + ds = ( + ds + .expand_dims("y") + .transpose("time", "y", "x") + ) + + with _timer("Writing netCDF file"): + if compress: + write_ncf(ds, dst / out_name, verbose=verbose) + else: + ds.to_netcdf(dst / out_name) + + +@app.command() +def get_ish( + start_date: str = typer.Option(..., "-s", "--start-date", help=f"Start date. {_DATE_FMT_NOTE}"), + end_date: str = typer.Option(..., "-e", "--end-date", help=f"End date. {_DATE_FMT_NOTE} {_DATE_END_NOTE}"), + freq: str = typer.Option("H", "-f", "--freq", help=( + "Frequency to resample to. " + "Mean is used to reduce the time groups (as opposed to nearest, e.g.)." + ) + ), + country: str = typer.Option(None, "--country", + help=( + "Two-letter country code (e.g., in order of site count, " + "US, RS, CA, AS, BR, IN, CH, NO, JA, UK, FR, ...)." + ) + ), + state: str = typer.Option(None, "--state", help="Two-letter state code (e.g., MD, ...)."), + box: Tuple[float, float, float, float] = typer.Option((None, None, None, None), "--box", + help=( + "Bounding box for site selection. " + "(latmin, lonmin, latmax, lonmax) in [-180, 180) format. " + "Can't be used if specifying country or state." + ) + ), + out_name: str = typer.Option(None, "-o", + help=( + "Output file name (or full/relative path). " + "By default the name is generated like 'ISH__.nc'." + ) + ), + dst: Path = typer.Option(".", "-d", "--dst", help=( + "Destination directory (to control output location " + "if using default output file name)." + ) + ), + compress: bool = typer.Option(True, help=( + "If true, pack float to int and apply compression using zlib with complevel 7. " + "This can take time if the dataset is large, but can lead to " + "significant space savings." + ) + ), + num_workers: int = typer.Option(1, "-n", "--num-workers", help="Number of download workers."), + verbose: bool = typer.Option(False), + debug: bool = typer.Option( + False, "--debug/", help="Print more messages (including full tracebacks)." + ), +): + """Download ISH data using monetio and reformat for MM usage. + + Note that the data are stored in yearly files by site, so the runtime + mostly depends on the number of unique years that your date range includes, + as well as any site selection narrowing. + You can use --country or --state or --box to select groups of sites. + Time resolution may be sub-hourly, depending on site, + thus we resample to hourly by default. + """ + import warnings + + import monetio as mio + import pandas as pd + + from .util.write_util import write_ncf + + global DEBUG + + DEBUG = debug + + if verbose: + from dask.diagnostics import ProgressBar + + ProgressBar().register() + + typer.echo(HEADER) + + start_date = pd.Timestamp(start_date) + end_date = pd.Timestamp(end_date) + dates = pd.date_range(start_date, end_date, freq="H") + if verbose: + print("Dates:") + print(dates) + + if box == (None, None, None, None): + box = None + + # Set destination and file name + fmt = r"%Y%m%d" + if out_name is None: + out_name = f"ISH_{start_date:{fmt}}_{end_date:{fmt}}.nc" + else: + p = Path(out_name) + if p.name == out_name: + # `out_name` is just the file name + out_name = p.name + else: + # `out_name` has path + if dst != Path("."): + typer.echo(f"warning: overriding `dst` setting {dst.as_posix()!r} with `out_name` {p.as_posix()!r}") + dst = p.parent + out_name = p.name + + with _timer("Fetching data with monetio"), _ignore_pandas_numeric_only_futurewarning(): + with warnings.catch_warnings(): + warnings.filterwarnings( + "ignore", + message="The (error|warn)_bad_lines argument has been deprecated" + ) + df = mio.ish.add_data( + dates, + box=box, + state=state, + country=country, + resample=True, + window=freq, + n_procs=num_workers, + verbose=verbose, + ) + + with _timer("Computing UTC offset for selected ISH sites"): + import datetime + + from timezonefinder import TimezoneFinder + from pytz import timezone, utc + + tf = TimezoneFinder(in_memory=True) + ref_date = datetime.datetime(2022, 1, 1, 0, 0) + + def get_utc_offset(*, lat, lon): + s = tf.timezone_at(lng=lon, lat=lat) + assert s is not None + + tz_target = timezone(s) + ref_date_tz_target = tz_target.localize(ref_date) + ref_date_utc = utc.localize(ref_date) + uo_h = (ref_date_utc - ref_date_tz_target).total_seconds() / 3600 + + return uo_h + + + locs = df[["siteid", "latitude", "longitude"]].groupby("siteid").first().reset_index() + locs["utcoffset"] = locs.apply(lambda r: get_utc_offset(lat=r.latitude, lon=r.longitude), axis="columns") + + df = df.merge(locs[["siteid", "utcoffset"]], on="siteid", how="left") + + + with _timer("Forming xarray Dataset"): + df = ( + df.dropna(subset=["latitude", "longitude"]) + .rename( + columns={ + "station name": "station_name", + "elev(m)": "elevation", + }, + errors="ignore", + ) + .drop(columns=["elev"], errors="ignore") # keep just elevation from the site meta file + ) + + site_vns = [ + "siteid", + "latitude", + "longitude", + "country", + "state", + "station_name", + "usaf", + "wban", + "icao", + "elevation", + "utcoffset", + "begin", + "end", + ] + # NOTE: time_local not included since it varies in time as well as by site + + ds_site = ( + df[site_vns] + .groupby("siteid") + .first() + .to_xarray() + .swap_dims(siteid="x") + ) + + # TODO: units? + units = {} + + cols = list(df.columns) + ds = ( + df[cols] + .set_index(["time", "siteid"]) + .to_xarray() + .swap_dims(siteid="x") + .drop_vars(site_vns) + .merge(ds_site) + .set_coords(["latitude", "longitude"]) + .assign(x=range(ds_site.dims["x"])) + ) + + # Add units + for k, u in units.items(): + vn = k + ds[vn].attrs.update(units=u) + + # Fill in local time array + # (in the df, not all sites have rows for all times, so we have NaTs at this point) + ds["time_local"] = ds.time + (ds.utcoffset * 60).astype("timedelta64[m]") + + # Expand + ds = ( + ds + .expand_dims("y") + .transpose("time", "y", "x") + ) + + with _timer("Writing netCDF file"): + if compress: + write_ncf(ds, dst / out_name, verbose=verbose) + else: + ds.to_netcdf(dst / out_name) + + + cli = app _typer_click_object = typer.main.get_command(app) # for sphinx-click in docs diff --git a/melodies_monet/tests/test_get_data_cli.py b/melodies_monet/tests/test_get_data_cli.py index 25bfb679..515e7e9c 100644 --- a/melodies_monet/tests/test_get_data_cli.py +++ b/melodies_monet/tests/test_get_data_cli.py @@ -81,4 +81,36 @@ def test_get_airnow_comp(tmp_path): ds[vn] = ds[vn].where(ds[vn] != -1) ds[vn] = ds[vn].where(~ ((ds[vn] == 0) & (ds0[vn] != 0))) # assert (np.abs((ds[vn] - ds0[vn]) / ds0[vn]).to_series().dropna() < 2e-6).all() - # assert (np.abs(ds[vn] - ds0[vn]).to_series().dropna() < 3e-7).all() + assert (np.abs(ds[vn] - ds0[vn]).to_series().dropna() < 3e-7).all() + + +def test_get_ish_lite_box(tmp_path): + fn = "x.nc" + cmd = [ + "melodies-monet", "get-ish-lite", + "-s", "2023-01-01", "-e", "2023-01-01 23:00", + "--box", "39.5", "-105.75", "40.5", "-104.75", + "--dst", tmp_path.as_posix(), "-o", fn, + ] + subprocess.run(cmd, check=True) + + ds = xr.open_dataset(tmp_path / fn) + + assert ds.time.size == 24 + assert np.unique(ds.state) == ["CO"] + + +def test_get_ish_box(tmp_path): + fn = "x.nc" + cmd = [ + "melodies-monet", "get-ish", + "-s", "2023-01-01", "-e", "2023-01-01 23:00", + "--box", "39.5", "-105.75", "40.5", "-104.75", + "--dst", tmp_path.as_posix(), "-o", fn, + ] + subprocess.run(cmd, check=True) + + ds = xr.open_dataset(tmp_path / fn) + + assert ds.time.size == 24 + assert np.unique(ds.state) == ["CO"]