Token provider implementation#

A token provider always needs to have the attributes host and token and implement the class TokenProvider And a token provider always has a requests_auth_handler method.

The simplest implementation of a token provider is the JWTTokenProvider:

class JWTTokenProvider(TokenProvider):
    """Provides Host and Token."""

    def __init__(self, host: Host | str, jwt: Token) -> None:
        """Initialize the JWTTokenProvider.

        Args:
            host: the foundry host
            jwt: the jwt token
        """
        super().__init__(host)
        self._jwt = jwt

    @cached_property
    def token(self) -> Token:
        """Returns the token supplied when creating this Provider."""
        return self._jwt

Note

Every variable in the token provider that does not start with a _ will be displayed in the config cli in plaintext. For secrets like a jwt token, prefix it with a _, these will be shown as not set if they are None or otherwise shown as set, but the value will not be displayed If it shouldn’t be displayed at all prefix it with __, for example like __cached in the OAuthTokenProvider.

The token property just returns the supplied jwt token via the jwt parameter. And host is set to the host parameter from the constructor.

requests auth handler#

The requests auth handler is a method that takes a PreparedRequest as an argument, modifies it and returns it.

This method will be set as the auth attribute by default in the ContextHTTPClient

Cached token provider#

There is also a CachedTokenProvider implementation, which does not work on its own but is a building block for other token providers. The builtin OAuthTokenProvider is based on it.

class CachedTokenProvider(TokenProvider):
    """Parent class for token providers which get their token dynamically and need caching."""

    _cached: Token | None = None
    _valid_until: float = -1
    # time to remove from expiry
    # e.g. it will request a new token if your token expires in 5 seconds
    _clock_skew: int = 10

    def invalidate_cache(self):
        """Invalidates the token cache."""
        self._cached = None
        self._valid_until = -1

    def _request_token(self) -> tuple[Token, float]:
        """Requests the token from the dynamic source."""
        msg = "This needs to be implemented by a class, this is just the meta class."
        raise NotImplementedError(msg)

    @property
    def token(self) -> Token:
        """Returns the token from a dynamic source and caches it."""
        if not self._cached or self._valid_until < time.time() + 10:
            self._cached, self._valid_until = self._request_token()
        return self._cached

A simple example implementation could look like this:

import time

from foundry_dev_tools.config.token_provider import CachedTokenProvider


class ExampleCachedTokenProvider(CachedTokenProvider):
    def _request_token(self):
        # expire in 100 seconds
        return "token", time.time() + 100

To implement from CachedTokenProvider you only need to change the _request_token method. The method only needs to return a tuple with a token and the timestamp when the token will expire.

More configuration details#

These steps happen when the credentials configuration gets parsed by the parse_credentials_config method:

def parse_credentials_config(config_dict: dict | None) -> TokenProvider:
    """Parses the credentials config dictionary and returns a TokenProvider object."""
    # check if there is a credentials config present
    if config_dict is not None and (credentials_config := config_dict.get("credentials")):
        # a domain must always be provided
        if "domain" not in credentials_config:
            raise MissingFoundryHostError

        # create a host object with the domain and the optional scheme setting
        host = Host(credentials_config.pop("domain"), credentials_config.pop("scheme", None))
        # get the token provider config setting, if it does not exist use an empty dict
        try:
            tp_name, tp_config = credentials_config.popitem()
            # make it possible to do jwt = "eyJ" instead of jwt = {jwt="eyJ"}
            if tp_config is None or len(tp_config) == 0:
                tp_config = {}
            elif not isinstance(tp_config, dict):
                tp_config = {tp_name: tp_config}
        except KeyError:
            tp_name, tp_config = None, None

        if tp_name:
            if mapped_class := TOKEN_PROVIDER_MAPPING.get(tp_name):
                # check the config kwargs and pass the valid kwargs to the mapped class
                return mapped_class(**check_init(mapped_class, "credentials", {"host": host, **tp_config}))

            # if the token_provider name was set but not present in the mapping
            msg = f"The token provider implementation {tp_name} does not exist."
            raise TokenProviderConfigError(msg)

        # use flask/dash/streamlit provider when used in the app service
        if "APP_SERVICE_TS" in os.environ:
            return AppServiceTokenProvider(host=host)
        raise MISSING_TP_ERROR
    raise MissingCredentialsConfigError