Source code for foundry_dev_tools.utils.s3
"""Custom foundry boto3 credential provider used by the FoundryRestClient."""
from __future__ import annotations
from typing import TYPE_CHECKING
import botocore.client
import botocore.credentials
import botocore.session
if TYPE_CHECKING:
from foundry_dev_tools.clients.s3_client import S3Client
[docs]
class CustomFoundryCredentialProvider(botocore.credentials.CredentialProvider):
"""Boto3 credential provider for s3 credentials."""
METHOD = "foundry"
CANONICAL_NAME = "foundry"
[docs]
def __init__(
self,
s3_client: S3Client,
session: botocore.session.Session | None = None,
):
self.s3_client = s3_client
super().__init__(session)
[docs]
def load(self) -> botocore.credentials.DeferredRefreshableCredentials:
"""Return the credentials from FoundryRestClient."""
return botocore.credentials.DeferredRefreshableCredentials(
self.s3_client.get_credentials,
method="sts-assume-role",
)
[docs]
def parse_s3_credentials_response(requests_response_text: str) -> dict:
"""Parses the AssumeRoleWithWebIdentity XML response."""
return {
"access_key": requests_response_text[
requests_response_text.find("<AccessKeyId>") + len("<AccessKeyId>") : requests_response_text.rfind(
"</AccessKeyId>",
)
],
"secret_key": requests_response_text[
requests_response_text.find("<SecretAccessKey>") + len("<SecretAccessKey>") : requests_response_text.rfind(
"</SecretAccessKey>",
)
],
"token": requests_response_text[
requests_response_text.find("<SessionToken>") + len("<SessionToken>") : requests_response_text.rfind(
"</SessionToken>",
)
],
"expiry_time": requests_response_text[
requests_response_text.find("<Expiration>") + len("<Expiration>") : requests_response_text.rfind(
"</Expiration>",
)
],
}