diff --git a/docs/celestial.rst b/docs/celestial.rst index 20c6f6528..8191df2e4 100644 --- a/docs/celestial.rst +++ b/docs/celestial.rst @@ -80,8 +80,8 @@ of the package, e.g.:: >>> from reproject import reproject_exact All functions share a common set of arguments, as well as including -algorithm-dependent arguments. In this section, we take a look at the common -arguments. +algorithm-dependent arguments. In this section, we take a look at some of the +common arguments. The reprojection functions take two main arguments. The first argument is the image to reproject, together with WCS information about the image. This can be @@ -109,19 +109,6 @@ also be specified, and be given the shape of the output image using the Numpy :class:`~astropy.io.fits.Header`, does not contain information about image size). -For the interpolation and adaptive algorithms, an optional third argument, -``roundtrip_coords`` is accepted. By default, after coordinates are transformed -from the output plane to the input plane, the input-plane coordinates are -transformed back to the output plane to ensure that the transformation is -defined in both directions. This doubles the amount of -coordinate-transformation work to be done. In speed-critical situations, where -it is known that the coordinate transformation is defined everywhere, this -extra work can be disabled by setting ``roundtrip_coords=False``. (Note that -this is not a question of whether each output pixel maps to an existing *pixel* -in the input image and vice-versa, but whether it maps to a valid *coordinate* -in the coordinate system of the input image---regardless of whether that -coordinate falls within the bounds of the input image.) - As an example, we start off by opening a FITS file using Astropy:: >>> from astropy.io import fits @@ -403,89 +390,3 @@ details). .. warning:: The :func:`~reproject.reproject_exact` is currently known to have precision issues for images with resolutions <0.05". For now it is therefore best to avoid using it with such images. - -Very large datasets -=================== - -If you have a very large dataset to reproject, i.e., any normal IFU or radio -spectral cube with many individual spectral channels - you may not be able to -hold two copies of the dataset in memory. In this case, you can specify an -output memory mapped array to store the data. For now this only works with the -interpolation reprojection methods. - -.. doctest-skip:: - - >>> outhdr = fits.Header.fromtextfile('cube_header_gal.hdr') - >>> shape = (outhdr['NAXIS3'], outhdr['NAXIS2'], outhdr['NAXIS1']) - >>> outarray = np.memmap(filename='output.np', mode='w+', shape=shape, dtype='float32') - >>> hdu = fits.open('cube_file.fits') - >>> rslt = reproject.reproject_interp(hdu, outhdr, output_array=outarray, - ... return_footprint=False, - ... independent_celestial_slices=True) - >>> newhdu = fits.PrimaryHDU(data=outarray, header=outhdr) - >>> newhdu.writeto('new_cube_file.fits') - -Or if you're dealing with FITS files, you can skip the numpy memmap step and use `FITS large file creation -`_. - -.. doctest-skip:: - - >>> outhdr = fits.Header.fromtextfile('cube_header_gal.hdr') - >>> outhdr.tofile('new_cube.fits') - >>> shape = tuple(outhdr['NAXIS{0}'.format(ii)] for ii in range(1, outhdr['NAXIS']+1)) - >>> with open('new_cube.fits', 'rb+') as fobj: - >>> fobj.seek(len(outhdr.tostring()) + (np.product(shape) * np.abs(outhdr['BITPIX']//8)) - 1) - >>> fobj.write(b'\0') - >>> outhdu = fits.open('new_cube.fits', mode='update') - >>> rslt = reproject.reproject_interp(hdu, outhdr, output_array=outhdu[0].data, - ... return_footprint=False, - ... independent_celestial_slices=True) - >>> outhdu.flush() - -Multiple images with the same coordinates -========================================= - -If you have multiple images with the exact same coordinate system (e.g. a raw -image and a corresponding processed image) and want to reproject both to the -same output frame, it is faster to compute the coordinate mapping between input -and output pixels only once and re-use this mapping for each reprojection. This -is supported by passing multiple input images as an additional dimension in the -input data array. When the input array contains more dimensions than the input -WCS describes, the extra leading dimensions are taken to represent separate -images with the same coordinates, and the reprojection loops over those -dimensions after computing the pixel mapping. For example: - -.. doctest-skip:: - >>> raw_image, header_in = fits.getdata('raw_image.fits', header=True) - >>> bg_subtracted_image = fits.getdata('background_subtracted_image.fits') - >>> header_out = # Prepare your desired output projection here - >>> # Combine the two images into one array - >>> image_stack = np.stack((raw_image, bg_subtracted_image)) - >>> # We provide a header that describes 2 WCS dimensions, but our input - >>> # array shape is (2, ny, nx)---the 'extra' first dimension represents - >>> # separate images sharing the same coordinates. - >>> reprojected, footprint = reproject.reproject_adaptive( - ... (image_stack, header_in), header_out) - >>> # The shape of `reprojected` is (2, ny', nx') - >>> reprojected_raw, reprojected_bg_subtracted = reprojected[0], reprojected[1] - -For :func:`~reproject.reproject_interp` and -:func:`~reproject.reproject_adaptive`, this is approximately twice as fast as -reprojecting the two images separately. For :func:`~reproject.reproject_exact` -the savings are much less significant, as producing the coordinate mapping is a -much smaller portion of the total runtime for this algorithm. - -When the output coordinates are provided as a WCS and a ``shape_out`` tuple, -``shape_out`` may describe the output shape of a single image, in which case -the extra leading dimensions are prepended automatically, or it may include the -extra dimensions, in which case the size of the extra dimensions must match -those of the input data exactly. - -While the reproject functions can accept the name of a FITS file as input, from -which the input data and coordinates are loaded automatically, this multi-image -reprojection feature does not support loading multiple images automatically -from multiple HDUs within one FITS file, as it would be difficult to verify -automatically that the HDUs contain the same exact coordinates. If multiple -HDUs do share coordinates and are to be reprojected together, they must be -separately loaded and combined into a single input array (e.g. using -``np.stack`` as in the above example). diff --git a/docs/dask.rst b/docs/dask.rst deleted file mode 100644 index e23ed67fb..000000000 --- a/docs/dask.rst +++ /dev/null @@ -1,108 +0,0 @@ -Integration with dask and parallel processing -============================================= - -The following functions all integrate well with the `dask `_ library. - -* :func:`~reproject.reproject_interp` -* :func:`~reproject.reproject_adaptive` -* :func:`~reproject.reproject_exact` - -This integration has several aspects that we will discuss in the following sections. - -.. testsetup:: - - >>> import numpy as np - >>> import dask.array as da - >>> input_array = np.random.random((1024, 1024)) - >>> dask_array = da.from_array(input_array, chunks=(128, 128)) - >>> from astropy.wcs import WCS - >>> wcs_in = WCS(naxis=2) - >>> wcs_out = WCS(naxis=2) - -Input dask arrays ------------------ - -The three reprojection functions mentioned above can accept dask arrays as -inputs, e.g. assuming you have already constructed a dask array named -``dask_array``:: - - >>> dask_array - dask.array - -you can pass this in as part of the first argument to one of the reprojection -functions:: - - >>> from reproject import reproject_interp - >>> array, footprint = reproject_interp((dask_array, wcs_in), wcs_out, - ... shape_out=(2048, 2048)) - -In general however, we cannot benefit much from the chunking of the input arrays -because any input pixel might in principle contribute to any output pixel. -Therefore, for now, when a dask array is passed as input, it is computed using -the current default scheduler and converted to a Numpy memory-mapped array. This -is done efficiently in terms of memory and never results in the whole dataset -being loaded into memory at any given time. However, this does require -sufficient space on disk to store the array. - -Chunk by chunk reprojection and parallel processing ---------------------------------------------------- - -Regardless of whether a dask or Numpy array is passed in as input to the -reprojection functions, you can specify a block size to use for the -reprojection, and this is used to iterate over chunks in the output array in -chunks. For instance, if you pass in a (1024, 1024) array and specify that the -shape of the output should be a (2048, 2048) array (e.g., via ``shape_out``), -then if you set ``block_size=(256, 256)``:: - - >>> input_array.shape - (1024, 1024) - >>> array, footprint = reproject_interp((input_array, wcs_in), wcs_out, - ... shape_out=(2048, 2048), block_size=(256, 256)) - -the reprojection will be done in 64 separate output chunks. Note however that -this does not break up the input array into chunks since in the general case any -input pixel may contribute to any output pixel. - -By default, the iteration over the output chunks is done in a single -process/thread, but you may specify ``parallel=True`` to process these in -parallel. If you do this, reproject will use multiple processes (rather than -threads) to parallelize the computation (this is because the core reprojection -algorithms we use are not currently thread-safe). If you specify -``parallel=True``, then ``block_size`` will be automatically set to a sensible -default, but you can also set ``block_size`` manually for more control. Note -that you can also set ``parallel=`` to an integer to indicate the number of -processes to use. - -Output dask arrays ------------------- - -By default, the reprojection functions will do the computation immmediately and -return Numpy arrays for the reprojected array and optionally the footprint (this -is regardless of whether dask or Numpy arrays were passed in, or any of the -parallelization options above). However, by setting ``return_type='dask'``, you -can make the functions delay any computation and return dask arrays:: - - >>> array, footprint = reproject_interp((input_array, wcs_in), wcs_out, - ... shape_out=(2048, 2048), block_size=(256, 256), - ... return_type='dask') - >>> array - dask.array - -You can then compute the array or a section of the array yourself whenever you need, or use the -result in further dask expressions. - -.. warning:: The reprojection does not currently work reliably when using multiple threads, so - it is important to make sure you use a dask scheduler that is not multi-threaded. - At the time of writing, the default dask scheduler is ``threads``, so the scheduler - needs to be explicitly set to a different one. - -Using dask.distributed ----------------------- - -The `dask.distributed `_ package makes it -possible to use distributed schedulers for dask. In order to compute -reprojections with dask.distributed, you should make use of the -``return_type='dask'`` option mentioned above so that you can compute the dask -array once the distributed scheduler has been set up. As mentioned in `Output -dask arrays`_, you should make sure that you limit any cluster to have one -thread per process or the results may be unreliable. diff --git a/docs/index.rst b/docs/index.rst index 6552ad26d..eddf73456 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -19,7 +19,7 @@ You can install *reproject* with `pip ` or with `conda `_:: - conda install -c astropy reproject + conda install -c conda-forge reproject More detailed installation instructions can be found in :ref:`installation`. @@ -137,7 +137,7 @@ that you want to reproject. noncelestial footprints mosaicking - dask + performance Reference/API ============= diff --git a/docs/installation.rst b/docs/installation.rst index 317c564a5..06a5ff48a 100644 --- a/docs/installation.rst +++ b/docs/installation.rst @@ -7,7 +7,7 @@ Installing reproject Requirements ============ -This package has the following hard run time dependencies: +This package has the following dependencies: * `Python `__ 3.8 or later @@ -19,20 +19,15 @@ This package has the following hard run time dependencies: * `astropy-healpix `_ 0.6 or later for HEALPIX image reprojection -and the following optional dependencies: - -* `shapely `_ 1.6 or later for some of the mosaicking functionality - -and to run the tests, you will also need: +* `dask `_ 2021.8 or later -* `Matplotlib `__ +* `zarr `_ -* `pytest-arraydiff `__ +* `fsspec `_ -* `pytest-astropy `__ - -* `pytest-doctestplus `__ +and the following optional dependencies: +* `shapely `_ 1.6 or later for some of the mosaicking functionality Installation ============ @@ -40,14 +35,14 @@ Installation Using pip --------- -To install *reproject* with `pip `_, -simply run: +To install *reproject* with `pip `_, +run:: pip install reproject Using conda ----------- -To install *reproject* with `anaconda `_, simply run:: +To install *reproject* with `conda `_, run:: conda install -c conda-forge reproject diff --git a/docs/performance.rst b/docs/performance.rst new file mode 100644 index 000000000..0db200753 --- /dev/null +++ b/docs/performance.rst @@ -0,0 +1,234 @@ +********************** +Optimizing performance +********************** + +Disabling coordinate transformation round-tripping +================================================== + +For the interpolation and adaptive algorithms, an optional third argument, +``roundtrip_coords`` is accepted. By default, after coordinates are transformed +from the output plane to the input plane, the input-plane coordinates are +transformed back to the output plane to ensure that the transformation is +defined in both directions. This doubles the amount of +coordinate-transformation work to be done. In speed-critical situations, where +it is known that the coordinate transformation is defined everywhere, this +extra work can be disabled by setting ``roundtrip_coords=False``. (Note that +this is not a question of whether each output pixel maps to an existing *pixel* +in the input image and vice-versa, but whether it maps to a valid *coordinate* +in the coordinate system of the input image---regardless of whether that +coordinate falls within the bounds of the input image.) + +Disabling returning the footprint +================================= + +If you don't need the output footprint after reprojection, you can set +``return_footprint=False`` to return only the reprojected array. This can save +memory and in some cases computing time: + +.. doctest-skip:: + + >>> array = reproject_interp(..., return_footprint=False) + +Using memory-mapped output arrays +================================= + +If you are dealing with a large dataset to reproject, you may be want to +write the reprojected array (and optionally the footprint) to an array of your choice, such as for example +a memory-mapped array. For example: + +.. doctest-skip:: + + >>> header_out = fits.Header.fromtextfile('cube_header_gal.hdr') + >>> shape = (header_out['NAXIS3'], header_out['NAXIS2'], header_out['NAXIS1']) + >>> array_out = np.memmap(filename='output.np', mode='w+', + ... shape=shape, dtype='float32') + >>> hdu = fits.open('cube_file.fits') + >>> reproject_interp(hdu, header_out, output_array=array_out, + ... return_footprint=False) + +After the call to :func:`~reproject.reproject_interp`, ``array_out`` will contain the reprojected values. +If you set up a memory-mapped array for the footprint you could also do: + +.. doctest-skip:: + + + >>> reproject_interp(hdu, header_out, output_array=array_out, + ... output_footprint=footprint_out) + +If you are dealing with FITS files, you can skip the numpy memmap step and use `FITS large file creation +`_: + +.. doctest-skip:: + + >>> header_out = fits.Header.fromtextfile('cube_header_gal.hdr') + >>> header_out.tofile('new_cube.fits') + >>> shape = tuple(header_out['NAXIS{0}'.format(ii)] for ii in range(1, header_out['NAXIS']+1)) + >>> with open('new_cube.fits', 'rb+') as fobj: + ... fobj.seek(len(header_out.tostring()) + (np.product(shape) * np.abs(header_out['BITPIX']//8)) - 1) + ... fobj.write(b'\0') + >>> hdu_out = fits.open('new_cube.fits', mode='update') + >>> rslt = reproject.reproject_interp(hdu, header_out, output_array=hdu_out[0].data, + ... return_footprint=False) + >>> hdu_out.flush() + +.. _broadcasting: + +Multiple images with the same coordinates +========================================= + +If you have multiple images with the exact same coordinate system (e.g. a raw +image and a corresponding processed image) and want to reproject both to the +same output frame, it is faster to compute the coordinate mapping between input +and output pixels only once and re-use this mapping for each reprojection. This +is supported by passing multiple input images as an additional dimension in the +input data array. When the input array contains more dimensions than the input +WCS describes, the extra leading dimensions are taken to represent separate +images with the same coordinates, and the reprojection loops over those +dimensions after computing the pixel mapping. For example: + +.. doctest-skip:: + >>> raw_image, header_in = fits.getdata('raw_image.fits', header=True) + >>> bg_subtracted_image = fits.getdata('background_subtracted_image.fits') + >>> header_out = # Prepare your desired output projection here + >>> # Combine the two images into one array + >>> image_stack = np.stack((raw_image, bg_subtracted_image)) + >>> # We provide a header that describes 2 WCS dimensions, but our input + >>> # array shape is (2, ny, nx)---the 'extra' first dimension represents + >>> # separate images sharing the same coordinates. + >>> reprojected, footprint = reproject.reproject_adaptive( + ... (image_stack, header_in), header_out) + >>> # The shape of `reprojected` is (2, ny', nx') + >>> reprojected_raw, reprojected_bg_subtracted = reprojected[0], reprojected[1] + +For :func:`~reproject.reproject_interp` and +:func:`~reproject.reproject_adaptive`, this is approximately twice as fast as +reprojecting the two images separately. For :func:`~reproject.reproject_exact` +the savings are much less significant, as producing the coordinate mapping is a +much smaller portion of the total runtime for this algorithm. + +When the output coordinates are provided as a WCS and a ``shape_out`` tuple, +``shape_out`` may describe the output shape of a single image, in which case +the extra leading dimensions are prepended automatically, or it may include the +extra dimensions, in which case the size of the extra dimensions must match +those of the input data exactly. + +While the reproject functions can accept the name of a FITS file as input, from +which the input data and coordinates are loaded automatically, this multi-image +reprojection feature does not support loading multiple images automatically +from multiple HDUs within one FITS file, as it would be difficult to verify +automatically that the HDUs contain the same exact coordinates. If multiple +HDUs do share coordinates and are to be reprojected together, they must be +separately loaded and combined into a single input array (e.g. using +``np.stack`` as in the above example). + +Chunk by chunk reprojection +=========================== + +.. testsetup:: + + >>> import numpy as np + >>> import dask.array as da + >>> input_array = np.random.random((1024, 1024)) + >>> dask_array = da.from_array(input_array, chunks=(128, 128)) + >>> from astropy.wcs import WCS + >>> wcs_in = WCS(naxis=2) + >>> wcs_out = WCS(naxis=2) + +When calling one of the reprojection functions, you can specify a block size to use for the +reprojection, and this is used to iterate over chunks in the output array in +chunks. For instance, if you pass in a (1024, 1024) array and specify that the +shape of the output should be a (2048, 2048) array (e.g., via ``shape_out``), +then if you set ``block_size=(256, 256)``:: + + >>> from reproject import reproject_interp + >>> input_array.shape + (1024, 1024) + >>> array, footprint = reproject_interp((input_array, wcs_in), wcs_out, + ... shape_out=(2048, 2048), block_size=(256, 256)) + +the reprojection will be done in 64 separate output chunks. Note however that +this does not break up the input array into chunks since in the general case any +input pixel may contribute to any output pixel. + +Multi-process reprojection +========================== + +By default, the iteration over the output chunks is done in a single +process/thread, but you may specify ``parallel=True`` to process these in +parallel. If you do this, reproject will use multiple processes (rather than +threads) to parallelize the computation (this is because the core reprojection +algorithms we use are not currently thread-safe). If you specify +``parallel=True``, then ``block_size`` will be automatically set to a sensible +default, but you can also set ``block_size`` manually for more control. Note +that you can also set ``parallel=`` to an integer to indicate the number of +processes to use. + +By default, in parallel mode, the entire input array will be written to a +temporary file that is then memory-mapped - this is to avoid loading the whole +input array into memory in each process. If you are specifying a WCS with fewer +dimensions than the data to be reprojected, as described in :ref:`broadcasting`, +you can set the block size to be such that the block size along the dimensions +being reprojected cover the whole image, while the other dimensions can be +smaller. For example, if you are reprojecting a spectral cube with dimensions +(500, 2048, 2048) where 500 is the number of spectral channels and (2048, 2048) +is the celestial plane, then if you are reprojecting just the celestial part of +the WCS you can specify a block size of (N, 2048, 2048) and this will enable a +separate reprojection mode where the input array is not written to disk but +where the reprojection is done in truly independent chunks with size (N, 2048, 2048). + +Input dask arrays +================= + +The three main reprojection functions can accept dask arrays as inputs, e.g. +assuming you have already constructed a dask array named ``dask_array``:: + + >>> dask_array + dask.array + +you can pass this in as part of the first argument to one of the reprojection +functions:: + + >>> array, footprint = reproject_interp((dask_array, wcs_in), wcs_out, + ... shape_out=(2048, 2048)) + +In general however, we cannot benefit much from the chunking of the input arrays +because any input pixel might in principle contribute to any output pixel. +Therefore, for now, when a dask array is passed as input, it is computed using +the current default scheduler and converted to a Numpy memory-mapped array. This +is done efficiently in terms of memory and never results in the whole dataset +being loaded into memory at any given time. However, this does require +sufficient space on disk to store the array. + +Output dask arrays +================== + +By default, the reprojection functions will do the computation immmediately and +return Numpy arrays for the reprojected array and optionally the footprint (this +is regardless of whether dask or Numpy arrays were passed in, or any of the +parallelization options above). However, by setting ``return_type='dask'``, you +can make the functions delay any computation and return dask arrays:: + + >>> array, footprint = reproject_interp((input_array, wcs_in), wcs_out, + ... shape_out=(2048, 2048), block_size=(256, 256), + ... return_type='dask') + >>> array + dask.array + +You can then compute the array or a section of the array yourself whenever you need, or use the +result in further dask expressions. + +.. warning:: The reprojection does not currently work reliably when using multiple threads, so + it is important to make sure you use a dask scheduler that is not multi-threaded. + At the time of writing, the default dask scheduler is ``threads``, so the scheduler + needs to be explicitly set to a different one. + +Using dask.distributed +====================== + +The `dask.distributed `_ package makes it +possible to use distributed schedulers for dask. In order to compute +reprojections with dask.distributed, you should make use of the +``return_type='dask'`` option mentioned above so that you can compute the dask +array once the distributed scheduler has been set up. As mentioned in `Output +dask arrays`_, you should make sure that you limit any cluster to have one +thread per process or the results may be unreliable. diff --git a/reproject/common.py b/reproject/common.py index d53884af4..ca3ab54d7 100644 --- a/reproject/common.py +++ b/reproject/common.py @@ -101,6 +101,23 @@ def _reproject_dispatcher( if reproject_func_kwargs is None: reproject_func_kwargs = {} + # Determine whether any broadcasting is taking place + broadcasting = wcs_in.low_level_wcs.pixel_n_dim < len(shape_out) + + # Determine whether block size indicates we should parallelize over broadcasted dimension + broadcasted_parallelization = False + if broadcasting and block_size: + if len(block_size) == len(shape_out): + if ( + block_size[-wcs_in.low_level_wcs.pixel_n_dim :] + == shape_out[-wcs_in.low_level_wcs.pixel_n_dim :] + ): + broadcasted_parallelization = True + block_size = ( + block_size[: -wcs_in.low_level_wcs.pixel_n_dim] + + (-1,) * wcs_in.low_level_wcs.pixel_n_dim + ) + # We set up a global temporary directory since this will be used e.g. to # store memory mapped Numpy arrays and zarr arrays. @@ -158,32 +175,6 @@ def _reproject_dispatcher( shape_in = array_in.shape - # When in parallel mode, we want to make sure we avoid having to copy the - # input array to all processes for each chunk, so instead we write out - # the input array to a Numpy memory map and load it in inside each process - # as a memory-mapped array. We need to be careful how this gets passed to - # reproject_single_block so we pass a variable that can be either a string - # or the array itself (for synchronous mode). If the input array is a dask - # array we should always write it out to a memmap even in synchronous mode - # otherwise map_blocks gets confused if it gets two dask arrays and tries - # to iterate over both. - - if isinstance(array_in, da.core.Array) or parallel: - # If return_type=='dask', - if return_type == "dask": - # We should use a temporary directory that will persist beyond - # the call to the reproject function. - tmp_dir = tempfile.mkdtemp() - else: - tmp_dir = local_tmp_dir - array_in_or_path = as_delayed_memmap_path(array_in, tmp_dir) - else: - # Here we could set array_in_or_path to array_in_path if it - # has been set previously, but in synchronous mode it is better to - # simply pass a reference to the memmap array itself to avoid having - # to load the memmap inside each reproject_single_block call. - array_in_or_path = array_in - def reproject_single_block(a, array_or_path, block_info=None): if a.ndim == 0 or block_info is None or block_info == []: return np.array([a, a]) @@ -217,40 +208,77 @@ def reproject_single_block(a, array_or_path, block_info=None): return np.array([array, footprint]) - # NOTE: the following array is just used to set up the iteration in map_blocks - # but isn't actually used otherwise - this is deliberate. - - if block_size is not None and block_size != "auto": - if wcs_in.low_level_wcs.pixel_n_dim < len(shape_out): - if len(block_size) < len(shape_out): - block_size = [-1] * (len(shape_out) - len(block_size)) + list(block_size) - else: - for i in range(len(shape_out) - wcs_in.low_level_wcs.pixel_n_dim): - if block_size[i] != -1 and block_size[i] != shape_out[i]: - raise ValueError( - "block shape for extra broadcasted dimensions should cover entire array along those dimensions" - ) + if broadcasted_parallelization: array_out_dask = da.empty(shape_out, chunks=block_size) + array_in = array_in.rechunk(block_size) + + result = da.map_blocks( + reproject_single_block, + array_out_dask, + array_in, + dtype=float, + new_axis=0, + chunks=(2,) + array_out_dask.chunksize, + ) + else: - if wcs_in.low_level_wcs.pixel_n_dim < len(shape_out): - chunks = (-1,) * (len(shape_out) - wcs_in.low_level_wcs.pixel_n_dim) - chunks += ("auto",) * wcs_in.low_level_wcs.pixel_n_dim - rechunk_kwargs = {"chunks": chunks} + # When in parallel mode, we want to make sure we avoid having to copy the + # input array to all processes for each chunk, so instead we write out + # the input array to a Numpy memory map and load it in inside each process + # as a memory-mapped array. We need to be careful how this gets passed to + # reproject_single_block so we pass a variable that can be either a string + # or the array itself (for synchronous mode). If the input array is a dask + # array we should always write it out to a memmap even in synchronous mode + # otherwise map_blocks gets confused if it gets two dask arrays and tries + # to iterate over both. + + if isinstance(array_in, da.core.Array) or parallel: + # If return_type=='dask', + if return_type == "dask": + # We should use a temporary directory that will persist beyond + # the call to the reproject function. + tmp_dir = tempfile.mkdtemp() + else: + tmp_dir = local_tmp_dir + array_in_or_path = as_delayed_memmap_path(array_in, tmp_dir) else: - rechunk_kwargs = {} - array_out_dask = da.empty(shape_out) - array_out_dask = array_out_dask.rechunk( - block_size_limit=8 * 1024**2, **rechunk_kwargs - ) + # Here we could set array_in_or_path to array_in_path if it + # has been set previously, but in synchronous mode it is better to + # simply pass a reference to the memmap array itself to avoid having + # to load the memmap inside each reproject_single_block call. + array_in_or_path = array_in + + if block_size is not None and block_size != "auto": + if broadcasting: + if len(block_size) < len(shape_out): + block_size = [-1] * (len(shape_out) - len(block_size)) + list(block_size) + else: + for i in range(len(shape_out) - wcs_in.low_level_wcs.pixel_n_dim): + if block_size[i] != -1 and block_size[i] != shape_out[i]: + raise ValueError( + "block shape for extra broadcasted dimensions should cover entire array along those dimensions" + ) + array_out_dask = da.empty(shape_out, chunks=block_size) + else: + if broadcasting: + chunks = (-1,) * (len(shape_out) - wcs_in.low_level_wcs.pixel_n_dim) + chunks += ("auto",) * wcs_in.low_level_wcs.pixel_n_dim + rechunk_kwargs = {"chunks": chunks} + else: + rechunk_kwargs = {} + array_out_dask = da.empty(shape_out) + array_out_dask = array_out_dask.rechunk( + block_size_limit=8 * 1024**2, **rechunk_kwargs + ) - result = da.map_blocks( - reproject_single_block, - array_out_dask, - array_in_or_path, - dtype=float, - new_axis=0, - chunks=(2,) + array_out_dask.chunksize, - ) + result = da.map_blocks( + reproject_single_block, + array_out_dask, + array_in_or_path, + dtype=float, + new_axis=0, + chunks=(2,) + array_out_dask.chunksize, + ) # Ensure that there are no more references to Numpy memmaps array_in = None