Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fsspec integration #63

Merged
merged 31 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
0a6d056
Initial fsspec integration
kylebarron Oct 21, 2024
ed0073e
Add _cat_ranges
kylebarron Oct 21, 2024
18bde17
Add failing test
kylebarron Oct 21, 2024
a9b0d51
Add pipe_file
kylebarron Oct 22, 2024
bf4f5e0
Add test fixtures
martindurant Oct 30, 2024
fdd2a79
Merge branch 'kyle/fsspec-integration' of https://github.com/developm…
martindurant Oct 30, 2024
6495836
Merge branch 'main' into kyle/fsspec-integration
martindurant Oct 31, 2024
68a2d1f
Simple file override
martindurant Oct 31, 2024
ec9c559
silghtly more
martindurant Oct 31, 2024
956b1c0
lint
martindurant Oct 31, 2024
a5a8e82
Update obstore/python/obstore/fsspec.py
martindurant Oct 31, 2024
0a0a2fc
add conftest
martindurant Oct 31, 2024
032c976
tests and docstrings
martindurant Nov 1, 2024
d26a651
make fs not cachable
martindurant Nov 1, 2024
bf1e368
Merge branch 'main' into kyle/fsspec-integration
martindurant Nov 1, 2024
3072e4e
start/end
martindurant Nov 1, 2024
71d9ef7
in cat also
martindurant Nov 1, 2024
31fbb58
Try mixed ranges
martindurant Nov 5, 2024
79449fd
Allow None ranges
martindurant Nov 5, 2024
abe7a44
overwrite test
martindurant Nov 6, 2024
4a05dcb
fix
martindurant Nov 8, 2024
2f70443
revive subclass
martindurant Nov 8, 2024
d8bba78
Merge branch 'main' into kyle/fsspec-integration
martindurant Nov 8, 2024
fca4619
xfails
martindurant Nov 8, 2024
3fc017e
xfails didn't stick
martindurant Nov 8, 2024
f02fa2b
give reason
martindurant Nov 8, 2024
c1ee71d
Update obstore/python/obstore/fsspec.py
kylebarron Nov 11, 2024
cdfa2ab
Update obstore/python/obstore/fsspec.py
kylebarron Nov 11, 2024
5b9b0f7
update for signature
martindurant Nov 13, 2024
45868c9
Merge branch 'kyle/fsspec-integration' of https://github.com/martindu…
martindurant Nov 13, 2024
ecc9399
Update obstore/python/obstore/fsspec.py
kylebarron Nov 13, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 179 additions & 0 deletions obstore/python/obstore/fsspec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
"""Fsspec integration.

The underlying `object_store` Rust crate [cautions](https://docs.rs/object_store/latest/object_store/#why-not-a-filesystem-interface) against relying too strongly on stateful filesystem representations of object stores:

> The ObjectStore interface is designed to mirror the APIs of object stores and not filesystems, and thus has stateless APIs instead of cursor based interfaces such as Read or Seek available in filesystems.
>
> This design provides the following advantages:
>
> - All operations are atomic, and readers cannot observe partial and/or failed writes
> - Methods map directly to object store APIs, providing both efficiency and predictability
> - Abstracts away filesystem and operating system specific quirks, ensuring portability
> - Allows for functionality not native to filesystems, such as operation preconditions and atomic multipart uploads

Where possible, implementations should use the underlying `obstore` APIs
directly. Only where this is not possible should users fall back to this fsspec
integration.
"""

from __future__ import annotations

import asyncio
from collections import defaultdict
from typing import Any, Coroutine, Dict, List, Tuple

import fsspec.asyn
import fsspec.spec

import obstore as obs


class AsyncFsspecStore(fsspec.asyn.AsyncFileSystem):
martindurant marked this conversation as resolved.
Show resolved Hide resolved
"""An fsspec implementation based on a obstore Store"""

cachable = False

def __init__(
self,
store: obs.store.ObjectStore,
*args,
asynchronous: bool = False,
loop=None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the right type for loop?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

   loop: since both fsspec/python and tokio/rust may be using loops, this should
        be kept None for now

If None is the only valid argument to loop, then we should remove it from __init__ and manually pass loop=None to super().__init__ below.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove loop too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest leaving it here even if we don't use it, to match the normal AsyncFileSystem signature.

batch_size: int | None = None,
):
"""Construct a new AsyncFsspecStore

store: a configured instance of one of the store classes in objstore.store
asynchronous: id this instance meant to be be called using the async API? This
should only be set to true when running within a coroutine
loop: since both fsspec/python and tokio/rust may be using loops, this should
be kept `None` for now, and will not be used.
batch_size: some operations on many files will batch their requests; if you
are seeing timeouts, you may want to set this number smaller than the defaults,
which are determined in fsspec.asyn._get_batch_size
"""

self.store = store
super().__init__(
*args, asynchronous=asynchronous, loop=loop, batch_size=batch_size
)

async def _rm_file(self, path, **kwargs):
return await obs.delete_async(self.store, path)

