diff --git a/.github/workflows/pythonpackage.yml b/.github/workflows/pythonpackage.yml index 80bf2e8554..a5448193b8 100644 --- a/.github/workflows/pythonpackage.yml +++ b/.github/workflows/pythonpackage.yml @@ -34,6 +34,8 @@ jobs: flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics # exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics - - name: Test with unittest + - name: Run tests with unittest + env: + ICECUBE_PASSWORD: ${{ secrets.ICECUBE_PASSWORD }} run: | ./tests/run.sh diff --git a/CHANGELOG.txt b/CHANGELOG.txt index fb84f64ec6..5139b7aa16 100644 --- a/CHANGELOG.txt +++ b/CHANGELOG.txt @@ -7,6 +7,17 @@ v23.2.1 - Individual datasets of a dataset collection (``dsc``) can now be accessed via ``dsc[name]`` or ``dsc[name1, name2, ...]``. +- Allow the definition of an origin of a dataset via the + core.dataset.DatasetOrigin class and download the dataset automatically from + the origin to the local host. The following transfer methods are provided: + + - wget + - rsync + + Unit tests for these transfer methods are added. + +- The iminuit package is an optional tool now. + v23.2.0 ======= - Complete overhaul of SkyLLH for more generic handling of parameters diff --git a/doc/sphinx/concepts/dataset.ipynb b/doc/sphinx/concepts/dataset.ipynb new file mode 100644 index 0000000000..cc573650cd --- /dev/null +++ b/doc/sphinx/concepts/dataset.ipynb @@ -0,0 +1,402 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Dataset Definition" + ] + }, + { + "cell_type": "raw", + "metadata": { + "raw_mimetype": "text/restructuredtext" + }, + "source": [ + "Each analysis requires a dataset definition that defines a particular dataset.\n", + "In practice usually a data sample exists, which is a collection of individual\n", + "datasets. For example the public 10-year IceCube point-source data sample is a\n", + "collection of individual datasets, one for each partial IceCube detector\n", + "configuration.\n", + "\n", + "SkyLLh provides the :py:class:`skyllh.core.dataset.Dataset` class to create an\n", + "individual dataset definition. Such a definition defines the experimental and\n", + "monte-carlo data files and possibly additional information like data binning\n", + "definitions or auxilary data files.\n", + "\n", + "Individual datasets can be combined into a dataset collection via the \n", + ":py:class:`skyllh.core.dataset.DatasetCollection` class.\n", + "\n", + "A dataset collection is usually defined within one Python module providing the\n", + "function ``create_dataset_collection``. For instance the 10-year public \n", + "point-source data sample is defined in the \n", + ":py:mod:`skyllh.datasets.i3.PublicData_10y_ps` module, and the its dataset \n", + "collection can be created via the \n", + ":py:func:`~skyllh.datasets.i3.PublicData_10y_ps.create_dataset_collection`\n", + "function. This function requires a configuration. If no data repository base \n", + "path is set in the configuration, that base path needs to be passed to the \n", + "function as well." + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "from skyllh.core.config import (\n", + " Config,\n", + ")\n", + "from skyllh.core.dataset import (\n", + " Dataset,\n", + " DatasetCollection,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "# Create configuration instance.\n", + "cfg = Config()" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "" + ] + }, + "execution_count": 3, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# Create individual dataset.\n", + "my_dataset = Dataset(\n", + " cfg=cfg,\n", + " name='My Dataset',\n", + " exp_pathfilenames='exp.npy',\n", + " mc_pathfilenames='mc.npy',\n", + " livetime=365,\n", + " version=1,\n", + " verqualifiers={'patch': 0},\n", + " default_sub_path_fmt='my_dataset_v{version:03d}_p{patch:02d}',\n", + " base_path='/data/ana/analyses/',\n", + ")\n", + "\n", + "# Create collection of individual datasets.\n", + "dsc = DatasetCollection(\n", + " name='My Dataset Collection',\n", + " description='This is my dataset collection containing all my individual '\n", + " 'datasets.')\n", + "dsc.add_datasets((my_dataset,))" + ] + }, + { + "cell_type": "raw", + "metadata": { + "raw_mimetype": "text/restructuredtext" + }, + "source": [ + "We can print the dataset collection, which will list all the individual datasets\n", + "of this collection." + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "DatasetCollection \"My Dataset Collection\"\n", + "--------------------------------------------------------------------------------\n", + "Description:\n", + "This is my dataset collection containing all my individual datasets.\n", + "Available datasets:\n", + "\n", + " Dataset \"My Dataset\": v001patch00\n", + " { livetime = 365.000 days }\n", + " Experimental data:\n", + " [\u001b[92mFOUND\u001b[0m] /data/ana/analyses/my_dataset_v001_p00/exp.npy\n", + " MC data:\n", + " [\u001b[92mFOUND\u001b[0m] /data/ana/analyses/my_dataset_v001_p00/mc.npy\n", + " \n" + ] + } + ], + "source": [ + "print(dsc)" + ] + }, + { + "cell_type": "raw", + "metadata": { + "raw_mimetype": "text/restructuredtext" + }, + "source": [ + "Individual datasets of the dataset collection can be retrieved via the\n", + ":py:meth:`~skyllh.core.dataset.DatasetCollection.get_dataset` method:" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Dataset \"My Dataset\": v001patch00\n", + " { livetime = 365.000 days }\n", + " Experimental data:\n", + " [\u001b[92mFOUND\u001b[0m] /data/ana/analyses/my_dataset_v001_p00/exp.npy\n", + " MC data:\n", + " [\u001b[92mFOUND\u001b[0m] /data/ana/analyses/my_dataset_v001_p00/mc.npy\n", + " \n" + ] + } + ], + "source": [ + "my_dataset = dsc.get_dataset('My Dataset')\n", + "print(my_dataset)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Auxiliary data files" + ] + }, + { + "cell_type": "raw", + "metadata": { + "raw_mimetype": "text/restructuredtext" + }, + "source": [ + "If a dataset requires auxiliary data files, such files can be defined via the\n", + ":py:meth:`~skyllh.core.dataset.Dataset.add_aux_data_definition` method:" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [], + "source": [ + "my_dataset.add_aux_data_definition('aux_file_key_1', 'aux_data/aux_file1.dat')" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Dataset \"My Dataset\": v001patch00\n", + " { livetime = 365.000 days }\n", + " Experimental data:\n", + " [\u001b[92mFOUND\u001b[0m] /data/ana/analyses/my_dataset_v001_p00/exp.npy\n", + " MC data:\n", + " [\u001b[92mFOUND\u001b[0m] /data/ana/analyses/my_dataset_v001_p00/mc.npy\n", + " Auxiliary data:\n", + " aux_file_key_1: \n", + " [\u001b[92mFOUND\u001b[0m] /data/ana/analyses/my_dataset_v001_p00/aux_data/aux_file1.dat\n" + ] + } + ], + "source": [ + "print(my_dataset)" + ] + }, + { + "cell_type": "raw", + "metadata": { + "raw_mimetype": "text/restructuredtext" + }, + "source": [ + "If the auxiliary data is not present as a file but as actual Python data, such\n", + "data can be added via the :py:meth:`~skyllh.core.dataset.Dataset.add_aux_data`\n", + "method: " + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [], + "source": [ + "my_dataset.add_aux_data('aux_data_1', [1, 2, 3])" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Dataset Origin" + ] + }, + { + "cell_type": "raw", + "metadata": { + "raw_mimetype": "text/restructuredtext" + }, + "source": [ + "An individual dataset can have an origin, which specifies where the\n", + "dataset can be downloaded automatically. SkyLLH provides the\n", + ":py:class:`skyllh.core.dataset.DatasetOrigin` class to define such an origin.\n", + "\n", + "The origin consists of a host (possibly also a port), a base path and a sub path\n", + "at the origin, and a transfer function which will be used to perform the actual\n", + "data transfer.\n", + "\n", + "SkyLLH provides two dataset transfer methods, ``wget`` and ``rsync``. " + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [], + "source": [ + "from skyllh.core.dataset import (\n", + " DatasetOrigin,\n", + " WGETDatasetTransfer,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [], + "source": [ + "origin = DatasetOrigin(\n", + " host='data.mydomain.com',\n", + " base_path='/downloads/data',\n", + " sub_path='my_dataset',\n", + " transfer_func=WGETDatasetTransfer(protocol='https').transfer)\n", + "my_dataset.origin = origin" + ] + }, + { + "cell_type": "raw", + "metadata": { + "raw_mimetype": "text/restructuredtext" + }, + "source": [ + "In the example above we specified that the dataset is available at the URL\n", + "``data.mydomain.com/downloads/data/my_dataset``, which can be transfered\n", + "using ``wget`` via the https protocol.\n", + "\n", + "Hence, the experimental and monte-carlo files ``exp.npy`` and ``mc.npy`` \n", + "of the dataset must be available at \n", + "``https://data.mydomain.com/downloads/data/my_dataset/exp.npy`` and\n", + "``https://data.mydomain.com/downloads/data/my_dataset/mc.npy``, respectively." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Origin as archive file" + ] + }, + { + "cell_type": "raw", + "metadata": { + "raw_mimetype": "text/restructuredtext" + }, + "source": [ + "The dataset might be available as an archive file, e.g. a zip file on a \n", + "webserver. In such cases the ``filename`` argument of the \n", + ":py:class:`~skyllh.core.dataset.DatasetOrigin` class constructor can be used in\n", + "combination with a post transfer function specified via the \n", + "``post_transfer_func`` argument of the constructor:" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [], + "source": [ + "origin = DatasetOrigin(\n", + " host='data.mydomain.com',\n", + " base_path='/downloads/data',\n", + " sub_path='',\n", + " filename='my_dataset.zip',\n", + " transfer_func=WGETDatasetTransfer(protocol='https').transfer,\n", + " post_transfer_func=WGETDatasetTransfer.post_transfer_unzip)" + ] + }, + { + "cell_type": "raw", + "metadata": { + "raw_mimetype": "text/restructuredtext" + }, + "source": [ + "The example above will transfer the single archive file \n", + "``https://data.mydomain.com/downloads/data/my_dataset.zip`` and unzip the file\n", + "on the local host." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Downloading the dataset" + ] + }, + { + "cell_type": "raw", + "metadata": { + "raw_mimetype": "text/restructuredtext" + }, + "source": [ + "If an origin is defined for an individual dataset, that dataset can be \n", + "downloaded automatically using the \n", + ":py:meth:`skyllh.core.dataset.Dataset.make_data_available` method of the\n", + ":py:class:`~skyllh.core.dataset.Dataset` class." + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.12" + }, + "orig_nbformat": 4 + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/doc/sphinx/concepts/index.rst b/doc/sphinx/concepts/index.rst index f103d10a0a..759a1f5a22 100644 --- a/doc/sphinx/concepts/index.rst +++ b/doc/sphinx/concepts/index.rst @@ -9,6 +9,7 @@ This section covers a few concepts SkyLLH is persuing. :maxdepth: 1 config + dataset datafields source_definition flux_definition diff --git a/skyllh/core/config.py b/skyllh/core/config.py index b3c09f75a8..8aac0d8c62 100644 --- a/skyllh/core/config.py +++ b/skyllh/core/config.py @@ -52,6 +52,7 @@ 'repository': { # A base path of repository datasets. 'base_path': None, + 'download_from_origin': True, }, 'units': { # Definition of the internal units to use. These must match with the diff --git a/skyllh/core/dataset.py b/skyllh/core/dataset.py index f957b147e5..4023f3ba55 100644 --- a/skyllh/core/dataset.py +++ b/skyllh/core/dataset.py @@ -1,12 +1,17 @@ # -*- coding: utf-8 -*- -from copy import ( - deepcopy, -) +import abc import os import os.path +import shutil +import stat + import numpy as np +from copy import ( + deepcopy, +) + from skyllh.core import ( display, ) @@ -20,6 +25,9 @@ DataFields, DataFieldStages as DFS, ) +from skyllh.core.debugging import ( + get_logger, +) from skyllh.core.display import ( ANSIColors, ) @@ -32,9 +40,11 @@ from skyllh.core.py import ( classname, float_cast, + get_class_of_func, issequence, issequenceof, list_of_cast, + module_class_method_name, str_cast, ) from skyllh.core.storage import ( @@ -46,6 +56,670 @@ ) +class DatasetOrigin( + object, +): + """The DatasetOrigin class provides information about the origin of a + dataset, so the files of a dataset can be downloaded from the origin. + """ + def __init__( + self, + base_path, + sub_path, + transfer_func, + filename=None, + host=None, + port=None, + username=None, + password=None, + post_transfer_func=None, + **kwargs, + ): + """Creates a new instance to define the origin of a dataset. + + Parameters + ---------- + base_path : str + The dataset's base directory at the origin. + sub_path : str + The dataset's sub directory at the origin. + transfer_func : callable + The callable object that should be used to transfer the dataset. + This function requires the following call signature:: + + __call__(origin, file_list, dst_base_path, user=None, password=None) + + where ``origin`` is an instance of DatasetOrigin, ``file_list`` is + a list of str specifying the files to transfer, ``dst_base_path`` is + an instance of str specifying the destination base path on the local + machine, ``user`` is the user name required to connect to the remote + host, and ``password`` is the password for the user name required to + connect to the remote host. + filename : str | None + If the origin is not a directory but a file, this specifies the + filename. + host : str | None + The name or IP of the remote host. + port : int | None + The port number to use when connecting to the remote host. + username : str | None + The user name required to connect to the remote host. + password : str | None + The password for the user name required to connect to the remote + host. + post_transfer_func : callable | None + The callable object that should be called after the dataset has been + transfered by the ``transfer_func``function. It can be used to + extract an archive file. + This function requires the following call signature:: + + __call__(ds, dst_path) + + where ``ds`` is an instance of ``Dataset``, and ``dst_path`` is the + destination path. + """ + super().__init__(**kwargs) + + self.base_path = base_path + self.sub_path = sub_path + self.transfer_func = transfer_func + self.filename = filename + self.host = host + self.port = port + self.username = username + self.password = password + self.post_transfer_func = post_transfer_func + + @property + def base_path(self): + """The dataset's base directory at the origin. + """ + return self._base_path + + @base_path.setter + def base_path(self, obj): + if not isinstance(obj, str): + raise TypeError( + 'The base_path property must be an instance of str! ' + f'Its current type is {classname(obj)}!') + self._base_path = obj + + @property + def sub_path(self): + """The dataset's sub directory at the origin. + """ + return self._sub_path + + @sub_path.setter + def sub_path(self, obj): + if not isinstance(obj, str): + raise TypeError( + 'The sub_path property must be an instance of str! ' + f'Its current type is {classname(obj)}!') + self._sub_path = obj + + @property + def root_dir(self): + """(read-only) The dataset's root directory at the origin, which is + the combination of ``base_path`` and ``sub_path``. + """ + return os.path.join(self._base_path, self._sub_path) + + @property + def transfer_func(self): + """The callable object that should be used to transfer the dataset. + """ + return self._transfer_func + + @transfer_func.setter + def transfer_func(self, obj): + if not callable(obj): + raise TypeError( + 'The property transfer_func must be a callable object! ' + f'Its current type is {classname(obj)}!') + self._transfer_func = obj + + @property + def filename(self): + """The file name if the origin is a file instaed of a directory. + """ + return self._filename + + @filename.setter + def filename(self, obj): + if obj is not None: + if not isinstance(obj, str): + raise TypeError( + 'The property filename must be None, or an instance of ' + 'str! ' + f'Its current type is {classname(obj)}!') + self._filename = obj + + @property + def is_directory(self): + """(read-only) Flag if the origin refers to a directory (``True``) or a + file (``False``). + """ + return (self._filename is None) + + @property + def host(self): + """The name or IP of the remote host. + """ + return self._host + + @host.setter + def host(self, obj): + if obj is not None: + if not isinstance(obj, str): + raise TypeError( + 'The property host must be None, or an instance of str! ' + f'Its current type is {classname(obj)}!') + self._host = obj + + @property + def port(self): + """The port number to use when connecting to the remote host. + """ + return self._port + + @port.setter + def port(self, obj): + if obj is not None: + if not isinstance(obj, int): + raise TypeError( + 'The property port must be None, or an instance of int! ' + f'Its current type is {classname(obj)}!') + self._port = obj + + @property + def username(self): + """The user name required to connect to the remote host. + """ + return self._username + + @username.setter + def username(self, obj): + if obj is not None: + if not isinstance(obj, str): + raise TypeError( + 'The property username must be None, or an instance of ' + 'str! ' + f'Its current type is {classname(obj)}!') + self._username = obj + + @property + def password(self): + """The password for the user name required to connect to the remote + host. + """ + return self._password + + @password.setter + def password(self, obj): + if obj is not None: + if not isinstance(obj, str): + raise TypeError( + 'The property password must be None, or an instance of ' + 'str! ' + f'Its current type is {classname(obj)}!') + self._password = obj + + @property + def post_transfer_func(self): + """The callable object that should be called after the dataset has been + transfered by the ``transfer_func`` callable. + """ + return self._post_transfer_func + + @post_transfer_func.setter + def post_transfer_func(self, obj): + if obj is not None: + if not callable(obj): + raise TypeError( + 'The property post_transfer_func must be a callable ' + 'object! ' + f'Its current type is {classname(obj)}!') + self._post_transfer_func = obj + + def __str__(self): + """Pretty string representation of this class. + """ + transfer_cls = get_class_of_func(self.transfer_func) + + s = f'{classname(self)} '+'{\n' + s1 = f'base_path = "{self.base_path}"\n' + s1 += f'sub_path = "{self.sub_path}"\n' + if self._filename is not None: + s1 += f'filename = {self._filename}\n' + s1 += f'user@host:port = {self.username}@{self.host}:{self.port}\n' + s1 += 'password = ' + if self.password is not None: + s1 += 'set\n' + else: + s1 += 'not set\n' + s1 += f'transfer class = {classname(transfer_cls)}\n' + s += display.add_leading_text_line_padding( + display.INDENTATION_WIDTH, s1) + s += '}' + + return s + + def is_locally_available(self): + """Checks if the dataset origin is available locally by checking if the + given path exists on the local host. + + Returns + ------- + check : bool + ``True`` if the path specified in this dataset origin is an absolute + path and exists on the local host, ``False`` otherwise. + """ + root_dir = self.root_dir + + if ( + self.is_directory and + os.path.abspath(root_dir) == root_dir and + os.path.exists(root_dir) and + os.path.isdir(root_dir) + ): + return True + + return False + + +class TemporaryTextFile( + object, +): + """This class provides a temporary text file with a given content while + being within a with statement. Exiting the with statement will remove the + temporary text file. + + Example: + + .. code:: + + with TemporaryTextFile('myfile.txt', 'My file content'): + # Do something that requires the text file ``myfile.txt``. + # At this point the text file is removed again. + + """ + + def __init__( + self, + pathfilename, + text, + mode=stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH, + ): + self.pathfilename = pathfilename + self.text = text + self.mode = mode + + def __enter__(self): + with open(self.pathfilename, 'w') as fd: + fd.write(self.text) + os.chmod(self.pathfilename, self.mode) + + def __exit__(self, exc_type, exc_val, exc_tb): + os.remove(self.pathfilename) + + +class SystemCommandError( + Exception, +): + """This custom exception will be raised when a system command failed. + """ + pass + + +class DatasetTransferError( + Exception, +): + """This custom exception defines an error that should be raised when the + actual transfer of the dataset files failed. + """ + pass + + +class DatasetTransfer( + object, + metaclass=abc.ABCMeta, +): + """Base class for a dataset transfer mechanism. + """ + def __init__(self, **kwargs): + super().__init__(**kwargs) + + @staticmethod + def execute_system_command( + cmd, + logger, + success_rcode=0, + ): + """Executes the given system command via a ``os.system`` call. + + Parameters + ---------- + cmd : str + The system command to execute. + logger : instance of logging.Logger + The logger to use for debug messages. + success_rcode : int | None + The return code that indicates success of the system command. + If set to ``None``, no return code checking is performed. + + Raises + ------ + SystemCommandError + If the system command did not return ``success_rcode``. + """ + logger.debug(f'Running command "{cmd}"') + rcode = os.system(cmd) + if (success_rcode is not None) and (rcode != success_rcode): + raise SystemCommandError( + f'The system command "{cmd}" failed with return code {rcode}!') + + @staticmethod + def ensure_dst_path( + dst_path, + ): + """Ensures the existance of the given destination path. + + Parameters + ---------- + dst_path : str + The destination path. + """ + if not os.path.isdir(dst_path): + # Throws if dst_path exists as a file. + os.makedirs(dst_path) + + @abc.abstractmethod + def transfer( + self, + origin, + file_list, + dst_base_path, + username=None, + password=None, + ): + """This method is supposed to transfer the dataset origin path to the + given destination path. + + Parameters + ---------- + origin : instance of DatasetOrigin + The instance of DatasetOrigin defining the origin of the dataset. + file_list : list of str + The list of files, relative to the origin base path, which should be + transfered. + dst_base_path : str + The destination base path into which the dataset files will be + transfered. + username : str | None + The user name required to connect to the remote host. + password : str | None + The password for the user name required to connect to the remote + host. + + Raises + ------ + DatasetTransferError + If the actual transfer of the dataset files failed. + """ + pass + + @staticmethod + def post_transfer_unzip( + ds, + dst_path, + ): + """This is a post-transfer function. It will unzip the transfered file + into the dst_path if the origin path was a zip file. + """ + if ds.origin.filename is None: + return + + if not ds.origin.filename.lower().endswith('.zip'): + return + + cls = get_class_of_func(DatasetTransfer.post_transfer_unzip) + logger = get_logger(f'{classname(cls)}.post_transfer_unzip') + + # Unzip the dataset file. + zip_file = os.path.join(dst_path, ds.origin.filename) + cmd = f'unzip "{zip_file}" -d "{dst_path}"' + DatasetTransfer.execute_system_command(cmd, logger) + + # Remove the zip file. + try: + os.remove(zip_file) + except Exception as exc: + logger.warn(str(exc)) + + +class RSYNCDatasetTransfer( + DatasetTransfer, +): + def __init__(self, **kwargs): + super().__init__( + **kwargs) + + def transfer( # noqa: C901 + self, + origin, + file_list, + dst_base_path, + username=None, + password=None, + ): + """Transfers the given dataset to the given destination path using the + ``rsync`` program. + + Parameters + ---------- + ds : instance of Dataset + The instance of Dataset containing the origin property specifying + the origin of the dataset. + file_list : list of str + The list of files, relative to the origin base path, which should be + transfered. + dst_base_path : str + The destination base path into which the dataset files will be + transfered. + username : str | None + The user name required to connect to the remote host. + password : str | None + The password for the user name required to connect to the remote + host. + """ + cls = get_class_of_func(self.transfer) + logger = get_logger(f'{classname(cls)}.transfer') + + host = origin.host + + # Make sure the origin and destination base paths end with a directory + # seperator. + origin_base_path = origin.base_path + if origin_base_path[-len(os.path.sep):] != os.path.sep: + origin_base_path += os.path.sep + if dst_base_path[-len(os.path.sep):] != os.path.sep: + dst_base_path += os.path.sep + + file_list_pathfilename = os.path.join( + os.getcwd(), + f'.{id(origin)}.rsync_file_list.txt') + + # Create file list file content. + # Skip files which already exists. + file_list_filecontent = '' + for file in file_list: + dst_pathfilename = os.path.join(dst_base_path, file) + if not os.path.exists(dst_pathfilename): + file_list_filecontent += f'{file}\n' + + if username is None: + # No user name is defined. + with TemporaryTextFile( + pathfilename=file_list_pathfilename, + text=file_list_filecontent, + ): + cmd = ( + f'rsync ' + '-avrRL ' + '--progress ' + f'--files-from="{file_list_pathfilename}" ' + f'{host}:"{origin_base_path}" "{dst_base_path}"' + ) + try: + DatasetTransfer.execute_system_command(cmd, logger) + except SystemCommandError as err: + raise DatasetTransferError(str(err)) + + elif password is not None: + # User and password is defined. + pwdfile = os.path.join( + os.getcwd(), + f'.{id(origin)}.rsync_passwd.txt') + + with TemporaryTextFile( + pathfilename=pwdfile, + text=password, + mode=stat.S_IRUSR, + ): + with TemporaryTextFile( + pathfilename=file_list_pathfilename, + text=file_list_filecontent, + ): + cmd = ( + f'rsync ' + '-avrRL ' + '--progress ' + f'--password-file "{pwdfile}" ' + f'--files-from="{file_list_pathfilename}" ' + f'{username}@{host}:"{origin_base_path}" "{dst_base_path}"' + ) + try: + DatasetTransfer.execute_system_command(cmd, logger) + except SystemCommandError as err: + raise DatasetTransferError(str(err)) + else: + # Only the user name is defined. + with TemporaryTextFile( + pathfilename=file_list_pathfilename, + text=file_list_filecontent, + ): + cmd = ( + f'rsync ' + '-avrRL ' + '--progress ' + f'--files-from="{file_list_pathfilename}" ' + f'{username}@{host}:"{origin_base_path}" "{dst_base_path}"' + ) + try: + DatasetTransfer.execute_system_command(cmd, logger) + except SystemCommandError as err: + raise DatasetTransferError(str(err)) + + +class WGETDatasetTransfer( + DatasetTransfer, +): + def __init__(self, protocol, **kwargs): + super().__init__( + **kwargs) + + self.protocol = protocol + + @property + def protocol(self): + """The protocol to use for the transfer. + """ + return self._protocol + + @protocol.setter + def protocol(self, obj): + if not isinstance(obj, str): + raise TypeError( + 'The property protocol must be an instance of str! ' + f'Its current type is {classname(obj)}!') + self._protocol = obj + + def transfer( + self, + origin, + file_list, + dst_base_path, + username=None, + password=None, + ): + """Transfers the given dataset to the given destination path using the + ``wget`` program. + + Parameters + ---------- + origin : instance of DatasetOrigin + The instance of DatasetOrigin defining the origin of the dataset. + file_list : list of str + The list of files relative to the origin's base path, which + should be transfered. + dst_base_path : str + The destination base path into which the dataset will be transfered. + username : str | None + The user name required to connect to the remote host. + password : str | None + The password for the user name required to connect to the remote + host. + """ + cls = get_class_of_func(self.transfer) + logger = get_logger(f'{classname(cls)}.transfer') + + host = origin.host + port = origin.port + + for file in file_list: + dst_pathfilename = os.path.join(dst_base_path, file) + if os.path.exists(dst_pathfilename): + logger.debug( + f'File "{dst_pathfilename}" already exists. Skipping.') + continue + + path = os.path.join(origin.base_path, file) + + dst_sub_path = os.path.dirname(file) + if dst_sub_path == '': + dst_path = dst_base_path + else: + dst_path = os.path.join(dst_base_path, dst_sub_path) + DatasetTransfer.ensure_dst_path(dst_path) + + cmd = 'wget ' + if username is None: + # No user name is specified. + pass + elif password is not None: + # A user name and password is specified. + cmd += ( + f'--user="{username}" ' + f'--password="{password}" ' + ) + else: + # Only a user name is specified. + cmd += ( + f'--user={username} ' + ) + cmd += f'{self.protocol}://{host}' + if port is not None: + cmd += f':{port}' + if path[0:1] != '/': + cmd += '/' + cmd += f'{path} -P {dst_path}' + try: + DatasetTransfer.execute_system_command(cmd, logger) + except SystemCommandError as err: + raise DatasetTransferError(str(err)) + + class Dataset( HasConfig, ): @@ -148,6 +822,7 @@ def __init__( verqualifiers=None, base_path=None, sub_path_fmt=None, + origin=None, **kwargs, ): """Creates a new dataset object that describes a self-consistent set of @@ -186,6 +861,9 @@ def __init__( sub_path_fmt : str | None The user-defined format of the sub path of the data set. If set to ``None``, the ``default_sub_path_fmt`` will be used. + origin : instance of DatasetOrigin | None + The instance of DatasetOrigin defining the origin of the dataset, + so the dataset can be transfered automatically to the user's device. """ super().__init__(**kwargs) @@ -198,6 +876,7 @@ def __init__( self.verqualifiers = verqualifiers self.base_path = base_path self.sub_path_fmt = sub_path_fmt + self.origin = origin self.description = '' @@ -413,6 +1092,23 @@ def sub_path_fmt(self, fmt): 'str!') self._sub_path_fmt = fmt + @property + def origin(self): + """The instance of DatasetOrigin defining the origin of the dataset. + This can be ``None`` if the dataset has no origin defined. + """ + return self._origin + + @origin.setter + def origin(self, obj): + if obj is not None: + if not isinstance(obj, DatasetOrigin): + raise TypeError( + 'The origin property must be None, or an instance of ' + 'DatasetOrigin! ' + f'Its current type is {classname(obj)}!') + self._origin = obj + @property def root_dir(self): """(read-only) The root directory to use when data files are specified @@ -462,12 +1158,13 @@ def mc_field_name_renaming_dict(self, d): @property def exists(self): - """(read-only) Flag if all the data files of this data set exists. It is + """(read-only) Flag if all the data files of this dataset exists. It is ``True`` if all data files exist and ``False`` otherwise. """ - for pathfilename in (self.exp_abs_pathfilename_list + - self.mc_abs_pathfilename_list): - if not os.path.exists(pathfilename): + file_list = self.create_file_list() + abs_file_list = self.get_abs_pathfilename_list(file_list) + for abs_file in abs_file_list: + if not os.path.exists(abs_file): return False return True @@ -616,6 +1313,31 @@ def __str__(self): # noqa: C901 return s + def remove_data(self): + """Removes the data of this dataset by removing the dataset's root + directory and everything in it. If the root directory is a symbolic + link, only this link will be removed. + + Raises + ------ + RuntimeError + If the dataset's root directory is neither a symlink nor a + directory. + """ + root_dir = self.root_dir + + if os.path.islink(root_dir): + os.remove(root_dir) + return + + if os.path.isdir(root_dir): + shutil.rmtree(root_dir) + return + + raise RuntimeError( + f'The root directory "{root_dir}" of dataset {self.name} is ' + 'neither a symlink nor a directory!') + def get_abs_pathfilename_list( self, pathfilename_list, @@ -647,6 +1369,29 @@ def get_abs_pathfilename_list( return abs_pathfilename_list + def get_missing_files( + self, + ): + """Determines which files of the dataset are missing and returns the + list of files. + + Returns + ------- + missing_files : list of str + The list of files that are missing. The files are relative to the + dataset's root directory. + """ + file_list = self.create_file_list() + abs_file_list = self.get_abs_pathfilename_list(file_list) + + missing_files = [ + file + for (file, abs_file) in zip(file_list, abs_file_list) + if not os.path.exists(abs_file) + ] + + return missing_files + def update_version_qualifiers( self, verqualifiers, @@ -690,6 +1435,127 @@ def update_version_qualifiers( 'Version qualifier values did not increment and no new version ' 'qualifiers were added!') + def create_file_list( + self, + ): + """Creates the list of files that are linked to this dataset. + The file paths are relative to the dataset's root directory. + + Returns + ------- + file_list : list of str + The list of files of this dataset. + """ + file_list = ( + self._exp_pathfilename_list + + self._mc_pathfilename_list + ) + + for aux_pathfilename_list in self._aux_data_definitions.values(): + file_list += aux_pathfilename_list + + return file_list + + def make_data_available( + self, + username=None, + password=None, + ): + """Makes the data of the dataset available. + If the root directory of the dataset does not exist locally, the dataset + is transfered from its origin to the local host. If the origin is + already available locally, only a symlink is created to the origin path. + + Parameters + ---------- + username : str | None + The user name required to connect to the remote host of the origin. + If set to ``None``, the + password : str | None + The password of the user name required to connect to the remote host + of the origin. + + Returns + ------- + success : bool + ``True`` if the data was made available successfully, ``False`` + otherwise. + """ + logger = get_logger( + module_class_method_name(self, 'make_data_available') + ) + + if len(self.get_missing_files()) == 0: + logger.debug( + f'All files of dataset "{self.name}" already exist. ' + 'Nothing to download.') + return True + + if self.origin is None: + logger.warn( + f'No origin defined for dataset "{self.name}"! ' + 'Cannot download dataset!') + return False + + # Check if the dataset origin is locally available. In that case we + # just create a symlink. + if self.origin.is_locally_available(): + root_dir = self.root_dir + # Make sure all directories leading to the symlink exist. + dirname = os.path.dirname(root_dir) + if dirname != '': + os.makedirs(dirname) + + cmd = f'ln -s "{self.origin.root_dir}" "{root_dir}"' + DatasetTransfer.execute_system_command(cmd, logger) + return True + + if self._cfg['repository']['download_from_origin'] is False: + logger.warn( + f'The data of dataset "{self.name}" is locally not available ' + 'and the download from the origin is disabled through the ' + 'configuration!') + return False + + if username is None: + username = self.origin.username + if password is None: + password = self.origin.password + + base_path = generate_base_path( + default_base_path=self._cfg['repository']['base_path'], + base_path=self._base_path) + + logger.debug( + f'Downloading dataset "{self.name}" from origin into base path ' + f'"{base_path}". username="{username}".') + + # Check if the origin is a directory. If not we just transfer that one + # file. + if self.origin.is_directory: + file_list = [ + os.path.join(self.origin.sub_path, pathfilename) + for pathfilename in self.create_file_list() + ] + else: + file_list = [ + os.path.join(self.origin.sub_path, self.origin.filename) + ] + + self.origin.transfer_func( + origin=self.origin, + file_list=file_list, + dst_base_path=base_path, + username=username, + password=password) + + if self.origin.post_transfer_func is not None: + self.origin.post_transfer_func( + ds=self, + dst_path=base_path) + + return True + def load_data( self, keep_fields=None, @@ -701,8 +1567,10 @@ def load_data( ): """Loads the data, which is described by the dataset. - Note: This does not call the ``prepare_data`` method! It only loads - the data as the method names says. + .. note: + + This does not call the ``prepare_data`` method! It only loads the + data as the method names says. Parameters ---------- @@ -767,6 +1635,9 @@ def _conv_new2orig_field_names( return orig_field_names + if self._cfg['repository']['download_from_origin'] is True: + self.make_data_available() + if keep_fields is None: keep_fields = [] @@ -1330,7 +2201,7 @@ def remove_aux_data_definition( Parameters ---------- name : str - The name of the dataset that should get removed. + The name of the data definition that should get removed. """ if name not in self._aux_data_definitions: raise KeyError( @@ -1339,6 +2210,21 @@ def remove_aux_data_definition( self._aux_data_definitions.pop(name) + def remove_aux_data_definitions( + self, + names, + ): + """Removes the auxiliary data definition from the dataset. + + Parameters + ---------- + names : sequence of str + The names of the data definitions that should get removed. + """ + for name in names: + self.remove_aux_data_definition( + name=name) + def add_aux_data( self, name, @@ -2059,6 +2945,63 @@ def remove_events( return data_exp +def generate_base_path( + default_base_path, + base_path=None, +): + """Generates the base path. If base_path is None, default_base_path is used. + + Parameters + ---------- + default_base_path : str + The default base path if base_path is None. + base_path : str | None + The user-specified base path. + + Returns + ------- + base_path : str + The generated base path. + """ + if base_path is None: + if default_base_path is None: + raise ValueError( + 'The default_base_path argument must not be None, when the ' + 'base_path argument is set to None!') + base_path = default_base_path + + return base_path + + +def generate_sub_path( + sub_path_fmt, + version, + verqualifiers, +): + """Generates the sub path of the dataset based on the given sub path format. + + Parameters + ---------- + sub_path_fmt : str + The format string of the sub path. + version : int + The version of the dataset. + verqualifiers : dict + The dictionary holding the version qualifiers of the dataset. + + Returns + ------- + sub_path : str + The generated sub path. + """ + fmtdict = dict( + [('version', version)] + list(verqualifiers.items()) + ) + sub_path = sub_path_fmt.format(**fmtdict) + + return sub_path + + def generate_data_file_root_dir( default_base_path, default_sub_path_fmt, @@ -2098,25 +3041,27 @@ def generate_data_file_root_dir( Returns ------- root_dir : str - The generated root directory of the data files. + The generated root directory of the data files. This will have no + trailing directory seperator. """ - if base_path is None: - if default_base_path is None: - raise ValueError( - 'The default_base_path argument must not be None, when the ' - 'base_path argument is set to None!') - base_path = default_base_path + base_path = generate_base_path( + default_base_path=default_base_path, + base_path=base_path) if sub_path_fmt is None: sub_path_fmt = default_sub_path_fmt - fmtdict = dict( - [('version', version)] + list(verqualifiers.items()) - ) - sub_path = sub_path_fmt.format(**fmtdict) + sub_path = generate_sub_path( + sub_path_fmt=sub_path_fmt, + version=version, + verqualifiers=verqualifiers) root_dir = os.path.join(base_path, sub_path) + len_sep = len(os.path.sep) + if root_dir[-len_sep:] == os.path.sep: + root_dir = root_dir[:-len_sep] + return root_dir diff --git a/skyllh/core/minimizers/iminuit.py b/skyllh/core/minimizers/iminuit.py index 4f395b7021..d37822e1f0 100644 --- a/skyllh/core/minimizers/iminuit.py +++ b/skyllh/core/minimizers/iminuit.py @@ -3,10 +3,11 @@ minimizer. """ -import iminuit - import numpy as np +from skyllh.core import ( + tool, +) from skyllh.core.config import ( HasConfig, ) @@ -116,6 +117,7 @@ class IMinuitMinimizerImpl( """The SkyLLH minimizer implementation that utilizes the iminuit minimizer. """ + @tool.requires('iminuit') def __init__( self, ftol=1e-6, @@ -194,6 +196,8 @@ def minimize( if kwargs is None: kwargs = dict() + iminuit = tool.get('iminuit') + func_provides_grads = kwargs.pop('func_provides_grads', True) if func_provides_grads: diff --git a/skyllh/datasets/i3/PublicData_10y_ps.py b/skyllh/datasets/i3/PublicData_10y_ps.py index e5f51b51ce..04cab87c28 100644 --- a/skyllh/datasets/i3/PublicData_10y_ps.py +++ b/skyllh/datasets/i3/PublicData_10y_ps.py @@ -5,6 +5,8 @@ from skyllh.core.dataset import ( DatasetCollection, + DatasetOrigin, + WGETDatasetTransfer, ) from skyllh.i3.dataset import ( I3Dataset, @@ -255,6 +257,16 @@ def create_dataset_collection( 28 January 2021 """ + # Define the origin of the dataset. + origin = DatasetOrigin( + base_path='data-releases', + sub_path='', + filename='20210126_PS-IC40-IC86_VII.zip', + host='icecube.wisc.edu', + transfer_func=WGETDatasetTransfer(protocol='http').transfer, + post_transfer_func=WGETDatasetTransfer.post_transfer_unzip, + ) + # Define the common keyword arguments for all data sets. ds_kwargs = dict( cfg=cfg, @@ -264,6 +276,7 @@ def create_dataset_collection( base_path=base_path, default_sub_path_fmt=default_sub_path_fmt, sub_path_fmt=sub_path_fmt, + origin=origin, ) grl_field_name_renaming_dict = { diff --git a/skyllh/datasets/i3/TestData.py b/skyllh/datasets/i3/TestData.py new file mode 100644 index 0000000000..d7b599a647 --- /dev/null +++ b/skyllh/datasets/i3/TestData.py @@ -0,0 +1,72 @@ +# -*- coding: utf-8 -*- +# Author: Dr. Martin Wolf + +from skyllh.core.dataset import ( + DatasetCollection, +) +from skyllh.i3.dataset import ( + I3Dataset, +) + + +def create_dataset_collection( + cfg, + base_path=None, + sub_path_fmt=None, +): + """Defines a dataset collection with a test dataset. + + Parameters + ---------- + cfg : instance of Config + The instance of Config holding the local configuration. + base_path : str | None + The base path of the data files. The actual path of a data file is + assumed to be of the structure //. + If None, use the default path ``cfg['repository']['base_path']``. + sub_path_fmt : str | None + The sub path format of the data files of the public data sample. + If None, use the default sub path format + 'testdata'. + + Returns + ------- + dsc : DatasetCollection + The dataset collection containing all the seasons as individual + I3Dataset objects. + """ + (version, verqualifiers) = (1, dict(p=0)) + + default_sub_path_fmt = 'testdata' + + dsc = DatasetCollection('Public Data 10-year point-source') + + dsc.description = r""" + This dataset collection contains a test dataset which can be used for unit + tests. + """ + + # Define the common keyword arguments for all data sets. + ds_kwargs = dict( + cfg=cfg, + livetime=None, + version=version, + verqualifiers=verqualifiers, + base_path=base_path, + default_sub_path_fmt=default_sub_path_fmt, + sub_path_fmt=sub_path_fmt, + ) + + TestData = I3Dataset( + name='TestData', + exp_pathfilenames='exp.npy', + mc_pathfilenames='mc.npy', + grl_pathfilenames='grl.npy', + **ds_kwargs, + ) + + dsc.add_datasets(( + TestData, + )) + + return dsc diff --git a/skyllh/i3/dataset.py b/skyllh/i3/dataset.py index 1cfb01016a..c1cb89889e 100644 --- a/skyllh/i3/dataset.py +++ b/skyllh/i3/dataset.py @@ -172,6 +172,24 @@ def __str__(self): return s + def create_file_list( + self, + ): + """Creates the list of files of this dataset. + The file paths are relative to the dataset's root directory. + + Returns + ------- + file_list : list of str + The list of files of this dataset. + """ + file_list = ( + super().create_file_list() + + self._grl_pathfilename_list + ) + + return file_list + def load_grl(self, efficiency_mode=None, tl=None): """Loads the good-run-list and returns a DataFieldRecordArray instance which should contain the following data fields: @@ -275,6 +293,16 @@ def load_data( A DatasetData instance holding the experimental and monte-carlo data of this data set. """ + # Load the dataset files first. This will ensure the dataset is + # downloaded if necessary. + data_ = super().load_data( + keep_fields=keep_fields, + livetime=livetime, + dtc_dict=dtc_dict, + dtc_except_fields=dtc_except_fields, + efficiency_mode=efficiency_mode, + tl=tl) + # Load the good-run-list (GRL) data if it is provided for this dataset, # and calculate the livetime based on the GRL. data_grl = None @@ -285,14 +313,8 @@ def load_data( # Load all the defined data. data = I3DatasetData( - super(I3Dataset, self).load_data( - keep_fields=keep_fields, - livetime=livetime, - dtc_dict=dtc_dict, - dtc_except_fields=dtc_except_fields, - efficiency_mode=efficiency_mode, - tl=tl), - data_grl) + data=data_, + data_grl=data_grl) return data diff --git a/tests/core/test_dataset.py b/tests/core/test_dataset.py index 218d6fd00f..9e4513e8a5 100644 --- a/tests/core/test_dataset.py +++ b/tests/core/test_dataset.py @@ -11,6 +11,10 @@ get_data_subset, Dataset, DatasetData, + DatasetOrigin, + DatasetTransferError, + RSYNCDatasetTransfer, + WGETDatasetTransfer, ) from skyllh.core.livetime import ( Livetime, @@ -18,12 +22,97 @@ from skyllh.core.storage import ( DataFieldRecordArray, ) + +from skyllh.datasets.i3 import ( + TestData, +) from skyllh.datasets.i3.PublicData_10y_ps import ( create_dataset_collection, ) -class TestDatasetFunctions(unittest.TestCase): +class TestRSYNCDatasetTransfer( + unittest.TestCase, +): + def setUp(self): + self.cfg = Config() + self.ds = TestData.create_dataset_collection( + cfg=self.cfg, + base_path=os.path.join(os.getcwd(), '.repository')).get_dataset( + 'TestData') + + # Remove the dataset if it already exists. + if self.ds.exists: + self.ds.remove_data() + + # Define the origin and transfer method of this dataset. + self.ds.origin = DatasetOrigin( + base_path='/data/user/mwolf/skyllh', + sub_path='testdata', + host='cobalt', + transfer_func=RSYNCDatasetTransfer().transfer, + ) + + def test_transfer(self): + try: + if not self.ds.make_data_available(): + raise RuntimeError( + f'The data of dataset {self.ds.name} could not be made ' + 'available!') + except DatasetTransferError: + self.skipTest( + f'The data of dataset {self.ds.name} could not be transfered.') + + # Check that there are no missing files. + missing_files = self.ds.get_missing_files() + self.assertEqual(len(missing_files), 0) + + +class TestWGETDatasetTransfer( + unittest.TestCase, +): + def setUp(self): + self.cfg = Config() + self.ds = TestData.create_dataset_collection( + cfg=self.cfg, + base_path=os.path.join(os.getcwd(), '.repository')).get_dataset( + 'TestData') + + # Remove the dataset if it already exists. + if self.ds.exists: + self.ds.remove_data() + + # Define the origin and transfer method of this dataset. + self.ds.origin = DatasetOrigin( + base_path='/data/user/mwolf/skyllh', + sub_path='testdata', + host='convey.icecube.wisc.edu', + username='icecube', + transfer_func=WGETDatasetTransfer(protocol='https').transfer, + ) + + def test_transfer(self): + password = os.environ.get('ICECUBE_PASSWORD', None) + if password is None: + self.skipTest( + f'No password for username "{self.ds.origin.username}" ' + 'provided via the environment!') + + if not self.ds.make_data_available( + password=password, + ): + raise RuntimeError( + f'The data of dataset {self.ds.name} could not be made ' + 'available!') + + # Check that there are no missing files. + missing_files = self.ds.get_missing_files() + self.assertEqual(len(missing_files), 0) + + +class TestDatasetFunctions( + unittest.TestCase, +): def setUp(self): path = os.path.abspath(os.path.dirname(__file__)) self.exp_data = DataFieldRecordArray( @@ -34,12 +123,6 @@ def setUp(self): os.path.join(path, 'testdata/livetime_testdata.npy')) self.livetime = 100 - def tearDown(self): - # self.exp_data.close() - # self.mc_data.close() - # self.livetime_datafile.close() - pass - def test_get_data_subset(self): # Whole interval. t_start = 58442.0