-
Notifications
You must be signed in to change notification settings - Fork 9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
⚡ Source gdp adapter using dask #508
base: main
Are you sure you want to change the base?
⚡ Source gdp adapter using dask #508
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added suggestion for edits and questions. I have not fully tested yet.
clouddrift/datasets.py
Outdated
If True, skips downloading the data files and the code assumes the files have already been downloaded. | ||
This is mainly used to skip downloading files if the remote doesn't provide the HTTP Last-Modified header. | ||
use_fill_values: bool, True (default) | ||
If True, when drifters are found with no associated metadata, ignore the associated observations. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If False?
clouddrift/adapters/gdp/gdpsource.py
Outdated
"comments": "0 (buoy still alive), 1 (buoy ran aground), 2 (picked up by vessel), 3 (stop transmitting), 4 (sporadic transmissions), 5 (bad batteries), 6 (inactive status)", | ||
}, | ||
"position_datetime": { | ||
"comments": "Position datetime derived from the year, month, day and time (represented as a ratio of a day) columns found in the source dataset that represent when the position of the drifter was measured. This value is only different from the sensor_datetime when the position of the drifter was determined onboard the Argos satellites using the doppler shift.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please change to : "Position datetime derived from the year, month, and day (represented as a ratio of a day) when the geographical coordinates of the drifter were obtained. May differ from sensor_datetime."
Even for GPS-tracked drifters sometimes sensor and position times may differ.
clouddrift/adapters/gdp/gdpsource.py
Outdated
"comments": "Position datetime derived from the year, month, day and time (represented as a ratio of a day) columns found in the source dataset that represent when the position of the drifter was measured. This value is only different from the sensor_datetime when the position of the drifter was determined onboard the Argos satellites using the doppler shift.", | ||
}, | ||
"sensor_datetime": { | ||
"comments": "Sensor datetime derived from the year, month, day and time (represented as a ratio of a day) columns found in the source dataset that represent when the sensor (like temp) data is recorded", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sensor datetime derived from the year, month, and day (represented as a ratio of a day) when sensor data were recorded.
}, | ||
"qualityIndex": { | ||
"long_name": "Quality Index", | ||
"units": "-", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add:
"comments": "Definitions vary"
The data is accessed from a public HTTPS server at NOAA's Atlantic | ||
Oceanographic and Meteorological Laboratory (AOML) at | ||
https://www.aoml.noaa.gov/ftp/pub/phod/pub/pazos/data/shane/sst/. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add something like:
"Metadata are obtained from url_of_metadata."
docs/datasets.rst
Outdated
@@ -14,6 +14,8 @@ datasets. Currently available datasets are: | |||
- :func:`clouddrift.datasets.gdp6h`: 6-hourly GDP data from a ragged-array | |||
NetCDF file hosted by the public HTTPS server at | |||
`NOAA's Atlantic Oceanographic and Meteorological Laboratory (AOML) <https://www.aoml.noaa.gov/phod/gdp/index.php>`_. | |||
- :func:`clouddrift.datasets.gdp_source`: source GDP data without being pre-processed unlike |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change to:
Source GDP data from which the 6-hourly and 1-hourly products are derived.
class gdp_source_files_integration(unittest.TestCase): | ||
def test_load_and_create_aggregate(self): | ||
""" | ||
Test that each drifters segment has maintianed the row order as its read in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please review the grammar and spelling of this sentence.
Tested and ran into following error:
|
c64a4e9
to
30979c3
Compare
* also remove
9b05387
to
99d166c
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files📢 Thoughts on this report? Let us know! |
dask by default utilizes multi-threading for distributing chunked tasks that can be performed in parallel. To leverage all of your CPU cores, include the dask code below before running the dataset from dask.distributed import LocalCluster
import clouddrift as cd
cluster = LocalCluster()
client = cluster.get_client()
ds = cd.datasets.gdp_source(tmp_path="/home/ksantana/.clouddrift/tmp/gdpsource")
ds
Although in theory this should enable multiple threads to run in parallel in reality due to the python GIL only one thread runs at a time. In the future the snippet may not be needed if the GIL becomes disabled by default in some future version of python. |
@selipot this is ready for re-review. |
Feedback: Include this info above in the docstring |
Hi @selipot the changes here are ready to be tested. When you get a chance could you try generating the entire aggregate ragged array zarr archive again using the code in this PR? As a heads up the process is going to take longer (3x as much time I would say) due to the serial nature of the current I suspect if we were to rework Ways of achieving this would be to rewrite the
I prefer the latter as it will reduce complexity and leverages tools already being used in the library. from dask.distributed import LocalCluster
cluster = LocalCluster()
client = cluster.get_client() |
I can't make this to work on my laptop (I run into memory/dask/workers errors). I also attempted on my desktop with more than 100GB of RAM and also run into memory issues. I suspect a memory limit configuration/setup is needed. To be discussed. |
Yes, this is likely caused by the tmp directory used for intermediary computations that sometimes lies on a partition with limited space. I've added more details in the docstring but in short utilize the following snippet and replace with your home directory to see if that fixes the memory limitation issue: import dask.config as daskc
daskc.set({"temporary-directory": "/home/ksantana/.clouddrift/tmp"}) |
Hey @selipot give the ragged array generation another shot. It should scale better now and work within the limits of your system. Here's the code to run: from dask.distributed import LocalCluster
import dask.config as daskc
import clouddrift as cd
# Change to your home dir
daskc.set({"temporary-directory": "/home/ksantana/.clouddrift/tmp"})
# Update using your system memory limit
cluster = LocalCluster(memory_limit="60GiB")
client = cluster.get_client()
# Update to your home dir
ds = cd.adapters.gdp_source.to_raggedarray(
tmp_path="/home/ksantana/.clouddrift/tmp/gdpsource",
skip_download=True,
use_fill_values=False,
max=1
) |
We should note the process takes longer now and as such the integration test is skipped as the workflows end up getting cancelled as a result of their long run time. |
This migrates the adapter to leverage dask for the parallelization of the adapter. This makes the adapter much more scalable as Dask can scale to many machines. The memory footprint of the adapters will also be improved due to dasks lazy loading feature.