async def _cp_file(self, path1, path2, **kwargs):
return await obs.copy_async(self.store, path1, path2)

async def _pipe_file(self, path, value, **kwargs):
return await obs.put_async(self.store, path, value)

async def _cat_file(self, path, start=None, end=None, **kwargs):
if start is None and end is None:
resp = await obs.get_async(self.store, path)
return await resp.bytes_async()

return await obs.get_range_async(self.store, path, start=start, end=end)

async def _cat_ranges(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you be able to add some tests for this? I think this function is the least stable of everything because it relies on custom code to split the ranges into per-file ranges and piece the outputs together again.

We should have some tests for multiple ranges in a single file, single range for multiple files, multiple ranges for multiple files, and maybe overlapping ranges too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll look into this. I strongly suspect that it will "just work" with the upstream fsspec code, actually, rather than having to carve out the per-file requests and repeat the code in rust. The only downside of that would be bytes copies in python.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I strongly suspect that it will "just work" with the upstream fsspec code, actually,

Yes, it would just work with the upstream code, but the benefit here is that we get request merging in the underlying Rust code. We should absolutely use the underlying get_ranges if we can; we just have to map between multi-file and single-file ranges.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

request merging in the underlying Rust code

as_opposed to doing it in python, you mean? I wonder if it makes a practical difference.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True. My position is mostly: we should use whatever tools object_store provides us, because the Rust code I'd guess will be slightly faster, which could make a difference for large-scale data loading with get_ranges, like in zarr.

I don't think it's too much work to have this helper function, and it would also be useful in places that don't depend on fsspec, like in the zarr PR: https://github.com/zarr-developers/zarr-python/pull/1661/files#diff-6f6bc4f5bf8c4e9eb8f9bb486d3699e17e4a4d6efc90b922c63a75d45cd7a9a6R47-R51

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I can agree with that reasoning.

self,
paths: List[str],
starts: List[int] | int,
ends: List[int] | int,
max_gap=None,
batch_size=None,
on_error="return",
**kwargs,
):
if isinstance(starts, int):
starts = [starts] * len(paths)
if isinstance(ends, int):
ends = [ends] * len(paths)
if not len(paths) == len(starts) == len(ends):
raise ValueError

per_file_requests: Dict[str, List[Tuple[int, int, int]]] = defaultdict(list)
for idx, (path, start, end) in enumerate(zip(paths, starts, ends)):
per_file_requests[path].append((start, end, idx))

futs: List[Coroutine[Any, Any, List[bytes]]] = []
for path, ranges in per_file_requests.items():
offsets = [r[0] for r in ranges]
ends = [r[1] for r in ranges]
fut = obs.get_ranges_async(self.store, path, starts=offsets, ends=ends)
futs.append(fut)

result = await asyncio.gather(*futs)

output_buffers: List[bytes] = [b""] * len(paths)
for per_file_request, buffers in zip(per_file_requests.items(), result):
path, ranges = per_file_request
for buffer, ranges_ in zip(buffers, ranges):
initial_index = ranges_[2]
output_buffers[initial_index] = buffer.as_bytes()

return output_buffers

async def _put_file(self, lpath, rpath, **kwargs):
with open(lpath, "rb") as f:
await obs.put_async(self.store, rpath, f)

async def _get_file(self, rpath, lpath, **kwargs):
with open(lpath, "wb") as f:
resp = await obs.get_async(self.store, rpath)
async for buffer in resp.stream():
f.write(buffer)

async def _info(self, path, **kwargs):
head = await obs.head_async(self.store, path)
return {
# Required of `info`: (?)
"name": head["path"],
"size": head["size"],
"type": "directory" if head["path"].endswith("/") else "file",
# Implementation-specific keys
"e_tag": head["e_tag"],
kylebarron marked this conversation as resolved.
Show resolved Hide resolved
"last_modified": head["last_modified"],
"version": head["version"],
}

async def _ls(self, path, detail=True, **kwargs):
result = await obs.list_with_delimiter_async(self.store, path)
objects = result["objects"]
prefs = result["common_prefixes"]
if detail:
return [
{
"name": object["path"],
"size": object["size"],
"type": "file",
"e_tag": object["e_tag"],
}
for object in objects
] + [{"name": object, "size": 0, "type": "directory"} for object in prefs]
else:
return sorted([object["path"] for object in objects] + prefs)

def _open(self, path, mode="rb", **kwargs):
"""Return raw bytes-mode file-like from the file-system"""
return BufferedFileSimple(self, path, mode, **kwargs)


class BufferedFileSimple(fsspec.spec.AbstractBufferedFile):
def __init__(self, fs, path, mode="rb", **kwargs):
if mode != "rb":
raise ValueError("Only 'rb' mode is currently supported")
super().__init__(fs, path, mode, **kwargs)

