Skip to content

Commit

Permalink
use position datetime as other coord and also pre-compute before appl…
Browse files Browse the repository at this point in the history
…ying transforms to dataframe
  • Loading branch information
kevinsantana11 committed Sep 11, 2024
1 parent d95f185 commit 9b05387
Showing 1 changed file with 9 additions and 9 deletions.
18 changes: 9 additions & 9 deletions clouddrift/adapters/gdp/gdpsource.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import numpy as np
import pandas as pd
import xarray as xr
from dask import delayed
from tqdm import tqdm

from clouddrift.adapters.gdp import get_gdp_metadata
Expand All @@ -23,12 +24,11 @@
_FILENAME_TEMPLATE = "buoydata_{start}_{end}_{suffix}.dat.gz"
_SECONDS_IN_DAY = 86_400

_COORDS = ["id", "obs_index"]
_COORDS = ["id", "position_datetime"]

_DATA_VARS = [
"latitude",
"longitude",
"position_datetime",
"sensor_datetime",
"drogue",
"sst",
Expand Down Expand Up @@ -123,7 +123,7 @@
"sensor6": np.float32,
}

_INPUT_COLS_PREFILTER_DTYPES = {
_INPUT_COLS_PREFILTER_DTYPES: dict[str, type[object]] = {
"posObsMonth": np.str_,
"posObsYear": np.float64,
"senObsMonth": np.str_,
Expand Down Expand Up @@ -329,9 +329,9 @@ def _preprocess(id_, **kwargs) -> xr.Dataset:

coords = {
"id": (["traj"], np.array([id_]).astype(np.int64)),
"obs_index": (
"position_datetime": (
["obs"],
traj_data_df[["obs_index"]].values.flatten().astype(np.int32),
traj_data_df[["position_datetime"]].values.flatten().astype(np.datetime64),
),
}

Expand Down Expand Up @@ -381,15 +381,15 @@ def _parse_datetime_with_day_ratio(


def _process(
df: pd.DataFrame,
df: dd.DataFrame,
gdp_metadata_df: pd.DataFrame,
use_fill_values: bool,
) -> dict[int, xr.Dataset]:
"""Process each dataframe chunk. Return a dictionary mapping each drifter to a unique xarray Dataset."""

# Transform the initial dataframe filtering out rows with really anomolous values
# examples include: years in the future, years way in the past before GDP program, etc...
preremove_df_chunk = df
preremove_df_chunk = df.compute()
df_chunk = _apply_remove(
preremove_df_chunk,
filters=[
Expand Down Expand Up @@ -506,13 +506,13 @@ def to_raggedarray(
):
decompr.write(compr.read())

df = dd.read_csv(
df: dd.DataFrame = dd.read_csv(
data_files,
sep=r"\s+",
header=None,
names=_INPUT_COLS,
engine="c",
dtype=_INPUT_COLS_PREFILTER_DTYPES,
dtype=_INPUT_COLS_DTYPES.update(_INPUT_COLS_PREFILTER_DTYPES),
blocksize="1GB",
assume_missing=True,
)
Expand Down

0 comments on commit 9b05387

Please sign in to comment.