Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fsspec integration #63

Merged
merged 31 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
0a6d056
Initial fsspec integration
kylebarron Oct 21, 2024
ed0073e
Add _cat_ranges
kylebarron Oct 21, 2024
18bde17
Add failing test
kylebarron Oct 21, 2024
a9b0d51
Add pipe_file
kylebarron Oct 22, 2024
bf4f5e0
Add test fixtures
martindurant Oct 30, 2024
fdd2a79
Merge branch 'kyle/fsspec-integration' of https://github.com/developm…
martindurant Oct 30, 2024
6495836
Merge branch 'main' into kyle/fsspec-integration
martindurant Oct 31, 2024
68a2d1f
Simple file override
martindurant Oct 31, 2024
ec9c559
silghtly more
martindurant Oct 31, 2024
956b1c0
lint
martindurant Oct 31, 2024
a5a8e82
Update obstore/python/obstore/fsspec.py
martindurant Oct 31, 2024
0a0a2fc
add conftest
martindurant Oct 31, 2024
032c976
tests and docstrings
martindurant Nov 1, 2024
d26a651
make fs not cachable
martindurant Nov 1, 2024
bf1e368
Merge branch 'main' into kyle/fsspec-integration
martindurant Nov 1, 2024
3072e4e
start/end
martindurant Nov 1, 2024
71d9ef7
in cat also
martindurant Nov 1, 2024
31fbb58
Try mixed ranges
martindurant Nov 5, 2024
79449fd
Allow None ranges
martindurant Nov 5, 2024
abe7a44
overwrite test
martindurant Nov 6, 2024
4a05dcb
fix
martindurant Nov 8, 2024
2f70443
revive subclass
martindurant Nov 8, 2024
d8bba78
Merge branch 'main' into kyle/fsspec-integration
martindurant Nov 8, 2024
fca4619
xfails
martindurant Nov 8, 2024
3fc017e
xfails didn't stick
martindurant Nov 8, 2024
f02fa2b
give reason
martindurant Nov 8, 2024
c1ee71d
Update obstore/python/obstore/fsspec.py
kylebarron Nov 11, 2024
cdfa2ab
Update obstore/python/obstore/fsspec.py
kylebarron Nov 11, 2024
5b9b0f7
update for signature
martindurant Nov 13, 2024
45868c9
Merge branch 'kyle/fsspec-integration' of https://github.com/martindu…
martindurant Nov 13, 2024
ecc9399
Update obstore/python/obstore/fsspec.py
kylebarron Nov 13, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 159 additions & 0 deletions obstore/python/obstore/fsspec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
"""Fsspec integration.

The underlying `object_store` Rust crate cautions against relying too strongly on stateful filesystem representations of object stores:
kylebarron marked this conversation as resolved.
Show resolved Hide resolved

> The ObjectStore interface is designed to mirror the APIs of object stores and not filesystems, and thus has stateless APIs instead of cursor based interfaces such as Read or Seek available in filesystems.
>
> This design provides the following advantages:
>
> - All operations are atomic, and readers cannot observe partial and/or failed writes
> - Methods map directly to object store APIs, providing both efficiency and predictability
> - Abstracts away filesystem and operating system specific quirks, ensuring portability
> - Allows for functionality not native to filesystems, such as operation preconditions and atomic multipart uploads

Where possible, implementations should use the underlying `object-store-rs` APIs
martindurant marked this conversation as resolved.
Show resolved Hide resolved
directly. Only where this is not possible should users fall back to this fsspec
integration.
"""

from __future__ import annotations

import asyncio
from collections import defaultdict
from typing import Any, Coroutine, Dict, List, Tuple

import fsspec.asyn
import fsspec.spec

import obstore as obs


