Source code for foundry_dev_tools.utils.compat
"""This module contains utility functions for compatibility between the v1 and v2 config."""
from __future__ import annotations
import os
import warnings
from urllib.parse import urlparse
from foundry_dev_tools.config.config import TokenProvider, get_config_dict, parse_credentials_config
from foundry_dev_tools.config.config_types import DEFAULT_SCHEME
[docs]
def v1_to_v2_config_dict(config: dict, env: bool = True, get_config: bool = True) -> dict: # noqa: C901
"""Converts the `config` argument to the new v2 config.
Args:
config: the v1 dictionary to convert
env: whether to include the environment variables in the conversion
get_config: wether to get the config from files and merge it
"""
_config = (get_config_dict(env=env) if get_config else None) or {}
jwt, client_id, client_secret, foundry_url, grant_type, scopes = (
config.pop("jwt", False),
config.pop("client_id", False),
config.pop("client_secret", False),
config.pop("foundry_url", None),
config.pop("grant_type", False),
config.pop("scopes", False),
)
domain = None
scheme = None
if jwt or foundry_url or client_id is not False:
_config.setdefault("credentials", {})
if foundry_url:
parsed_foundry_url = urlparse(foundry_url)
if not parsed_foundry_url.scheme:
msg = f"{foundry_url} is not a valid URL <scheme>://<domain>"
raise AttributeError(msg)
domain = parsed_foundry_url.netloc
scheme = parsed_foundry_url.scheme
_config["credentials"]["domain"] = domain
if scheme:
_config["credentials"]["scheme"] = scheme
if "credentials" in _config and (scheme := _config["credentials"].get("scheme")) and scheme == DEFAULT_SCHEME:
del _config["credentials"]["scheme"]
if jwt:
_config["credentials"]["jwt"] = jwt
elif client_id is not False:
_config["credentials"].setdefault("oauth", {})
_config["credentials"]["oauth"]["client_id"] = client_id
if "oauth" in _config.get("credentials", {}):
if client_secret is not False:
_config["credentials"]["oauth"]["client_secret"] = client_secret
if grant_type is not False:
_config["credentials"]["oauth"]["grant_type"] = grant_type
if scopes is not False:
_config["credentials"]["oauth"]["scopes"] = scopes
return _config
[docs]
def v1_to_v2_config(config: dict) -> tuple[TokenProvider, dict]:
"""Converts a version 1 configuration to a version 2 configuration.
Args:
config (dict): The version 1 configuration to be converted.
Returns:
tuple[TokenProvider, dict]: A tuple containing the TokenProvider and the converted configuration.
Raises:
AttributeError: If the TokenProvider cannot be created due to missing jwt or client_id(/client_secret).
"""
_config = v1_to_v2_config_dict(config)
tp = parse_credentials_config(_config)
if c := _config.get("config"):
c.update(config)
_config["config"] = c
else:
_config["config"] = config
return tp, _config
[docs]
def get_v1_environment_variables() -> dict:
"""Parses the v1 environment variables and converts them to a v2 dictionary.
Returns:
dict: A dictionary containing the processed v1 environment variables.
"""
v1_env_config = {}
for k, v in os.environ.items():
if k.startswith("FOUNDRY_DEV_TOOLS_"):
v1_env_config[k.removeprefix("FOUNDRY_DEV_TOOLS_").lower()] = v
if len(v1_env_config) > 0:
warnings.warn(
"The v1 environment variables are deprecated, please use the v2 environment variables instead",
DeprecationWarning,
)
return v1_to_v2_config_dict(v1_env_config, env=False, get_config=False)