From 36da2365950128a3c1003af95e650ff3b9ba5700 Mon Sep 17 00:00:00 2001 From: Martin Wolf Date: Wed, 19 Jul 2023 17:48:03 +0200 Subject: [PATCH 01/34] Initial commit --- skyllh/core/dataset.py | 222 ++++++++++++++++++++++-- skyllh/datasets/i3/PublicData_10y_ps.py | 12 ++ 2 files changed, 223 insertions(+), 11 deletions(-) diff --git a/skyllh/core/dataset.py b/skyllh/core/dataset.py index 1128c9c6e8..0c165b01f2 100644 --- a/skyllh/core/dataset.py +++ b/skyllh/core/dataset.py @@ -1,12 +1,15 @@ # -*- coding: utf-8 -*- -from copy import ( - deepcopy, -) +import abc import os import os.path + import numpy as np +from copy import ( + deepcopy, +) + from skyllh.core import ( display, ) @@ -20,6 +23,9 @@ DataFields, DataFieldStages as DFS, ) +from skyllh.core.debugging import ( + get_logger, +) from skyllh.core.display import ( ANSIColors, ) @@ -32,6 +38,7 @@ from skyllh.core.py import ( classname, float_cast, + get_class_of_func, issequence, issequenceof, list_of_cast, @@ -46,6 +53,130 @@ ) +class DatasetOrigin( + object, +): + """The DatasetOrigin class provides information about the origin of a + dataset, so the files of a dataset can be downloaded from the origin. + """ + def __init__( + self, + path, + transfer_func, + protocol=None, + host=None, + port=None, + user=None, + password=None, + post_transfer_func=None, + **kwargs, + ): + super().__init__(**kwargs) + + self.path = path + self.transfer_func = transfer_func + self.protocol = protocol + self.host = host + self.port = port + self.user = user + self.password = password + self.post_transfer_func = post_transfer_func + + +class DatasetTransfer( + object, + metaclass=abc.ABCMeta, +): + """Base class for a dataset transfer mechanism. + """ + def __init__(self, **kwargs): + super().__init__(**kwargs) + + @staticmethod + def ensure_dst_path(dst_path): + """Ensures the existance of the given destination path. + + Parameters + ---------- + dst_path : str + The destination path. + """ + if not os.path.isdir(dst_path): + # Throws if dst_path exists as a file. + os.makedirs(dst_path) + + @staticmethod + @abc.abstractstaticmethod + def transfer(ds, dst_path): + """This method is supposed to transfer the dataset origin path to the + given destination path. + + Parameters + ---------- + ds : instance of Dataset + The instance of Dataset having the ``origin`` property defined, + specifying the origin of the dataset. + dst_path : str + The destination path into which the dataset will be transfered. + """ + pass + + @staticmethod + def post_transfer_unzip(ds, dst_path): + """This is a post-transfer function. It will unzip the transfered file + into the dst_path if the origin path was a zip file. + """ + if not ds.origin.path.lower().endswith('.zip'): + return + + cls = get_class_of_func(DatasetTransfer.post_transfer_unzip) + logger = get_logger(f'{classname(cls)}.post_transfer_unzip') + + fname = os.path.basename(ds.origin.path) + # Unzip the dataset file. + zip_file = os.path.join(dst_path, fname) + cmd = f'unzip "{zip_file}" -d "{dst_path}"' + rcode = os.system(cmd) + if rcode != 0: + raise RuntimeError( + f'The post-transfer command "{cmd}" failed with return ' + f'code {rcode}!') + # Remove the zip file. + try: + os.remove(zip_file) + except Exception as exc: + logger.warn(str(exc)) + + +class WGETDatasetTransfer( + DatasetTransfer, +): + def __init__(self, **kwargs): + super().__init__(**kwargs) + + @staticmethod + def transfer(ds, dst_path): + """Transfers the given dataset to the given destination path. + + Parameters + ---------- + ds : instance of Dataset + The instance of Dataset containing the origin property specifying + the origin of the dataset. + dst_path : str + The destination path into which the dataset will be transfered. + """ + protocol = ds.origin.protocol + host = ds.origin.host + path = ds.origin.path + cmd = f'wget {protocol}://{host}/{path} -P {dst_path}' + rcode = os.system(cmd) + if rcode != 0: + raise RuntimeError( + f'The transfer command "{cmd}" failed with return code ' + f'{rcode}!') + + class Dataset( HasConfig, ): @@ -148,6 +279,7 @@ def __init__( verqualifiers=None, base_path=None, sub_path_fmt=None, + origin=None, **kwargs, ): """Creates a new dataset object that describes a self-consistent set of @@ -186,6 +318,9 @@ def __init__( sub_path_fmt : str | None The user-defined format of the sub path of the data set. If set to ``None``, the ``default_sub_path_fmt`` will be used. + origin : instance of DatasetOrigin | None + The instance of DatasetOrigin defining the origin of the dataset, + so the dataset can be transfered automatically to the user's device. """ super().__init__(**kwargs) @@ -198,6 +333,7 @@ def __init__( self.verqualifiers = verqualifiers self.base_path = base_path self.sub_path_fmt = sub_path_fmt + self.origin = origin self.description = '' @@ -413,6 +549,23 @@ def sub_path_fmt(self, fmt): 'str!') self._sub_path_fmt = fmt + @property + def origin(self): + """The instance of DatasetOrigin defining the origin of the dataset. + This can be ``None`` if the dataset has no origin defined. + """ + return self._origin + + @origin.setter + def origin(self, obj): + if obj is not None: + if not isinstance(obj, DatasetOrigin): + raise TypeError( + 'The origin property must be None, or an instance of ' + 'DatasetOrigin! ' + f'Its current type is {classname(obj)}!') + self._origin = obj + @property def root_dir(self): """(read-only) The root directory to use when data files are specified @@ -690,6 +843,23 @@ def update_version_qualifiers( 'Version qualifier values did not increment and no new version ' 'qualifiers were added!') + def download_from_origin(self): + """Downloads the dataset from the origin using the transfer and + post-transfer function if the root directory of the dataset does not + exist. + """ + root_dir = self.root_dir + if os.path.exists(root_dir) and os.path.isdir(root_dir): + return + + if self.origin is None: + return + + self.origin.transfer_func( + ds=self, + dst_path=root_dir) + # if self.origin + def load_data( self, keep_fields=None, @@ -701,8 +871,10 @@ def load_data( ): """Loads the data, which is described by the dataset. - Note: This does not call the ``prepare_data`` method! It only loads - the data as the method names says. + .. note: + + This does not call the ``prepare_data`` method! It only loads the + data as the method names says. Parameters ---------- @@ -767,6 +939,9 @@ def _conv_new2orig_field_names( return orig_field_names + # Download the dataset if necessary. + self.download_from_origin() + if keep_fields is None: keep_fields = [] @@ -2026,6 +2201,34 @@ def remove_events( return data_exp +def generate_base_path( + default_base_path, + base_path=None, +): + """Generates the base path. If base_path is None, default_base_path is used. + + Parameters + ---------- + default_base_path : str + The default base path if base_path is None. + base_path : str | None + The user-specified base path. + + Returns + ------- + base_path : str + The generated base path. + """ + if base_path is None: + if default_base_path is None: + raise ValueError( + 'The default_base_path argument must not be None, when the ' + 'base_path argument is set to None!') + base_path = default_base_path + + return base_path + + def generate_data_file_root_dir( default_base_path, default_sub_path_fmt, @@ -2067,12 +2270,9 @@ def generate_data_file_root_dir( root_dir : str The generated root directory of the data files. """ - if base_path is None: - if default_base_path is None: - raise ValueError( - 'The default_base_path argument must not be None, when the ' - 'base_path argument is set to None!') - base_path = default_base_path + base_path = generate_base_path( + default_base_path=default_base_path, + base_path=base_path) if sub_path_fmt is None: sub_path_fmt = default_sub_path_fmt diff --git a/skyllh/datasets/i3/PublicData_10y_ps.py b/skyllh/datasets/i3/PublicData_10y_ps.py index 3b28646d26..ad0a1d9dbe 100644 --- a/skyllh/datasets/i3/PublicData_10y_ps.py +++ b/skyllh/datasets/i3/PublicData_10y_ps.py @@ -5,6 +5,8 @@ from skyllh.core.dataset import ( DatasetCollection, + DatasetOrigin, + WGETDatasetTransfer, ) from skyllh.i3.dataset import ( I3Dataset, @@ -255,6 +257,15 @@ def create_dataset_collection( 28 January 2021 """ + # Define the origin of the dataset. + origin = DatasetOrigin( + path='data-releases/20210126_PS-IC40-IC86_VII.zip', + host='icecube.wisc.edu', + protocol='http', + transfer_func=WGETDatasetTransfer.transfer, + post_transfer_func=WGETDatasetTransfer.post_transfer_unzip, + ) + # Define the common keyword arguments for all data sets. ds_kwargs = dict( cfg=cfg, @@ -264,6 +275,7 @@ def create_dataset_collection( base_path=base_path, default_sub_path_fmt=default_sub_path_fmt, sub_path_fmt=sub_path_fmt, + origin=origin, ) grl_field_name_renaming_dict = { From 4839fd5f08dd4764d750cdc8c9e26d3fef735054 Mon Sep 17 00:00:00 2001 From: Martin Wolf Date: Thu, 20 Jul 2023 10:39:15 +0200 Subject: [PATCH 02/34] Add config option to download dataset from origin --- skyllh/core/config.py | 1 + 1 file changed, 1 insertion(+) diff --git a/skyllh/core/config.py b/skyllh/core/config.py index 3ecfc147fe..eb7aa2f15e 100644 --- a/skyllh/core/config.py +++ b/skyllh/core/config.py @@ -51,6 +51,7 @@ 'repository': { # A base path of repository datasets. 'base_path': None, + 'download_from_origin': True, }, 'units': { # Definition of the internal units to use. These must match with the From a848f9fbd4ee72f20812dae1df42bebd0b75cb4e Mon Sep 17 00:00:00 2001 From: Martin Wolf Date: Thu, 20 Jul 2023 10:39:38 +0200 Subject: [PATCH 03/34] Complete method --- skyllh/core/dataset.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/skyllh/core/dataset.py b/skyllh/core/dataset.py index 0c165b01f2..9912739a59 100644 --- a/skyllh/core/dataset.py +++ b/skyllh/core/dataset.py @@ -855,10 +855,18 @@ def download_from_origin(self): if self.origin is None: return + base_path = generate_base_path( + default_base_path=self._cfg['repository']['base_path'], + base_path=self._base_path) + self.origin.transfer_func( ds=self, - dst_path=root_dir) - # if self.origin + dst_path=base_path) + + if self.origin.post_transfer_func is not None: + self.origin.post_transfer_func( + ds=self, + dst_path=base_path) def load_data( self, @@ -939,8 +947,8 @@ def _conv_new2orig_field_names( return orig_field_names - # Download the dataset if necessary. - self.download_from_origin() + if self._cfg['repository']['download_from_origin'] is True: + self.download_from_origin() if keep_fields is None: keep_fields = [] From 03333054db105f83817b75a6362afd2f6ecc72ae Mon Sep 17 00:00:00 2001 From: Martin Wolf Date: Thu, 20 Jul 2023 10:53:44 +0200 Subject: [PATCH 04/34] Make iminuit an optional tool --- skyllh/core/minimizers/iminuit.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/skyllh/core/minimizers/iminuit.py b/skyllh/core/minimizers/iminuit.py index 4f395b7021..d37822e1f0 100644 --- a/skyllh/core/minimizers/iminuit.py +++ b/skyllh/core/minimizers/iminuit.py @@ -3,10 +3,11 @@ minimizer. """ -import iminuit - import numpy as np +from skyllh.core import ( + tool, +) from skyllh.core.config import ( HasConfig, ) @@ -116,6 +117,7 @@ class IMinuitMinimizerImpl( """The SkyLLH minimizer implementation that utilizes the iminuit minimizer. """ + @tool.requires('iminuit') def __init__( self, ftol=1e-6, @@ -194,6 +196,8 @@ def minimize( if kwargs is None: kwargs = dict() + iminuit = tool.get('iminuit') + func_provides_grads = kwargs.pop('func_provides_grads', True) if func_provides_grads: From d1d5530a3ecbe8007196b9cfa4a0c49d82a2a819 Mon Sep 17 00:00:00 2001 From: Martin Wolf Date: Thu, 20 Jul 2023 11:04:07 +0200 Subject: [PATCH 05/34] Add some print debugging --- skyllh/core/dataset.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/skyllh/core/dataset.py b/skyllh/core/dataset.py index 9912739a59..a67184ad9c 100644 --- a/skyllh/core/dataset.py +++ b/skyllh/core/dataset.py @@ -136,6 +136,7 @@ def post_transfer_unzip(ds, dst_path): # Unzip the dataset file. zip_file = os.path.join(dst_path, fname) cmd = f'unzip "{zip_file}" -d "{dst_path}"' + print(f'post_transfer_unzip: cmd: {cmd}') rcode = os.system(cmd) if rcode != 0: raise RuntimeError( @@ -170,6 +171,7 @@ def transfer(ds, dst_path): host = ds.origin.host path = ds.origin.path cmd = f'wget {protocol}://{host}/{path} -P {dst_path}' + print(f'Transfer cmd: {cmd}') rcode = os.system(cmd) if rcode != 0: raise RuntimeError( @@ -849,12 +851,17 @@ def download_from_origin(self): exist. """ root_dir = self.root_dir + print(f'Checking root_dir "{root_dir}"') if os.path.exists(root_dir) and os.path.isdir(root_dir): + print('The root dir exists.') return if self.origin is None: + print('No dataset origin defined!') return + print(f'Downloading dataset "{self.name}" from orgin.') + base_path = generate_base_path( default_base_path=self._cfg['repository']['base_path'], base_path=self._base_path) @@ -947,6 +954,7 @@ def _conv_new2orig_field_names( return orig_field_names + print(f'download_from_origin = {self._cfg["repository"]["download_from_origin"]}') if self._cfg['repository']['download_from_origin'] is True: self.download_from_origin() From 8266a955388d2ed0637a905f93cf5ce8be1f906c Mon Sep 17 00:00:00 2001 From: Martin Wolf Date: Thu, 20 Jul 2023 11:27:50 +0200 Subject: [PATCH 06/34] Load the base data first --- skyllh/i3/dataset.py | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/skyllh/i3/dataset.py b/skyllh/i3/dataset.py index 1cfb01016a..08aff27028 100644 --- a/skyllh/i3/dataset.py +++ b/skyllh/i3/dataset.py @@ -275,6 +275,16 @@ def load_data( A DatasetData instance holding the experimental and monte-carlo data of this data set. """ + # Load the dataset files first. This will ensure the dataset is + # downloaded if necessary. + data_ = super().load_data( + keep_fields=keep_fields, + livetime=livetime, + dtc_dict=dtc_dict, + dtc_except_fields=dtc_except_fields, + efficiency_mode=efficiency_mode, + tl=tl) + # Load the good-run-list (GRL) data if it is provided for this dataset, # and calculate the livetime based on the GRL. data_grl = None @@ -285,14 +295,8 @@ def load_data( # Load all the defined data. data = I3DatasetData( - super(I3Dataset, self).load_data( - keep_fields=keep_fields, - livetime=livetime, - dtc_dict=dtc_dict, - dtc_except_fields=dtc_except_fields, - efficiency_mode=efficiency_mode, - tl=tl), - data_grl) + data=data_, + data_grl=data_grl) return data From 339cbd3ae6bdcdbb8f238d0d6513c7a0bf30d7af Mon Sep 17 00:00:00 2001 From: Martin Wolf Date: Thu, 20 Jul 2023 12:15:54 +0200 Subject: [PATCH 07/34] Add pretty string and logger debug messages --- skyllh/core/dataset.py | 46 +++++++++++++++++++++++++++++++++++------- 1 file changed, 39 insertions(+), 7 deletions(-) diff --git a/skyllh/core/dataset.py b/skyllh/core/dataset.py index a67184ad9c..010f505f1d 100644 --- a/skyllh/core/dataset.py +++ b/skyllh/core/dataset.py @@ -42,6 +42,7 @@ issequence, issequenceof, list_of_cast, + module_class_method_name, str_cast, ) from skyllh.core.storage import ( @@ -82,6 +83,27 @@ def __init__( self.password = password self.post_transfer_func = post_transfer_func + def __str__(self): + """Pretty string representation of this class. + """ + transfer_cls = get_class_of_func(self.transfer_func) + + s = f'{classname(self)} '+'{\n' + s1 = f'path = {self.path}\n' + s1 += f'protocol = {self.protocol}\n' + s1 += f'user@host:port = {self.user}@{self.host}:{self.port}\n' + s1 += 'password = ' + if self.password is not None: + s1 += 'set\n' + else: + s1 += 'not set\n' + s1 += f'transfer class = {classname(transfer_cls)}\n' + s += display.add_leading_text_line_padding( + display.INDENTATION_WIDTH, s1) + s += '}' + + return s + class DatasetTransfer( object, @@ -136,7 +158,7 @@ def post_transfer_unzip(ds, dst_path): # Unzip the dataset file. zip_file = os.path.join(dst_path, fname) cmd = f'unzip "{zip_file}" -d "{dst_path}"' - print(f'post_transfer_unzip: cmd: {cmd}') + logger.debug(f'Running command "{cmd}"') rcode = os.system(cmd) if rcode != 0: raise RuntimeError( @@ -167,11 +189,14 @@ def transfer(ds, dst_path): dst_path : str The destination path into which the dataset will be transfered. """ + cls = get_class_of_func(WGETDatasetTransfer.transfer) + logger = get_logger(f'{classname(cls)}.transfer') + protocol = ds.origin.protocol host = ds.origin.host path = ds.origin.path cmd = f'wget {protocol}://{host}/{path} -P {dst_path}' - print(f'Transfer cmd: {cmd}') + logger.debug(f'Running command "{cmd}"') rcode = os.system(cmd) if rcode != 0: raise RuntimeError( @@ -850,17 +875,25 @@ def download_from_origin(self): post-transfer function if the root directory of the dataset does not exist. """ + logger = get_logger( + module_class_method_name(self, 'download_from_origin') + ) + root_dir = self.root_dir - print(f'Checking root_dir "{root_dir}"') if os.path.exists(root_dir) and os.path.isdir(root_dir): - print('The root dir exists.') + logger.debug( + f'The root dir "{root_dir}" of dataset "{self.name}" exists. ' + 'Not downloading anything.') return if self.origin is None: - print('No dataset origin defined!') + logger.warn( + f'No origin defined for dataset "{self.name}"! ' + 'Cannot download dataset!') return - print(f'Downloading dataset "{self.name}" from orgin.') + logger.debug( + f'Downloading dataset "{self.name}" from orgin.') base_path = generate_base_path( default_base_path=self._cfg['repository']['base_path'], @@ -954,7 +987,6 @@ def _conv_new2orig_field_names( return orig_field_names - print(f'download_from_origin = {self._cfg["repository"]["download_from_origin"]}') if self._cfg['repository']['download_from_origin'] is True: self.download_from_origin() From fd91a99eb2c1350a33f2f66c2f0a2c8c38d13ca8 Mon Sep 17 00:00:00 2001 From: Martin Wolf Date: Thu, 20 Jul 2023 13:44:56 +0200 Subject: [PATCH 08/34] Add doc string --- skyllh/core/dataset.py | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/skyllh/core/dataset.py b/skyllh/core/dataset.py index 010f505f1d..e21878b01b 100644 --- a/skyllh/core/dataset.py +++ b/skyllh/core/dataset.py @@ -72,6 +72,37 @@ def __init__( post_transfer_func=None, **kwargs, ): + """Creates a new instance to define the origin of a dataset. + + Parameters + ---------- + path : str + The dataset's root directory at the origin. + transfer_func : callable + The callable object that should be used to transfer the dataset. + This function requires the following call signature:: + + __call__(ds, dst_path) + + where ``ds`` is an instance of Dataset, and ``dst_path`` is an + instance of str specifying the destination path on the local + machine. + protocol : str | None + The protcol to use for the transfer, e.g. ``"http"`` or ``rsync``. + host : str | None + The name or IP of the remote host. + port : int | None + The port number to use when connecting to the remote host. + user : str | None + The user name required to connect to the remote host. + password : str | None + The password for the user name required to connect to the remote + host. + post_transfer_func : callable | None + The callable object that should be called after the dataset has been + transfered by the ``transfer_func``function. It can be used to + extract an archive file. + """ super().__init__(**kwargs) self.path = path From 10aab7354c3c3cb64a2783716b86c403b18e606c Mon Sep 17 00:00:00 2001 From: Martin Wolf Date: Fri, 21 Jul 2023 14:00:00 +0200 Subject: [PATCH 09/34] Create symlink if origin is available locally --- skyllh/core/dataset.py | 245 ++++++++++++++++++++++++++++++++++++----- 1 file changed, 218 insertions(+), 27 deletions(-) diff --git a/skyllh/core/dataset.py b/skyllh/core/dataset.py index e21878b01b..0beb94f701 100644 --- a/skyllh/core/dataset.py +++ b/skyllh/core/dataset.py @@ -135,6 +135,38 @@ def __str__(self): return s + def is_locally_available(self): + """Checks if the dataset origin is available locally by checking if the + given path exists on the local host. + + Returns + ------- + check : bool + ``True`` if the path specified in this dataset origin is an absolute + path and exists on the local host, ``False`` otherwise. + """ + if ( + os.path.abspath(self.path) == self.path and + os.path.exists(self.path) and + os.path.isdir(self.path) + ): + return True + + return False + + +class TemporaryTextFile(object): + def __init__(self, pathfilename, text): + self.pathfilename = pathfilename + self.text = text + + def __enter__(self): + with open(self.pathfilename, 'w') as fd: + fd.write(self.text) + + def __exit__(self, exc_type, exc_val, exc_tb): + os.remove(self.pathfilename) + class DatasetTransfer( object, @@ -146,7 +178,38 @@ def __init__(self, **kwargs): super().__init__(**kwargs) @staticmethod - def ensure_dst_path(dst_path): + def execute_system_command( + cmd, + logger, + success_rcode=0, + ): + """Executes the given system command via a ``os.system`` call. + + Parameters + ---------- + cmd : str + The system command to execute. + logger : instance of logging.Logger + The logger to use for debug messages. + success_rcode : int | None + The return code that indicates success of the system command. + If set to ``None``, no return code checking is performed. + + Raises + ------ + RuntimeError + If the system command did not return ``success_rcode``. + """ + logger.debug(f'Running command "{cmd}"') + rcode = os.system(cmd) + if (success_rcode is not None) and (rcode != success_rcode): + raise RuntimeError( + f'The system command "{cmd}" failed with return code {rcode}!') + + @staticmethod + def ensure_dst_path( + dst_path, + ): """Ensures the existance of the given destination path. Parameters @@ -160,7 +223,12 @@ def ensure_dst_path(dst_path): @staticmethod @abc.abstractstaticmethod - def transfer(ds, dst_path): + def transfer( + ds, + dst_path, + user=None, + password=None, + ): """This method is supposed to transfer the dataset origin path to the given destination path. @@ -171,6 +239,11 @@ def transfer(ds, dst_path): specifying the origin of the dataset. dst_path : str The destination path into which the dataset will be transfered. + user : str | None + The user name required to connect to the remote host. + password : str | None + The password for the user name required to connect to the remote + host. """ pass @@ -186,15 +259,12 @@ def post_transfer_unzip(ds, dst_path): logger = get_logger(f'{classname(cls)}.post_transfer_unzip') fname = os.path.basename(ds.origin.path) + # Unzip the dataset file. zip_file = os.path.join(dst_path, fname) cmd = f'unzip "{zip_file}" -d "{dst_path}"' - logger.debug(f'Running command "{cmd}"') - rcode = os.system(cmd) - if rcode != 0: - raise RuntimeError( - f'The post-transfer command "{cmd}" failed with return ' - f'code {rcode}!') + DatasetTransfer.execute_system_command(cmd, logger) + # Remove the zip file. try: os.remove(zip_file) @@ -202,6 +272,78 @@ def post_transfer_unzip(ds, dst_path): logger.warn(str(exc)) +class RSYNCDatasetTransfer( + DatasetTransfer, +): + def __init__(self, **kwargs): + super().__init__(**kwargs) + + @staticmethod + def transfer( + ds, + dst_path, + user=None, + password=None, + ): + """Transfers the given dataset to the given destination path using the + ``rsync`` program. + + Parameters + ---------- + ds : instance of Dataset + The instance of Dataset containing the origin property specifying + the origin of the dataset. + dst_path : str + The destination path into which the dataset will be transfered. + user : str | None + The user name required to connect to the remote host. + password : str | None + The password for the user name required to connect to the remote + host. + """ + execute_system_command = DatasetTransfer.execute_system_command + + cls = get_class_of_func(RSYNCDatasetTransfer.transfer) + logger = get_logger(f'{classname(cls)}.transfer') + + host = ds.origin.host + path = ds.origin.path + + if user is None: + # No user name is defined. + cmd = ( + f'rsync ' + '-avL ' + '--progress ' + f'{host}:"{path}" "{dst_path}"' + ) + execute_system_command(cmd, logger) + elif password is not None: + # User and password is defined. + pwdfile = os.path.join(os.getcwd(), f'{id(ds)}.passwd') + with TemporaryTextFile( + pathfilename=pwdfile, + text=password + ): + cmd = ( + f'rsync ' + '-avL ' + '--progress ' + f'--password-file "{pwdfile}" ' + f'{user}@{host}:"{path}" "{dst_path}"' + ) + execute_system_command(cmd, logger) + else: + # Only the user name is defined. + cmd = ( + f'rsync ' + '-avL ' + '--progress ' + f'{user}@{host}:"{path}" "{dst_path}"' + ) + execute_system_command(cmd, logger) + + class WGETDatasetTransfer( DatasetTransfer, ): @@ -209,8 +351,14 @@ def __init__(self, **kwargs): super().__init__(**kwargs) @staticmethod - def transfer(ds, dst_path): - """Transfers the given dataset to the given destination path. + def transfer( + ds, + dst_path, + user=None, + password=None, + ): + """Transfers the given dataset to the given destination path using the + ``wget`` program. Parameters ---------- @@ -219,6 +367,11 @@ def transfer(ds, dst_path): the origin of the dataset. dst_path : str The destination path into which the dataset will be transfered. + user : str | None + The user name required to connect to the remote host. + password : str | None + The password for the user name required to connect to the remote + host. """ cls = get_class_of_func(WGETDatasetTransfer.transfer) logger = get_logger(f'{classname(cls)}.transfer') @@ -227,12 +380,7 @@ def transfer(ds, dst_path): host = ds.origin.host path = ds.origin.path cmd = f'wget {protocol}://{host}/{path} -P {dst_path}' - logger.debug(f'Running command "{cmd}"') - rcode = os.system(cmd) - if rcode != 0: - raise RuntimeError( - f'The transfer command "{cmd}" failed with return code ' - f'{rcode}!') + DatasetTransfer.execute_system_command(cmd, logger) class Dataset( @@ -901,30 +1049,69 @@ def update_version_qualifiers( 'Version qualifier values did not increment and no new version ' 'qualifiers were added!') - def download_from_origin(self): - """Downloads the dataset from the origin using the transfer and - post-transfer function if the root directory of the dataset does not - exist. + def make_data_available( + self, + user=None, + password=None, + ): + """Makes the data of the dataset available. + If the root directory of the dataset does not exist locally, the dataset + is transfered from its origin to the local host. If the origin is + already available locally, only a symlink is created to the origin path. + + Parameters + ---------- + user : str | None + The user name required to connect to the remote host of the origin. + If set to ``None``, the + password : str | None + The password of the user name required to connect to the remote host + of the origin. + + Returns + ------- + success : bool + ``True`` if the data was made available successfully, ``False`` + otherwise. """ logger = get_logger( - module_class_method_name(self, 'download_from_origin') + module_class_method_name(self, 'make_data_available') ) root_dir = self.root_dir if os.path.exists(root_dir) and os.path.isdir(root_dir): logger.debug( f'The root dir "{root_dir}" of dataset "{self.name}" exists. ' - 'Not downloading anything.') - return + 'Nothing to download.') + return True if self.origin is None: logger.warn( f'No origin defined for dataset "{self.name}"! ' 'Cannot download dataset!') - return + return False + + # Check if the dataset origin is locally available. In that case we + # just create a symlink. + if self.origin.is_locally_available(): + cmd = f'ln -s "{self.origin.path}" "{root_dir}"' + DatasetTransfer.execute_system_command(cmd, logger) + return True + + if self._cfg['repository']['download_from_origin'] is False: + logger.warn( + f'The data of dataset "{self.name}" is locally not available ' + 'and the download from the origin is disabled through the ' + 'configuration!') + return False + + if user is None: + user = self.origin.user + if password is None: + password = self.origin.password logger.debug( - f'Downloading dataset "{self.name}" from orgin.') + f'Downloading dataset "{self.name}" from origin. user="{user}".') base_path = generate_base_path( default_base_path=self._cfg['repository']['base_path'], @@ -932,13 +1119,17 @@ def download_from_origin(self): self.origin.transfer_func( ds=self, - dst_path=base_path) + dst_path=base_path, + user=user, + password=password) if self.origin.post_transfer_func is not None: self.origin.post_transfer_func( ds=self, dst_path=base_path) + return True + def load_data( self, keep_fields=None, @@ -1019,7 +1210,7 @@ def _conv_new2orig_field_names( return orig_field_names if self._cfg['repository']['download_from_origin'] is True: - self.download_from_origin() + self.make_data_available() if keep_fields is None: keep_fields = [] From 00893044520380fafa00f4129343a57d88bbd091 Mon Sep 17 00:00:00 2001 From: Martin Wolf Date: Fri, 21 Jul 2023 14:33:23 +0200 Subject: [PATCH 10/34] Add is_directory flag --- skyllh/core/dataset.py | 5 +++++ skyllh/datasets/i3/PublicData_10y_ps.py | 1 + 2 files changed, 6 insertions(+) diff --git a/skyllh/core/dataset.py b/skyllh/core/dataset.py index 0beb94f701..7621e7e59f 100644 --- a/skyllh/core/dataset.py +++ b/skyllh/core/dataset.py @@ -64,6 +64,7 @@ def __init__( self, path, transfer_func, + is_directory=True, protocol=None, host=None, port=None, @@ -87,6 +88,9 @@ def __init__( where ``ds`` is an instance of Dataset, and ``dst_path`` is an instance of str specifying the destination path on the local machine. + is_directory : bool + Flag if the remote path refers to a directory (``True``) or not + ``False``. protocol : str | None The protcol to use for the transfer, e.g. ``"http"`` or ``rsync``. host : str | None @@ -107,6 +111,7 @@ def __init__( self.path = path self.transfer_func = transfer_func + self.is_directory = is_directory self.protocol = protocol self.host = host self.port = port diff --git a/skyllh/datasets/i3/PublicData_10y_ps.py b/skyllh/datasets/i3/PublicData_10y_ps.py index ad0a1d9dbe..241430f9bf 100644 --- a/skyllh/datasets/i3/PublicData_10y_ps.py +++ b/skyllh/datasets/i3/PublicData_10y_ps.py @@ -260,6 +260,7 @@ def create_dataset_collection( # Define the origin of the dataset. origin = DatasetOrigin( path='data-releases/20210126_PS-IC40-IC86_VII.zip', + is_directory=False, host='icecube.wisc.edu', protocol='http', transfer_func=WGETDatasetTransfer.transfer, From cd84014ddb75423e2ad7a2cf36e2367a8076c31b Mon Sep 17 00:00:00 2001 From: Martin Wolf Date: Fri, 21 Jul 2023 15:22:49 +0200 Subject: [PATCH 11/34] Add properties to DatasetOrigin --- skyllh/core/dataset.py | 150 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 145 insertions(+), 5 deletions(-) diff --git a/skyllh/core/dataset.py b/skyllh/core/dataset.py index 7621e7e59f..2d463b2af5 100644 --- a/skyllh/core/dataset.py +++ b/skyllh/core/dataset.py @@ -83,16 +83,18 @@ def __init__( The callable object that should be used to transfer the dataset. This function requires the following call signature:: - __call__(ds, dst_path) + __call__(ds, dst_path, user=None, password=None) - where ``ds`` is an instance of Dataset, and ``dst_path`` is an - instance of str specifying the destination path on the local - machine. + where ``ds`` is an instance of Dataset, ``dst_path`` is an instance + of str specifying the destination path on the local machine, + ``user`` is the user name required to connect to the remote host, + and ``password`` is the password for the user name required to + connect to the remote host. is_directory : bool Flag if the remote path refers to a directory (``True``) or not ``False``. protocol : str | None - The protcol to use for the transfer, e.g. ``"http"`` or ``rsync``. + The protocol to use for the transfer, e.g. ``"http"`` or ``"file"``. host : str | None The name or IP of the remote host. port : int | None @@ -119,6 +121,142 @@ def __init__( self.password = password self.post_transfer_func = post_transfer_func + @property + def path(self): + """The dataset's root directory at the origin. + """ + return self._path + + @path.setter + def path(self, obj): + if not isinstance(obj, str): + raise TypeError( + 'The path property must be an instance of str! ' + f'Its current type is {classname(obj)}!') + self._path = obj + + @property + def transfer_func(self): + """The callable object that should be used to transfer the dataset. + """ + return self._transfer_func + + @transfer_func.setter + def transfer_func(self, obj): + if not callable(obj): + raise TypeError( + 'The property transfer_func must be a callable object! ' + f'Its current type is {classname(obj)}!') + self._transfer_func = obj + + @property + def is_directory(self): + """Flag if the remote path refers to a directory (``True``) or not + (``False``). + """ + return self._is_directory + + @is_directory.setter + def is_directory(self, obj): + if not isinstance(obj, bool): + return TypeError( + 'The is_directory property must be an instance of bool! ' + f'Its current type is {classname(obj)}!') + self._is_directory = obj + + @property + def protocol(self): + """The protocol to use for the transfer. + """ + return self._protocol + + @protocol.setter + def protocol(self, obj): + if obj is not None: + if not isinstance(obj, str): + raise TypeError( + 'The property protocol must be None, or an instance of ' + 'str! ' + f'Its current type is {classname(obj)}!') + self._protocol = obj + + @property + def host(self): + """The name or IP of the remote host. + """ + return self._host + + @host.setter + def host(self, obj): + if obj is not None: + if not isinstance(obj, str): + raise TypeError( + 'The property host must be None, or an instance of str! ' + f'Its current type is {classname(obj)}!') + self._host = obj + + @property + def port(self): + """The port number to use when connecting to the remote host. + """ + return self._port + + @port.setter + def port(self, obj): + if obj is not None: + if not isinstance(obj, int): + raise TypeError( + 'The property port must be None, or an instance of int! ' + f'Its current type is {classname(obj)}!') + self._port = obj + + @property + def user(self): + """The user name required to connect to the remote host. + """ + return self._user + + @user.setter + def user(self, obj): + if obj is not None: + if not isinstance(obj, str): + raise TypeError( + 'The property user must be None, or an instance of str! ' + f'Its current type is {classname(obj)}!') + self._user = obj + + @property + def password(self): + """The password for the user name required to connect to the remote + host. + """ + return self._password + + @password.setter + def password(self, obj): + if obj is not None: + if not isinstance(obj, str): + raise TypeError( + 'The property password must be None, or an instance of ' + 'str! ' + f'Its current type is {classname(obj)}!') + self._password = obj + + @property + def post_transfer_func(self): + """The callable object that should be called after the dataset has been + transfered by the ``transfer_func`` callable. + """ + return self._post_transfer_func + + @post_transfer_func.setter + def post_transfer_func(self, obj): + if not callable(obj): + raise TypeError( + 'The property post_transfer_func must be a callable object! ' + f'Its current type is {classname(obj)}!') + self._post_transfer_func = obj + def __str__(self): """Pretty string representation of this class. """ @@ -126,6 +264,7 @@ def __str__(self): s = f'{classname(self)} '+'{\n' s1 = f'path = {self.path}\n' + s1 += f'is_directory = {self.is_directory}\n' s1 += f'protocol = {self.protocol}\n' s1 += f'user@host:port = {self.user}@{self.host}:{self.port}\n' s1 += 'password = ' @@ -151,6 +290,7 @@ def is_locally_available(self): path and exists on the local host, ``False`` otherwise. """ if ( + self.is_directory and os.path.abspath(self.path) == self.path and os.path.exists(self.path) and os.path.isdir(self.path) From 2d8e8e011db39d61285aea23bcfbc0ecd5247a87 Mon Sep 17 00:00:00 2001 From: Martin Wolf Date: Fri, 21 Jul 2023 16:23:26 +0200 Subject: [PATCH 12/34] Implement file mode feature --- skyllh/core/dataset.py | 31 ++++++++++++++++++++++++++++--- 1 file changed, 28 insertions(+), 3 deletions(-) diff --git a/skyllh/core/dataset.py b/skyllh/core/dataset.py index 2d463b2af5..67f349a343 100644 --- a/skyllh/core/dataset.py +++ b/skyllh/core/dataset.py @@ -3,6 +3,7 @@ import abc import os import os.path +import stat import numpy as np @@ -300,14 +301,37 @@ def is_locally_available(self): return False -class TemporaryTextFile(object): - def __init__(self, pathfilename, text): +class TemporaryTextFile( + object, +): + """This class provides a temporary text file with a given content while + being within a with statement. Exiting the with statement will remove the + temporary text file. + + Example: + + .. code:: + + with TemporaryTextFile('myfile.txt', 'My file content'): + # Do something that requires the text file ``myfile.txt``. + # At this point the text file is removed again. + + """ + + def __init__( + self, + pathfilename, + text, + mode=stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH, + ): self.pathfilename = pathfilename self.text = text + self.mode = mode def __enter__(self): with open(self.pathfilename, 'w') as fd: fd.write(self.text) + os.chmod(self.pathfilename, self.mode) def __exit__(self, exc_type, exc_val, exc_tb): os.remove(self.pathfilename) @@ -468,7 +492,8 @@ def transfer( pwdfile = os.path.join(os.getcwd(), f'{id(ds)}.passwd') with TemporaryTextFile( pathfilename=pwdfile, - text=password + text=password, + mode=stat.S_IRUSR, ): cmd = ( f'rsync ' From 44851408e34f7715382607f215eaf4a2d7fc0351 Mon Sep 17 00:00:00 2001 From: Martin Wolf Date: Sun, 23 Jul 2023 11:41:11 +0200 Subject: [PATCH 13/34] Transfer individual dataset files --- skyllh/core/dataset.py | 296 +++++++++++++++++------- skyllh/datasets/i3/PublicData_10y_ps.py | 5 +- skyllh/i3/dataset.py | 20 ++ 3 files changed, 240 insertions(+), 81 deletions(-) diff --git a/skyllh/core/dataset.py b/skyllh/core/dataset.py index 67f349a343..1568950ee3 100644 --- a/skyllh/core/dataset.py +++ b/skyllh/core/dataset.py @@ -63,9 +63,10 @@ class DatasetOrigin( """ def __init__( self, - path, + base_path, + sub_path, transfer_func, - is_directory=True, + filename=None, protocol=None, host=None, port=None, @@ -78,8 +79,10 @@ def __init__( Parameters ---------- - path : str - The dataset's root directory at the origin. + base_path : str + The dataset's base directory at the origin. + sub_path : str + The dataset's sub directory at the origin. transfer_func : callable The callable object that should be used to transfer the dataset. This function requires the following call signature:: @@ -91,9 +94,9 @@ def __init__( ``user`` is the user name required to connect to the remote host, and ``password`` is the password for the user name required to connect to the remote host. - is_directory : bool - Flag if the remote path refers to a directory (``True``) or not - ``False``. + filename : str | None + If the origin is not a directory but a file, this specifies the + filename. protocol : str | None The protocol to use for the transfer, e.g. ``"http"`` or ``"file"``. host : str | None @@ -112,9 +115,10 @@ def __init__( """ super().__init__(**kwargs) - self.path = path + self.base_path = base_path + self.sub_path = sub_path self.transfer_func = transfer_func - self.is_directory = is_directory + self.filename = filename self.protocol = protocol self.host = host self.port = port @@ -123,18 +127,39 @@ def __init__( self.post_transfer_func = post_transfer_func @property - def path(self): - """The dataset's root directory at the origin. + def base_path(self): + """The dataset's base directory at the origin. """ - return self._path + return self._base_path - @path.setter - def path(self, obj): + @base_path.setter + def base_path(self, obj): if not isinstance(obj, str): raise TypeError( - 'The path property must be an instance of str! ' + 'The base_path property must be an instance of str! ' f'Its current type is {classname(obj)}!') - self._path = obj + self._base_path = obj + + @property + def sub_path(self): + """The dataset's sub directory at the origin. + """ + return self._sub_path + + @sub_path.setter + def sub_path(self, obj): + if not isinstance(obj, str): + raise TypeError( + 'The sub_path property must be an instance of str! ' + f'Its current type is {classname(obj)}!') + self._sub_path = obj + + @property + def root_dir(self): + """(read-only) The dataset's root directory at the origin, which is + the combination of ``base_path`` and ``sub_path``. + """ + return os.path.join(self._base_path, self._sub_path) @property def transfer_func(self): @@ -151,19 +176,27 @@ def transfer_func(self, obj): self._transfer_func = obj @property - def is_directory(self): - """Flag if the remote path refers to a directory (``True``) or not - (``False``). + def filename(self): + """The file name if the origin is a file instaed of a directory. """ - return self._is_directory + return self._filename - @is_directory.setter - def is_directory(self, obj): - if not isinstance(obj, bool): - return TypeError( - 'The is_directory property must be an instance of bool! ' - f'Its current type is {classname(obj)}!') - self._is_directory = obj + @filename.setter + def filename(self, obj): + if obj is not None: + if not isinstance(obj, str): + raise TypeError( + 'The property filename must be None, or an instance of ' + 'str! ' + f'Its current type is {classname(obj)}!') + self._filename = obj + + @property + def is_directory(self): + """(read-only) Flag if the origin refers to a directory (``True``) or a + file (``False``). + """ + return (self._filename is None) @property def protocol(self): @@ -264,8 +297,10 @@ def __str__(self): transfer_cls = get_class_of_func(self.transfer_func) s = f'{classname(self)} '+'{\n' - s1 = f'path = {self.path}\n' - s1 += f'is_directory = {self.is_directory}\n' + s1 = f'base_path = "{self.base_path}"\n' + s1 += f'sub_path = "{self.sub_path}"\n' + if self._filename is not None: + s1 += f'filename = {self._filename}\n' s1 += f'protocol = {self.protocol}\n' s1 += f'user@host:port = {self.user}@{self.host}:{self.port}\n' s1 += 'password = ' @@ -449,8 +484,9 @@ def __init__(self, **kwargs): @staticmethod def transfer( - ds, - dst_path, + origin, + file_list, + dst_base_path, user=None, password=None, ): @@ -462,56 +498,87 @@ def transfer( ds : instance of Dataset The instance of Dataset containing the origin property specifying the origin of the dataset. - dst_path : str - The destination path into which the dataset will be transfered. + file_list : list of str + The list of files, relative to the origin base path, which should be + transfered. + dst_base_path : str + The destination base path into which the dataset files will be + transfered. user : str | None The user name required to connect to the remote host. password : str | None The password for the user name required to connect to the remote host. """ - execute_system_command = DatasetTransfer.execute_system_command - cls = get_class_of_func(RSYNCDatasetTransfer.transfer) logger = get_logger(f'{classname(cls)}.transfer') - host = ds.origin.host - path = ds.origin.path + host = origin.host + + # Make sure the origin and destination base paths end with a directory + # seperator. + origin_base_path = origin.base_path + if origin_base_path[-len(os.path.sep):] != os.path.sep: + origin_base_path += os.path.sep + if dst_base_path[-len(os.path.sep):] != os.path.sep: + dst_base_path += os.path.sep + + file_list_pathfilename = os.path.join( + os.getcwd(), + f'.{id(origin)}.rsync_file_list.txt') if user is None: # No user name is defined. - cmd = ( - f'rsync ' - '-avL ' - '--progress ' - f'{host}:"{path}" "{dst_path}"' - ) - execute_system_command(cmd, logger) + with TemporaryTextFile( + pathfilename=file_list_pathfilename, + text='\n'.join(file_list) + ): + cmd = ( + f'rsync ' + '-avrRL ' + '--progress ' + f'--files-from="{file_list_pathfilename}" ' + f'{host}:"{origin_base_path}" "{dst_base_path}"' + ) + DatasetTransfer.execute_system_command(cmd, logger) elif password is not None: # User and password is defined. - pwdfile = os.path.join(os.getcwd(), f'{id(ds)}.passwd') + pwdfile = os.path.join( + os.getcwd(), + f'.{id(origin)}.rsync_passwd.txt') + with TemporaryTextFile( pathfilename=pwdfile, text=password, mode=stat.S_IRUSR, + ): + with TemporaryTextFile( + pathfilename=file_list_pathfilename, + text='\n'.join(file_list) + ): + cmd = ( + f'rsync ' + '-avrRL ' + '--progress ' + f'--password-file "{pwdfile}" ' + f'--files-from="{file_list_pathfilename}" ' + f'{user}@{host}:"{origin_base_path}" "{dst_base_path}"' + ) + DatasetTransfer.execute_system_command(cmd, logger) + else: + # Only the user name is defined. + with TemporaryTextFile( + pathfilename=file_list_pathfilename, + text='\n'.join(file_list) ): cmd = ( f'rsync ' - '-avL ' + '-avrRL ' '--progress ' - f'--password-file "{pwdfile}" ' - f'{user}@{host}:"{path}" "{dst_path}"' + f'--files-from="{file_list_pathfilename}" ' + f'{user}@{host}:"{origin_base_path}" "{dst_base_path}"' ) - execute_system_command(cmd, logger) - else: - # Only the user name is defined. - cmd = ( - f'rsync ' - '-avL ' - '--progress ' - f'{user}@{host}:"{path}" "{dst_path}"' - ) - execute_system_command(cmd, logger) + DatasetTransfer.execute_system_command(cmd, logger) class WGETDatasetTransfer( @@ -522,8 +589,9 @@ def __init__(self, **kwargs): @staticmethod def transfer( - ds, - dst_path, + origin, + file_list, + dst_base_path, user=None, password=None, ): @@ -532,11 +600,13 @@ def transfer( Parameters ---------- - ds : instance of Dataset - The instance of Dataset containing the origin property specifying - the origin of the dataset. - dst_path : str - The destination path into which the dataset will be transfered. + origin : instance of DatasetOrigin + The instance of DatasetOrigin defining the origin of the dataset. + file_list : list of str + The list of files relative to the origin's base path, which + should be transfered. + dst_base_path : str + The destination base path into which the dataset will be transfered. user : str | None The user name required to connect to the remote host. password : str | None @@ -546,11 +616,13 @@ def transfer( cls = get_class_of_func(WGETDatasetTransfer.transfer) logger = get_logger(f'{classname(cls)}.transfer') - protocol = ds.origin.protocol - host = ds.origin.host - path = ds.origin.path - cmd = f'wget {protocol}://{host}/{path} -P {dst_path}' - DatasetTransfer.execute_system_command(cmd, logger) + protocol = origin.protocol + host = origin.host + + for file in file_list: + path = os.path.join(origin.base_path, file) + cmd = f'wget {protocol}://{host}/{path} -P {dst_base_path}' + DatasetTransfer.execute_system_command(cmd, logger) class Dataset( @@ -1219,6 +1291,29 @@ def update_version_qualifiers( 'Version qualifier values did not increment and no new version ' 'qualifiers were added!') + def create_transfer_file_list( + self, + ): + """Creates the list of files that need to be transfered from the origin + for this dataset. The paths are relative paths starting after the sub + path of the dataset. + + Returns + ------- + file_list : list of str + The list of files that need to be transfered from the origin for + this dataset. + """ + file_list = ( + self._exp_pathfilename_list + + self._mc_pathfilename_list + ) + + for aux_pathfilename_list in self._aux_data_definitions.values(): + file_list += aux_pathfilename_list + + return file_list + def make_data_available( self, user=None, @@ -1280,16 +1375,30 @@ def make_data_available( if password is None: password = self.origin.password - logger.debug( - f'Downloading dataset "{self.name}" from origin. user="{user}".') - base_path = generate_base_path( default_base_path=self._cfg['repository']['base_path'], base_path=self._base_path) + logger.debug( + f'Downloading dataset "{self.name}" from origin into base path ' + f'"{base_path}". user="{user}".') + + # Check if the origin is a directory. If not we just transfer that one + # file. + if self.origin.is_directory: + file_list = [ + os.path.join(self.origin.sub_path, pathfilename) + for pathfilename in self.create_transfer_file_list() + ] + else: + file_list = [ + os.path.join(self.origin.sub_path, self.origin.filename) + ] + self.origin.transfer_func( - ds=self, - dst_path=base_path, + origin=self.origin, + file_list=file_list, + dst_base_path=base_path, user=user, password=password) @@ -2669,6 +2778,35 @@ def generate_base_path( return base_path +def generate_sub_path( + sub_path_fmt, + version, + verqualifiers, +): + """Generates the sub path of the dataset based on the given sub path format. + + Parameters + ---------- + sub_path_fmt : str + The format string of the sub path. + version : int + The version of the dataset. + verqualifiers : dict + The dictionary holding the version qualifiers of the dataset. + + Returns + ------- + sub_path : str + The generated sub path. + """ + fmtdict = dict( + [('version', version)] + list(verqualifiers.items()) + ) + sub_path = sub_path_fmt.format(**fmtdict) + + return sub_path + + def generate_data_file_root_dir( default_base_path, default_sub_path_fmt, @@ -2717,10 +2855,10 @@ def generate_data_file_root_dir( if sub_path_fmt is None: sub_path_fmt = default_sub_path_fmt - fmtdict = dict( - [('version', version)] + list(verqualifiers.items()) - ) - sub_path = sub_path_fmt.format(**fmtdict) + sub_path = generate_sub_path( + sub_path_fmt=sub_path_fmt, + version=version, + verqualifiers=verqualifiers) root_dir = os.path.join(base_path, sub_path) diff --git a/skyllh/datasets/i3/PublicData_10y_ps.py b/skyllh/datasets/i3/PublicData_10y_ps.py index 241430f9bf..f6409dbb65 100644 --- a/skyllh/datasets/i3/PublicData_10y_ps.py +++ b/skyllh/datasets/i3/PublicData_10y_ps.py @@ -259,8 +259,9 @@ def create_dataset_collection( # Define the origin of the dataset. origin = DatasetOrigin( - path='data-releases/20210126_PS-IC40-IC86_VII.zip', - is_directory=False, + base_path='data-releases', + sub_path='', + filename='20210126_PS-IC40-IC86_VII.zip', host='icecube.wisc.edu', protocol='http', transfer_func=WGETDatasetTransfer.transfer, diff --git a/skyllh/i3/dataset.py b/skyllh/i3/dataset.py index 08aff27028..435fd42e6a 100644 --- a/skyllh/i3/dataset.py +++ b/skyllh/i3/dataset.py @@ -172,6 +172,26 @@ def __str__(self): return s + def create_transfer_file_list( + self, + ): + """Creates the list of files that need to be transfered from the origin + for this dataset. The paths are relative paths starting after the sub + path of the dataset. + + Returns + ------- + file_list : list of str + The list of files that need to be transfered from the origin for + this dataset. + """ + file_list = ( + super().create_transfer_file_list() + + self._grl_pathfilename_list + ) + + return file_list + def load_grl(self, efficiency_mode=None, tl=None): """Loads the good-run-list and returns a DataFieldRecordArray instance which should contain the following data fields: From 237c0f6093a5051f56fdfd845766960fa5e9fb14 Mon Sep 17 00:00:00 2001 From: Martin Wolf Date: Sun, 23 Jul 2023 12:12:41 +0200 Subject: [PATCH 14/34] Use new DatasetOrigin interface, ensure directories exist --- skyllh/core/dataset.py | 33 ++++++++++++++++++++++++--------- 1 file changed, 24 insertions(+), 9 deletions(-) diff --git a/skyllh/core/dataset.py b/skyllh/core/dataset.py index 1568950ee3..a103307e25 100644 --- a/skyllh/core/dataset.py +++ b/skyllh/core/dataset.py @@ -325,11 +325,13 @@ def is_locally_available(self): ``True`` if the path specified in this dataset origin is an absolute path and exists on the local host, ``False`` otherwise. """ + root_dir = self.root_dir + if ( self.is_directory and - os.path.abspath(self.path) == self.path and - os.path.exists(self.path) and - os.path.isdir(self.path) + os.path.abspath(root_dir) == root_dir and + os.path.exists(root_dir) and + os.path.isdir(root_dir) ): return True @@ -456,16 +458,18 @@ def post_transfer_unzip(ds, dst_path): """This is a post-transfer function. It will unzip the transfered file into the dst_path if the origin path was a zip file. """ - if not ds.origin.path.lower().endswith('.zip'): + if ds.origin.filename is None: + return + + fname = os.path.join(ds.origin.root_dir, ds.origin.filename) + if not fname.lower().endswith('.zip'): return cls = get_class_of_func(DatasetTransfer.post_transfer_unzip) logger = get_logger(f'{classname(cls)}.post_transfer_unzip') - fname = os.path.basename(ds.origin.path) - # Unzip the dataset file. - zip_file = os.path.join(dst_path, fname) + zip_file = os.path.join(dst_path, ds.origin.filename) cmd = f'unzip "{zip_file}" -d "{dst_path}"' DatasetTransfer.execute_system_command(cmd, logger) @@ -1344,6 +1348,7 @@ def make_data_available( ) root_dir = self.root_dir + if os.path.exists(root_dir) and os.path.isdir(root_dir): logger.debug( f'The root dir "{root_dir}" of dataset "{self.name}" exists. ' @@ -1359,7 +1364,12 @@ def make_data_available( # Check if the dataset origin is locally available. In that case we # just create a symlink. if self.origin.is_locally_available(): - cmd = f'ln -s "{self.origin.path}" "{root_dir}"' + # Make sure all directories leading to the symlink exist. + dirname = os.path.dirname(root_dir) + if dirname != '': + os.makedirs(dirname) + + cmd = f'ln -s "{self.origin.root_dir}" "{root_dir}"' DatasetTransfer.execute_system_command(cmd, logger) return True @@ -2846,7 +2856,8 @@ def generate_data_file_root_dir( Returns ------- root_dir : str - The generated root directory of the data files. + The generated root directory of the data files. This will have no + trailing directory seperator. """ base_path = generate_base_path( default_base_path=default_base_path, @@ -2862,6 +2873,10 @@ def generate_data_file_root_dir( root_dir = os.path.join(base_path, sub_path) + len_sep = len(os.path.sep) + if root_dir[-len_sep:] == os.path.sep: + root_dir = root_dir[:-len_sep] + return root_dir From f4c0b58db9d187c3069df58c0869ab7c3e055c02 Mon Sep 17 00:00:00 2001 From: Martin Wolf Date: Sat, 29 Jul 2023 19:11:21 +0200 Subject: [PATCH 15/34] Allow None for post_transfer_func --- skyllh/core/dataset.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/skyllh/core/dataset.py b/skyllh/core/dataset.py index a103307e25..433a9adc97 100644 --- a/skyllh/core/dataset.py +++ b/skyllh/core/dataset.py @@ -285,10 +285,12 @@ def post_transfer_func(self): @post_transfer_func.setter def post_transfer_func(self, obj): - if not callable(obj): - raise TypeError( - 'The property post_transfer_func must be a callable object! ' - f'Its current type is {classname(obj)}!') + if obj is not None: + if not callable(obj): + raise TypeError( + 'The property post_transfer_func must be a callable ' + 'object! ' + f'Its current type is {classname(obj)}!') self._post_transfer_func = obj def __str__(self): From d695a65f3f36e11554eae0f681fc566d7eb62025 Mon Sep 17 00:00:00 2001 From: Martin Wolf Date: Sat, 29 Jul 2023 20:18:43 +0200 Subject: [PATCH 16/34] Rename user to username --- skyllh/core/dataset.py | 74 +++++++++++++++++++++++++++--------------- 1 file changed, 47 insertions(+), 27 deletions(-) diff --git a/skyllh/core/dataset.py b/skyllh/core/dataset.py index 433a9adc97..91da0fee83 100644 --- a/skyllh/core/dataset.py +++ b/skyllh/core/dataset.py @@ -70,7 +70,7 @@ def __init__( protocol=None, host=None, port=None, - user=None, + username=None, password=None, post_transfer_func=None, **kwargs, @@ -103,7 +103,7 @@ def __init__( The name or IP of the remote host. port : int | None The port number to use when connecting to the remote host. - user : str | None + username : str | None The user name required to connect to the remote host. password : str | None The password for the user name required to connect to the remote @@ -122,7 +122,7 @@ def __init__( self.protocol = protocol self.host = host self.port = port - self.user = user + self.username = username self.password = password self.post_transfer_func = post_transfer_func @@ -245,19 +245,20 @@ def port(self, obj): self._port = obj @property - def user(self): + def username(self): """The user name required to connect to the remote host. """ - return self._user + return self._username - @user.setter - def user(self, obj): + @username.setter + def username(self, obj): if obj is not None: if not isinstance(obj, str): raise TypeError( - 'The property user must be None, or an instance of str! ' + 'The property username must be None, or an instance of ' + 'str! ' f'Its current type is {classname(obj)}!') - self._user = obj + self._username = obj @property def password(self): @@ -304,7 +305,7 @@ def __str__(self): if self._filename is not None: s1 += f'filename = {self._filename}\n' s1 += f'protocol = {self.protocol}\n' - s1 += f'user@host:port = {self.user}@{self.host}:{self.port}\n' + s1 += f'user@host:port = {self.username}@{self.host}:{self.port}\n' s1 += 'password = ' if self.password is not None: s1 += 'set\n' @@ -434,7 +435,7 @@ def ensure_dst_path( def transfer( ds, dst_path, - user=None, + username=None, password=None, ): """This method is supposed to transfer the dataset origin path to the @@ -447,7 +448,7 @@ def transfer( specifying the origin of the dataset. dst_path : str The destination path into which the dataset will be transfered. - user : str | None + username : str | None The user name required to connect to the remote host. password : str | None The password for the user name required to connect to the remote @@ -456,7 +457,10 @@ def transfer( pass @staticmethod - def post_transfer_unzip(ds, dst_path): + def post_transfer_unzip( + ds, + dst_path, + ): """This is a post-transfer function. It will unzip the transfered file into the dst_path if the origin path was a zip file. """ @@ -493,7 +497,7 @@ def transfer( origin, file_list, dst_base_path, - user=None, + username=None, password=None, ): """Transfers the given dataset to the given destination path using the @@ -510,7 +514,7 @@ def transfer( dst_base_path : str The destination base path into which the dataset files will be transfered. - user : str | None + username : str | None The user name required to connect to the remote host. password : str | None The password for the user name required to connect to the remote @@ -533,7 +537,7 @@ def transfer( os.getcwd(), f'.{id(origin)}.rsync_file_list.txt') - if user is None: + if username is None: # No user name is defined. with TemporaryTextFile( pathfilename=file_list_pathfilename, @@ -568,7 +572,7 @@ def transfer( '--progress ' f'--password-file "{pwdfile}" ' f'--files-from="{file_list_pathfilename}" ' - f'{user}@{host}:"{origin_base_path}" "{dst_base_path}"' + f'{username}@{host}:"{origin_base_path}" "{dst_base_path}"' ) DatasetTransfer.execute_system_command(cmd, logger) else: @@ -582,7 +586,7 @@ def transfer( '-avrRL ' '--progress ' f'--files-from="{file_list_pathfilename}" ' - f'{user}@{host}:"{origin_base_path}" "{dst_base_path}"' + f'{username}@{host}:"{origin_base_path}" "{dst_base_path}"' ) DatasetTransfer.execute_system_command(cmd, logger) @@ -598,7 +602,7 @@ def transfer( origin, file_list, dst_base_path, - user=None, + username=None, password=None, ): """Transfers the given dataset to the given destination path using the @@ -613,7 +617,7 @@ def transfer( should be transfered. dst_base_path : str The destination base path into which the dataset will be transfered. - user : str | None + username : str | None The user name required to connect to the remote host. password : str | None The password for the user name required to connect to the remote @@ -627,7 +631,23 @@ def transfer( for file in file_list: path = os.path.join(origin.base_path, file) - cmd = f'wget {protocol}://{host}/{path} -P {dst_base_path}' + + cmd = 'wget ' + if username is None: + # No user name is specified. + pass + elif password is not None: + # A user name and password is specified. + cmd += ( + f'--user="{username}" ' + f'--password="{password}" ' + ) + else: + # Only a user name is specified. + cmd += ( + f'--user={username} ' + ) + cmd += f'{protocol}://{host}/{path} -P {dst_base_path}' DatasetTransfer.execute_system_command(cmd, logger) @@ -1322,7 +1342,7 @@ def create_transfer_file_list( def make_data_available( self, - user=None, + username=None, password=None, ): """Makes the data of the dataset available. @@ -1332,7 +1352,7 @@ def make_data_available( Parameters ---------- - user : str | None + username : str | None The user name required to connect to the remote host of the origin. If set to ``None``, the password : str | None @@ -1382,8 +1402,8 @@ def make_data_available( 'configuration!') return False - if user is None: - user = self.origin.user + if username is None: + username = self.origin.username if password is None: password = self.origin.password @@ -1393,7 +1413,7 @@ def make_data_available( logger.debug( f'Downloading dataset "{self.name}" from origin into base path ' - f'"{base_path}". user="{user}".') + f'"{base_path}". username="{username}".') # Check if the origin is a directory. If not we just transfer that one # file. @@ -1411,7 +1431,7 @@ def make_data_available( origin=self.origin, file_list=file_list, dst_base_path=base_path, - user=user, + username=username, password=password) if self.origin.post_transfer_func is not None: From 7b05f95d3686c159ab04d2b96d5cd43c934a87ca Mon Sep 17 00:00:00 2001 From: Martin Wolf Date: Sat, 29 Jul 2023 20:34:54 +0200 Subject: [PATCH 17/34] Support port in wget --- skyllh/core/dataset.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/skyllh/core/dataset.py b/skyllh/core/dataset.py index 91da0fee83..1e49fb84ef 100644 --- a/skyllh/core/dataset.py +++ b/skyllh/core/dataset.py @@ -628,6 +628,7 @@ def transfer( protocol = origin.protocol host = origin.host + port = origin.port for file in file_list: path = os.path.join(origin.base_path, file) @@ -647,7 +648,12 @@ def transfer( cmd += ( f'--user={username} ' ) - cmd += f'{protocol}://{host}/{path} -P {dst_base_path}' + cmd += f'{protocol}://{host}' + if port is not None: + cmd += f':{port}' + if path[0:1] != '/': + cmd += '/' + cmd += f'{path} -P {dst_base_path}' DatasetTransfer.execute_system_command(cmd, logger) From a6dfb2feadfee3b442866dc41c990baba5f01112 Mon Sep 17 00:00:00 2001 From: Martin Wolf Date: Sat, 29 Jul 2023 20:54:56 +0200 Subject: [PATCH 18/34] Create local sub dirs --- skyllh/core/dataset.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/skyllh/core/dataset.py b/skyllh/core/dataset.py index 1e49fb84ef..ae02e898b2 100644 --- a/skyllh/core/dataset.py +++ b/skyllh/core/dataset.py @@ -633,6 +633,13 @@ def transfer( for file in file_list: path = os.path.join(origin.base_path, file) + dst_sub_path = os.path.dirname(file) + if dst_sub_path == '': + dst_path = dst_base_path + else: + dst_path = os.path.join(dst_base_path, dst_sub_path) + DatasetTransfer.ensure_dst_path(dst_path) + cmd = 'wget ' if username is None: # No user name is specified. @@ -653,7 +660,7 @@ def transfer( cmd += f':{port}' if path[0:1] != '/': cmd += '/' - cmd += f'{path} -P {dst_base_path}' + cmd += f'{path} -P {dst_path}' DatasetTransfer.execute_system_command(cmd, logger) From 087dc5eec78c8d7c6db08234a8c261973fd927fa Mon Sep 17 00:00:00 2001 From: Martin Wolf Date: Sat, 29 Jul 2023 21:24:19 +0200 Subject: [PATCH 19/34] Add convenient method --- skyllh/core/dataset.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/skyllh/core/dataset.py b/skyllh/core/dataset.py index ae02e898b2..c5bac8f6bd 100644 --- a/skyllh/core/dataset.py +++ b/skyllh/core/dataset.py @@ -2099,7 +2099,7 @@ def remove_aux_data_definition( Parameters ---------- name : str - The name of the dataset that should get removed. + The name of the data definition that should get removed. """ if name not in self._aux_data_definitions: raise KeyError( @@ -2108,6 +2108,21 @@ def remove_aux_data_definition( self._aux_data_definitions.pop(name) + def remove_aux_data_definitions( + self, + names, + ): + """Removes the auxiliary data definition from the dataset. + + Parameters + ---------- + names : sequence of str + The names of the data definitions that should get removed. + """ + for name in names: + self.remove_aux_data_definition( + name=name) + def add_aux_data( self, name, From 88a6533cf6f1ba567fd94f30683db3dd51f29601 Mon Sep 17 00:00:00 2001 From: Martin Wolf Date: Sun, 30 Jul 2023 21:45:10 +0200 Subject: [PATCH 20/34] Skip existing files --- skyllh/core/dataset.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/skyllh/core/dataset.py b/skyllh/core/dataset.py index c5bac8f6bd..43d0379b9e 100644 --- a/skyllh/core/dataset.py +++ b/skyllh/core/dataset.py @@ -631,6 +631,12 @@ def transfer( port = origin.port for file in file_list: + dst_pathfilename = os.path.join(dst_base_path, file) + if os.path.exists(dst_pathfilename): + logger.debug( + f'File "{dst_pathfilename}" already exists. Skipping.') + continue + path = os.path.join(origin.base_path, file) dst_sub_path = os.path.dirname(file) From b4fba844b3ed807361ca34c5b33475ed9982ef1b Mon Sep 17 00:00:00 2001 From: Martin Wolf Date: Sun, 30 Jul 2023 22:08:45 +0200 Subject: [PATCH 21/34] Skip existing files --- skyllh/core/dataset.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/skyllh/core/dataset.py b/skyllh/core/dataset.py index 43d0379b9e..d2bc9251f8 100644 --- a/skyllh/core/dataset.py +++ b/skyllh/core/dataset.py @@ -537,11 +537,19 @@ def transfer( os.getcwd(), f'.{id(origin)}.rsync_file_list.txt') + # Create file list file content. + # Skip files which already exists. + file_list_filecontent = '' + for file in file_list: + dst_pathfilename = os.path.join(dst_base_path, file) + if not os.path.exists(dst_pathfilename): + file_list_filecontent += f'{file}\n' + if username is None: # No user name is defined. with TemporaryTextFile( pathfilename=file_list_pathfilename, - text='\n'.join(file_list) + text=file_list_filecontent, ): cmd = ( f'rsync ' @@ -564,7 +572,7 @@ def transfer( ): with TemporaryTextFile( pathfilename=file_list_pathfilename, - text='\n'.join(file_list) + text=file_list_filecontent, ): cmd = ( f'rsync ' @@ -579,7 +587,7 @@ def transfer( # Only the user name is defined. with TemporaryTextFile( pathfilename=file_list_pathfilename, - text='\n'.join(file_list) + text=file_list_filecontent, ): cmd = ( f'rsync ' From f1fcfdb1c0c8b49be2a8c2fe662a3f8d47dcd694 Mon Sep 17 00:00:00 2001 From: Martin Wolf Date: Mon, 31 Jul 2023 21:18:28 +0200 Subject: [PATCH 22/34] Add TestData dataset definition --- skyllh/datasets/i3/TestData.py | 72 ++++++++++++++++++++++++++++++++++ 1 file changed, 72 insertions(+) create mode 100644 skyllh/datasets/i3/TestData.py diff --git a/skyllh/datasets/i3/TestData.py b/skyllh/datasets/i3/TestData.py new file mode 100644 index 0000000000..d7b599a647 --- /dev/null +++ b/skyllh/datasets/i3/TestData.py @@ -0,0 +1,72 @@ +# -*- coding: utf-8 -*- +# Author: Dr. Martin Wolf + +from skyllh.core.dataset import ( + DatasetCollection, +) +from skyllh.i3.dataset import ( + I3Dataset, +) + + +def create_dataset_collection( + cfg, + base_path=None, + sub_path_fmt=None, +): + """Defines a dataset collection with a test dataset. + + Parameters + ---------- + cfg : instance of Config + The instance of Config holding the local configuration. + base_path : str | None + The base path of the data files. The actual path of a data file is + assumed to be of the structure //. + If None, use the default path ``cfg['repository']['base_path']``. + sub_path_fmt : str | None + The sub path format of the data files of the public data sample. + If None, use the default sub path format + 'testdata'. + + Returns + ------- + dsc : DatasetCollection + The dataset collection containing all the seasons as individual + I3Dataset objects. + """ + (version, verqualifiers) = (1, dict(p=0)) + + default_sub_path_fmt = 'testdata' + + dsc = DatasetCollection('Public Data 10-year point-source') + + dsc.description = r""" + This dataset collection contains a test dataset which can be used for unit + tests. + """ + + # Define the common keyword arguments for all data sets. + ds_kwargs = dict( + cfg=cfg, + livetime=None, + version=version, + verqualifiers=verqualifiers, + base_path=base_path, + default_sub_path_fmt=default_sub_path_fmt, + sub_path_fmt=sub_path_fmt, + ) + + TestData = I3Dataset( + name='TestData', + exp_pathfilenames='exp.npy', + mc_pathfilenames='mc.npy', + grl_pathfilenames='grl.npy', + **ds_kwargs, + ) + + dsc.add_datasets(( + TestData, + )) + + return dsc From 380a0f3a83b3118c33a62e78e4e29bc372d92d10 Mon Sep 17 00:00:00 2001 From: Martin Wolf Date: Mon, 31 Jul 2023 21:19:06 +0200 Subject: [PATCH 23/34] Rename method --- skyllh/i3/dataset.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/skyllh/i3/dataset.py b/skyllh/i3/dataset.py index 435fd42e6a..c1cb89889e 100644 --- a/skyllh/i3/dataset.py +++ b/skyllh/i3/dataset.py @@ -172,21 +172,19 @@ def __str__(self): return s - def create_transfer_file_list( + def create_file_list( self, ): - """Creates the list of files that need to be transfered from the origin - for this dataset. The paths are relative paths starting after the sub - path of the dataset. + """Creates the list of files of this dataset. + The file paths are relative to the dataset's root directory. Returns ------- file_list : list of str - The list of files that need to be transfered from the origin for - this dataset. + The list of files of this dataset. """ file_list = ( - super().create_transfer_file_list() + + super().create_file_list() + self._grl_pathfilename_list ) From 16067f4565ab54281fa472265ae229aa8c4fd804 Mon Sep 17 00:00:00 2001 From: Martin Wolf Date: Mon, 31 Jul 2023 21:20:42 +0200 Subject: [PATCH 24/34] Improve dataset handling and error handling of data transfers --- skyllh/core/dataset.py | 126 ++++++++++++++++++++++++++++++++++------- 1 file changed, 104 insertions(+), 22 deletions(-) diff --git a/skyllh/core/dataset.py b/skyllh/core/dataset.py index d2bc9251f8..ad62d757e0 100644 --- a/skyllh/core/dataset.py +++ b/skyllh/core/dataset.py @@ -3,6 +3,7 @@ import abc import os import os.path +import shutil import stat import numpy as np @@ -377,6 +378,23 @@ def __exit__(self, exc_type, exc_val, exc_tb): os.remove(self.pathfilename) +class SystemCommandError( + Exception, +): + """This custom exception will be raised when a system command failed. + """ + pass + + +class DatasetTransferError( + Exception, +): + """This custom exception defines an error that should be raised when the + actual transfer of the dataset files failed. + """ + pass + + class DatasetTransfer( object, metaclass=abc.ABCMeta, @@ -406,13 +424,13 @@ def execute_system_command( Raises ------ - RuntimeError + SystemCommandError If the system command did not return ``success_rcode``. """ logger.debug(f'Running command "{cmd}"') rcode = os.system(cmd) if (success_rcode is not None) and (rcode != success_rcode): - raise RuntimeError( + raise SystemCommandError( f'The system command "{cmd}" failed with return code {rcode}!') @staticmethod @@ -453,6 +471,11 @@ def transfer( password : str | None The password for the user name required to connect to the remote host. + + Raises + ------ + DatasetTransferError + If the actual transfer of the dataset files failed. """ pass @@ -493,7 +516,7 @@ def __init__(self, **kwargs): super().__init__(**kwargs) @staticmethod - def transfer( + def transfer( # noqa: C901 origin, file_list, dst_base_path, @@ -558,7 +581,11 @@ def transfer( f'--files-from="{file_list_pathfilename}" ' f'{host}:"{origin_base_path}" "{dst_base_path}"' ) - DatasetTransfer.execute_system_command(cmd, logger) + try: + DatasetTransfer.execute_system_command(cmd, logger) + except SystemCommandError as err: + raise DatasetTransferError(str(err)) + elif password is not None: # User and password is defined. pwdfile = os.path.join( @@ -582,7 +609,10 @@ def transfer( f'--files-from="{file_list_pathfilename}" ' f'{username}@{host}:"{origin_base_path}" "{dst_base_path}"' ) - DatasetTransfer.execute_system_command(cmd, logger) + try: + DatasetTransfer.execute_system_command(cmd, logger) + except SystemCommandError as err: + raise DatasetTransferError(str(err)) else: # Only the user name is defined. with TemporaryTextFile( @@ -596,7 +626,10 @@ def transfer( f'--files-from="{file_list_pathfilename}" ' f'{username}@{host}:"{origin_base_path}" "{dst_base_path}"' ) - DatasetTransfer.execute_system_command(cmd, logger) + try: + DatasetTransfer.execute_system_command(cmd, logger) + except SystemCommandError as err: + raise DatasetTransferError(str(err)) class WGETDatasetTransfer( @@ -675,7 +708,10 @@ def transfer( if path[0:1] != '/': cmd += '/' cmd += f'{path} -P {dst_path}' - DatasetTransfer.execute_system_command(cmd, logger) + try: + DatasetTransfer.execute_system_command(cmd, logger) + except SystemCommandError as err: + raise DatasetTransferError(str(err)) class Dataset( @@ -1116,12 +1152,13 @@ def mc_field_name_renaming_dict(self, d): @property def exists(self): - """(read-only) Flag if all the data files of this data set exists. It is + """(read-only) Flag if all the data files of this dataset exists. It is ``True`` if all data files exist and ``False`` otherwise. """ - for pathfilename in (self.exp_abs_pathfilename_list + - self.mc_abs_pathfilename_list): - if not os.path.exists(pathfilename): + file_list = self.create_file_list() + abs_file_list = self.get_abs_pathfilename_list(file_list) + for abs_file in abs_file_list: + if not os.path.exists(abs_file): return False return True @@ -1270,6 +1307,31 @@ def __str__(self): # noqa: C901 return s + def remove_data(self): + """Removes the data of this dataset by removing the dataset's root + directory and everything in it. If the root directory is a symbolic + link, only this link will be removed. + + Raises + ------ + RuntimeError + If the dataset's root directory is neither a symlink nor a + directory. + """ + root_dir = self.root_dir + + if os.path.islink(root_dir): + os.remove(root_dir) + return + + if os.path.isdir(root_dir): + shutil.rmtree(root_dir) + return + + raise RuntimeError( + f'The root directory "{root_dir}" of dataset {self.name} is ' + 'neither a symlink nor a directory!') + def get_abs_pathfilename_list( self, pathfilename_list, @@ -1301,6 +1363,29 @@ def get_abs_pathfilename_list( return abs_pathfilename_list + def get_missing_files( + self, + ): + """Determines which files of the dataset are missing and returns the + list of files. + + Returns + ------- + missing_files : list of str + The list of files that are missing. The files are relative to the + dataset's root directory. + """ + file_list = self.create_file_list() + abs_file_list = self.get_abs_pathfilename_list(file_list) + + missing_files = [ + file + for (file, abs_file) in zip(file_list, abs_file_list) + if not os.path.exists(abs_file) + ] + + return missing_files + def update_version_qualifiers( self, verqualifiers, @@ -1344,18 +1429,16 @@ def update_version_qualifiers( 'Version qualifier values did not increment and no new version ' 'qualifiers were added!') - def create_transfer_file_list( + def create_file_list( self, ): - """Creates the list of files that need to be transfered from the origin - for this dataset. The paths are relative paths starting after the sub - path of the dataset. + """Creates the list of files that are linked to this dataset. + The file paths are relative to the dataset's root directory. Returns ------- file_list : list of str - The list of files that need to be transfered from the origin for - this dataset. + The list of files of this dataset. """ file_list = ( self._exp_pathfilename_list + @@ -1396,11 +1479,9 @@ def make_data_available( module_class_method_name(self, 'make_data_available') ) - root_dir = self.root_dir - - if os.path.exists(root_dir) and os.path.isdir(root_dir): + if len(self.get_missing_files()) == 0: logger.debug( - f'The root dir "{root_dir}" of dataset "{self.name}" exists. ' + f'All files of dataset "{self.name}" already exist. ' 'Nothing to download.') return True @@ -1413,6 +1494,7 @@ def make_data_available( # Check if the dataset origin is locally available. In that case we # just create a symlink. if self.origin.is_locally_available(): + root_dir = self.root_dir # Make sure all directories leading to the symlink exist. dirname = os.path.dirname(root_dir) if dirname != '': @@ -1447,7 +1529,7 @@ def make_data_available( if self.origin.is_directory: file_list = [ os.path.join(self.origin.sub_path, pathfilename) - for pathfilename in self.create_transfer_file_list() + for pathfilename in self.create_file_list() ] else: file_list = [ From cd8a2c8b9aad4ac4bafcfec3d2286586383ab263 Mon Sep 17 00:00:00 2001 From: Martin Wolf Date: Mon, 31 Jul 2023 21:21:12 +0200 Subject: [PATCH 25/34] Add unit test for rsync dataset transfers --- tests/core/test_dataset.py | 46 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/tests/core/test_dataset.py b/tests/core/test_dataset.py index fca6f6e0a1..a9093cc6aa 100644 --- a/tests/core/test_dataset.py +++ b/tests/core/test_dataset.py @@ -4,9 +4,15 @@ import os.path import unittest +from skyllh.core.config import ( + Config, +) from skyllh.core.dataset import ( get_data_subset, DatasetData, + DatasetOrigin, + DatasetTransferError, + RSYNCDatasetTransfer, ) from skyllh.core.livetime import ( Livetime, @@ -14,6 +20,46 @@ from skyllh.core.storage import ( DataFieldRecordArray, ) +from skyllh.datasets.i3 import ( + TestData, +) + + +class TestRSYNCDatasetTransfer( + unittest.TestCase, +): + def setUp(self): + self.cfg = Config() + self.ds = TestData.create_dataset_collection( + cfg=self.cfg, + base_path=os.path.join(os.getcwd(), '.repository')).get_dataset( + 'TestData') + + # Remove the dataset if it already exists. + if self.ds.exists: + self.ds.remove_data() + + # Define the origin and transfer method of this dataset. + self.ds.origin = DatasetOrigin( + base_path='/data/user/mwolf/skyllh', + sub_path='testdata', + host='cobalt', + transfer_func=RSYNCDatasetTransfer.transfer, + ) + + def test_transfer(self): + try: + if not self.ds.make_data_available(): + raise RuntimeError( + f'The data of dataset {self.ds.name} could not be made ' + 'available!') + except DatasetTransferError: + self.skipTest( + f'The data of dataset {self.ds.name} could not be transfered.') + + # Check that there are no missing files. + missing_files = self.ds.get_missing_files() + self.assertEqual(len(missing_files), 0) class TestDatasetFunctions(unittest.TestCase): From 9f3a34471954818ec50f990e7118891d49def3e7 Mon Sep 17 00:00:00 2001 From: Martin Wolf Date: Tue, 1 Aug 2023 09:27:59 +0200 Subject: [PATCH 26/34] Add unit test for wget dataset transfer --- .github/workflows/pythonpackage.yml | 4 ++- tests/core/test_dataset.py | 44 +++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+), 1 deletion(-) diff --git a/.github/workflows/pythonpackage.yml b/.github/workflows/pythonpackage.yml index 80bf2e8554..a5448193b8 100644 --- a/.github/workflows/pythonpackage.yml +++ b/.github/workflows/pythonpackage.yml @@ -34,6 +34,8 @@ jobs: flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics # exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics - - name: Test with unittest + - name: Run tests with unittest + env: + ICECUBE_PASSWORD: ${{ secrets.ICECUBE_PASSWORD }} run: | ./tests/run.sh diff --git a/tests/core/test_dataset.py b/tests/core/test_dataset.py index a9093cc6aa..2c27e61b10 100644 --- a/tests/core/test_dataset.py +++ b/tests/core/test_dataset.py @@ -13,6 +13,7 @@ DatasetOrigin, DatasetTransferError, RSYNCDatasetTransfer, + WGETDatasetTransfer, ) from skyllh.core.livetime import ( Livetime, @@ -62,6 +63,49 @@ def test_transfer(self): self.assertEqual(len(missing_files), 0) +class TestWGETDatasetTransfer( + unittest.TestCase, +): + def setUp(self): + self.cfg = Config() + self.ds = TestData.create_dataset_collection( + cfg=self.cfg, + base_path=os.path.join(os.getcwd(), '.repository')).get_dataset( + 'TestData') + + # Remove the dataset if it already exists. + if self.ds.exists: + self.ds.remove_data() + + # Define the origin and transfer method of this dataset. + self.ds.origin = DatasetOrigin( + base_path='/data/user/mwolf/skyllh', + sub_path='testdata', + host='convey.icecube.wisc.edu', + transfer_func=WGETDatasetTransfer.transfer, + ) + + def test_transfer(self): + username = 'icecube' + password = os.environ.get('ICECUBE_PASSWORD', None) + if password is None: + self.skipTest( + f'No password for username "{username}" provided via the ' + 'environment!') + + if not self.ds.make_data_available( + username=username, + password=password, + ): + raise RuntimeError( + f'The data of dataset {self.ds.name} could not be made ' + 'available!') + + # Check that there are no missing files. + missing_files = self.ds.get_missing_files() + self.assertEqual(len(missing_files), 0) + + class TestDatasetFunctions(unittest.TestCase): def setUp(self): path = os.path.abspath(os.path.dirname(__file__)) From d63e7789b182ee24837cd264ed6c72fc47e7963a Mon Sep 17 00:00:00 2001 From: Martin Wolf Date: Tue, 1 Aug 2023 09:30:36 +0200 Subject: [PATCH 27/34] Fix origin --- tests/core/test_dataset.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/core/test_dataset.py b/tests/core/test_dataset.py index 2c27e61b10..cbc15e1687 100644 --- a/tests/core/test_dataset.py +++ b/tests/core/test_dataset.py @@ -82,6 +82,7 @@ def setUp(self): base_path='/data/user/mwolf/skyllh', sub_path='testdata', host='convey.icecube.wisc.edu', + protocol='https', transfer_func=WGETDatasetTransfer.transfer, ) From f9780ea7bc1c7d4e8b53333840957b4e92248311 Mon Sep 17 00:00:00 2001 From: Martin Wolf Date: Tue, 1 Aug 2023 10:02:29 +0200 Subject: [PATCH 28/34] Make DatasetTransfer.transfer an instance method --- skyllh/core/dataset.py | 72 +++++++++++++------------ skyllh/datasets/i3/PublicData_10y_ps.py | 3 +- tests/core/test_dataset.py | 16 +++--- 3 files changed, 47 insertions(+), 44 deletions(-) diff --git a/skyllh/core/dataset.py b/skyllh/core/dataset.py index ad62d757e0..219333e2dd 100644 --- a/skyllh/core/dataset.py +++ b/skyllh/core/dataset.py @@ -199,22 +199,6 @@ def is_directory(self): """ return (self._filename is None) - @property - def protocol(self): - """The protocol to use for the transfer. - """ - return self._protocol - - @protocol.setter - def protocol(self, obj): - if obj is not None: - if not isinstance(obj, str): - raise TypeError( - 'The property protocol must be None, or an instance of ' - 'str! ' - f'Its current type is {classname(obj)}!') - self._protocol = obj - @property def host(self): """The name or IP of the remote host. @@ -448,11 +432,11 @@ def ensure_dst_path( # Throws if dst_path exists as a file. os.makedirs(dst_path) - @staticmethod - @abc.abstractstaticmethod + @abc.abstractmethod def transfer( - ds, - dst_path, + self, + origin, + dst_base_path, username=None, password=None, ): @@ -461,11 +445,14 @@ def transfer( Parameters ---------- - ds : instance of Dataset - The instance of Dataset having the ``origin`` property defined, - specifying the origin of the dataset. - dst_path : str - The destination path into which the dataset will be transfered. + origin : instance of DatasetOrigin + The instance of DatasetOrigin defining the origin of the dataset. + file_list : list of str + The list of files, relative to the origin base path, which should be + transfered. + dst_base_path : str + The destination base path into which the dataset files will be + transfered. username : str | None The user name required to connect to the remote host. password : str | None @@ -513,10 +500,11 @@ class RSYNCDatasetTransfer( DatasetTransfer, ): def __init__(self, **kwargs): - super().__init__(**kwargs) + super().__init__( + **kwargs) - @staticmethod def transfer( # noqa: C901 + self, origin, file_list, dst_base_path, @@ -543,7 +531,7 @@ def transfer( # noqa: C901 The password for the user name required to connect to the remote host. """ - cls = get_class_of_func(RSYNCDatasetTransfer.transfer) + cls = get_class_of_func(self.transfer) logger = get_logger(f'{classname(cls)}.transfer') host = origin.host @@ -635,11 +623,28 @@ def transfer( # noqa: C901 class WGETDatasetTransfer( DatasetTransfer, ): - def __init__(self, **kwargs): - super().__init__(**kwargs) + def __init__(self, protocol, **kwargs): + super().__init__( + **kwargs) + + self.protocol = protocol + + @property + def protocol(self): + """The protocol to use for the transfer. + """ + return self._protocol + + @protocol.setter + def protocol(self, obj): + if not isinstance(obj, str): + raise TypeError( + 'The property protocol must be an instance of str! ' + f'Its current type is {classname(obj)}!') + self._protocol = obj - @staticmethod def transfer( + self, origin, file_list, dst_base_path, @@ -664,10 +669,9 @@ def transfer( The password for the user name required to connect to the remote host. """ - cls = get_class_of_func(WGETDatasetTransfer.transfer) + cls = get_class_of_func(self.transfer) logger = get_logger(f'{classname(cls)}.transfer') - protocol = origin.protocol host = origin.host port = origin.port @@ -702,7 +706,7 @@ def transfer( cmd += ( f'--user={username} ' ) - cmd += f'{protocol}://{host}' + cmd += f'{self.protocol}://{host}' if port is not None: cmd += f':{port}' if path[0:1] != '/': diff --git a/skyllh/datasets/i3/PublicData_10y_ps.py b/skyllh/datasets/i3/PublicData_10y_ps.py index f6409dbb65..fdba7e791e 100644 --- a/skyllh/datasets/i3/PublicData_10y_ps.py +++ b/skyllh/datasets/i3/PublicData_10y_ps.py @@ -263,8 +263,7 @@ def create_dataset_collection( sub_path='', filename='20210126_PS-IC40-IC86_VII.zip', host='icecube.wisc.edu', - protocol='http', - transfer_func=WGETDatasetTransfer.transfer, + transfer_func=WGETDatasetTransfer(protocol='http').transfer, post_transfer_func=WGETDatasetTransfer.post_transfer_unzip, ) diff --git a/tests/core/test_dataset.py b/tests/core/test_dataset.py index cbc15e1687..074ee4bc65 100644 --- a/tests/core/test_dataset.py +++ b/tests/core/test_dataset.py @@ -45,7 +45,7 @@ def setUp(self): base_path='/data/user/mwolf/skyllh', sub_path='testdata', host='cobalt', - transfer_func=RSYNCDatasetTransfer.transfer, + transfer_func=RSYNCDatasetTransfer().transfer, ) def test_transfer(self): @@ -82,20 +82,18 @@ def setUp(self): base_path='/data/user/mwolf/skyllh', sub_path='testdata', host='convey.icecube.wisc.edu', - protocol='https', - transfer_func=WGETDatasetTransfer.transfer, + username='icecube', + transfer_func=WGETDatasetTransfer(protocol='https').transfer, ) def test_transfer(self): - username = 'icecube' password = os.environ.get('ICECUBE_PASSWORD', None) if password is None: self.skipTest( - f'No password for username "{username}" provided via the ' - 'environment!') + f'No password for username "{self.ds.origin.username}" ' + 'provided via the environment!') if not self.ds.make_data_available( - username=username, password=password, ): raise RuntimeError( @@ -107,7 +105,9 @@ def test_transfer(self): self.assertEqual(len(missing_files), 0) -class TestDatasetFunctions(unittest.TestCase): +class TestDatasetFunctions( + unittest.TestCase, +): def setUp(self): path = os.path.abspath(os.path.dirname(__file__)) self.exp_data = DataFieldRecordArray( From e9c04af53a27008f7e538455b718eb307202a936 Mon Sep 17 00:00:00 2001 From: Martin Wolf Date: Fri, 4 Aug 2023 12:26:06 +0200 Subject: [PATCH 29/34] protocol is not part of origin --- skyllh/core/dataset.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/skyllh/core/dataset.py b/skyllh/core/dataset.py index 219333e2dd..f6010a24fd 100644 --- a/skyllh/core/dataset.py +++ b/skyllh/core/dataset.py @@ -68,7 +68,6 @@ def __init__( sub_path, transfer_func, filename=None, - protocol=None, host=None, port=None, username=None, @@ -98,8 +97,6 @@ def __init__( filename : str | None If the origin is not a directory but a file, this specifies the filename. - protocol : str | None - The protocol to use for the transfer, e.g. ``"http"`` or ``"file"``. host : str | None The name or IP of the remote host. port : int | None @@ -120,7 +117,6 @@ def __init__( self.sub_path = sub_path self.transfer_func = transfer_func self.filename = filename - self.protocol = protocol self.host = host self.port = port self.username = username @@ -289,7 +285,6 @@ def __str__(self): s1 += f'sub_path = "{self.sub_path}"\n' if self._filename is not None: s1 += f'filename = {self._filename}\n' - s1 += f'protocol = {self.protocol}\n' s1 += f'user@host:port = {self.username}@{self.host}:{self.port}\n' s1 += 'password = ' if self.password is not None: From fdc250a4f2cf0d02270316245eef05ccf806a39e Mon Sep 17 00:00:00 2001 From: Martin Wolf Date: Fri, 4 Aug 2023 13:11:58 +0200 Subject: [PATCH 30/34] Add docs for dataset --- doc/sphinx/concepts/dataset.ipynb | 402 ++++++++++++++++++++++++++++++ doc/sphinx/concepts/index.rst | 1 + 2 files changed, 403 insertions(+) create mode 100644 doc/sphinx/concepts/dataset.ipynb diff --git a/doc/sphinx/concepts/dataset.ipynb b/doc/sphinx/concepts/dataset.ipynb new file mode 100644 index 0000000000..203479946d --- /dev/null +++ b/doc/sphinx/concepts/dataset.ipynb @@ -0,0 +1,402 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Dataset Definition" + ] + }, + { + "cell_type": "raw", + "metadata": { + "raw_mimetype": "text/restructuredtext" + }, + "source": [ + "Each analysis requires a dataset definition that defines a particular dataset.\n", + "In practice usually a data sample exists, which is a collection of individual\n", + "datasets. For example the public 10-year IceCube point-source data sample is a\n", + "collection of individual datasets, one for each partial IceCube detector\n", + "configuration.\n", + "\n", + "SkyLLh provides the :py:class:`skyllh.core.dataset.Dataset` class to create an\n", + "individual dataset definition. Such a definition defines the experimental and\n", + "monte-carlo data files and possibly additional information like data binning\n", + "definitions or auxilary data files.\n", + "\n", + "Individual datasets can be combined into a dataset collection via the \n", + ":py:class:`skyllh.core.dataset.DatasetCollection` class.\n", + "\n", + "A dataset collection is usually defined within one Python module providing the\n", + "function ``create_dataset_collection``. For instance the 10-year public \n", + "point-source data sample is defined in the \n", + ":py:mod:`skyllh.datasets.i3.PublicData_10y_ps` module, and the its dataset \n", + "collection can be created via the \n", + ":py:func:`~skyllh.datasets.i3.PublicData_10y_ps.create_dataset_collection`\n", + "function. This function requires a configuration. If no data repository base \n", + "path is set in the configuration, that base path needs to be passed to the \n", + "function as well." + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "from skyllh.core.config import (\n", + " Config,\n", + ")\n", + "from skyllh.core.dataset import (\n", + " Dataset,\n", + " DatasetCollection,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "# Create configuration instance.\n", + "cfg = Config()" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "" + ] + }, + "execution_count": 3, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# Create individual dataset.\n", + "my_dataset = Dataset(\n", + " cfg=cfg,\n", + " name='My Dataset',\n", + " exp_pathfilenames='exp.npy',\n", + " mc_pathfilenames='mc.npy',\n", + " livetime=365,\n", + " version=1,\n", + " verqualifiers={'patch': 0},\n", + " default_sub_path_fmt='my_dataset_v{version:03d}_p{patch:02d}',\n", + " base_path='/data/ana/analyses/',\n", + ")\n", + "\n", + "# Create collection of individual datasets.\n", + "dsc = DatasetCollection(\n", + " name='My Dataset Collection',\n", + " description='This is my dataset collection containing all my individual '\n", + " 'datasets.')\n", + "dsc.add_datasets((my_dataset,))" + ] + }, + { + "cell_type": "raw", + "metadata": { + "raw_mimetype": "text/restructuredtext" + }, + "source": [ + "We can print the dataset collection, which will list all the individual datasets\n", + "of this collection." + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "DatasetCollection \"My Dataset Collection\"\n", + "--------------------------------------------------------------------------------\n", + "Description:\n", + "This is my dataset collection containing all my individual datasets.\n", + "Available datasets:\n", + "\n", + " Dataset \"My Dataset\": v001patch00\n", + " { livetime = 365.000 days }\n", + " Experimental data:\n", + " [\u001b[92mFOUND\u001b[0m] /data/ana/analyses/my_dataset_v001_p00/exp.npy\n", + " MC data:\n", + " [\u001b[92mFOUND\u001b[0m] /data/ana/analyses/my_dataset_v001_p00/mc.npy\n", + " \n" + ] + } + ], + "source": [ + "print(dsc)" + ] + }, + { + "cell_type": "raw", + "metadata": { + "raw_mimetype": "text/restructuredtext" + }, + "source": [ + "Individual datasets of the dataset collection can be retrieved via the\n", + ":py:meth:`~skyllh.core.dataset.DatasetCollection.get_dataset` method:" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Dataset \"My Dataset\": v001patch00\n", + " { livetime = 365.000 days }\n", + " Experimental data:\n", + " [\u001b[92mFOUND\u001b[0m] /data/ana/analyses/my_dataset_v001_p00/exp.npy\n", + " MC data:\n", + " [\u001b[92mFOUND\u001b[0m] /data/ana/analyses/my_dataset_v001_p00/mc.npy\n", + " \n" + ] + } + ], + "source": [ + "my_dataset = dsc.get_dataset('My Dataset')\n", + "print(my_dataset)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Auxiliary data files" + ] + }, + { + "cell_type": "raw", + "metadata": { + "raw_mimetype": "text/restructuredtext" + }, + "source": [ + "If a dataset requires auxiliary data files, such files can be defined via the\n", + ":py:meth:`~skyllh.core.dataset.Dataset.add_aux_data_definition` method:" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [], + "source": [ + "my_dataset.add_aux_data_definition('aux_file_key_1', 'aux_data/aux_file1.dat')" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Dataset \"My Dataset\": v001patch00\n", + " { livetime = 365.000 days }\n", + " Experimental data:\n", + " [\u001b[92mFOUND\u001b[0m] /data/ana/analyses/my_dataset_v001_p00/exp.npy\n", + " MC data:\n", + " [\u001b[92mFOUND\u001b[0m] /data/ana/analyses/my_dataset_v001_p00/mc.npy\n", + " Auxiliary data:\n", + " aux_file_key_1: \n", + " [\u001b[92mFOUND\u001b[0m] /data/ana/analyses/my_dataset_v001_p00/aux_data/aux_file1.dat\n" + ] + } + ], + "source": [ + "print(my_dataset)" + ] + }, + { + "cell_type": "raw", + "metadata": { + "raw_mimetype": "text/restructuredtext" + }, + "source": [ + "If the auxiliary data is not present as a file but as actual Python data, such\n", + "data can be added via the :py:meth:`~skyllh.core.dataset.Dataset.add_aux_data`\n", + "method: " + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [], + "source": [ + "my_dataset.add_aux_data('aux_data_1', [1, 2, 3])" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Dataset Origin" + ] + }, + { + "cell_type": "raw", + "metadata": { + "raw_mimetype": "text/restructuredtext" + }, + "source": [ + "An individual dataset can have an origin, which specifies where the\n", + "dataset can be downloaded automatically. SkyLLH provides the\n", + ":py:class:`skyllh.core.dataset.DatasetOrigin` class to define such an origin.\n", + "\n", + "The origin consists of a host (possibly also a port), a base path and a sub path\n", + "at the origin, and a transfer function which will be used to perform the actual\n", + "data transfer.\n", + "\n", + "SkyLLH provides two dataset transfer methods, ``wget`` and ``rsync``. " + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [], + "source": [ + "from skyllh.core.dataset import (\n", + " DatasetOrigin,\n", + " WGETDatasetTransfer,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [], + "source": [ + "origin = DatasetOrigin(\n", + " host='data.mydomain.com',\n", + " base_path='/downloads/data',\n", + " sub_path='my_dataset',\n", + " transfer_func=WGETDatasetTransfer(protocol='https').transfer)\n", + "my_dataset.origin = origin" + ] + }, + { + "cell_type": "raw", + "metadata": { + "raw_mimetype": "text/restructuredtext" + }, + "source": [ + "In the example above we specified that the dataset is available at the URL\n", + "``data.mydomain.com/downloads/data/my_dataset``, which can be transfered\n", + "using ``wget`` via the https protocol.\n", + "\n", + "Hence, the experimental and monte-carlo files ``exp.npy`` and ``mc.npy`` \n", + "of the dataset must be available at \n", + "``https://data.mydomain.com/downloads/data/my_dataset/exp.npy`` and\n", + "``https://data.mydomain.com/downloads/data/my_dataset/mc.npy``, respectively." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Origin as archive file" + ] + }, + { + "cell_type": "raw", + "metadata": { + "raw_mimetype": "text/restructuredtext" + }, + "source": [ + "The dataset might be available as an archive file, e.g. a zip file on a \n", + "webserver. In such cases the ``filename``argument of the \n", + ":py:class:`~skyllh.core.dataset.DatasetOrigin` class constructor can be used in\n", + "combination with a post transfer function specified via the \n", + "``post_transfer_func`` argument of the constructor:" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [], + "source": [ + "origin = DatasetOrigin(\n", + " host='data.mydomain.com',\n", + " base_path='/downloads/data',\n", + " sub_path='',\n", + " filename='my_dataset.zip',\n", + " transfer_func=WGETDatasetTransfer(protocol='https').transfer,\n", + " post_transfer_func=WGETDatasetTransfer.post_transfer_unzip)" + ] + }, + { + "cell_type": "raw", + "metadata": { + "raw_mimetype": "text/restructuredtext" + }, + "source": [ + "The example above will transfer the single archive file \n", + "``https://data.mydomain.com/downloads/data/my_dataset.zip`` and unzip the file\n", + "on the local host." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Downloading the dataset" + ] + }, + { + "cell_type": "raw", + "metadata": { + "raw_mimetype": "text/restructuredtext" + }, + "source": [ + "If an origin is defined for an individual dataset, that dataset can be \n", + "downloaded automatically using the \n", + ":py:meth:`skyllh.core.dataset.Dataset.make_data_available` method of the\n", + ":py:class:`~skyllh.core.dataset.Dataset` class." + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.12" + }, + "orig_nbformat": 4 + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/doc/sphinx/concepts/index.rst b/doc/sphinx/concepts/index.rst index f103d10a0a..759a1f5a22 100644 --- a/doc/sphinx/concepts/index.rst +++ b/doc/sphinx/concepts/index.rst @@ -9,6 +9,7 @@ This section covers a few concepts SkyLLH is persuing. :maxdepth: 1 config + dataset datafields source_definition flux_definition From 71e5d567b1a2f064a7f2b56c7f9c2ba815ef9fd3 Mon Sep 17 00:00:00 2001 From: Martin Wolf Date: Fri, 4 Aug 2023 13:40:46 +0200 Subject: [PATCH 31/34] Fix typo --- doc/sphinx/concepts/dataset.ipynb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/sphinx/concepts/dataset.ipynb b/doc/sphinx/concepts/dataset.ipynb index 203479946d..cc573650cd 100644 --- a/doc/sphinx/concepts/dataset.ipynb +++ b/doc/sphinx/concepts/dataset.ipynb @@ -325,7 +325,7 @@ }, "source": [ "The dataset might be available as an archive file, e.g. a zip file on a \n", - "webserver. In such cases the ``filename``argument of the \n", + "webserver. In such cases the ``filename`` argument of the \n", ":py:class:`~skyllh.core.dataset.DatasetOrigin` class constructor can be used in\n", "combination with a post transfer function specified via the \n", "``post_transfer_func`` argument of the constructor:" From 9ad0cb73eac0e9ceddc4682efe9657e2acca670d Mon Sep 17 00:00:00 2001 From: Martin Wolf Date: Mon, 7 Aug 2023 17:26:23 +0200 Subject: [PATCH 32/34] Add changelog info --- CHANGELOG.txt | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/CHANGELOG.txt b/CHANGELOG.txt index c9fa3c9ae5..645420b1d6 100644 --- a/CHANGELOG.txt +++ b/CHANGELOG.txt @@ -1,5 +1,18 @@ This file contains a log-book for major changes between releases. +v23.2.1 +======= +- Allow the definition of an origin of a dataset via the + core.dataset.DatasetOrigin class and download the dataset automatically from + the origin to the local host. The following transfer methods are provided: + + - wget + - rsync + + Unit tests for these transfer methods are added. + +- The iminuit package is an optional tool now. + v23.2.0 ======= - Complete overhaul of SkyLLH for more generic handling of parameters From fdeb1753f27d8dc3acef9eda6d168c942c2e02f4 Mon Sep 17 00:00:00 2001 From: Martin Wolf Date: Tue, 8 Aug 2023 14:42:42 +0200 Subject: [PATCH 33/34] Remove obsolete method --- tests/core/test_dataset.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/tests/core/test_dataset.py b/tests/core/test_dataset.py index 074ee4bc65..bdb15dbe64 100644 --- a/tests/core/test_dataset.py +++ b/tests/core/test_dataset.py @@ -118,12 +118,6 @@ def setUp(self): os.path.join(path, 'testdata/livetime_testdata.npy')) self.livetime = 100 - def tearDown(self): - # self.exp_data.close() - # self.mc_data.close() - # self.livetime_datafile.close() - pass - def test_get_data_subset(self): # Whole interval. t_start = 58442.0 From 0bb71d7e4096a364c68ab8408e2965f428b21fb4 Mon Sep 17 00:00:00 2001 From: Martin Wolf Date: Tue, 8 Aug 2023 15:59:41 +0200 Subject: [PATCH 34/34] Improve doc string --- skyllh/core/dataset.py | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/skyllh/core/dataset.py b/skyllh/core/dataset.py index f6010a24fd..4ad7d417be 100644 --- a/skyllh/core/dataset.py +++ b/skyllh/core/dataset.py @@ -87,12 +87,13 @@ def __init__( The callable object that should be used to transfer the dataset. This function requires the following call signature:: - __call__(ds, dst_path, user=None, password=None) + __call__(origin, file_list, dst_base_path, user=None, password=None) - where ``ds`` is an instance of Dataset, ``dst_path`` is an instance - of str specifying the destination path on the local machine, - ``user`` is the user name required to connect to the remote host, - and ``password`` is the password for the user name required to + where ``origin`` is an instance of DatasetOrigin, ``file_list`` is + a list of str specifying the files to transfer, ``dst_base_path`` is + an instance of str specifying the destination base path on the local + machine, ``user`` is the user name required to connect to the remote + host, and ``password`` is the password for the user name required to connect to the remote host. filename : str | None If the origin is not a directory but a file, this specifies the @@ -110,6 +111,12 @@ def __init__( The callable object that should be called after the dataset has been transfered by the ``transfer_func``function. It can be used to extract an archive file. + This function requires the following call signature:: + + __call__(ds, dst_path) + + where ``ds`` is an instance of ``Dataset``, and ``dst_path`` is the + destination path. """ super().__init__(**kwargs) @@ -431,6 +438,7 @@ def ensure_dst_path( def transfer( self, origin, + file_list, dst_base_path, username=None, password=None, @@ -472,8 +480,7 @@ def post_transfer_unzip( if ds.origin.filename is None: return - fname = os.path.join(ds.origin.root_dir, ds.origin.filename) - if not fname.lower().endswith('.zip'): + if not ds.origin.filename.lower().endswith('.zip'): return cls = get_class_of_func(DatasetTransfer.post_transfer_unzip)