Source code for foundry_dev_tools.clients.s3_client

"""S3Client for the S3 compatible dataset API."""

from __future__ import annotations

import time
from typing import TYPE_CHECKING

from foundry_dev_tools.errors.handling import raise_foundry_api_error
from foundry_dev_tools.utils.misc import parse_iso

if TYPE_CHECKING:
    import aiobotocore
    import boto3
    import requests

    from foundry_dev_tools.config.context import FoundryContext


[docs] class S3Client: """The S3 compatible dataset API."""
[docs] def __init__(self, context: FoundryContext): self.context = context self._credentials: dict | None = None self._credentials_expiration = 0.0
[docs] def get_url(self) -> str: """Return the s3 endpoint url.""" return self.context.host.url + "/io/s3"
[docs] def get_s3fs_storage_options(self) -> dict: """Get the foundry s3 credentials in the s3fs storage_options format. Example: >>> ctx = FoundryContext() >>> storage_options = ctx.s3.get_s3fs_storage_options() >>> df = pd.read_parquet( ... "s3://ri.foundry.main.dataset.<uuid>/spark", storage_options=storage_options ... ) """ return { "session": self._get_aiobotocore_session(), "endpoint_url": self.get_url(), }
[docs] def get_polars_storage_options(self) -> dict: """Get the foundry s3 credentials in the format that polars expects. https://docs.rs/object_store/latest/object_store/aws/enum.AmazonS3ConfigKey.html Example: >>> ctx = FoundryContext() >>> storage_options = ctx.s3.get_polars_storage_options() >>> df = pl.read_parquet( ... "s3://ri.foundry.main.dataset.<uuid>/**/*.parquet", storage_options=storage_options ... ) """ credentials = self.get_credentials() return { "aws_access_key_id": credentials["access_key"], "aws_secret_access_key": credentials["secret_key"], "aws_session_token": credentials["token"], "aws_region": "foundry", "aws_endpoint": self.get_url(), "aws_virtual_hosted_style_request": "false", }
[docs] def get_duckdb_create_secret_string(self) -> str: """Returns a CREATE SECRET SQL String with Foundry Configuration. https://duckdb.org/docs/extensions/httpfs/s3api.html#config-provider Example: >>> ctx = FoundryContext() >>> con.execute(ctx.s3.get_duckdb_create_secret_string()) >>> df = con.execute( ... "SELECT * FROM read_parquet('s3://ri.foundry.main.dataset.<uuid>/**/*.parquet') LIMIT 1;" ... ).df() """ credentials = self.get_credentials() endpoint = self.context.host.domain + "/io/s3" return """ CREATE OR REPLACE SECRET foundryConnection ( TYPE S3, KEY_ID '{access_key}', SECRET '{secret_key}', SESSION_TOKEN '{token}', ENDPOINT '{endpoint}', URL_STYLE 'path', REGION 'foundry' ); """.format(**credentials, endpoint=endpoint)
def _get_boto3_session(self) -> boto3.Session: """Returns the boto3 session with foundry s3 credentials applied. See :py:attr:`foundry_dev_tools.clients.s3_client.api_assume_role_with_webidentity`. """ import boto3 import botocore.session from foundry_dev_tools.utils.s3 import CustomFoundryCredentialProvider session = botocore.session.Session() session.set_config_variable("region", "foundry") cred_provider = session.get_component("credential_provider") cred_provider.insert_before("env", CustomFoundryCredentialProvider(self, session)) return boto3.Session(botocore_session=session) def _get_aiobotocore_session(self) -> aiobotocore.AioSession: """Returns an aiobotocore session with foundry s3 credentials applied. See :py:attr:`foundry_dev_tools.clients.s3_client.api_assume_role_with_webidentity`. """ import aiobotocore.session from foundry_dev_tools.utils.async_s3 import ( CustomAsyncFoundryCredentialProvider, ) session = aiobotocore.session.AioSession() session.set_config_variable("region", "foundry") cred_provider = session.get_component("credential_provider") cred_provider.insert_before("env", CustomAsyncFoundryCredentialProvider(self, session)) return session
[docs] def get_boto3_client(self, **kwargs): # noqa: ANN201 """Returns the boto3 s3 client with credentials applied and endpoint url set. See :py:attr:`foundry_dev_tools.clients.s3_client.api_assume_role_with_webidentity`. Example: >>> from foundry_dev_tools import FoundryContext >>> ctx = FoundryContext() >>> s3_client = ctx.s3.get_boto3_client() >>> s3_client Args: **kwargs: gets passed to :py:meth:`boto3.session.Session.client`, `endpoint_url` will be overwritten """ kwargs["endpoint_url"] = self.get_url() return self._get_boto3_session().client("s3", **kwargs)
[docs] def get_boto3_resource(self, **kwargs): # noqa: ANN201 """Returns boto3 s3 resource with credentials applied and endpoint url set. Args: **kwargs: gets passed to :py:meth:`boto3.session.Session.resource`, `endpoint_url` will be overwritten """ kwargs["endpoint_url"] = self.get_url() return self._get_boto3_session().resource("s3", **kwargs)
[docs] def get_credentials(self, expiration_duration: int = 3600) -> dict: """Parses the AssumeRoleWithWebIdentity response and caches the credentials. See :py:attr:`foundry_dev_tools.clients.s3_client.api_assume_role_with_webidentity`. """ if ( self._credentials is None or time.time() > self._credentials_expiration or ( (t := self._credentials_expiration - time.time()) > expiration_duration ) # if the expiration duration is larger than the requested expiration or (t < 900) # less than 15 minutes, recommended by the boto library. ): from foundry_dev_tools.utils.s3 import parse_s3_credentials_response self._credentials = parse_s3_credentials_response( self.api_assume_role_with_webidentity(expiration_duration).text, ) self._credentials_expiration = parse_iso(self._credentials["expiry_time"]).timestamp() return self._credentials
[docs] def api_assume_role_with_webidentity(self, expiration_duration: int = 3600) -> requests.Response: """Calls the AssumeRoleWithWebIdentity API to get temporary S3 credentials. Args: expiration_duration: seconds the credentials should be valid, defaults to 3600 (the upper bound) """ # does not call the api_request method, as this is not a regular api resp = self.context.client.post( self.get_url(), params={ "Action": "AssumeRoleWithWebIdentity", "WebIdentityToken": self.context.token_provider.token, "DurationSeconds": expiration_duration, }, auth=lambda x: x, # stub auth function that is normally set by the token provider ) raise_foundry_api_error(resp) return resp