Skip to content

Commit

Permalink
Get local time; remove breakpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
zmoon committed Feb 13, 2024
1 parent b20af44 commit 845e284
Showing 1 changed file with 34 additions and 13 deletions.
47 changes: 34 additions & 13 deletions melodies_monet/_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1020,12 +1020,34 @@ def get_aqs(
local=False,
wide_fmt=True, # column for each variable
n_procs=num_workers,
meta=False,
meta=False, # TODO: enable or add option once monetio fixes released
)

breakpoint()
if not daily:
with _timer("Fetching site metadata"):
# Need UTC offset in order to compute local time
# But currently the `meta=True` option doesn't work
meta0 = pd.read_csv(
"https://aqs.epa.gov/aqsweb/airdata/aqs_sites.zip",
encoding="ISO-8859-1",
usecols=[0, 1, 2, 17],
dtype=str,
)
meta = (
meta0.copy()
.assign(
siteid=meta0["State Code"] + meta0["County Code"] + meta0["Site Number"],
utcoffset=meta0["GMT Offset"].astype(int),
)
.drop(
columns=["State Code", "County Code", "Site Number", "GMT Offset"],
)
)

with _timer("Forming xarray Dataset"):
# Select requested time period (currently monetio doesn't do this)
df = df[df.time.between(dates[0], dates[-1], inclusive="both")]

df = df.dropna(subset=["latitude", "longitude"])

v_vns = [
Expand All @@ -1039,7 +1061,7 @@ def get_aqs(
"method_name",
]
df = df.drop(columns=v_vns).drop_duplicates()
# TODO: maybe better to get long fmt and drop these first and then pivot
# TODO: may be better to get long fmt and drop these first and then pivot
# TODO: option to average duplicate measurements at same site instead of keeping first?

site_vns = [
Expand All @@ -1051,8 +1073,9 @@ def get_aqs(
"longitude",
]
# NOTE: time_local not included since it varies in time as well
if daily:
site_vns.remove("utcoffset") # not present in the daily data product
if not daily:
df = df.merge(meta, on="siteid", how="left")
site_vns.append("utcoffset")

ds_site = (
df[site_vns]
Expand All @@ -1071,10 +1094,10 @@ def get_aqs(
cols = [n for n in df.columns if not n.endswith(unit_suff)]
ds = (
df[cols]
.drop(columns=[vn for vn in site_vns if vn != "siteid"])
.set_index(["time", "siteid"])
.to_xarray()
.swap_dims(siteid="x")
.drop_vars(site_vns)
.merge(ds_site)
.set_coords(["latitude", "longitude"])
.assign(x=range(ds_site.dims["x"]))
Expand All @@ -1085,10 +1108,10 @@ def get_aqs(
vn = k[:-len(unit_suff)]
ds[vn].attrs.update(units=u)

# # Fill in local time array
# # (in the df, not all sites have rows for all times, so we have NaTs at this point)
# if not daily:
# ds["time_local"] = ds.time + ds.utcoffset.astype("timedelta64[h]")
# Fill in local time array
# (in the df, not all sites have rows for all times, so we have NaTs at this point)
if not daily:
ds["time_local"] = ds.time + ds.utcoffset.astype("timedelta64[h]")

# Expand
ds = (
Expand All @@ -1097,12 +1120,10 @@ def get_aqs(
.transpose("time", "y", "x")
)

# Apparently can't have `/` in variable nane
# Can't have `/` in variable name for netCDF
to_rename = [vn for vn in ds.data_vars if "/" in vn]
ds = ds.rename_vars({vn: vn.replace("/", "_") for vn in to_rename})

breakpoint()

with _timer("Writing netCDF file"):
if compress:
write_ncf(ds, dst / out_name, verbose=verbose)
Expand Down

0 comments on commit 845e284

Please sign in to comment.