Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inline loaded variables into kerchunk references #73

Merged
merged 25 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
d27ba38
add test vs kerchunk inlining
TomNicholas Apr 5, 2024
5733185
Merge branch 'main' into inline_chunkdata
TomNicholas Apr 5, 2024
af46998
encode inlined data correctly (at least for standard numpy arrays)
TomNicholas Apr 5, 2024
490007b
don't test time variable for now
TomNicholas Apr 5, 2024
9041d53
store fill_value as np.NaN, and always coerce it
TomNicholas Apr 6, 2024
00a9b62
test passing for lat and lon variables
TomNicholas Apr 6, 2024
3973c72
Merge branch 'main' of https://github.com/TomNicholas/VirtualiZarr in…
TomNicholas Apr 6, 2024
cd6ca47
Merge branch 'main' into inline_chunkdata
TomNicholas Apr 6, 2024
65fb3d1
formatting
TomNicholas Apr 8, 2024
aa754f4
Merge branch 'main' into inline_chunkdata
TomNicholas May 2, 2024
a4c1f1b
encode numpy types
TomNicholas May 2, 2024
e59428e
tidy internal import
TomNicholas May 3, 2024
7c02f19
parametrize test to test inlining different variables
TomNicholas May 3, 2024
1e46efb
raise when encoding encountered during serialization of numpy arrays
TomNicholas May 3, 2024
5071ae9
see if closing the netcdf files in the fixtures fixes the kerchunk error
TomNicholas May 3, 2024
654851c
update docs
TomNicholas May 3, 2024
f1e5eab
ensure inline_threshold is an int
TomNicholas May 6, 2024
98c4f8f
Merge branch 'main' into inline_chunkdata
TomNicholas May 8, 2024
6135e14
formatting
TomNicholas May 8, 2024
7970021
Merge branch 'main' into inline_chunkdata
TomNicholas May 15, 2024
ec7528c
Merge branch 'main' into inline_chunkdata
TomNicholas May 15, 2024
8c24af6
specified NetCDF4 for netcdf4_file fixture & added netcdf4 to pyproje…
norlandrhagen May 15, 2024
744cc6d
Merge branch 'main' into inline_chunkdata
TomNicholas May 16, 2024
f997b79
organize tests
TomNicholas May 16, 2024
0baee1a
dont unnecessarily slice dataset
TomNicholas May 16, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docs/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,10 @@ These references can now be interpreted like they were a Zarr store by [fsspec](
combined_ds = xr.open_dataset('combined.json', engine="kerchunk")
```

In-memory ("loadable") variables backed by numpy arrays can also be written out to kerchunk reference files, with the values serialized as bytes. This is equivalent to kerchunk's concept of "inlining", but done on a per-array basis using the `loadable_variables` kwarg rather than a per-chunk basis using kerchunk's `inline_threshold` kwarg.

```{note}
Currently you can only serialize virtual variables backed by `ManifestArray` objects to kerchunk reference files, not real in-memory numpy-backed variables.
Currently you can only serialize in-memory variables to kerchunk references if they do not have any encoding.
```

When you have many chunks, the reference file can get large enough to be unwieldy as json. In that case the references can be instead stored as parquet. Again this uses kerchunk internally.
Expand Down
7 changes: 4 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,16 @@ dependencies = [
test = [
"codecov",
"pre-commit",
"ruff",
"pytest-mypy",
"pytest-cov",
"pytest",
"fsspec",
"pooch",
"scipy",
"ruff",
"netcdf4",
"fsspec",
"s3fs",
"fastparquet",
"s3fs"
]


Expand Down
83 changes: 66 additions & 17 deletions virtualizarr/kerchunk.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
import base64
import json
from enum import Enum, auto
from pathlib import Path
from typing import NewType, Optional, cast

import numpy as np
import ujson # type: ignore
import xarray as xr

from virtualizarr.manifests.manifest import join
from virtualizarr.utils import _fsspec_openfile_from_filepath
from virtualizarr.zarr import ZArray, ZAttrs

Expand All @@ -19,9 +24,6 @@
) # lower-level dict containing just the information for one zarr array


from enum import Enum, auto


class AutoName(Enum):
# Recommended by official Python docs for auto naming:
# https://docs.python.org/3/library/enum.html#using-automatic-values
Expand All @@ -38,6 +40,16 @@ class FileType(AutoName):
zarr = auto()


class NumpyEncoder(json.JSONEncoder):
# TODO I don't understand how kerchunk gets around this problem of encoding numpy types (in the zattrs) whilst only using ujson
def default(self, obj):
if isinstance(obj, np.ndarray):
return obj.tolist() # Convert NumPy array to Python list
elif isinstance(obj, np.generic):
return obj.item() # Convert NumPy scalar to Python scalar
return json.JSONEncoder.default(self, obj)


def read_kerchunk_references_from_file(
filepath: str,
filetype: FileType | None,
Expand Down Expand Up @@ -194,7 +206,7 @@ def dataset_to_kerchunk_refs(ds: xr.Dataset) -> KerchunkStoreRefs:

all_arr_refs = {}
for var_name, var in ds.variables.items():
arr_refs = variable_to_kerchunk_arr_refs(var)
arr_refs = variable_to_kerchunk_arr_refs(var, var_name)

prepended_with_var_name = {
f"{var_name}/{key}": val for key, val in arr_refs.items()
Expand All @@ -206,38 +218,75 @@ def dataset_to_kerchunk_refs(ds: xr.Dataset) -> KerchunkStoreRefs:
"version": 1,
"refs": {
".zgroup": '{"zarr_format":2}',
".zattrs": ujson.dumps(ds.attrs),
**all_arr_refs,
},
}

return cast(KerchunkStoreRefs, ds_refs)


def variable_to_kerchunk_arr_refs(var: xr.Variable) -> KerchunkArrRefs:
def variable_to_kerchunk_arr_refs(var: xr.Variable, var_name: str) -> KerchunkArrRefs:
"""
Create a dictionary containing kerchunk-style array references from a single xarray.Variable (which wraps a ManifestArray).
Create a dictionary containing kerchunk-style array references from a single xarray.Variable (which wraps either a ManifestArray or a numpy array).

Partially encodes the inner dicts to json to match kerchunk behaviour (see https://github.com/fsspec/kerchunk/issues/415).
"""
from virtualizarr.manifests import ManifestArray

marr = var.data
if isinstance(var.data, ManifestArray):
marr = var.data

if not isinstance(marr, ManifestArray):
raise TypeError(
f"Can only serialize wrapped arrays of type ManifestArray, but got type {type(marr)}"
)
arr_refs: dict[str, str | list[str | int]] = {
str(chunk_key): chunk_entry.to_kerchunk()
for chunk_key, chunk_entry in marr.manifest.entries.items()
}

arr_refs: dict[str, str | list[str | int]] = {
str(chunk_key): chunk_entry.to_kerchunk()
for chunk_key, chunk_entry in marr.manifest.entries.items()
}
zarray = marr.zarray

else:
try:
np_arr = var.to_numpy()
except AttributeError as e:
raise TypeError(
f"Can only serialize wrapped arrays of type ManifestArray or numpy.ndarray, but got type {type(var.data)}"
) from e

if var.encoding:
if "scale_factor" in var.encoding:
raise NotImplementedError(
f"Cannot serialize loaded variable {var_name}, as it is encoded with a scale_factor"
)
if "offset" in var.encoding:
raise NotImplementedError(
f"Cannot serialize loaded variable {var_name}, as it is encoded with an offset"
)
if "calendar" in var.encoding:
raise NotImplementedError(
f"Cannot serialize loaded variable {var_name}, as it is encoded with a calendar"
)

# This encoding is what kerchunk does when it "inlines" data, see https://github.com/fsspec/kerchunk/blob/a0c4f3b828d37f6d07995925b324595af68c4a19/kerchunk/hdf.py#L472
byte_data = np_arr.tobytes()
# TODO do I really need to encode then decode like this?
inlined_data = (b"base64:" + base64.b64encode(byte_data)).decode("utf-8")

# TODO can this be generalized to save individual chunks of a dask array?
# TODO will this fail for a scalar?
arr_refs = {join(0 for _ in np_arr.shape): inlined_data}

zarray = ZArray(
chunks=np_arr.shape,
shape=np_arr.shape,
dtype=np_arr.dtype,
order="C",
)

zarray_dict = marr.zarray.to_kerchunk_json()
zarray_dict = zarray.to_kerchunk_json()
arr_refs[".zarray"] = zarray_dict

zattrs = var.attrs
zattrs["_ARRAY_DIMENSIONS"] = list(var.dims)
arr_refs[".zattrs"] = ujson.dumps(zattrs)
arr_refs[".zattrs"] = json.dumps(zattrs, separators=(",", ":"), cls=NumpyEncoder)

return cast(KerchunkArrRefs, arr_refs)
5 changes: 4 additions & 1 deletion virtualizarr/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ def netcdf4_file(tmpdir):

# Save it to disk as netCDF (in temporary directory)
filepath = f"{tmpdir}/air.nc"
ds.to_netcdf(filepath)
ds.to_netcdf(filepath, format="NETCDF4")
ds.close()

return filepath

Expand All @@ -28,5 +29,7 @@ def netcdf4_files(tmpdir):
filepath2 = f"{tmpdir}/air2.nc"
ds1.to_netcdf(filepath1)
ds2.to_netcdf(filepath2)
ds1.close()
ds2.close()

return filepath1, filepath2
132 changes: 69 additions & 63 deletions virtualizarr/tests/test_integration.py
Original file line number Diff line number Diff line change
@@ -1,68 +1,95 @@
import fsspec
import pytest
import xarray as xr
import xarray.testing as xrt

from virtualizarr import open_virtual_dataset


def test_kerchunk_roundtrip_no_concat(tmpdir):
# set up example xarray dataset
ds = xr.tutorial.open_dataset("air_temperature", decode_times=False)

# save it to disk as netCDF (in temporary directory)
ds.to_netcdf(f"{tmpdir}/air.nc")
@pytest.mark.parametrize(
"inline_threshold, vars_to_inline",
[
(5e2, ["lat", "lon"]),
pytest.param(
5e4, ["lat", "lon", "time"], marks=pytest.mark.xfail(reason="time encoding")
),
pytest.param(
5e7,
["lat", "lon", "time", "air"],
marks=pytest.mark.xfail(reason="scale factor encoding"),
),
],
)
def test_numpy_arrays_to_inlined_kerchunk_refs(
netcdf4_file, inline_threshold, vars_to_inline
):
from kerchunk.hdf import SingleHdf5ToZarr

# inline_threshold is chosen to test inlining only the variables listed in vars_to_inline
expected = SingleHdf5ToZarr(
netcdf4_file, inline_threshold=int(inline_threshold)
).translate()

# loading the variables should produce same result as inlining them using kerchunk
vds = open_virtual_dataset(
netcdf4_file, loadable_variables=vars_to_inline, indexes={}
)
refs = vds.virtualize.to_kerchunk(format="dict")

# use open_dataset_via_kerchunk to read it as references
vds = open_virtual_dataset(f"{tmpdir}/air.nc", indexes={})
# TODO I would just compare the entire dicts but kerchunk returns inconsistent results - see https://github.com/TomNicholas/VirtualiZarr/pull/73#issuecomment-2040931202
# assert refs == expected
assert refs["refs"]["air/0.0.0"] == expected["refs"]["air/0.0.0"]
assert refs["refs"]["lon/0"] == expected["refs"]["lon/0"]
assert refs["refs"]["lat/0"] == expected["refs"]["lat/0"]
assert refs["refs"]["time/0"] == expected["refs"]["time/0"]

# write those references to disk as kerchunk json
vds.virtualize.to_kerchunk(f"{tmpdir}/refs.json", format="json")

# use fsspec to read the dataset from disk via the zarr store
fs = fsspec.filesystem("reference", fo=f"{tmpdir}/refs.json")
m = fs.get_mapper("")
@pytest.mark.parametrize("format", ["json", "parquet"])
class TestKerchunkRoundtrip:
def test_kerchunk_roundtrip_no_concat(self, tmpdir, format):
# set up example xarray dataset
ds = xr.tutorial.open_dataset("air_temperature", decode_times=False)

roundtrip = xr.open_dataset(m, engine="kerchunk")
# save it to disk as netCDF (in temporary directory)
ds.to_netcdf(f"{tmpdir}/air.nc")

# assert equal to original dataset
xrt.assert_equal(roundtrip, ds)
# use open_dataset_via_kerchunk to read it as references
vds = open_virtual_dataset(f"{tmpdir}/air.nc", indexes={})

# write those references to disk as kerchunk json
vds.virtualize.to_kerchunk(f"{tmpdir}/refs.{format}", format=format)

def test_kerchunk_roundtrip_concat(tmpdir):
# set up example xarray dataset
ds = xr.tutorial.open_dataset("air_temperature", decode_times=False).isel(
time=slice(None, 2000)
)
# use fsspec to read the dataset from disk via the zarr store
roundtrip = xr.open_dataset(f"{tmpdir}/refs.{format}", engine="kerchunk")

# split into two datasets
ds1, ds2 = ds.isel(time=slice(None, 1000)), ds.isel(time=slice(1000, None))
# assert equal to original dataset
xrt.assert_equal(roundtrip, ds)

# save it to disk as netCDF (in temporary directory)
ds1.to_netcdf(f"{tmpdir}/air1.nc")
ds2.to_netcdf(f"{tmpdir}/air2.nc")
def test_kerchunk_roundtrip_concat(self, tmpdir, format):
# set up example xarray dataset
ds = xr.tutorial.open_dataset("air_temperature", decode_times=False)

# use open_dataset_via_kerchunk to read it as references
vds1 = open_virtual_dataset(f"{tmpdir}/air1.nc", indexes={})
vds2 = open_virtual_dataset(f"{tmpdir}/air2.nc", indexes={})
# split into two datasets
ds1, ds2 = ds.isel(time=slice(None, 1460)), ds.isel(time=slice(1460, None))

# concatenate virtually along time
vds = xr.concat([vds1, vds2], dim="time", coords="minimal", compat="override")
print(vds["air"].variable._data)
# save it to disk as netCDF (in temporary directory)
ds1.to_netcdf(f"{tmpdir}/air1.nc")
ds2.to_netcdf(f"{tmpdir}/air2.nc")

# write those references to disk as kerchunk json
vds.virtualize.to_kerchunk(f"{tmpdir}/refs.json", format="json")
# use open_dataset_via_kerchunk to read it as references
vds1 = open_virtual_dataset(f"{tmpdir}/air1.nc", indexes={})
vds2 = open_virtual_dataset(f"{tmpdir}/air2.nc", indexes={})

# use fsspec to read the dataset from disk via the zarr store
fs = fsspec.filesystem("reference", fo=f"{tmpdir}/refs.json")
m = fs.get_mapper("")
# concatenate virtually along time
vds = xr.concat([vds1, vds2], dim="time", coords="minimal", compat="override")

roundtrip = xr.open_dataset(m, engine="kerchunk")
# write those references to disk as kerchunk json
vds.virtualize.to_kerchunk(f"{tmpdir}/refs.{format}", format=format)

# user does analysis here
# use fsspec to read the dataset from disk via the zarr store
roundtrip = xr.open_dataset(f"{tmpdir}/refs.{format}", engine="kerchunk")

# assert equal to original dataset
xrt.assert_equal(roundtrip, ds)
# assert equal to original dataset
xrt.assert_equal(roundtrip, ds)


def test_open_scalar_variable(tmpdir):
Expand All @@ -73,24 +100,3 @@ def test_open_scalar_variable(tmpdir):

vds = open_virtual_dataset(f"{tmpdir}/scalar.nc")
assert vds["a"].shape == ()


@pytest.mark.parametrize("format", ["json", "parquet"])
def test_kerchunk_roundtrip(tmpdir, format):
# set up example xarray dataset
ds = xr.tutorial.open_dataset("air_temperature", decode_times=False)

# save it to disk as netCDF (in temporary directory)
ds.to_netcdf(f"{tmpdir}/air.nc")

# use open_virtual_dataset to read it as references
vds = open_virtual_dataset(f"{tmpdir}/air.nc", indexes={})

# write those references to disk as kerchunk json
vds.virtualize.to_kerchunk(f"{tmpdir}/refs.{format}", format=format)

# read the dataset from disk via the zarr store
roundtrip = xr.open_dataset(f"{tmpdir}/refs.{format}", engine="kerchunk")

# assert equal to original dataset
xrt.assert_equal(roundtrip, ds)
Loading
Loading