def read(self, length: int = -1):
"""Return bytes from the remote file

length: if positive, returns up to this many bytes; if negative, return all
remaining byets.
"""
if length < 0:
data = self.fs.cat_file(self.path, self.loc, self.size)
self.loc = self.size
else:
data = self.fs.cat_file(self.path, self.loc, self.loc + length)
self.loc += length
return data
6 changes: 6 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ dev-dependencies = [
"arro3-core>=0.4.2",
"black>=24.10.0",
"boto3>=1.35.38",
"fsspec>=2024.10.0",
"griffe-inherited-docstrings>=1.0.1",
"ipykernel>=6.29.5",
"maturin>=1.7.4",
Expand All @@ -21,6 +22,7 @@ dev-dependencies = [
"moto[s3,server]>=5.0.18",
"pandas>=2.2.3",
"pip>=24.2",
"pyarrow>=17.0.0",
"pytest-asyncio>=0.24.0",
"pytest>=8.3.3",
]
Expand All @@ -41,3 +43,7 @@ select = [
"F401", # Allow unused imports in __init__.py files
"F403", # unable to detect undefined names
]

[tool.pytest.ini_options]
addopts = "-v"
testpaths = ["tests"]
53 changes: 53 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import boto3
import pytest
import urllib3
from botocore import UNSIGNED
from botocore.client import Config
from moto.moto_server.threaded_moto_server import ThreadedMotoServer

from obstore.store import S3Store

TEST_BUCKET_NAME = "test"


# See docs here: https://docs.getmoto.org/en/latest/docs/server_mode.html
@pytest.fixture()
def moto_server_uri():
"""Fixture to run a mocked AWS server for testing."""
# Note: pass `port=0` to get a random free port.
server = ThreadedMotoServer(ip_address="localhost", port=0)
server.start()
if hasattr(server, "get_host_and_port"):
host, port = server.get_host_and_port()
else:
host, port = server._server.server_address
uri = f"http://{host}:{port}"
yield uri
server.stop()


@pytest.fixture()
def s3(moto_server_uri: str):
client = boto3.client(
"s3",
config=Config(signature_version=UNSIGNED),
region_name="us-east-1",
endpoint_url=moto_server_uri,
)
client.create_bucket(Bucket=TEST_BUCKET_NAME, ACL="public-read")
client.put_object(Bucket=TEST_BUCKET_NAME, Key="afile", Body=b"hello world")
yield moto_server_uri
urllib3.request(method="post", url=f"{moto_server_uri}/moto-api/reset")


@pytest.fixture()
def s3_store(s3):
return S3Store.from_url(
f"s3://{TEST_BUCKET_NAME}/",
config={
"AWS_ENDPOINT_URL": s3,
"AWS_REGION": "us-east-1",
"AWS_SKIP_SIGNATURE": "True",
"AWS_ALLOW_HTTP": "true",
},
)
65 changes: 4 additions & 61 deletions tests/store/test_s3.py
Original file line number Diff line number Diff line change
@@ -1,74 +1,17 @@
import boto3
import pytest
from botocore import UNSIGNED
from botocore.client import Config
from moto.moto_server.threaded_moto_server import ThreadedMotoServer

import obstore as obs
from obstore.store import S3Store

TEST_BUCKET_NAME = "test"


# See docs here: https://docs.getmoto.org/en/latest/docs/server_mode.html
@pytest.fixture(scope="module")
def moto_server_uri():
"""Fixture to run a mocked AWS server for testing."""
# Note: pass `port=0` to get a random free port.
server = ThreadedMotoServer(ip_address="localhost", port=0)
server.start()
host, port = server.get_host_and_port()
uri = f"http://{host}:{port}"
yield uri
server.stop()


@pytest.fixture()
def s3(moto_server_uri: str):
client = boto3.client(
"s3",
config=Config(signature_version=UNSIGNED),
region_name="us-east-1",
endpoint_url=moto_server_uri,
)
client.create_bucket(Bucket=TEST_BUCKET_NAME, ACL="public-read")
client.put_object(Bucket=TEST_BUCKET_NAME, Key="afile", Body=b"hello world")
return moto_server_uri


# @pytest.fixture(autouse=True)
# def reset_s3_fixture(moto_server_uri):
# import requests

# # We reuse the MotoServer for all tests
# # But we do want a clean state for every test
# try:
# requests.post(f"{moto_server_uri}/moto-api/reset")
# except:
# pass


@pytest.fixture()
def store(s3):
return S3Store.from_url(
f"s3://{TEST_BUCKET_NAME}/",
config={
"AWS_ENDPOINT_URL": s3,
"AWS_REGION": "us-east-1",
"AWS_SKIP_SIGNATURE": "True",
"AWS_ALLOW_HTTP": "true",
},
)


@pytest.mark.asyncio
async def test_list_async(store: S3Store):
list_result = await obs.list(store).collect_async()
async def test_list_async(s3_store: S3Store):
list_result = await obs.list(s3_store).collect_async()
assert any("afile" in x["path"] for x in list_result)


@pytest.mark.asyncio
async def test_get_async(store: S3Store):
resp = await obs.get_async(store, "afile")
async def test_get_async(s3_store: S3Store):
resp = await obs.get_async(s3_store, "afile")
buf = await resp.bytes_async()
assert buf == b"hello world"
Loading