Skip to content

Commit

Permalink
Split kerchunk reader up (#261)
Browse files Browse the repository at this point in the history
* standardize zarr v3 and dmrpp readers behind dedicated open_virtual_dataset functions

* refactor hdf5 reader behind open_virtual_dataset function

* refactor netcdf3

* refactor tiff

* refactor fits

* refactored so create VirtualBackends

* restore backend.py, but keep readers/common.py

* oops I deleted a file

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* standardize open_virtual_dataset method signature, and raise NotImplemented

* fix bug with zarr reader

* remove todo

* make open_virtual_dataset a staticmethod

* try to fix mypy error about importing DataTree from versions of xarray where it doesn't exist

* mypy

* sanitize drop_variables and loadable_variables

* implement drop_variables for kerchunk reader

* sanitize drmpp args

* pass all arguments to kerchunk reader

* coerce kerchunk refs to our types

* make sure all readers are passed the same set of args

* fix bad merge, and refactor determine_chunk_grid_shape a bit

* ensure decode_times is passed to each reader

* remove match case statement in favour of mapping

* ensure optional dependencies aren't imported

* release note

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
TomNicholas and pre-commit-ci[bot] authored Oct 19, 2024
1 parent 7053bc0 commit 29ca4ac
Show file tree
Hide file tree
Showing 21 changed files with 1,090 additions and 687 deletions.
2 changes: 2 additions & 0 deletions docs/releases.rst
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ Internal Changes

- Refactored internal structure significantly to split up everything to do with reading references from that to do with writing references.
(:issue:`229`) (:pull:`231`) By `Tom Nicholas <https://github.com/TomNicholas>`_.
- Refactored readers to consider every filetype as a separate reader, all standardized to present the same `open_virtual_dataset` interface internally.
(:pull:`261`) By `Tom Nicholas <https://github.com/TomNicholas>`_.

.. _v1.0.0:

Expand Down
311 changes: 92 additions & 219 deletions virtualizarr/backend.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,39 @@
import os
import warnings
from collections.abc import Iterable, Mapping, MutableMapping
from collections.abc import Iterable, Mapping
from enum import Enum, auto
from io import BufferedIOBase
from pathlib import Path
from typing import (
Any,
Hashable,
Optional,
cast,
)

import xarray as xr
from xarray.backends import AbstractDataStore, BackendArray
from xarray.core.indexes import Index, PandasIndex
from xarray.core.variable import IndexVariable
from xarray import Dataset
from xarray.core.indexes import Index

from virtualizarr.manifests import ManifestArray
from virtualizarr.types.kerchunk import KerchunkStoreRefs
from virtualizarr.utils import _FsspecFSFromFilepath

XArrayOpenT = str | os.PathLike[Any] | BufferedIOBase | AbstractDataStore
from virtualizarr.readers import (
DMRPPVirtualBackend,
FITSVirtualBackend,
HDF5VirtualBackend,
KerchunkVirtualBackend,
NetCDF3VirtualBackend,
TIFFVirtualBackend,
ZarrV3VirtualBackend,
)
from virtualizarr.utils import _FsspecFSFromFilepath, check_for_collisions

# TODO add entrypoint to allow external libraries to add to this mapping
VIRTUAL_BACKENDS = {
"kerchunk": KerchunkVirtualBackend,
"zarr_v3": ZarrV3VirtualBackend,
"dmrpp": DMRPPVirtualBackend,
# all the below call one of the kerchunk backends internally (https://fsspec.github.io/kerchunk/reference.html#file-format-backends)
"netcdf3": NetCDF3VirtualBackend,
"hdf5": HDF5VirtualBackend,
"netcdf4": HDF5VirtualBackend, # note this is the same as for hdf5
"tiff": TIFFVirtualBackend,
"fits": FITSVirtualBackend,
}


class AutoName(Enum):
Expand All @@ -43,10 +57,49 @@ class FileType(AutoName):
kerchunk = auto()


class ManifestBackendArray(ManifestArray, BackendArray):
"""Using this prevents xarray from wrapping the KerchunkArray in ExplicitIndexingAdapter etc."""
def automatically_determine_filetype(
*,
filepath: str,
reader_options: Optional[dict[str, Any]] = {},
) -> FileType:
"""
Attempt to automatically infer the correct reader for this filetype.
Uses magic bytes and file / directory suffixes.
"""

...
# TODO this should ideally handle every filetype that we have a reader for, not just kerchunk

# TODO how do we handle kerchunk json / parquet here?
if Path(filepath).suffix == ".zarr":
# TODO we could imagine opening an existing zarr store, concatenating it, and writing a new virtual one...
raise NotImplementedError()

# Read magic bytes from local or remote file
fpath = _FsspecFSFromFilepath(
filepath=filepath, reader_options=reader_options
).open_file()
magic_bytes = fpath.read(8)
fpath.close()

if magic_bytes.startswith(b"CDF"):
filetype = FileType.netcdf3
elif magic_bytes.startswith(b"\x0e\x03\x13\x01"):
raise NotImplementedError("HDF4 formatted files not supported")
elif magic_bytes.startswith(b"\x89HDF"):
filetype = FileType.hdf5
elif magic_bytes.startswith(b"GRIB"):
filetype = FileType.grib
elif magic_bytes.startswith(b"II*"):
filetype = FileType.tiff
elif magic_bytes.startswith(b"SIMPLE"):
filetype = FileType.fits
else:
raise NotImplementedError(
f"Unrecognised file based on header bytes: {magic_bytes}"
)

return filetype


def open_virtual_dataset(
Expand All @@ -61,15 +114,14 @@ def open_virtual_dataset(
indexes: Mapping[str, Index] | None = None,
virtual_array_class=ManifestArray,
reader_options: Optional[dict] = None,
) -> xr.Dataset:
) -> Dataset:
"""
Open a file or store as an xarray Dataset wrapping virtualized zarr arrays.
No data variables will be loaded unless specified in the ``loadable_variables`` kwarg (in which case they will be xarray lazily indexed arrays).
Xarray indexes can optionally be created (the default behaviour). To avoid creating any xarray indexes pass ``indexes={}``.
Parameters
----------
filepath : str, default None
Expand Down Expand Up @@ -112,217 +164,38 @@ def open_virtual_dataset(
stacklevel=2,
)

loadable_vars: dict[str, xr.Variable]
virtual_vars: dict[str, xr.Variable]
vars: dict[str, xr.Variable]

if drop_variables is None:
drop_variables = []
elif isinstance(drop_variables, str):
drop_variables = [drop_variables]
else:
drop_variables = list(drop_variables)
if loadable_variables is None:
loadable_variables = []
elif isinstance(loadable_variables, str):
loadable_variables = [loadable_variables]
else:
loadable_variables = list(loadable_variables)
common = set(drop_variables).intersection(set(loadable_variables))
if common:
raise ValueError(f"Cannot both load and drop variables {common}")
drop_variables, loadable_variables = check_for_collisions(
drop_variables,
loadable_variables,
)

if virtual_array_class is not ManifestArray:
raise NotImplementedError()

# if filetype is user defined, convert to FileType
if reader_options is None:
reader_options = {}

if filetype is not None:
# if filetype is user defined, convert to FileType
filetype = FileType(filetype)

if filetype == FileType.kerchunk:
from virtualizarr.readers.kerchunk import dataset_from_kerchunk_refs

fs = _FsspecFSFromFilepath(filepath=filepath, reader_options=reader_options)

# The kerchunk .parquet storage format isn't actually a parquet, but a directory that contains named parquets for each group/variable.
if fs.filepath.endswith("ref.parquet"):
from fsspec.implementations.reference import LazyReferenceMapper

lrm = LazyReferenceMapper(filepath, fs.fs)

# build reference dict from KV pairs in LazyReferenceMapper
# is there a better / more preformant way to extract this?
array_refs = {k: lrm[k] for k in lrm.keys()}

full_reference = {"refs": array_refs}

return dataset_from_kerchunk_refs(KerchunkStoreRefs(full_reference))

# JSON has no magic bytes, but the Kerchunk version 1 spec starts with 'version':
# https://fsspec.github.io/kerchunk/spec.html
elif fs.read_bytes(9).startswith(b'{"version'):
import ujson

with fs.open_file() as of:
refs = ujson.load(of)

return dataset_from_kerchunk_refs(KerchunkStoreRefs(refs))

else:
raise ValueError(
"The input Kerchunk reference did not seem to be in Kerchunk's JSON or Parquet spec: https://fsspec.github.io/kerchunk/spec.html. The Kerchunk format autodetection is quite flaky, so if your reference matches the Kerchunk spec feel free to open an issue: https://github.com/zarr-developers/VirtualiZarr/issues"
)

if filetype == FileType.zarr_v3:
# TODO is there a neat way of auto-detecting this?
from virtualizarr.readers.zarr import open_virtual_dataset_from_v3_store

return open_virtual_dataset_from_v3_store(
storepath=filepath, drop_variables=drop_variables, indexes=indexes
)
elif filetype == FileType.dmrpp:
from virtualizarr.readers.dmrpp import DMRParser

if loadable_variables != [] or indexes is None:
raise NotImplementedError(
"Specifying `loadable_variables` or auto-creating indexes with `indexes=None` is not supported for dmrpp files."
)

fpath = _FsspecFSFromFilepath(
filepath=filepath, reader_options=reader_options
).open_file()
parser = DMRParser(fpath.read(), data_filepath=filepath.strip(".dmrpp"))
vds = parser.parse_dataset()
vds.drop_vars(drop_variables)
return vds
else:
# we currently read every other filetype using kerchunks various file format backends
from virtualizarr.readers.kerchunk import (
fully_decode_arr_refs,
read_kerchunk_references_from_file,
virtual_vars_from_kerchunk_refs,
)

if reader_options is None:
reader_options = {}

# this is the only place we actually always need to use kerchunk directly
# TODO avoid even reading byte ranges for variables that will be dropped later anyway?
vds_refs = read_kerchunk_references_from_file(
filepath=filepath,
filetype=filetype,
group=group,
reader_options=reader_options,
)
virtual_vars = virtual_vars_from_kerchunk_refs(
vds_refs,
drop_variables=drop_variables + loadable_variables,
virtual_array_class=virtual_array_class,
)
ds_attrs = fully_decode_arr_refs(vds_refs["refs"]).get(".zattrs", {})
coord_names = ds_attrs.pop("coordinates", [])

if indexes is None or len(loadable_variables) > 0:
# TODO we are reading a bunch of stuff we know we won't need here, e.g. all of the data variables...
# TODO it would also be nice if we could somehow consolidate this with the reading of the kerchunk references
# TODO really we probably want a dedicated xarray backend that iterates over all variables only once
fpath = _FsspecFSFromFilepath(
filepath=filepath, reader_options=reader_options
).open_file()

# fpath can be `Any` thanks to fsspec.filesystem(...).open() returning Any.
# We'll (hopefully safely) cast it to what xarray is expecting, but this might let errors through.

ds = xr.open_dataset(
cast(XArrayOpenT, fpath),
drop_variables=drop_variables,
group=group,
decode_times=decode_times,
)

if indexes is None:
warnings.warn(
"Specifying `indexes=None` will create in-memory pandas indexes for each 1D coordinate, but concatenation of ManifestArrays backed by pandas indexes is not yet supported (see issue #18)."
"You almost certainly want to pass `indexes={}` to `open_virtual_dataset` instead."
)

# add default indexes by reading data from file
indexes = {name: index for name, index in ds.xindexes.items()}
elif indexes != {}:
# TODO allow manual specification of index objects
raise NotImplementedError()
else:
indexes = dict(**indexes) # for type hinting: to allow mutation

loadable_vars = {
str(name): var
for name, var in ds.variables.items()
if name in loadable_variables
}

# if we only read the indexes we can just close the file right away as nothing is lazy
if loadable_vars == {}:
ds.close()
else:
loadable_vars = {}
indexes = {}

vars = {**virtual_vars, **loadable_vars}

data_vars, coords = separate_coords(vars, indexes, coord_names)

vds = xr.Dataset(
data_vars,
coords=coords,
# indexes={}, # TODO should be added in a later version of xarray
attrs=ds_attrs,
filetype = automatically_determine_filetype(
filepath=filepath, reader_options=reader_options
)

# TODO we should probably also use vds.set_close() to tell xarray how to close the file we opened

return vds


def separate_coords(
vars: Mapping[str, xr.Variable],
indexes: MutableMapping[str, Index],
coord_names: Iterable[str] | None = None,
) -> tuple[dict[str, xr.Variable], xr.Coordinates]:
"""
Try to generate a set of coordinates that won't cause xarray to automatically build a pandas.Index for the 1D coordinates.
Currently requires this function as a workaround unless xarray PR #8124 is merged.
Will also preserve any loaded variables and indexes it is passed.
"""

if coord_names is None:
coord_names = []

# split data and coordinate variables (promote dimension coordinates)
data_vars = {}
coord_vars: dict[
str, tuple[Hashable, Any, dict[Any, Any], dict[Any, Any]] | xr.Variable
] = {}
for name, var in vars.items():
if name in coord_names or var.dims == (name,):
# use workaround to avoid creating IndexVariables described here https://github.com/pydata/xarray/pull/8107#discussion_r1311214263
if len(var.dims) == 1:
dim1d, *_ = var.dims
coord_vars[name] = (dim1d, var.data, var.attrs, var.encoding)
backend_cls = VIRTUAL_BACKENDS.get(filetype.name.lower())

if isinstance(var, IndexVariable):
# unless variable actually already is a loaded IndexVariable,
# in which case we need to keep it and add the corresponding indexes explicitly
coord_vars[str(name)] = var
# TODO this seems suspect - will it handle datetimes?
indexes[name] = PandasIndex(var, dim1d)
else:
coord_vars[name] = var
else:
data_vars[name] = var
if backend_cls is None:
raise NotImplementedError(f"Unsupported file type: {filetype.name}")

coords = xr.Coordinates(coord_vars, indexes=indexes)
vds = backend_cls.open_virtual_dataset(
filepath,
group=group,
drop_variables=drop_variables,
loadable_variables=loadable_variables,
decode_times=decode_times,
indexes=indexes,
reader_options=reader_options,
)

return data_vars, coords
return vds
13 changes: 8 additions & 5 deletions virtualizarr/manifests/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@

import numpy as np

from ..types.kerchunk import KerchunkArrRefs
from ..zarr import ZArray
from .array_api import MANIFESTARRAY_HANDLED_ARRAY_FUNCTIONS, _isnan
from .manifest import ChunkManifest
from virtualizarr.manifests.array_api import (
MANIFESTARRAY_HANDLED_ARRAY_FUNCTIONS,
_isnan,
)
from virtualizarr.manifests.manifest import ChunkManifest
from virtualizarr.types.kerchunk import KerchunkArrRefs
from virtualizarr.zarr import ZArray


class ManifestArray:
Expand Down Expand Up @@ -61,7 +64,7 @@ def __init__(

@classmethod
def _from_kerchunk_refs(cls, arr_refs: KerchunkArrRefs) -> "ManifestArray":
from virtualizarr.readers.kerchunk import (
from virtualizarr.translators.kerchunk import (
fully_decode_arr_refs,
parse_array_refs,
)
Expand Down
Loading

0 comments on commit 29ca4ac

Please sign in to comment.