Token provider implementation#
A token provider always needs to have the attributes host
and token
and implement the class TokenProvider
And a token provider always has a requests_auth_handler
method.
The simplest implementation of a token provider is the JWTTokenProvider
:
class JWTTokenProvider(TokenProvider):
"""Provides Host and Token."""
def __init__(self, host: Host | str, jwt: Token) -> None:
"""Initialize the JWTTokenProvider.
Args:
host: the foundry host
jwt: the jwt token
"""
super().__init__(host)
self._jwt = jwt
@cached_property
def token(self) -> Token:
"""Returns the token supplied when creating this Provider."""
return self._jwt
Note
Every variable in the token provider that does not start with a _
will be displayed in the config cli in plaintext.
For secrets like a jwt token, prefix it with a _
, these will be shown as not set if they are None or otherwise shown as set, but the value will not be displayed
If it shouldn’t be displayed at all prefix it with __
, for example like __cached
in the OAuthTokenProvider
.
The token
property just returns the supplied jwt token via the jwt
parameter.
And host
is set to the host
parameter from the constructor.
requests auth handler#
The requests auth handler is a method that takes a PreparedRequest
as an argument, modifies it and returns it.
See also
Requests documentation: https://requests.readthedocs.io/en/latest/user/authentication/#new-forms-of-authentication
This method will be set as the auth
attribute by default in the ContextHTTPClient
Cached token provider#
There is also a CachedTokenProvider
implementation, which does not work on its own but is a building block for other token providers.
The builtin OAuthTokenProvider
is based on it.
class CachedTokenProvider(TokenProvider):
"""Parent class for token providers which get their token dynamically and need caching."""
_cached: Token | None = None
_valid_until: float = -1
# time to remove from expiry
# e.g. it will request a new token if your token expires in 5 seconds
_clock_skew: int = 10
def invalidate_cache(self):
"""Invalidates the token cache."""
self._cached = None
self._valid_until = -1
def _request_token(self) -> tuple[Token, float]:
"""Requests the token from the dynamic source."""
msg = "This needs to be implemented by a class, this is just the meta class."
raise NotImplementedError(msg)
@property
def token(self) -> Token:
"""Returns the token from a dynamic source and caches it."""
if not self._cached or self._valid_until < time.time() + 10:
self._cached, self._valid_until = self._request_token()
return self._cached
A simple example implementation could look like this:
import time
from foundry_dev_tools.config.token_provider import CachedTokenProvider
class ExampleCachedTokenProvider(CachedTokenProvider):
def _request_token(self):
# expire in 100 seconds
return "token", time.time() + 100
To implement from CachedTokenProvider
you only need to change the _request_token method.
The method only needs to return a tuple with a token and the timestamp when the token will expire.
More configuration details#
See also
These steps happen when the credentials configuration gets parsed by the parse_credentials_config
method:
def parse_credentials_config(config_dict: dict | None) -> TokenProvider:
"""Parses the credentials config dictionary and returns a TokenProvider object."""
# check if there is a credentials config present
if config_dict is not None and (credentials_config := config_dict.get("credentials")):
# a domain must always be provided
if "domain" not in credentials_config:
raise MissingFoundryHostError
# create a host object with the domain and the optional scheme setting
host = Host(credentials_config.pop("domain"), credentials_config.pop("scheme", None))
# get the token provider config setting, if it does not exist use an empty dict
try:
tp_name, tp_config = credentials_config.popitem()
# make it possible to do jwt = "eyJ" instead of jwt = {jwt="eyJ"}
if tp_config is None or len(tp_config) == 0:
tp_config = {}
elif not isinstance(tp_config, dict):
tp_config = {tp_name: tp_config}
except KeyError:
tp_name, tp_config = None, None
if tp_name:
if mapped_class := TOKEN_PROVIDER_MAPPING.get(tp_name):
# check the config kwargs and pass the valid kwargs to the mapped class
return mapped_class(**check_init(mapped_class, "credentials", {"host": host, **tp_config}))
# if the token_provider name was set but not present in the mapping
msg = f"The token provider implementation {tp_name} does not exist."
raise TokenProviderConfigError(msg)
# use flask/dash/streamlit provider when used in the app service
if "APP_SERVICE_TS" in os.environ:
return AppServiceTokenProvider(host=host)
raise MISSING_TP_ERROR
raise MissingCredentialsConfigError