From 5cc09827bdc0b9b2a76a4e023c26d77ac79459a3 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Mon, 30 Oct 2023 09:02:26 -0500 Subject: [PATCH] Use PyArrow dataset for metadata --- dask_expr/io/parquet.py | 38 +++++++++++++++++++++++++++++--------- 1 file changed, 29 insertions(+), 9 deletions(-) diff --git a/dask_expr/io/parquet.py b/dask_expr/io/parquet.py index 401e4cd0..3e0a56cd 100644 --- a/dask_expr/io/parquet.py +++ b/dask_expr/io/parquet.py @@ -395,13 +395,17 @@ def filesystem(self): import boto3 from pyarrow.fs import S3FileSystem + bucket = self.path[5:].split("/")[0] session = boto3.session.Session() credentials = session.get_credentials() + region = session.client("s3").get_bucket_location(Bucket=bucket)[ + "LocationConstraint" + ] return S3FileSystem( secret_key=credentials.secret_key, access_key=credentials.access_key, - region="us-east-2", # TODO + region=region, session_token=credentials.token, ) else: @@ -510,8 +514,27 @@ def meta_and_filenames(path): if str(path).startswith("s3://"): import s3fs - s3 = s3fs.S3FileSystem() - filenames = s3.ls(path) + filenames = s3fs.S3FileSystem().ls(path) + + import boto3 + from pyarrow.fs import S3FileSystem + + session = boto3.session.Session() + credentials = session.get_credentials() + + bucket = path[5:].split("/")[0] + region = session.client("s3").get_bucket_location(Bucket=bucket)[ + "LocationConstraint" + ] + + filesystem = S3FileSystem( + secret_key=credentials.secret_key, + access_key=credentials.access_key, + region=region, + session_token=credentials.token, + ) + path = path[5:] + else: import glob import os @@ -521,14 +544,11 @@ def meta_and_filenames(path): else: filenames = [path] # TODO: split by row group -<<<<<<< HEAD - import dask.dataframe as dd - meta = dd.read_parquet(path)._meta -======= - ds = pq.ParquetDataset(path) + filesystem = None + + ds = pq.ParquetDataset(path, filesystem=filesystem) t = pa.Table.from_pylist([], schema=ds.schema) meta = t.to_pandas(types_mapper=types_mapper) ->>>>>>> d037c82 (Grab meta from arrow rather than dask.dataframe) return meta, filenames