class AsyncFsspecStore(fsspec.asyn.AsyncFileSystem):
martindurant marked this conversation as resolved.
Show resolved Hide resolved
def __init__(
self,
store,
martindurant marked this conversation as resolved.
Show resolved Hide resolved
*args,
asynchronous=False,
martindurant marked this conversation as resolved.
Show resolved Hide resolved
loop=None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the right type for loop?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

   loop: since both fsspec/python and tokio/rust may be using loops, this should
        be kept None for now

If None is the only valid argument to loop, then we should remove it from __init__ and manually pass loop=None to super().__init__ below.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove loop too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest leaving it here even if we don't use it, to match the normal AsyncFileSystem signature.

batch_size=None,
**kwargs,
):
self.store = store
super().__init__(
*args, asynchronous=asynchronous, loop=loop, batch_size=batch_size, **kwargs
)

async def _rm_file(self, path, **kwargs):
return await obs.delete_async(self.store, path)

async def _cp_file(self, path1, path2, **kwargs):
return await obs.copy_async(self.store, path1, path2)

async def _pipe_file(self, path, value, **kwargs):
return await obs.put_async(self.store, path, value)

async def _cat_file(self, path, start=None, end=None, **kwargs):
if start is None and end is None:
resp = await obs.get_async(self.store, path)
return await resp.bytes_async()

if start is not None and end is not None:
return await obs.get_range_async(
self.store, path, offset=start, length=end - start
)
martindurant marked this conversation as resolved.
Show resolved Hide resolved

raise NotImplementedError("todo: handle open-ended ranges")
martindurant marked this conversation as resolved.
Show resolved Hide resolved

async def _cat_ranges(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you be able to add some tests for this? I think this function is the least stable of everything because it relies on custom code to split the ranges into per-file ranges and piece the outputs together again.

We should have some tests for multiple ranges in a single file, single range for multiple files, multiple ranges for multiple files, and maybe overlapping ranges too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll look into this. I strongly suspect that it will "just work" with the upstream fsspec code, actually, rather than having to carve out the per-file requests and repeat the code in rust. The only downside of that would be bytes copies in python.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I strongly suspect that it will "just work" with the upstream fsspec code, actually,

Yes, it would just work with the upstream code, but the benefit here is that we get request merging in the underlying Rust code. We should absolutely use the underlying get_ranges if we can; we just have to map between multi-file and single-file ranges.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

request merging in the underlying Rust code

as_opposed to doing it in python, you mean? I wonder if it makes a practical difference.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True. My position is mostly: we should use whatever tools object_store provides us, because the Rust code I'd guess will be slightly faster, which could make a difference for large-scale data loading with get_ranges, like in zarr.

I don't think it's too much work to have this helper function, and it would also be useful in places that don't depend on fsspec, like in the zarr PR: https://github.com/zarr-developers/zarr-python/pull/1661/files#diff-6f6bc4f5bf8c4e9eb8f9bb486d3699e17e4a4d6efc90b922c63a75d45cd7a9a6R47-R51

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I can agree with that reasoning.

self,
paths: List[str],
starts: List[int],
ends: List[int],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Upstream says that the type here is int | list[int]: https://github.com/fsspec/filesystem_spec/blob/9a161714f0bbfe44ee769f259420f2f7db975471/fsspec/asyn.py#L495-L497

We should update here for symmetry, and then we'll need to fix line 89 as well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can any element of starts or ends be None? Or these are always bounded ranges?

I updated the zarr PR to handle non-bounded ranges in the multi-request path.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can any element of starts or ends be None?

We can choose to support that. For kerchunk use, we will only have full files (called via cat()) and all known ranges (called via cat_ranges()). Having all that and partial/suffix ranges in one place would be convenient, but I don't think urgent.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, we can handle that in a follow up.

max_gap=None,
batch_size=None,
on_error="return",
**kwargs,
):
# TODO: need to go through this again and test it
per_file_requests: Dict[str, List[Tuple[int, int, int]]] = defaultdict(list)
for idx, (path, start, end) in enumerate(zip(paths, starts, ends)):
per_file_requests[path].append((start, end, idx))

