From 9b0538700d33acdacfd50af8d024eacbf7118a72 Mon Sep 17 00:00:00 2001 From: Kevin Santana Date: Wed, 11 Sep 2024 00:10:16 -0400 Subject: [PATCH] use position datetime as other coord and also pre-compute before applying transforms to dataframe --- clouddrift/adapters/gdp/gdpsource.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/clouddrift/adapters/gdp/gdpsource.py b/clouddrift/adapters/gdp/gdpsource.py index 83a77903..1163a10e 100644 --- a/clouddrift/adapters/gdp/gdpsource.py +++ b/clouddrift/adapters/gdp/gdpsource.py @@ -11,6 +11,7 @@ import numpy as np import pandas as pd import xarray as xr +from dask import delayed from tqdm import tqdm from clouddrift.adapters.gdp import get_gdp_metadata @@ -23,12 +24,11 @@ _FILENAME_TEMPLATE = "buoydata_{start}_{end}_{suffix}.dat.gz" _SECONDS_IN_DAY = 86_400 -_COORDS = ["id", "obs_index"] +_COORDS = ["id", "position_datetime"] _DATA_VARS = [ "latitude", "longitude", - "position_datetime", "sensor_datetime", "drogue", "sst", @@ -123,7 +123,7 @@ "sensor6": np.float32, } -_INPUT_COLS_PREFILTER_DTYPES = { +_INPUT_COLS_PREFILTER_DTYPES: dict[str, type[object]] = { "posObsMonth": np.str_, "posObsYear": np.float64, "senObsMonth": np.str_, @@ -329,9 +329,9 @@ def _preprocess(id_, **kwargs) -> xr.Dataset: coords = { "id": (["traj"], np.array([id_]).astype(np.int64)), - "obs_index": ( + "position_datetime": ( ["obs"], - traj_data_df[["obs_index"]].values.flatten().astype(np.int32), + traj_data_df[["position_datetime"]].values.flatten().astype(np.datetime64), ), } @@ -381,7 +381,7 @@ def _parse_datetime_with_day_ratio( def _process( - df: pd.DataFrame, + df: dd.DataFrame, gdp_metadata_df: pd.DataFrame, use_fill_values: bool, ) -> dict[int, xr.Dataset]: @@ -389,7 +389,7 @@ def _process( # Transform the initial dataframe filtering out rows with really anomolous values # examples include: years in the future, years way in the past before GDP program, etc... - preremove_df_chunk = df + preremove_df_chunk = df.compute() df_chunk = _apply_remove( preremove_df_chunk, filters=[ @@ -506,13 +506,13 @@ def to_raggedarray( ): decompr.write(compr.read()) - df = dd.read_csv( + df: dd.DataFrame = dd.read_csv( data_files, sep=r"\s+", header=None, names=_INPUT_COLS, engine="c", - dtype=_INPUT_COLS_PREFILTER_DTYPES, + dtype=_INPUT_COLS_DTYPES.update(_INPUT_COLS_PREFILTER_DTYPES), blocksize="1GB", assume_missing=True, )