diff --git a/crates/polars-io/src/cloud/credential_provider.rs b/crates/polars-io/src/cloud/credential_provider.rs index e6de837488c1..5cd366fb1a88 100644 --- a/crates/polars-io/src/cloud/credential_provider.rs +++ b/crates/polars-io/src/cloud/credential_provider.rs @@ -547,6 +547,7 @@ mod python_impl { #[cfg(feature = "azure")] fn into_azure_provider(self) -> object_store::azure::AzureCredentialProvider { + use object_store::azure::AzureAccessKey; use polars_error::{to_compute_err, PolarsResult}; use crate::cloud::credential_provider::{ @@ -556,8 +557,10 @@ mod python_impl { CredentialProviderFunction(Arc::new(move || { let func = self.0.clone(); Box::pin(async move { - let mut credentials = - object_store::azure::AzureCredential::BearerToken(String::new()); + let mut credentials = None; + + static VALID_KEYS_MSG: &str = + "valid configuration keys are: account_key, bearer_token"; let expiry = Python::with_gil(|py| { let v = func.0.call0(py)?.into_bound(py); @@ -568,17 +571,23 @@ mod python_impl { let k = k.extract::()?; let v = v.extract::()?; - // We only support bearer for now match k.as_ref() { + "account_key" => { + credentials = + Some(object_store::azure::AzureCredential::AccessKey( + AzureAccessKey::try_new(v.as_str()).map_err(|e| { + PyValueError::new_err(e.to_string()) + })?, + )) + }, "bearer_token" => { credentials = - object_store::azure::AzureCredential::BearerToken(v) + Some(object_store::azure::AzureCredential::BearerToken(v)) }, v => { return pyo3::PyResult::Err(PyValueError::new_err(format!( - "unknown configuration key for azure: {}, \ - valid configuration keys are: {}", - v, "bearer_token", + "unknown configuration key for azure: {}, {}", + v, VALID_KEYS_MSG ))) }, } @@ -588,16 +597,15 @@ mod python_impl { }) .map_err(to_compute_err)?; - let object_store::azure::AzureCredential::BearerToken(bearer) = &credentials - else { - unreachable!() - }; - - if bearer.is_empty() { + let Some(credentials) = credentials else { return Err(PolarsError::ComputeError( - "bearer was empty or not given".into(), + format!( + "did not find a valid configuration key for azure, {}", + VALID_KEYS_MSG + ) + .into(), )); - } + }; PolarsResult::Ok((ObjectStoreCredential::Azure(Arc::new(credentials)), expiry)) }) diff --git a/py-polars/docs/source/reference/io.rst b/py-polars/docs/source/reference/io.rst index bdfe93ee9ebe..034a597cd04e 100644 --- a/py-polars/docs/source/reference/io.rst +++ b/py-polars/docs/source/reference/io.rst @@ -127,4 +127,5 @@ Configuration for cloud credential provisioning. CredentialProvider CredentialProviderAWS + CredentialProviderAzure CredentialProviderGCP diff --git a/py-polars/polars/__init__.py b/py-polars/polars/__init__.py index be9cdc3d7640..67b4413d2b61 100644 --- a/py-polars/polars/__init__.py +++ b/py-polars/polars/__init__.py @@ -182,6 +182,7 @@ from polars.io.cloud import ( CredentialProvider, CredentialProviderAWS, + CredentialProviderAzure, CredentialProviderFunction, CredentialProviderFunctionReturn, CredentialProviderGCP, @@ -280,6 +281,7 @@ # polars.io.cloud "CredentialProvider", "CredentialProviderAWS", + "CredentialProviderAzure", "CredentialProviderFunction", "CredentialProviderFunctionReturn", "CredentialProviderGCP", diff --git a/py-polars/polars/io/cloud/__init__.py b/py-polars/polars/io/cloud/__init__.py index f5ef9c5fd0bf..0dfa3717184e 100644 --- a/py-polars/polars/io/cloud/__init__.py +++ b/py-polars/polars/io/cloud/__init__.py @@ -1,6 +1,7 @@ from polars.io.cloud.credential_provider import ( CredentialProvider, CredentialProviderAWS, + CredentialProviderAzure, CredentialProviderFunction, CredentialProviderFunctionReturn, CredentialProviderGCP, @@ -9,6 +10,7 @@ __all__ = [ "CredentialProvider", "CredentialProviderAWS", + "CredentialProviderAzure", "CredentialProviderFunction", "CredentialProviderFunctionReturn", "CredentialProviderGCP", diff --git a/py-polars/polars/io/cloud/_utils.py b/py-polars/polars/io/cloud/_utils.py index 91ad65a4ca94..4821ab4dcb91 100644 --- a/py-polars/polars/io/cloud/_utils.py +++ b/py-polars/polars/io/cloud/_utils.py @@ -37,5 +37,9 @@ def _is_aws_cloud(scheme: str) -> bool: return any(scheme == x for x in ["s3", "s3a"]) +def _is_azure_cloud(scheme: str) -> bool: + return any(scheme == x for x in ["az", "azure", "adl", "abfs", "abfss"]) + + def _is_gcp_cloud(scheme: str) -> bool: return any(scheme == x for x in ["gs", "gcp", "gcs"]) diff --git a/py-polars/polars/io/cloud/credential_provider.py b/py-polars/polars/io/cloud/credential_provider.py index dd98683f316e..e851a76ae958 100644 --- a/py-polars/polars/io/cloud/credential_provider.py +++ b/py-polars/polars/io/cloud/credential_provider.py @@ -2,7 +2,9 @@ import abc import importlib.util +import json import os +import subprocess import sys import zoneinfo from typing import IO, TYPE_CHECKING, Any, Callable, Literal, Optional, TypedDict, Union @@ -139,6 +141,147 @@ def _check_module_availability(cls) -> None: raise ImportError(msg) +class CredentialProviderAzure(CredentialProvider): + """ + Azure Credential Provider. + + Using this requires the `azure-identity` Python package to be installed. + + .. warning:: + This functionality is considered **unstable**. It may be changed + at any point without it being considered a breaking change. + """ + + def __init__( + self, + *, + scopes: list[str] | None = None, + storage_account: str | None = None, + _verbose: bool = False, + ) -> None: + """ + Initialize a credential provider for Microsoft Azure. + + This uses `azure.identity.DefaultAzureCredential()`. + + Parameters + ---------- + scopes + Scopes to pass to `get_token` + storage_account + If specified, an attempt will be made to retrieve the account keys + for this account using the Azure CLI. If this is successful, the + account keys will be used instead of + `DefaultAzureCredential.get_token()` + """ + msg = "`CredentialProviderAzure` functionality is considered unstable" + issue_unstable_warning(msg) + + self._check_module_availability() + + self.account_name = storage_account + # Done like this to bypass mypy, we don't have stubs for azure.identity + self.credential = importlib.import_module("azure.identity").__dict__[ + "DefaultAzureCredential" + ]() + self.scopes = scopes if scopes is not None else ["https://storage.azure.com/"] + self._verbose = _verbose + + if self._verbose: + print( + ( + "CredentialProviderAzure " + f"{self.account_name = } " + f"{self.scopes = } " + ), + file=sys.stderr, + ) + + def __call__(self) -> CredentialProviderFunctionReturn: + """Fetch the credentials.""" + if self.account_name is not None: + try: + creds = { + "account_key": self._get_azure_storage_account_key_az_cli( + self.account_name + ) + } + + if self._verbose: + print( + "[CredentialProviderAzure]: retrieved account keys from Azure CLI", + file=sys.stderr, + ) + except Exception as e: + if self._verbose: + print( + f"[CredentialProviderAzure]: failed to retrieve account keys from Azure CLI: {e}", + file=sys.stderr, + ) + else: + return creds, None # type: ignore[return-value] + + token = self.credential.get_token(*self.scopes) + + return { + "bearer_token": token.token, + }, token.expires_on + + @classmethod + def _check_module_availability(cls) -> None: + if importlib.util.find_spec("azure.identity") is None: + msg = "azure-identity must be installed to use `CredentialProviderAzure`" + raise ImportError(msg) + + @staticmethod + def _extract_adls_uri_storage_account(uri: str) -> str | None: + # "abfss://{CONTAINER}@{STORAGE_ACCOUNT}.dfs.core.windows.net/" + # ^^^^^^^^^^^^^^^^^ + try: + return ( + uri.split("://", 1)[1] + .split("/", 1)[0] + .split("@", 1)[1] + .split(".dfs.core.windows.net", 1)[0] + ) + + except IndexError: + return None + + @staticmethod + def _get_azure_storage_account_key_az_cli(account_name: str) -> str: + az_cmd = [ + "az", + "storage", + "account", + "keys", + "list", + "--output", + "json", + "--account-name", + account_name, + ] + + cmd = az_cmd if sys.platform != "win32" else ["cmd", "/C", *az_cmd] + + # [ + # { + # "creationTime": "1970-01-01T00:00:00.000000+00:00", + # "keyName": "key1", + # "permissions": "FULL", + # "value": "..." + # }, + # { + # "creationTime": "1970-01-01T00:00:00.000000+00:00", + # "keyName": "key2", + # "permissions": "FULL", + # "value": "..." + # } + # ] + + return json.loads(subprocess.check_output(cmd))[0]["value"] + + class CredentialProviderGCP(CredentialProvider): """ GCP Credential Provider. @@ -165,7 +308,7 @@ def __init__( ---------- Parameters are passed to `google.auth.default()` """ - msg = "`CredentialProviderAWS` functionality is considered unstable" + msg = "`CredentialProviderGCP` functionality is considered unstable" issue_unstable_warning(msg) self._check_module_availability() @@ -194,7 +337,7 @@ def __init__( self.creds = creds def __call__(self) -> CredentialProviderFunctionReturn: - """Fetch the credentials for the configured profile name.""" + """Fetch the credentials.""" import google.auth.transport.requests self.creds.refresh(google.auth.transport.requests.__dict__["Request"]()) @@ -238,6 +381,7 @@ def _maybe_init_credential_provider( _first_scan_path, _get_path_scheme, _is_aws_cloud, + _is_azure_cloud, _is_gcp_cloud, ) @@ -265,6 +409,13 @@ def _maybe_init_credential_provider( provider = ( CredentialProviderAWS() if _is_aws_cloud(scheme) + else CredentialProviderAzure( + storage_account=( + CredentialProviderAzure._extract_adls_uri_storage_account(str(path)) + ), + _verbose=verbose, + ) + if _is_azure_cloud(scheme) else CredentialProviderGCP() if _is_gcp_cloud(scheme) else None diff --git a/py-polars/polars/meta/versions.py b/py-polars/polars/meta/versions.py index f9f631dac5fd..d734f5904b9a 100644 --- a/py-polars/polars/meta/versions.py +++ b/py-polars/polars/meta/versions.py @@ -68,6 +68,7 @@ def _get_dependency_list() -> list[str]: return [ "adbc_driver_manager", "altair", + "azure.identity", "boto3", "cloudpickle", "connectorx",