futs: List[Coroutine[Any, Any, List[bytes]]] = []
for path, ranges in per_file_requests.items():
offsets = [r[0] for r in ranges]
lengths = [r[1] - r[0] for r in ranges]
fut = obs.get_ranges_async(
self.store, path, offsets=offsets, lengths=lengths
)
martindurant marked this conversation as resolved.
Show resolved Hide resolved
futs.append(fut)

result = await asyncio.gather(*futs)

output_buffers: List[bytes] = [b""] * len(paths)
for per_file_request, buffers in zip(per_file_requests.items(), result):
path, ranges = per_file_request
for buffer, ranges_ in zip(buffers, ranges):
initial_index = ranges_[2]
output_buffers[initial_index] = buffer

return output_buffers

async def _put_file(self, lpath, rpath, **kwargs):
with open(lpath, "rb") as f:
await obs.put_async(self.store, rpath, f)

async def _get_file(self, rpath, lpath, **kwargs):
with open(lpath, "wb") as f:
resp = await obs.get_async(self.store, rpath)
async for buffer in resp.stream():
f.write(buffer)

async def _info(self, path, **kwargs):
head = await obs.head_async(self.store, path)
return {
# Required of `info`: (?)
"name": head["path"],
"size": head["size"],
"type": "directory" if head["path"].endswith("/") else "file",
# Implementation-specific keys
"e_tag": head["e_tag"],
kylebarron marked this conversation as resolved.
Show resolved Hide resolved
"last_modified": head["last_modified"],
"version": head["version"],
}

async def _ls(self, path, detail=True, **kwargs):
result = await obs.list_with_delimiter_async(self.store, path)
objects = result["objects"]
prefs = result["common_prefixes"]
if detail:
return [
{
"name": object["path"],
"size": object["size"],
"type": "file",
"ETag": object["e_tag"],
martindurant marked this conversation as resolved.
Show resolved Hide resolved
}
for object in objects
] + [{"name": object, "size": 0, "type": "directory"} for object in prefs]
else:
return sorted([object["path"] for object in objects] + prefs)

def _open(self, path, mode="rb", **kwargs):
"""Return raw bytes-mode file-like from the file-system"""
out = BufferedFileSimple(self, path, mode)
return out
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this what's going to have a default implementation when fsspec/filesystem_spec#1732 is merged? Or this is separate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could have been done with that PR, once release.

I was worried about passing Buffers around rather than bytes and/or whether read buffering might be implemented on the rust side (without copies). I can confirm that AbstractBufferedFile works directly including the fsspec change.

However, we will end up making a class to deal with writing data unless we only every write whole memory buffers in a single go as opposed to multi-part.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, we will end up making a class to deal with writing data unless we only every write whole memory buffers in a single go as opposed to multi-part.

We can wrap a BufWriter similarly to what we do now with BufReader, which will use a multi part upload under the hood.



class BufferedFileSimple(fsspec.spec.AbstractBufferedFile):
def __init__(self, fs, path, mode="rb", cache_type="none", **kwargs):
super().__init__(fs, path, mode, mode, cache_type=cache_type, **kwargs)

