Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

⚡ Source gdp adapter using dask #508

Open
wants to merge 19 commits into
base: main
Choose a base branch
from

Conversation

kevinsantana11
Copy link
Contributor

This migrates the adapter to leverage dask for the parallelization of the adapter. This makes the adapter much more scalable as Dask can scale to many machines. The memory footprint of the adapters will also be improved due to dasks lazy loading feature.

Copy link
Member

@selipot selipot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added suggestion for edits and questions. I have not fully tested yet.

If True, skips downloading the data files and the code assumes the files have already been downloaded.
This is mainly used to skip downloading files if the remote doesn't provide the HTTP Last-Modified header.
use_fill_values: bool, True (default)
If True, when drifters are found with no associated metadata, ignore the associated observations.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If False?

clouddrift/adapters/gdp/gdpsource.py Show resolved Hide resolved
"comments": "0 (buoy still alive), 1 (buoy ran aground), 2 (picked up by vessel), 3 (stop transmitting), 4 (sporadic transmissions), 5 (bad batteries), 6 (inactive status)",
},
"position_datetime": {
"comments": "Position datetime derived from the year, month, day and time (represented as a ratio of a day) columns found in the source dataset that represent when the position of the drifter was measured. This value is only different from the sensor_datetime when the position of the drifter was determined onboard the Argos satellites using the doppler shift.",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please change to : "Position datetime derived from the year, month, and day (represented as a ratio of a day) when the geographical coordinates of the drifter were obtained. May differ from sensor_datetime."

Even for GPS-tracked drifters sometimes sensor and position times may differ.

"comments": "Position datetime derived from the year, month, day and time (represented as a ratio of a day) columns found in the source dataset that represent when the position of the drifter was measured. This value is only different from the sensor_datetime when the position of the drifter was determined onboard the Argos satellites using the doppler shift.",
},
"sensor_datetime": {
"comments": "Sensor datetime derived from the year, month, day and time (represented as a ratio of a day) columns found in the source dataset that represent when the sensor (like temp) data is recorded",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sensor datetime derived from the year, month, and day (represented as a ratio of a day) when sensor data were recorded.

},
"qualityIndex": {
"long_name": "Quality Index",
"units": "-",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add:
"comments": "Definitions vary"

The data is accessed from a public HTTPS server at NOAA's Atlantic
Oceanographic and Meteorological Laboratory (AOML) at
https://www.aoml.noaa.gov/ftp/pub/phod/pub/pazos/data/shane/sst/.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add something like:

"Metadata are obtained from url_of_metadata."

@@ -14,6 +14,8 @@ datasets. Currently available datasets are:
- :func:`clouddrift.datasets.gdp6h`: 6-hourly GDP data from a ragged-array
NetCDF file hosted by the public HTTPS server at
`NOAA's Atlantic Oceanographic and Meteorological Laboratory (AOML) <https://www.aoml.noaa.gov/phod/gdp/index.php>`_.
- :func:`clouddrift.datasets.gdp_source`: source GDP data without being pre-processed unlike
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change to:
Source GDP data from which the 6-hourly and 1-hourly products are derived.

class gdp_source_files_integration(unittest.TestCase):
def test_load_and_create_aggregate(self):
"""
Test that each drifters segment has maintianed the row order as its read in
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please review the grammar and spelling of this sentence.

@selipot
Copy link
Member

selipot commented Aug 17, 2024

Tested and ran into following error:

In [6]: ds = gdp_source(tmp_path=tmp_path, use_fill_values=True,skip_download=True)
https://www.aoml.noaa.gov/ftp/pub/phod/buoydata/dirfl_1_5000.dat: 100%|████████████████████████████████████████████████████████████████████████████████| 0.57k/0.57k [00:00<00:00, 831kB/s]
https://www.aoml.noaa.gov/ftp/pub/phod/buoydata/dirfl_5001_10000.dat: 100%|████████████████████████████████████████████████████████████████████████████| 0.57k/0.57k [00:00<00:00, 707kB/s]
https://www.aoml.noaa.gov/ftp/pub/phod/buoydata/dirfl_10001_15000.dat: 100%|███████████████████████████████████████████████████████████████████████████| 0.61k/0.61k [00:00<00:00, 775kB/s]
https://www.aoml.noaa.gov/ftp/pub/phod/buoydata/dirfl_15001_20000.dat: 0.06kB [00:00, 315kB/s]
https://www.aoml.noaa.gov/ftp/pub/phod/buoydata/dirfl_15001_current.dat: 100%|████████████████████████████████████████████████████████████████████████| 1.56k/1.56k [00:01<00:00, 1.43MB/s]
Decompressing files: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 4/4 [00:00<00:00, 47662.55file/s]
/Users/selipot/projects.git/clouddrift/clouddrift/adapters/gdp/gdpsource.py:416: UserWarning: Filters removed 3 rows from chunk
  warnings.warn(
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
Cell In[6], line 1
----> 1 ds = gdp_source(tmp_path=tmp_path, use_fill_values=True,skip_download=True)

File ~/projects.git/clouddrift/clouddrift/datasets.py:223, in gdp_source(tmp_path, max, skip_download, use_fill_values, decode_times)
    163 """Returns the NOAA Global Drifter Program (GDP) source (raw) dataset as a ragged array
    164 Xarray dataset.
    165 
   (...)
    220     wmo_number         (traj) int64 ...
    221 """
    222 file_selection_label = "all" if max is None else f"first-{max}"
--> 223 return _dataset_filecache(
    224     f"gdpsource_agg_{file_selection_label}.zarr",
    225     decode_times,
    226     lambda: adapters.gdp_source.to_raggedarray(
    227         tmp_path, skip_download, max, use_fill_values=use_fill_values
    228     ),
    229 )

File ~/projects.git/clouddrift/clouddrift/datasets.py:709, in _dataset_filecache(filename, decode_times, get_ds)
    706 os.makedirs(os.path.dirname(fp), exist_ok=True)
    708 if not os.path.exists(fp):
--> 709     ds = get_ds()
    710     if ext == ".nc":
    711         ds.to_netcdf(fp)

File ~/projects.git/clouddrift/clouddrift/datasets.py:226, in gdp_source.<locals>.<lambda>()
    163 """Returns the NOAA Global Drifter Program (GDP) source (raw) dataset as a ragged array
    164 Xarray dataset.
    165 
   (...)
    220     wmo_number         (traj) int64 ...
    221 """
    222 file_selection_label = "all" if max is None else f"first-{max}"
    223 return _dataset_filecache(
    224     f"gdpsource_agg_{file_selection_label}.zarr",
    225     decode_times,
--> 226     lambda: adapters.gdp_source.to_raggedarray(
    227         tmp_path, skip_download, max, use_fill_values=use_fill_values
    228     ),
    229 )

File ~/projects.git/clouddrift/clouddrift/adapters/gdp/gdpsource.py:514, in to_raggedarray(tmp_path, skip_download, max, use_fill_values)
    502             decompr.write(compr.read())
    504 df = dd.read_csv(
    505     data_files,
    506     sep=r"\s+",
   (...)
    512     assume_missing=True
    513 )
--> 514 drifter_datasets = _process(df, gdp_metadata_df, use_fill_values)
    516 # Sort the drifters by their start date.
    517 deploy_date_id_map = {
    518     ds["id"].data[0]: ds["start_date"].data[0] for ds in drifter_datasets
    519 }

File ~/projects.git/clouddrift/clouddrift/adapters/gdp/gdpsource.py:424, in _process(df, gdp_metadata_df, use_fill_values)
    421     return drifter_ds_map
    423 df_chunk = df_chunk.astype(_INPUT_COLS_DTYPES)
--> 424 df_chunk = _apply_transform(
    425     df_chunk,
    426     {
    427         "position_datetime": (
    428             ["posObsMonth", "posObsDay", "posObsYear"],
    429             _parse_datetime_with_day_ratio,
    430         ),
    431         "sensor_datetime": (
    432             ["senObsMonth", "senObsDay", "senObsYear"],
    433             _parse_datetime_with_day_ratio,
    434         ),
    435     },
    436 )
    438 # Find and process drifters found and documented in the drifter metadata.
    439 ids_in_data = np.unique(df_chunk[["id"]].values)

File ~/projects.git/clouddrift/clouddrift/adapters/gdp/gdpsource.py:360, in _apply_transform(df, transforms)
    358 args = list()
    359 for col in input_cols:
--> 360     arg = df[[col]].values.flatten()
    361     args.append(arg)
    362 tmp_df = tmp_df.assign(**{output_col: func(*args)})

File /opt/homebrew/Caskroom/mambaforge/base/envs/clouddrift/lib/python3.12/site-packages/dask/array/core.py:2185, in Array.ravel(self)
   2175 """Return a flattened array.
   2176 
   2177 Refer to :func:`dask.array.ravel` for full documentation.
   (...)
   2181 dask.array.ravel : equivalent function
   2182 """
   2183 from dask.array.routines import ravel
-> 2185 return ravel(self)

File /opt/homebrew/Caskroom/mambaforge/base/envs/clouddrift/lib/python3.12/site-packages/dask/array/routines.py:1914, in ravel(array_like)
   1912 @derived_from(np)
   1913 def ravel(array_like):
-> 1914     return asanyarray(array_like).reshape((-1,))

File /opt/homebrew/Caskroom/mambaforge/base/envs/clouddrift/lib/python3.12/site-packages/dask/array/core.py:2215, in Array.reshape(self, merge_chunks, limit, *shape)
   2213 if len(shape) == 1 and not isinstance(shape[0], Number):
   2214     shape = shape[0]
-> 2215 return reshape(self, shape, merge_chunks=merge_chunks, limit=limit)

File /opt/homebrew/Caskroom/mambaforge/base/envs/clouddrift/lib/python3.12/site-packages/dask/array/reshape.py:218, in reshape(x, shape, merge_chunks, limit)
    216     if len(shape) == 1 and x.ndim == 1:
    217         return x
--> 218     missing_size = sanitize_index(x.size / reduce(mul, known_sizes, 1))
    219     shape = tuple(missing_size if s == -1 else s for s in shape)
    221 if np.isnan(sum(x.shape)):

File /opt/homebrew/Caskroom/mambaforge/base/envs/clouddrift/lib/python3.12/site-packages/dask/array/slicing.py:72, in sanitize_index(ind)
     66     return slice(
     67         _sanitize_index_element(ind.start),
     68         _sanitize_index_element(ind.stop),
     69         _sanitize_index_element(ind.step),
     70     )
     71 elif isinstance(ind, Number):
---> 72     return _sanitize_index_element(ind)
     73 elif is_dask_collection(ind):
     74     return ind

File /opt/homebrew/Caskroom/mambaforge/base/envs/clouddrift/lib/python3.12/site-packages/dask/array/slicing.py:26, in _sanitize_index_element(ind)
     24 """Sanitize a one-element index."""
     25 if isinstance(ind, Number):
---> 26     ind2 = int(ind)
     27     if ind2 != ind:
     28         raise IndexError("Bad index.  Must be integer-like: %s" % ind)

ValueError: cannot convert float NaN to integer

@kevinsantana11 kevinsantana11 force-pushed the source-gdp-adapter-dask branch from c64a4e9 to 30979c3 Compare August 24, 2024 04:30
Copy link

codecov bot commented Sep 29, 2024

Codecov Report

Attention: Patch coverage is 10.93750% with 57 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
clouddrift/adapters/gdp/gdpsource.py 10.93% 57 Missing ⚠️
Additional details and impacted files

📢 Thoughts on this report? Let us know!

@kevinsantana11
Copy link
Contributor Author

dask by default utilizes multi-threading for distributing chunked tasks that can be performed in parallel. To leverage all of your CPU cores, include the dask code below before running the dataset gdp_source function.

from dask.distributed import LocalCluster
import clouddrift as cd

cluster = LocalCluster() 
client = cluster.get_client()

ds = cd.datasets.gdp_source(tmp_path="/home/ksantana/.clouddrift/tmp/gdpsource")
ds

code snippet initializing dask to create a local cluster using multi-processing

Although in theory this should enable multiple threads to run in parallel in reality due to the python GIL only one thread runs at a time.

In the future the snippet may not be needed if the GIL becomes disabled by default in some future version of python.

@kevinsantana11
Copy link
Contributor Author

@selipot this is ready for re-review.

@kevinsantana11
Copy link
Contributor Author

from dask.distributed import LocalCluster
import clouddrift as cd

cluster = LocalCluster() 
client = cluster.get_client()

Feedback: Include this info above in the docstring

@kevinsantana11
Copy link
Contributor Author

Hi @selipot the changes here are ready to be tested. When you get a chance could you try generating the entire aggregate ragged array zarr archive again using the code in this PR?

As a heads up the process is going to take longer (3x as much time I would say) due to the serial nature of the current RaggedArray.from_files function. In the current adapter implementation the serial nature of the function is avoided since it's executed per chunk instead of per drifter or the entire dataset.

I suspect if we were to rework RaggedArray.from_files to take better advantage of multi-processing we could speed up the process to be faster than the current script.

Ways of achieving this would be to rewrite the RaggedArray.from_files to parallelize the allocate and number_of_observations function by:

  • directly using a processing pool executor similarly to how this process was initially parallelized.
  • replacing dataset querying and manipulation operations with pandas or xarray operations similarly to this PR. Doing this can enable us to quickly configure the process from being serialized to parallelized using the following 3 lines:

I prefer the latter as it will reduce complexity and leverages tools already being used in the library.

from dask.distributed import LocalCluster
cluster = LocalCluster() 
client = cluster.get_client()

@selipot
Copy link
Member

selipot commented Oct 7, 2024

I can't make this to work on my laptop (I run into memory/dask/workers errors). I also attempted on my desktop with more than 100GB of RAM and also run into memory issues. I suspect a memory limit configuration/setup is needed. To be discussed.

@kevinsantana11
Copy link
Contributor Author

kevinsantana11 commented Oct 8, 2024

I can't make this to work on my laptop (I run into memory/dask/workers errors). I also attempted on my desktop with more than 100GB of RAM and also run into memory issues. I suspect a memory limit configuration/setup is needed. To be discussed.

Yes, this is likely caused by the tmp directory used for intermediary computations that sometimes lies on a partition with limited space. I've added more details in the docstring but in short utilize the following snippet and replace with your home directory to see if that fixes the memory limitation issue:

import dask.config as daskc
daskc.set({"temporary-directory": "/home/ksantana/.clouddrift/tmp"})

@kevinsantana11
Copy link
Contributor Author

Hey @selipot give the ragged array generation another shot. It should scale better now and work within the limits of your system. Here's the code to run:

from dask.distributed import LocalCluster
import dask.config as daskc
import clouddrift as cd

# Change to your home dir
daskc.set({"temporary-directory": "/home/ksantana/.clouddrift/tmp"})

# Update using your system memory limit
cluster = LocalCluster(memory_limit="60GiB")
client = cluster.get_client()

# Update to your home dir
ds = cd.adapters.gdp_source.to_raggedarray(
    tmp_path="/home/ksantana/.clouddrift/tmp/gdpsource",
    skip_download=True,
    use_fill_values=False,
    max=1
)

@kevinsantana11
Copy link
Contributor Author

We should note the process takes longer now and as such the integration test is skipped as the workflows end up getting cancelled as a result of their long run time.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants