diff --git a/0.7.0rc20/.buildinfo b/0.7.0rc20/.buildinfo new file mode 100644 index 00000000..b981b525 --- /dev/null +++ b/0.7.0rc20/.buildinfo @@ -0,0 +1,4 @@ +# Sphinx build info version 1 +# This file hashes the configuration used when building these files. When it is not found, a full rebuild will be done. +config: 0f34b93955b29a71fada116c42b45829 +tags: 645f666f9bcd5a90fca523b33c5a78b7 diff --git a/0.7.0rc20/.doctrees/environment.pickle b/0.7.0rc20/.doctrees/environment.pickle new file mode 100644 index 00000000..5264c6fd Binary files /dev/null and b/0.7.0rc20/.doctrees/environment.pickle differ diff --git a/0.7.0rc20/.doctrees/index.doctree b/0.7.0rc20/.doctrees/index.doctree new file mode 100644 index 00000000..840f4868 Binary files /dev/null and b/0.7.0rc20/.doctrees/index.doctree differ diff --git a/0.7.0rc20/.doctrees/merger.doctree b/0.7.0rc20/.doctrees/merger.doctree new file mode 100644 index 00000000..6e87fd72 Binary files /dev/null and b/0.7.0rc20/.doctrees/merger.doctree differ diff --git a/0.7.0rc20/.doctrees/merger.harmony.doctree b/0.7.0rc20/.doctrees/merger.harmony.doctree new file mode 100644 index 00000000..b084b25d Binary files /dev/null and b/0.7.0rc20/.doctrees/merger.harmony.doctree differ diff --git a/0.7.0rc20/.doctrees/modules.doctree b/0.7.0rc20/.doctrees/modules.doctree new file mode 100644 index 00000000..331c17af Binary files /dev/null and b/0.7.0rc20/.doctrees/modules.doctree differ diff --git a/0.7.0rc20/_modules/index.html b/0.7.0rc20/_modules/index.html new file mode 100644 index 00000000..4c5917b5 --- /dev/null +++ b/0.7.0rc20/_modules/index.html @@ -0,0 +1,112 @@ + + +
+ + +
+"""A Harmony CLI wrapper around Concise"""
+
+from argparse import ArgumentParser
+import harmony
+from podaac.merger.harmony.service import ConciseService
+
+
+[docs]def main(config=None):
+ """Main Harmony CLI entrypoint"""
+
+ parser = ArgumentParser()
+ harmony.setup_cli(parser)
+
+ args = parser.parse_args()
+ if harmony.is_harmony_cli(args):
+ harmony.run_cli(parser, args, ConciseService, cfg=config)
+ else:
+ parser.error("Only --harmony CLIs are supported")
+
+
+if __name__ == "__main__":
+ main()
+
+"""A utility for downloading multiple granules simultaneously"""
+
+from copy import deepcopy
+from multiprocessing import Manager, Process
+from os import cpu_count
+from pathlib import Path
+import queue
+import re
+from urllib.parse import urlparse
+
+from harmony.logging import build_logger
+from harmony.util import download
+
+
+[docs]def multi_core_download(urls, destination_dir, access_token, cfg, process_count=None):
+ """
+ A method which automagically scales downloads to the number of CPU
+ cores. For further explaination, see documentation on "multi-track
+ drifting"
+
+ Parameters
+ ----------
+ urls : list
+ list of urls to download
+ destination_dir : str
+ output path for downloaded files
+ access_token : str
+ access token as provided in Harmony input
+ cfg : dict
+ Harmony configuration information
+ process_count : int
+ Number of worker processes to run (expected >= 1)
+
+ Returns
+ -------
+ list
+ list of downloaded files as pathlib.Path objects
+ """
+
+ if process_count is None:
+ process_count = cpu_count()
+
+ with Manager() as manager:
+ url_queue = manager.Queue(len(urls))
+ path_list = manager.list()
+
+ for url in urls:
+ url_queue.put(url)
+
+ # Spawn worker processes
+ processes = []
+ for _ in range(process_count):
+ download_process = Process(target=_download_worker, args=(url_queue, path_list, destination_dir, access_token, cfg))
+ processes.append(download_process)
+ download_process.start()
+
+ # Ensure worker processes exit successfully
+ for process in processes:
+ process.join()
+ if process.exitcode != 0:
+ raise RuntimeError(f'Download failed - exit code: {process.exitcode}')
+
+ process.close()
+
+ path_list = deepcopy(path_list) # ensure GC can cleanup multiprocessing
+
+ return [Path(path) for path in path_list]
+
+
+def _download_worker(url_queue, path_list, destination_dir, access_token, cfg):
+ """
+ A method to be executed in a separate process which processes the url_queue
+ and places paths to completed downloads into the path_list. Downloads are
+ handled by harmony.util.download
+
+ Parameters
+ ----------
+ url_queue : queue.Queue
+ URLs to process - should be filled from start and only decreases
+ path_list : list
+ paths to completed file downloads
+ destination_dir : str
+ output path for downloaded files
+ access_token : str
+ access token as provided in Harmony input
+ cfg : dict
+ Harmony configuration information
+ """
+
+ logger = build_logger(cfg)
+
+ while not url_queue.empty():
+ try:
+ url = url_queue.get_nowait()
+ except queue.Empty:
+ break
+
+ path = Path(download(url, destination_dir, logger=logger, access_token=access_token, cfg=cfg))
+ filename_match = re.match(r'.*\/(.+\..+)', urlparse(url).path)
+
+ if filename_match is not None:
+ filename = filename_match.group(1)
+ dest_path = path.parent.joinpath(filename)
+ path = path.rename(dest_path)
+ else:
+ logger.warning('Origin filename could not be assertained - %s', url)
+
+ path_list.append(str(path))
+
+"""A Harmony service wrapper around the Concise module"""
+
+from datetime import datetime, timezone
+from pathlib import Path
+from tempfile import TemporaryDirectory
+from shutil import copyfile
+from urllib.parse import urlsplit
+from uuid import uuid4
+
+from harmony.adapter import BaseHarmonyAdapter
+from harmony.util import bbox_to_geometry, stage
+from pystac import Catalog, Item
+from pystac.item import Asset
+
+from podaac.merger.merge import merge_netcdf_files
+from podaac.merger.harmony.download_worker import multi_core_download
+from podaac.merger.harmony.util import get_bbox, get_datetime, get_granule_url
+
+
+NETCDF4_MIME = 'application/x-netcdf4' # pylint: disable=invalid-name
+
+
+[docs]class ConciseService(BaseHarmonyAdapter):
+ """
+ A harmony-service-lib wrapper around the Concise module. This wrapper does
+ not support Harmony calls that do not have STAC catalogs as support for
+ this behavior is being depreciated in harmony-service-lib
+ """
+
+[docs] def invoke(self):
+ """
+ Primary entrypoint into the service wrapper. Overrides BaseHarmonyAdapter.invoke
+ """
+ if not self.catalog:
+ # Message-only support is being depreciated in Harmony so we should expect to
+ # only see requests with catalogs when invoked with a newer Harmony instance
+ # https://github.com/nasa/harmony-service-lib-py/blob/21bcfbda17caf626fb14d2ac4f8673be9726b549/harmony/adapter.py#L71
+ raise RuntimeError('Invoking CONCISE without a STAC catalog is not supported')
+
+ return (self.message, self.process_catalog(self.catalog))
+
+[docs] def process_catalog(self, catalog: Catalog):
+ """
+ Recursively process a catalog and all its children. Adapted from
+ BaseHarmonyAdapter._process_catalog_recursive to specfifically
+ support our particular use case for many-to-one
+
+ Parameters
+ ----------
+ catalog : pystac.Catalog or pystac.Collection
+ a catalog/collection to process for merging
+
+ Returns
+ -------
+ pystac.Catalog
+ A new catalog containing the results from the merge
+ """
+ result = catalog.clone()
+ result.id = str(uuid4())
+ result.clear_children()
+
+ # Get all the items from the catalog, including from child or linked catalogs
+ items = list(self.get_all_catalog_items(catalog))
+
+ # Quick return if catalog contains no items
+ if len(items) == 0:
+ return result
+
+ # -- Process metadata --
+ bbox = []
+ granule_urls = []
+ datetimes = [
+ datetime.max.replace(tzinfo=timezone.utc), # start
+ datetime.min.replace(tzinfo=timezone.utc) # end
+ ]
+
+ for item in items:
+ get_bbox(item, bbox)
+ get_granule_url(item, granule_urls)
+ get_datetime(item, datetimes)
+
+ # Items did not have a bbox; valid under spec
+ if len(bbox) == 0:
+ bbox = None
+
+ # -- Perform merging --
+ collection = self._get_item_source(items[0]).collection
+ filename = f'{collection}_merged.nc4'
+
+ with TemporaryDirectory() as temp_dir:
+ self.logger.info('Starting granule downloads')
+ input_files = multi_core_download(granule_urls, temp_dir, self.message.accessToken, self.config)
+ self.logger.info('Finished granule downloads')
+
+ output_path = Path(temp_dir).joinpath(filename).resolve()
+ merge_netcdf_files(input_files, output_path, granule_urls, logger=self.logger)
+ staged_url = self._stage(str(output_path), filename, NETCDF4_MIME)
+
+ # -- Output to STAC catalog --
+ result.clear_items()
+ properties = {
+ "start_datetime": datetimes[0].isoformat(),
+ "end_datetime": datetimes[1].isoformat()
+ }
+
+ item = Item(str(uuid4()), bbox_to_geometry(bbox), bbox, None, properties)
+ asset = Asset(staged_url, title=filename, media_type=NETCDF4_MIME, roles=['data'])
+ item.add_asset('data', asset)
+ result.add_item(item)
+
+ return result
+
+ def _stage(self, local_filename, remote_filename, mime):
+ """
+ Stages a local file to either to S3 (utilizing harmony.util.stage) or to
+ the local filesystem by performing a file copy. Staging location is
+ determined by message.stagingLocation or the --harmony-data-location
+ CLI argument override
+
+ Parameters
+ ----------
+ local_filename : string
+ A path and filename to the local file that should be staged
+ remote_filename : string
+ The basename to give to the remote file
+ mime : string
+ The mime type to apply to the staged file for use when it is served, e.g. "application/x-netcdf4"
+
+ Returns
+ -------
+ url : string
+ A URL to the staged file
+ """
+ url_components = urlsplit(self.message.stagingLocation)
+ scheme = url_components.scheme
+
+ if scheme == 'file':
+ dest_path = Path(url_components.path).joinpath(remote_filename)
+ self.logger.info('Staging to local filesystem: \'%s\'', str(dest_path))
+
+ copyfile(local_filename, dest_path)
+ return dest_path.as_uri()
+
+ return stage(local_filename, remote_filename, mime,
+ logger=self.logger,
+ location=self.message.stagingLocation,
+ cfg=self.config
+ )
+
+"""Misc utility functions"""
+
+
+[docs]def is_netcdf_asset(asset, strict):
+ """
+ Determine if an asset is netcdf4-python compatible. netcdf4-python currently supports
+ HDF5, NetCDF3, and NetCDF4. Determination is currently done through MIME if strict
+ mode is enabled. If strict mode is not enabled, determination is done through 'data'
+ role.
+
+ Parameters
+ ----------
+ asset : pystac.Asset
+ an asset to check
+
+ Returns
+ -------
+ bool
+ True if netcdf4-python compatible; False otherwise
+ """
+
+ if strict:
+ accepted_types = ['application/x-hdf5', 'application/x-netcdf', 'application/x-netcdf4']
+ return asset.media_type in accepted_types
+
+ return 'data' in asset.roles
+
+
+[docs]def get_granule_url(item, granule_urls, strict=True):
+ """
+ Processes an item to find a netcdf4-python compatible asset. If no asset is
+ found, a RuntimeException is thrown
+
+ Parameters
+ ----------
+ item : pystac.Item
+ an item to process
+ granule_urls : list
+ list to append the asset's url to
+ """
+
+ for asset in item.assets.values():
+ if is_netcdf_asset(asset, strict):
+ granule_urls.append(asset.href)
+ return
+
+ if not strict:
+ raise RuntimeError(f'A NetCDF4 asset was not found in this item: {item.id}')
+
+ get_granule_url(item, granule_urls, False) # Rerun in lax-mode
+
+
+[docs]def get_bbox(item, current_bbox):
+ """
+ Accumulate bboxes from items to generate a bbox which encompasses all items
+
+ Parameters
+ ----------
+ item : pystac.Item
+ an item to process
+ current_bbox : list
+ the bbox to accumulate all items to
+ """
+
+ if len(current_bbox) == 0:
+ if item.bbox is not None: # Spec allows for null geometry and bbox
+ current_bbox[:] = item.bbox
+ else:
+ # xmin
+ if item.bbox[0] < current_bbox[0]:
+ current_bbox[0] = item.bbox[0]
+ # ymin
+ if item.bbox[1] < current_bbox[1]:
+ current_bbox[1] = item.bbox[1]
+ # xmax
+ if item.bbox[2] > current_bbox[2]:
+ current_bbox[2] = item.bbox[2]
+ # ymax
+ if item.bbox[3] > current_bbox[3]:
+ current_bbox[3] = item.bbox[3]
+
+
+[docs]def get_datetime(item, datetimes):
+ """
+ Accumulate datetimes from items to generate a datetime pair that
+ encompasses all items
+
+ Parameters
+ ----------
+ item : pystac.Item
+ an item to process
+ datetimes : list
+ datetime pair to accumulate to; first element is start_datetime,
+ second is end_datetime
+ """
+
+ if item.datetime is None:
+ item_start_dt = item.common_metadata.start_datetime
+ item_end_dt = item.common_metadata.end_datetime
+ else:
+ item_start_dt = item.datetime
+ item_end_dt = item.datetime
+
+ if item_start_dt < datetimes[0]:
+ datetimes[0] = item_start_dt
+
+ if item_end_dt > datetimes[1]:
+ datetimes[1] = item_end_dt
+
+"""Main module containing merge implementation"""
+
+from time import perf_counter
+from logging import getLogger
+from os import cpu_count
+import netCDF4 as nc
+import numpy as np
+
+from podaac.merger.merge_worker import run_merge
+from podaac.merger.path_utils import get_group_path, resolve_dim, resolve_group
+from podaac.merger.preprocess_worker import run_preprocess
+
+
+[docs]def is_file_empty(parent_group):
+ """
+ Function to test if a all variable size in a dataset is 0
+ """
+
+ for var in parent_group.variables.values():
+ if var.size != 0:
+ return False
+ for child_group in parent_group.groups.values():
+ return is_file_empty(child_group)
+ return True
+
+
+[docs]def merge_netcdf_files(original_input_files, output_file, granule_urls, logger=getLogger(__name__), perf_stats=None, process_count=None): # pylint: disable=too-many-locals
+ """
+ Main entrypoint to merge implementation. Merges n >= 2 granules together as a single
+ granule. Named in reference to original Java implementation.
+
+ Parameters
+ ----------
+ input_files: list
+ list of string paths to NetCDF4 files to merge
+ output_file: str
+ output path for merged product
+ logger: logger
+ logger object
+ perf_stats: dict
+ dictionary used to store performance stats
+ process_count: int
+ number of processes to run (expected >= 1)
+ """
+
+ if perf_stats is None:
+ perf_stats = {}
+
+ if process_count is None:
+ process_count = cpu_count()
+ elif process_count <= 0:
+ raise RuntimeError('process_count should be > 0')
+
+ # -- initial preprocessing --
+ logger.info('Preprocessing data...')
+ start = perf_counter()
+
+ input_files = []
+
+ # only concatinate files that are not empty
+ for file in original_input_files:
+ with nc.Dataset(file, 'r') as dataset:
+ is_empty = is_file_empty(dataset)
+ if is_empty is False:
+ input_files.append(file)
+
+ preprocess = run_preprocess(input_files, process_count, granule_urls)
+ group_list = preprocess['group_list']
+ max_dims = preprocess['max_dims']
+ var_info = preprocess['var_info']
+ var_metadata = preprocess['var_metadata']
+ group_metadata = preprocess['group_metadata']
+
+ perf_stats['preprocess'] = perf_counter() - start
+ logger.info('Preprocessing completed: %f', perf_stats['preprocess'])
+
+ merged_dataset = nc.Dataset(output_file, 'w', format='NETCDF4')
+ merged_dataset.set_auto_maskandscale(False)
+ init_dataset(merged_dataset, group_list, var_info, max_dims, input_files)
+
+ # -- merge datasets --
+ logger.info('Merging datasets...')
+ start = perf_counter()
+ run_merge(merged_dataset, input_files, var_info, max_dims, process_count, logger)
+
+ perf_stats['merge'] = perf_counter() - start
+ logger.info('Merging completed: %f', perf_stats['merge'])
+
+ # -- finalize metadata --
+ logger.info('Finalizing metadata...')
+ start = perf_counter()
+
+ for group_path in group_list:
+ group = merged_dataset if group_path == '/' else merged_dataset[group_path]
+
+ group_attrs = group_metadata[group_path]
+ clean_metadata(group_attrs)
+ group.setncatts(group_attrs)
+
+ for var in group.variables.values():
+ if var.name == 'subset_files' and group == merged_dataset:
+ continue # Skip /subset_files for metadata finalization
+
+ var_path = get_group_path(group, var.name)
+ var_attrs = var_metadata[var_path]
+ clean_metadata(var_attrs)
+ var.setncatts(var_attrs)
+
+ perf_stats['metadata'] = perf_counter() - start
+ logger.info('Metadata completed: %f', perf_stats['metadata'])
+
+ merged_dataset.close()
+ logger.info('Done!')
+
+
+[docs]def clean_metadata(metadata):
+ """
+ Prepares metadata dictionary for insertion by removing inconsistent entries
+ and performing escaping of attribute names
+
+ Parameters
+ ----------
+ metadata : dict
+ dictionary of attribute names and their associated data
+ """
+
+ for key in list(metadata):
+ val = metadata[key]
+
+ # delete inconsistent items
+ if not isinstance(val, np.ndarray) and isinstance(val, bool) and not val:
+ del metadata[key]
+ elif key == '_FillValue':
+ del metadata[key]
+
+ # escape '/' to '_'
+ # https://www.unidata.ucar.edu/mailing_lists/archives/netcdfgroup/2012/msg00098.html
+ if '/' in key:
+ new_key = key.replace('/', '_')
+ metadata[new_key] = val
+ del metadata[key]
+
+
+[docs]def init_dataset(merged_dataset, groups, var_info, max_dims, input_files):
+ """
+ Initialize the dataset utilizing data gathered from preprocessing
+
+ Parameters
+ ----------
+ merged_dataset : nc.Dataset
+ the dataset to be initialized
+ groups : list
+ list of group names
+ var_info : dict
+ dictionary of variable names and VariableInfo objects
+ max_dims : dict
+ dictionary of dimension names (including path) and their sizes
+ input_files : list
+ list of file paths to be merged
+ """
+
+ # Create groups
+ for group in groups:
+ if group == '/':
+ continue # Skip root
+
+ merged_dataset.createGroup(group)
+
+ # Create dims
+ merged_dataset.createDimension('subset_index', len(input_files))
+ for dim in max_dims.items():
+ group = resolve_group(merged_dataset, dim[0])
+ group[0].createDimension(group[1], dim[1])
+
+ # Generate filelist
+ subset_files = merged_dataset.createVariable('subset_files', np.str_, ['subset_index'])
+ subset_files.long_name = 'List of subsetted files used to create this merge product.'
+ for i, file in enumerate(input_files):
+ subset_files[i] = file.name
+
+ # Recreate variables
+ for var in var_info.items():
+ dims = ['subset_index'] + list(var[1].dim_order)
+ group = resolve_group(merged_dataset, var[0])
+
+ # Holdover from old merging code - not sure if needed, but kept for legacy
+ chunk_sizes = [1] + [resolve_dim(max_dims, group[0].path, key) for key in var[1].dim_order]
+
+ group[0].createVariable(
+ varname=var[1].name,
+ datatype=var[1].datatype,
+ dimensions=dims,
+ chunksizes=chunk_sizes,
+ fill_value=var[1].fill_value,
+ zlib=True
+ )
+
+"""A simple CLI wrapper around the main merge function"""
+
+from argparse import ArgumentParser
+import logging
+from pathlib import Path
+
+from podaac.merger.merge import merge_netcdf_files
+
+
+[docs]def main():
+ """Main CLI entrypoint"""
+
+ parser = ArgumentParser(
+ prog='merge',
+ description='Simple CLI wrapper around the granule merge module.')
+ parser.add_argument(
+ 'data_dir',
+ help='The directory containing the files to be merged.')
+ parser.add_argument(
+ 'output_path',
+ help='The output filename for the merged output.')
+ parser.add_argument(
+ '-v', '--verbose',
+ help='Enable verbose output to stdout; useful for debugging',
+ action='store_true'
+ )
+ parser.add_argument(
+ '-c', '--cores',
+ help='Override the number of cores to be utilized during multitreaded/multiprocess operations. Defaults to cpu_count',
+ type=int,
+ default=None
+ )
+
+ args = parser.parse_args()
+
+ if args.verbose:
+ logging.basicConfig(level=logging.DEBUG)
+
+ input_files = list(Path(args.data_dir).resolve().iterdir())
+ granule_urls = []
+ merge_netcdf_files(input_files, args.output_path, granule_urls, process_count=args.cores)
+
+
+if __name__ == '__main__':
+ main()
+
+"""Preprocessing methods and the utilities to automagically run them in single-thread/multiprocess modes"""
+
+import math
+import multiprocessing
+from multiprocessing.shared_memory import SharedMemory
+import queue
+import time
+import os
+import shutil
+import netCDF4 as nc
+import numpy as np
+
+from podaac.merger.path_utils import resolve_dim, resolve_group
+
+
+
+
+
+[docs]def max_var_memory(file_list, var_info, max_dims):
+ """
+ function to get the maximum shared memory that will be used for variables
+
+ Parameters
+ ----------
+ file_list : list
+ List of file paths to be processed
+ var_info : dict
+ Dictionary of variable paths and associated VariableInfo
+ """
+
+ max_var_mem = 0
+ for file in file_list:
+ with nc.Dataset(file, 'r') as origin_dataset:
+
+ for var_path, var_meta in var_info.items():
+ ds_group, var_name = resolve_group(origin_dataset, var_path)
+ ds_var = ds_group.variables.get(var_name)
+
+ if ds_var is None:
+ target_shape = tuple(max_dims[f'/{dim}'] for dim in var_meta.dim_order)
+ var_size = math.prod(target_shape) * var_meta.datatype.itemsize
+ max_var_mem = max(var_size, max_var_mem)
+ else:
+ var_size = math.prod(ds_var.shape) * var_meta.datatype.itemsize
+ max_var_mem = max(var_size, max_var_mem)
+
+ return max_var_mem
+
+
+[docs]def run_merge(merged_dataset, file_list, var_info, max_dims, process_count, logger):
+ """
+ Automagically run merging in an optimized mode determined by the environment
+
+ Parameters
+ ----------
+ merged_dataset : nc.Dataset
+ Destination dataset of the merge operation
+ file_list : list
+ List of file paths to be processed
+ var_info : dict
+ Dictionary of variable paths and associated VariableInfo
+ max_dims : dict
+ Dictionary of dimension paths and maximum dimensions found during preprocessing
+ process_count : int
+ Number of worker processes to run (expected >= 1)
+ """
+
+ if process_count == 1:
+ _run_single_core(merged_dataset, file_list, var_info, max_dims, logger)
+ else:
+ # Merging is bottlenecked at the write process which is single threaded
+ # so spinning up more than 2 processes for read/write won't scale the
+ # optimization
+
+ max_var_mem = max_var_memory(file_list, var_info, max_dims)
+ max_memory_size = round(shared_memory_size() * .95)
+
+ if max_var_mem < max_memory_size:
+ _run_multi_core(merged_dataset, file_list, var_info, max_dims, 2, logger)
+ else:
+ _run_single_core(merged_dataset, file_list, var_info, max_dims, logger)
+
+
+def _run_single_core(merged_dataset, file_list, var_info, max_dims, logger):
+ """
+ Run the variable merge in the current thread/single-core mode
+
+ Parameters
+ ----------
+ merged_dataset : nc.Dataset
+ Destination dataset of the merge operation
+ file_list : list
+ List of file paths to be processed
+ var_info : dict
+ Dictionary of variable paths and associated VariableInfo
+ max_dims : dict
+ Dictionary of dimension paths and maximum dimensions found during preprocessing
+ """
+
+ logger.info("Running single core ......")
+ for i, file in enumerate(file_list):
+ with nc.Dataset(file, 'r') as origin_dataset:
+ origin_dataset.set_auto_maskandscale(False)
+
+ for var_path, var_meta in var_info.items():
+ ds_group, var_name = resolve_group(origin_dataset, var_path)
+ merged_group = resolve_group(merged_dataset, var_path)
+ ds_var = ds_group.variables.get(var_name)
+
+ merged_var = merged_group[0].variables[var_name]
+
+ if ds_var is None:
+ fill_value = var_meta.fill_value
+ target_shape = tuple(max_dims[f'/{dim}'] for dim in var_meta.dim_order)
+ merged_var[i] = np.full(target_shape, fill_value)
+ continue
+
+ resized = resize_var(ds_var, var_meta, max_dims)
+ merged_var[i] = resized
+
+
+def _run_multi_core(merged_dataset, file_list, var_info, max_dims, process_count, logger): # pylint: disable=too-many-locals
+ """
+ Run the variable merge in multi-core mode. This method creates (process_count - 1)
+ read processes which read data from an origin granule, resize it, then queue it
+ for the write process to write to disk. The write process is run in the current
+ thread
+
+ # of write processes (1) + # of read processes (process_count - 1) = process_count
+
+ Parameters
+ ----------
+ merged_dataset : nc.Dataset
+ Destination dataset of the merge operation
+ file_list : list
+ List of file paths to be processed
+ var_info : dict
+ Dictionary of variable paths and associated VariableInfo
+ max_dims : dict
+ Dictionary of dimension paths and maximum dimensions found during preprocessing
+ process_count : int
+ Number of worker processes to run (expected >= 2)
+ """
+
+ logger.info("Running multicore ......")
+ total_variables = len(file_list) * len(var_info)
+ logger.info(f"total variables {total_variables}")
+
+ # Ensure SharedMemory doesn't get cleaned up before being processed
+ context = multiprocessing.get_context('forkserver')
+
+ with context.Manager() as manager:
+ in_queue = manager.Queue(len(file_list))
+ out_queue = manager.Queue((process_count - 1) * len(var_info)) # Store (process_count - 1) granules in buffer
+ memory_limit = manager.Value('i', 0)
+ lock = manager.Lock()
+
+ logger.info(file_list)
+ for i, file in enumerate(file_list):
+ in_queue.put((i, file))
+
+ processes = []
+
+ logger.info("creating read processes")
+ for _ in range(process_count - 1):
+ process = context.Process(target=_run_worker, args=(in_queue, out_queue, max_dims, var_info, memory_limit, lock))
+ processes.append(process)
+ process.start()
+
+ processed_variables = 0
+
+ logger.info("Start processing variables in main process")
+ while processed_variables < total_variables:
+ try:
+ i, var_path, shape, memory_name = out_queue.get_nowait()
+ except queue.Empty:
+ _check_exit(processes)
+ continue
+
+ merged_var = merged_dataset[var_path]
+ var_meta = var_info[var_path]
+ shared_memory = SharedMemory(name=memory_name, create=False)
+ resized_arr = np.ndarray(shape, var_meta.datatype, shared_memory.buf)
+
+ merged_var[i] = resized_arr # The write operation itself
+ shared_memory.unlink()
+ shared_memory.close()
+ with lock:
+ memory_limit.value = memory_limit.value - resized_arr.nbytes
+ processed_variables = processed_variables + 1
+
+ for process in processes:
+ # Ensure that child processes properly exit before manager context
+ # gets GCed. Solves EOFError
+ process.join()
+
+
+def _run_worker(in_queue, out_queue, max_dims, var_info, memory_limit, lock):
+ """
+ A method to be executed in a separate process which reads variables from a
+ granule, performs resizing, and queues the processed data up for the writer
+ process.
+
+ Parameters
+ ----------
+ in_queue : Queue
+ Input queue of tuples of subset indexes and granule file paths respectively
+ out_queue : Queue
+ Output queue of tuples of subset indexes, variable path, variable shape, and shared memory name
+ max_dims : dict
+ Dictionary of dimension paths and maximum dimensions found during preprocessing
+ var_info : dict
+ Dictionary of variable paths and associated VariableInfo
+ """
+
+ # want to use max 95% of the memory size of disk
+ max_memory_size = round(shared_memory_size() * .95)
+
+ while not in_queue.empty():
+
+ try:
+ i, file = in_queue.get_nowait()
+ except queue.Empty:
+ break
+
+ with nc.Dataset(file, 'r') as origin_dataset:
+ origin_dataset.set_auto_maskandscale(False)
+
+ for var_path, var_meta in var_info.items():
+
+ ds_group, var_name = resolve_group(origin_dataset, var_path)
+ ds_var = ds_group.variables.get(var_name)
+
+ if ds_var is None:
+ fill_value = var_meta.fill_value
+ target_shape = tuple(max_dims[f'/{dim}'] for dim in var_meta.dim_order)
+ resized_arr = np.full(target_shape, fill_value)
+ else:
+ resized_arr = resize_var(ds_var, var_meta, max_dims)
+
+ if resized_arr.nbytes > max_memory_size:
+ raise RuntimeError(f'Merging failed - MAX MEMORY REACHED: {resized_arr.nbytes}')
+
+ # Limit to how much memory we allocate to max memory size
+ while (memory_limit.value + resized_arr.nbytes) > max_memory_size > resized_arr.nbytes:
+ time.sleep(.5)
+
+ # Copy resized array to shared memory
+ shared_mem = SharedMemory(create=True, size=resized_arr.nbytes)
+ shared_arr = np.ndarray(resized_arr.shape, resized_arr.dtype, buffer=shared_mem.buf)
+ np.copyto(shared_arr, resized_arr)
+ with lock:
+ memory_limit.value = memory_limit.value + resized_arr.nbytes
+
+ out_queue.put((i, var_path, shared_arr.shape, shared_mem.name))
+ shared_mem.close()
+
+
+def _check_exit(processes):
+ """
+ Ensure that all processes have exited without error by checking their exitcode
+ if they're no longer running. Processes that have exited properly are removed
+ from the list
+
+ Parameters
+ ----------
+ processes : list
+ List of processes to check
+ """
+
+ for process in processes.copy():
+ if not process.is_alive():
+ if process.exitcode == 0:
+ processes.remove(process)
+ else:
+ raise RuntimeError(f'Merging failed - exit code: {process.exitcode}')
+
+
+[docs]def resize_var(var, var_info, max_dims):
+ """
+ Resizes a variable's data to the maximum dimensions found in preprocessing.
+ This method will never downscale a variable and only performs bottom and
+ left padding as utilized in the original Java implementation
+
+ Parameters
+ ----------
+ var : nc.Variable
+ variable to be resized
+ group_path : str
+ group path to this variable
+ max_dims : dict
+ dictionary of maximum dimensions found during preprocessing
+
+ Returns
+ -------
+ np.ndarray
+ An ndarray containing the resized data
+ """
+ # special case for 0d variables
+ if var.ndim == 0:
+ return var[:]
+
+ # generate ordered array of new widths
+ dims = [resolve_dim(max_dims, var_info.group_path, dim.name) - dim.size for dim in var.get_dims()]
+ widths = [[0, dim] for dim in dims]
+
+ # Legacy merger doesn't explicitly define this behavior, but its resizer
+ # fills its resized arrays with 0s upon initialization. Sources:
+ # https://github.com/Unidata/netcdf-java/blob/87f37eb82b6f862f71e0d5767470500b27af5d1e/cdm-core/src/main/java/ucar/ma2/Array.java#L52
+ fill_value = 0 if var_info.fill_value is None else var_info.fill_value
+
+ resized = np.pad(var, widths, mode='constant', constant_values=fill_value)
+ return resized
+
+"""
+Utilities used throughout the merging implementation to simplify group path resolution
+and generation
+"""
+
+
+[docs]def get_group_path(group, resource):
+ """
+ Generates a Unix-like path from a group and resource to be accessed
+
+ Parameters
+ ----------
+ group: nc.Group
+ NetCDF4 group that contains the resource
+ resource: str
+ name of the resource being accessed
+
+ Returns
+ -------
+ str
+ Unix-like path to the resource
+ """
+
+ if group.path == '/':
+ return '/' + resource
+
+ return group.path + '/' + resource
+
+
+[docs]def resolve_group(dataset, path):
+ """
+ Resolves a group path into two components: the group and the resource's name
+
+ Parameters
+ ----------
+ dataset: nc.Dataset
+ NetCDF4 Dataset used as the root for all groups
+ path: str
+ the path to the resource
+
+ Returns
+ -------
+ tuple
+ a tuple of the resolved group and the final path component str respectively
+ """
+
+ components = path.rsplit('/', 1)
+ group = dataset
+
+ if len(components[0]) > 0:
+ group = dataset[components[0]]
+
+ return (group, components[1])
+
+
+[docs]def resolve_dim(dims, group_path, dim_name):
+ """
+ Attempt to resolve dim name starting from top-most group going down to the root group
+
+ Parameters
+ ----------
+ dims: dict
+ Dictionary of dimensions to be traversed
+ group_path: str
+ the group path from which to start resolving the specific dimension
+ dim_name: str
+ the name of the dim to be resolved
+
+ Returns
+ -------
+ int
+ the size of the dimension requested
+ """
+ group_tree = group_path.split('/')
+
+ for i in range(len(group_tree), 0, -1):
+ path = '/'.join(group_tree[:i]) + '/' + dim_name
+
+ if path in dims:
+ return dims[path]
+
+ # Attempt to find dim in root node
+ return dims[dim_name]
+
+"""Preprocessing methods and the utilities to automagically run them in single-thread/multiprocess modes"""
+
+import json
+import queue
+from copy import deepcopy
+from datetime import datetime, timezone
+from multiprocessing import Manager, Process
+
+import importlib_metadata
+import netCDF4 as nc
+import numpy as np
+
+from podaac.merger.path_utils import get_group_path
+from podaac.merger.variable_info import VariableInfo
+
+
+[docs]def run_preprocess(file_list, process_count, granule_urls):
+ """
+ Automagically run preprocessing in an optimized mode determined by the environment
+
+ Parameters
+ ----------
+ file_list : list
+ List of file paths to be processed
+ process_count : int
+ Number of worker processes to run (expected >= 1)
+ """
+
+ if process_count == 1:
+ return _run_single_core(file_list, granule_urls)
+
+ return _run_multi_core(file_list, process_count, granule_urls)
+
+
+[docs]def merge_max_dims(merged_max_dims, subset_max_dims):
+ """
+ Perform aggregation of max_dims. Intended for use in multithreaded
+ mode only
+
+ Parameters
+ ----------
+ merged_max_dims : dict
+ Dictionary of the aggregated max_dims
+ subset_max_dims : dict
+ Dictionary of max_dims from one of the worker processes
+ """
+
+ for dim_name, subset_dim_size in subset_max_dims.items():
+ if dim_name not in merged_max_dims or subset_dim_size > merged_max_dims[dim_name]:
+ merged_max_dims[dim_name] = subset_dim_size
+
+
+[docs]def merge_metadata(merged_metadata, subset_metadata):
+ """
+ Perform aggregation of metadata. Intended for use in multithreaded
+ mode only
+
+ Parameters
+ ----------
+ merged_metadata : dict
+ Dictionary of the aggregated metadata
+ subset_max_dims : dict
+ Dictionary of metadata from one of the worker processes
+ """
+
+ for var_path, subset_attrs in subset_metadata.items():
+ if var_path not in merged_metadata:
+ merged_metadata[var_path] = {}
+
+ merged_attrs = merged_metadata[var_path]
+ for attr_name, subset_attr in subset_attrs.items():
+ if attr_name not in merged_attrs:
+ merged_attrs[attr_name] = subset_attr
+ elif not attr_eq(merged_attrs[attr_name], subset_attr):
+ merged_attrs[attr_name] = False # mark as inconsistent
+
+
+[docs]def construct_history(input_files, granule_urls):
+ """
+ Construct history JSON entry for this concatenation operation
+ https://wiki.earthdata.nasa.gov/display/TRT/In-File+Provenance+Metadata+-+TRT-42
+
+ Parameters
+ ----------
+ input_files : list
+ List of input files
+
+ Returns
+ -------
+ dict
+ History JSON constructed for this concat operation
+ """
+
+ history_json = {
+ "date_time": datetime.now(tz=timezone.utc).isoformat(),
+ "derived_from": granule_urls,
+ "program": 'concise',
+ "version": importlib_metadata.distribution('podaac-concise').version,
+ "parameters": f'input_files={input_files}',
+ "program_ref": "https://cmr.earthdata.nasa.gov:443/search/concepts/S2153799015-POCLOUD",
+ "$schema": "https://harmony.earthdata.nasa.gov/schemas/history/0.1.0/history-v0.1.0.json"
+ }
+ return history_json
+
+
+[docs]def retrieve_history(dataset):
+ """
+ Retrieve history_json field from NetCDF dataset, if it exists
+
+ Parameters
+ ----------
+ dataset : netCDF4.Dataset
+ NetCDF Dataset representing a single granule
+
+ Returns
+ -------
+ dict
+ history_json field
+ """
+ if 'history_json' not in dataset.ncattrs():
+ return []
+ history_json = dataset.getncattr('history_json')
+ return json.loads(history_json)
+
+
+def _run_single_core(file_list, granule_urls):
+ """
+ Run the granule preprocessing in the current thread/single-core mode
+
+ Parameters
+ ----------
+ file_list : list
+ List of file paths to be processed
+
+ Returns
+ -------
+ dict
+ A dictionary containing the output from the preprocessing process
+ """
+ group_list = []
+ var_info = {}
+ max_dims = {}
+ var_metadata = {}
+ group_metadata = {}
+ history_json = []
+
+ for file in file_list:
+ with nc.Dataset(file, 'r') as dataset:
+ dataset.set_auto_maskandscale(False)
+ process_groups(dataset, group_list, max_dims, group_metadata, var_metadata, var_info)
+ history_json.extend(retrieve_history(dataset))
+
+ group_list.sort() # Ensure insertion order doesn't matter between granules
+
+ history_json.append(construct_history(file_list, granule_urls))
+ group_metadata[group_list[0]]['history_json'] = json.dumps(
+ history_json,
+ default=str
+ )
+
+ return {
+ 'group_list': group_list,
+ 'max_dims': max_dims,
+ 'var_info': var_info,
+ 'var_metadata': var_metadata,
+ 'group_metadata': group_metadata
+ }
+
+
+def _run_multi_core(file_list, process_count, granule_urls):
+ """
+ Run the granule preprocessing in multi-core mode. This method spins up
+ the number of processes defined by process_count which process granules
+ in the input queue until empty. When all processes are done, the method
+ merges all the preprocessing results together and returns the final
+ results
+
+ Parameters
+ ----------
+ file_list : list
+ List of file paths to be processed
+ process_count : int
+ Number of worker processes to run (expected >= 2)
+
+ Returns
+ -------
+ dict
+ A dictionary containing the output from the preprocessing process
+ """
+ with Manager() as manager:
+ in_queue = manager.Queue(len(file_list))
+ results = manager.list()
+
+ for file in file_list:
+ in_queue.put(file)
+
+ processes = []
+ for _ in range(process_count):
+ process = Process(target=_run_worker, args=(in_queue, results))
+ processes.append(process)
+ process.start()
+
+ # Explicitly check for all processes to successfully exit
+ # before attempting to merge results
+ for process in processes:
+ process.join()
+
+ if process.exitcode != 0:
+ raise RuntimeError(f'Preprocessing failed - exit code: {process.exitcode}')
+
+ results = deepcopy(results) # ensure GC can cleanup multiprocessing
+
+ # -- Merge final results --
+ group_list = None
+ var_info = None
+ max_dims = {}
+ var_metadata = {}
+ group_metadata = {}
+ history_json = []
+
+ for result in results:
+ # The following data should be consistent between granules and
+ # require no special treatment to merge. Sanity checks added
+ # just for verification.
+ if group_list is None:
+ group_list = result['group_list']
+ elif group_list != result['group_list']:
+ raise RuntimeError('Groups are inconsistent between granules')
+
+ if var_info is None:
+ var_info = result['var_info']
+ elif var_info != result['var_info']:
+ if set(var_info.keys()).difference(result['var_info']):
+ # If not all variables match, only compare variables that intersect
+ intersecting_vars = set(var_info).intersection(result['var_info'])
+ if list(
+ map(var_info.get, intersecting_vars)
+ ) != list(map(result['var_info'].get, intersecting_vars)):
+ raise RuntimeError('Variable schemas are inconsistent between granules')
+ var_info.update(result['var_info'])
+
+ # The following data requires accumulation methods
+ merge_max_dims(max_dims, result['max_dims'])
+ merge_metadata(var_metadata, result['var_metadata'])
+ merge_metadata(group_metadata, result['group_metadata'])
+
+ # Merge history_json entries from input files
+ history_json.extend(result['history_json'])
+
+ history_json.append(construct_history(file_list, granule_urls))
+ group_metadata[group_list[0]]['history_json'] = json.dumps(
+ history_json,
+ default=str
+ )
+
+ return {
+ 'group_list': group_list,
+ 'max_dims': max_dims,
+ 'var_info': var_info,
+ 'var_metadata': var_metadata,
+ 'group_metadata': group_metadata
+ }
+
+
+def _run_worker(in_queue, results):
+ """
+ A method to be executed in a separate process which runs preprocessing on granules
+ from the input queue and stores the results internally. When the queue is empty
+ (processing is complete), the local results are transfered to the external results
+ array to be merged by the main process. If the process never processed any granules
+ which is possible if the input queue is underfilled, the process just exits without
+ appending to the array
+
+ Parameters
+ ----------
+ in_queue : Queue
+ Input queue of tuples of subset indexes and granule file paths respectively
+ results : list
+ An array which stores the results of preprocessing from all workers
+ """
+ empty = True
+ group_list = []
+ max_dims = {}
+ var_info = {}
+ var_metadata = {}
+ group_metadata = {}
+ history_json = []
+
+ while not in_queue.empty():
+ try:
+ file = in_queue.get_nowait()
+ except queue.Empty:
+ break
+
+ empty = False
+ with nc.Dataset(file, 'r') as dataset:
+ dataset.set_auto_maskandscale(False)
+ process_groups(dataset, group_list, max_dims, group_metadata, var_metadata, var_info)
+ history_json.extend(retrieve_history(dataset))
+
+ group_list.sort() # Ensure insertion order doesn't matter between granules
+
+ if not empty:
+ results.append({
+ 'group_list': group_list,
+ 'max_dims': max_dims,
+ 'var_info': var_info,
+ 'var_metadata': var_metadata,
+ 'group_metadata': group_metadata,
+ 'history_json': history_json
+ })
+
+
+[docs]def process_groups(parent_group, group_list, max_dims, group_metadata, var_metadata, var_info):
+ """
+ Perform preprocessing of a group and recursively process each child group
+
+ Parameters
+ ----------
+ parent_group: nc.Dataset, nc.Group
+ current group to be processed
+ group_list: list
+ list of group paths
+ max_dims: dict
+ dictionary which stores dimension paths and associated dimension sizes
+ group_metadata: dict
+ dictionary which stores group paths and their associated attributes
+ var_metadata: dict
+ dictionary of dictionaries which stores variable paths and their associated attributes
+ var_info: dict
+ dictionary of variable paths and associated VariableInfo data
+ """
+
+ if parent_group.path not in group_metadata:
+ group_metadata[parent_group.path] = {}
+
+ if parent_group.path not in group_list:
+ group_list.append(parent_group.path)
+
+ get_max_dims(parent_group, max_dims)
+ get_metadata(parent_group, group_metadata[parent_group.path])
+ get_variable_data(parent_group, var_info, var_metadata)
+
+ for child_group in parent_group.groups.values():
+ process_groups(child_group, group_list, max_dims, group_metadata, var_metadata, var_info)
+
+
+[docs]def get_max_dims(group, max_dims):
+ """
+ Aggregates dimensions from each group and creates a dictionary
+ of the largest dimension sizes for each group
+
+ Parameters
+ ----------
+ group: nc.Dataset, nc.Group
+ group to process dimensions from
+ max_dims: dict
+ dictionary which stores dimension paths and associated dimension sizes
+ """
+
+ for dim in group.dimensions.values():
+ dim_path = get_group_path(group, dim.name)
+
+ if dim_path not in max_dims or max_dims[dim_path] < dim.size:
+ max_dims[dim_path] = dim.size
+
+
+[docs]def get_metadata(group, metadata):
+ """
+ Aggregates metadata from various NetCDF4 objects into a dictionary
+
+ Parameters
+ ----------
+ group : nc.Dataset, nc.Group, nc.Variable
+ the NetCDF4 object to aggregate metadata from
+ metadata : dict
+ a dictionary containing the object name and associated metadata
+ """
+
+ for attr_name in group.ncattrs():
+ attr = group.getncattr(attr_name)
+
+ if attr_name not in metadata:
+ metadata[attr_name] = attr
+ elif not attr_eq(metadata[attr_name], attr):
+ metadata[attr_name] = False # mark as inconsistent
+
+
+[docs]def attr_eq(attr_1, attr_2):
+ """
+ Helper function to check if one attribute value is equal to another
+ (no, a simple == was not working)
+
+ Parameters
+ ----------
+ attr_1 : obj
+ An attribute value
+ attr_2 : obj
+ An attribute value
+ """
+
+ if isinstance(attr_1, np.ndarray) or isinstance(attr_2, np.ndarray):
+ if not np.array_equal(attr_1, attr_2):
+ return False
+ elif type(attr_1) != type(attr_2) or attr_1 != attr_2: # pylint: disable=unidiomatic-typecheck # noqa: E721
+ return False
+
+ return True
+
+
+[docs]def get_variable_data(group, var_info, var_metadata):
+ """
+ Aggregate variable metadata and attributes. Primarily utilized in process_groups
+
+ Parameters
+ ----------
+ group : nc.Dataset, nc.Group
+ group associated with this variable
+ var_info : dict
+ dictionary of variable paths and associated VariableInfo
+ var_metadata : dict
+ dictionary of variable paths and associated attribute dictionary
+ """
+
+ for var in group.variables.values():
+
+ # Generate VariableInfo map
+ info = VariableInfo(var)
+ var_path = get_group_path(group, var.name)
+
+ if var_path not in var_info:
+ var_info[var_path] = info
+ elif var_info[var_path] != info:
+ # Check to ensure datasets are consistent
+ raise RuntimeError('Inconsistent variable schemas')
+
+ # Generate variable attribute map
+ if var_path not in var_metadata:
+ var_metadata[var_path] = {}
+
+ get_metadata(var, var_metadata[var_path])
+
+"""Wrapper used to manage variable metadata"""
+import numpy as np
+
+
+[docs]class VariableInfo:
+ """
+ Lightweight wrapper class utilized in granule preprocessing to simply comparisons between
+ different variables from different granule sets
+
+ Attributes
+ ----------
+ name: str
+ name of the variable
+ dim_order: list
+ list of dimension names in order
+ datatype: numpy.dtype
+ the numpy datatype for the data held in the variable
+ group_path: str
+ Unix-like group path to the variable
+ fill_value: object
+ Value used to fill missing/empty values in variable's data
+ """
+
+ def __init__(self, var):
+ self.name = var.name
+ self.dim_order = var.dimensions
+ self.datatype = var.datatype
+ self.group_path = var.group().path
+
+ if hasattr(var, '_FillValue'):
+ self.fill_value = var._FillValue
+ elif hasattr(var, 'missing_value'):
+ self.fill_value = var.missing_value
+ else:
+ self.fill_value = None
+
+ self.init = True # Finalize object values
+
+ def __setattr__(self, name, value):
+ if hasattr(self, 'init') and self.init:
+ raise AttributeError('VariableInfo is immutable')
+
+ self.__dict__[name] = value
+
+ def __str__(self):
+ return f"name:{self.name} dim_order:{self.dim_order} fill_value:{self.fill_value} datatype:{self.datatype} group_path:{self.group_path}"
+
+ def __eq__(self, other):
+ return (
+ self.dim_order == other.dim_order and
+ self.datatype == other.datatype and
+ self.name == other.name and
+ (
+ self.fill_value == other.fill_value or
+ np.array_equal(self.fill_value, other.fill_value, equal_nan=True)
+ ) and
+ self.group_path == other.group_path
+ )
+