def read(self, length=-1):
if length < 0:
data = self.fs.cat_file(self.path, self.loc, self.size)
self.loc = self.size
else:
data = self.fs.cat_file(self.path, self.loc, self.loc + length)
self.loc += length
return data
6 changes: 6 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ dev-dependencies = [
"arro3-core>=0.4.2",
"black>=24.10.0",
"boto3>=1.35.38",
"fsspec>=2024.10.0",
"griffe-inherited-docstrings>=1.0.1",
"ipykernel>=6.29.5",
"maturin>=1.7.4",
Expand All @@ -21,6 +22,7 @@ dev-dependencies = [
"moto[s3,server]>=5.0.18",
"pandas>=2.2.3",
"pip>=24.2",
"pyarrow>=17.0.0",
"pytest-asyncio>=0.24.0",
"pytest>=8.3.3",
]
Expand All @@ -41,3 +43,7 @@ select = [
"F401", # Allow unused imports in __init__.py files
"F403", # unable to detect undefined names
]

[tool.pytest.ini_options]
addopts = "-v"
testpaths = ["tests"]
65 changes: 4 additions & 61 deletions tests/store/test_s3.py
Original file line number Diff line number Diff line change
@@ -1,74 +1,17 @@
import boto3
import pytest
from botocore import UNSIGNED
from botocore.client import Config
from moto.moto_server.threaded_moto_server import ThreadedMotoServer

import obstore as obs
from obstore.store import S3Store

TEST_BUCKET_NAME = "test"


# See docs here: https://docs.getmoto.org/en/latest/docs/server_mode.html
@pytest.fixture(scope="module")
def moto_server_uri():
"""Fixture to run a mocked AWS server for testing."""
# Note: pass `port=0` to get a random free port.
server = ThreadedMotoServer(ip_address="localhost", port=0)
server.start()
host, port = server.get_host_and_port()
uri = f"http://{host}:{port}"
yield uri
server.stop()


@pytest.fixture()
def s3(moto_server_uri: str):
client = boto3.client(
"s3",
config=Config(signature_version=UNSIGNED),
region_name="us-east-1",
endpoint_url=moto_server_uri,
)
client.create_bucket(Bucket=TEST_BUCKET_NAME, ACL="public-read")
client.put_object(Bucket=TEST_BUCKET_NAME, Key="afile", Body=b"hello world")
return moto_server_uri


# @pytest.fixture(autouse=True)
# def reset_s3_fixture(moto_server_uri):
# import requests

# # We reuse the MotoServer for all tests
# # But we do want a clean state for every test
# try:
# requests.post(f"{moto_server_uri}/moto-api/reset")
# except:
# pass


@pytest.fixture()
def store(s3):
return S3Store.from_url(
f"s3://{TEST_BUCKET_NAME}/",
config={
"AWS_ENDPOINT_URL": s3,
"AWS_REGION": "us-east-1",
"AWS_SKIP_SIGNATURE": "True",
"AWS_ALLOW_HTTP": "true",
},
)


@pytest.mark.asyncio
async def test_list_async(store: S3Store):
list_result = await obs.list(store).collect_async()
async def test_list_async(s3_store: S3Store):
list_result = await obs.list(s3_store).collect_async()
assert any("afile" in x["path"] for x in list_result)


@pytest.mark.asyncio
async def test_get_async(store: S3Store):
resp = await obs.get_async(store, "afile")
async def test_get_async(s3_store: S3Store):
resp = await obs.get_async(s3_store, "afile")
buf = await resp.bytes_async()
assert buf == b"hello world"
30 changes: 30 additions & 0 deletions tests/test_fsspec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import pytest

pytest.importorskip("moto")
martindurant marked this conversation as resolved.
Show resolved Hide resolved
import pyarrow.parquet as pq

import obstore as obs
from obstore.fsspec import AsyncFsspecStore


@pytest.fixture()
def fs(s3_store):
return AsyncFsspecStore(s3_store)


def test_list(fs):
out = fs.ls("", detail=False)
kylebarron marked this conversation as resolved.
Show resolved Hide resolved
assert out == ["afile"]
fs.pipe_file("dir/bfile", b"data")
out = fs.ls("", detail=False)
assert out == ["afile", "dir"]
out = fs.ls("", detail=True)
assert out[0]["type"] == "file"
assert out[1]["type"] == "directory"


def test_remote_parquet():
store = obs.store.HTTPStore.from_url("https://github.com")
fs = AsyncFsspecStore(store)
url = "opengeospatial/geoparquet/raw/refs/heads/main/examples/example.parquet"
pq.read_metadata(url, filesystem=fs)
Loading
Loading