"""Contains FoundryRestClient and FoundrySqlClient and exception classes.
One of the gaols of this module is to be self-contained so that it can be
dropped into any python installation with minimal dependency to 'requests'
Optional dependencies for the SQL functionality to work are pandas and pyarrow.
"""
from __future__ import annotations
import logging
import os
import shutil
import tempfile
import warnings
from contextlib import contextmanager
from os import PathLike
from pathlib import Path
from typing import IO, TYPE_CHECKING, AnyStr, Literal, overload
from foundry_dev_tools.config.config import (
get_config_dict,
parse_credentials_config,
parse_general_config,
path_from_path_or_str,
)
from foundry_dev_tools.config.context import FoundryContext
from foundry_dev_tools.errors.compass import ResourceNotFoundError
from foundry_dev_tools.errors.dataset import (
DatasetHasNoSchemaError,
DatasetHasNoTransactionsError,
DatasetNoReadAccessError,
DatasetNotFoundError,
)
from foundry_dev_tools.errors.handling import ErrorHandlingConfig
from foundry_dev_tools.utils import api_types
from foundry_dev_tools.utils.api_types import assert_in_literal
from foundry_dev_tools.utils.compat import v1_to_v2_config
if TYPE_CHECKING:
from collections.abc import Iterator
import pandas as pd
import pyarrow as pa
import pyspark
import requests
LOGGER = logging.getLogger(__name__)
[docs]
class FoundryRestClient:
"""Foundry RestClient compatibility shim for Foundry DevTools v2."""
[docs]
def __init__(self, config: dict | None = None, ctx: FoundryContext | None = None):
"""Create an instance of FoundryRestClient.
Args:
config: config dictionary which tries to get parsed into the v2 configuration, to be backwards compatible
ctx: or just pass the v2 FoundryContext directly instead of the 'old' configuration,
the config dict will be ignored
Examples:
>>> fc = FoundryRestClient()
>>> fc = FoundryRestClient(config={"jwt": "<token>"})
>>> fc = FoundryRestClient(config={"client_id": "<client_id>"})
>>> ctx = FoundryContext()
>>> fc = FoundryRestClient(ctx=ctx)
"""
if ctx:
self.ctx = ctx
else:
if config:
tp, _config = v1_to_v2_config(config)
else:
_config = get_config_dict()
tp = parse_credentials_config(_config)
self.ctx = FoundryContext(parse_general_config(_config), tp)
[docs]
def create_dataset(self, dataset_path: api_types.FoundryPath) -> dict:
"""Creates an empty dataset in Foundry.
Args:
dataset_path: Path in Foundry, where this empty dataset should be created
for example: /Global/Foundry Operations/Foundry Support/iris_new
Returns:
dict:
with keys rid and fileSystemId.
The key rid contains the dataset_rid which is the unique identifier of a dataset.
"""
return self.ctx.catalog.api_create_dataset(dataset_path).json()
[docs]
def get_dataset(self, dataset_rid: api_types.DatasetRid) -> dict:
"""Gets dataset_rid and fileSystemId.
Args:
dataset_rid (str): Dataset rid
Returns:
dict:
with the keys rid and fileSystemId
Raises:
DatasetNotFoundError: if dataset does not exist
"""
return self.ctx.catalog.api_get_dataset(dataset_rid).json()
[docs]
def delete_dataset(self, dataset_rid: api_types.DatasetRid):
"""Deletes a dataset in Foundry and moves it to trash.
Args:
dataset_rid (str): Unique identifier of the dataset
Raises:
DatasetNotFoundError: if dataset does not exist
"""
self.ctx.catalog.api_delete_dataset(dataset_rid)
self.move_resource_to_trash(rid=dataset_rid)
[docs]
def move_resource_to_trash(self, rid: api_types.Rid):
"""Moves a Compass resource (e.g. dataset or folder) to trash.
Args:
rid (str): rid of the resource
"""
self.ctx.compass.api_add_to_trash({rid})
[docs]
def create_branch(
self,
dataset_rid: api_types.DatasetRid,
branch: str,
parent_branch_id: str | None = None,
parent_branch: api_types.TransactionRid | None = None,
) -> dict:
"""Creates a new branch in a dataset.
If dataset is 'new', only parameter dataset_rid and branch are required.
Args:
dataset_rid: Unique identifier of the dataset
branch: The branch name to create
parent_branch: The transaction id, to branch off
parent_branch_id: The name of the parent branch, if empty creates new root branch
Returns:
dict:
the response as a json object
"""
return self.ctx.catalog.api_create_branch(dataset_rid, branch, parent_branch, parent_branch_id).json()
[docs]
def update_branch(
self,
dataset_rid: api_types.Rid,
branch: str,
parent_branch: str | api_types.TransactionRid | None = None,
) -> dict:
"""Updates the latest transaction of branch 'branch' to the latest transaction of branch 'parent_branch'.
Args:
dataset_rid: Unique identifier of the dataset
branch: The branch to update (e.g. master)
parent_branch: the name of the branch to copy the last transaction from or a transaction rid
Returns:
dict:
example below for the branch response
.. code-block:: python
{
"id": "..",
"rid": "ri.foundry.main.branch...",
"ancestorBranchIds": [],
"creationTime": "",
"transactionRid": "ri.foundry.main.transaction....",
}
"""
return self.ctx.catalog.api_update_branch(dataset_rid, branch, parent_branch).json()
[docs]
def get_branches(self, dataset_rid: api_types.DatasetRid) -> list[str]:
"""Returns a list of branches available a dataset.
Args:
dataset_rid: Unique identifier of the dataset
Returns:
list[str]:
list of dataset branch names
"""
return self.ctx.catalog.api_get_branches(dataset_rid).json()
[docs]
def get_branch(self, dataset_rid: api_types.DatasetRid, branch: api_types.DatasetBranch) -> dict:
"""Returns branch information.
Args:
dataset_rid: Unique identifier of the dataset
branch: Branch name
Returns:
dict:
with keys id (name) and rid (unique id) of the branch.
Raises:
BranchNotFoundError: if branch does not exist.
"""
return self.ctx.catalog.api_get_branch(dataset_rid, branch).json()
[docs]
def open_transaction(self, dataset_rid: str, mode: str = "SNAPSHOT", branch: str = "master") -> str:
"""Opens a new transaction on a dataset.
Args:
dataset_rid (str): Unique identifier of the dataset
mode (str):
APPEND: append files,
SNAPSHOT: replace all,
UPDATE: replace file if exists, keep existing files
branch (str): dataset branch
Returns:
str:
the transaction ID
Raises:
BranchNotFoundError: if branch does not exist
DatasetNotFoundError: if dataset does not exist
DatasetHasOpenTransactionError: if dataset has an open transaction
"""
assert_in_literal(mode, api_types.FoundryTransaction, "mode")
transaction_type = mode
transaction_id = self.ctx.catalog.api_start_transaction(
dataset_rid, branch, start_transaction_type=transaction_type
).json()["rid"]
# update type of transaction, default is APPEND
if mode != "APPEND":
self.ctx.catalog.api_set_transaction_type(
dataset_rid,
transaction_id,
transaction_type=transaction_type,
)
return transaction_id
[docs]
def remove_dataset_file(
self,
dataset_rid: str,
transaction_id: str,
logical_path: str,
recursive: bool = False,
):
"""Removes the given file from an open transaction.
If the logical path matches a file exactly then only that file
will be removed, regardless of the value of recursive.
If the logical path represents a directory, then all
files prefixed with the logical path followed by '/'
will be removed when recursive is true and no files will be
removed when recursive is false.
If the given logical path does not match a file or directory then this call
is ignored and does not throw an exception.
Args:
dataset_rid (str): Unique identifier of the dataset
transaction_id (str): transaction rid
logical_path (str): logical path in the backing filesystem
recursive (bool): recurse into subdirectories
"""
self.ctx.catalog.api_remove_dataset_file(dataset_rid, transaction_id, logical_path, recursive)
[docs]
def add_files_to_delete_transaction(self, dataset_rid: str, transaction_id: str, logical_paths: list[str]):
"""Adds files in an open DELETE transaction.
Files added to DELETE transactions affect
the dataset view by removing files from the view.
Args:
dataset_rid (str): Unique identifier of the dataset
transaction_id (str): transaction rid
logical_paths (List[str]): files in the dataset to delete
"""
self.ctx.catalog.api_add_files_to_delete_transaction(dataset_rid, transaction_id, logical_paths)
[docs]
def commit_transaction(self, dataset_rid: str, transaction_id: str):
"""Commits a transaction, should be called after file upload is complete.
Args:
dataset_rid (str): Unique identifier of the dataset
transaction_id (str): transaction id
Raises:
KeyError: when there was an issue with committing
"""
self.ctx.catalog.api_commit_transaction(dataset_rid, transaction_id)
[docs]
def abort_transaction(self, dataset_rid: str, transaction_id: str):
"""Aborts a transaction. Dataset will remain on transaction N-1.
Args:
dataset_rid (str): Unique identifier of the dataset
transaction_id (str): transaction id
Raises:
KeyError: When abort transaction fails
"""
self.ctx.catalog.api_abort_transaction(dataset_rid, transaction_id)
[docs]
def get_dataset_transactions(
self,
dataset_rid: str,
branch: str = "master",
last: int = 50,
include_open_exclusive_transaction: bool = False,
) -> list[api_types.SecuredTransaction]:
"""Returns the transactions of a dataset / branch combination.
Returns last 50 transactions by default, pagination not implemented.
Args:
dataset_rid (str): Unique identifier of the dataset
branch (str): Branch
last (int): last
include_open_exclusive_transaction (bool): include_open_exclusive_transaction
Returns:
dict:
of transaction information.
Raises:
BranchNotFoundError: if branch not found
DatasetHasNoTransactionsError: If the dataset has not transactions
"""
response = self.ctx.catalog.api_get_reverse_transactions2(
dataset_rid,
branch,
last,
include_open_exclusive_transaction=include_open_exclusive_transaction,
)
if values := response.json().get("values"):
return values
raise DatasetHasNoTransactionsError(response, dataset_rid=dataset_rid, branch=branch)
[docs]
def get_dataset_last_transaction(
self,
dataset_rid: str,
branch: str = "master",
) -> api_types.SecuredTransaction | None:
"""Returns the last transaction of a dataset / branch combination.
Args:
dataset_rid (str): Unique identifier of the dataset
branch (str): Branch
Returns:
dict | None:
response from transaction API or None if dataset has no transaction.
"""
try:
return self.get_dataset_transactions(dataset_rid, branch, last=1)[0]
except DatasetHasNoTransactionsError:
return None
[docs]
def get_dataset_last_transaction_rid(self, dataset_rid: str, branch: str = "master") -> str | None:
"""Returns the last transaction rid of a dataset / branch combination.
Args:
dataset_rid (str): Unique identifier of the dataset
branch (str): Branch
Returns:
str | None:
transaction rid or None if dataset has no transaction.
"""
last_transaction = self.get_dataset_last_transaction(dataset_rid, branch)
if last_transaction:
return last_transaction["rid"]
return None
[docs]
def upload_dataset_file(
self,
dataset_rid: str,
transaction_rid: str,
path_or_buf: str | Path | IO[AnyStr],
path_in_foundry_dataset: str,
) -> requests.Response:
"""Uploads a file like object to a path in a foundry dataset.
Args:
dataset_rid: Unique identifier of the dataset
transaction_rid: transaction id
path_or_buf: A str or file handle,
file path or object
path_in_foundry_dataset: The path in the dataset, to which the file
is uploaded.
"""
if isinstance(path_or_buf, str):
path_or_buf = Path(path_or_buf)
if isinstance(path_or_buf, Path):
return self.ctx.data_proxy.upload_dataset_file(
dataset_rid,
transaction_rid,
path_or_buf,
path_in_foundry_dataset,
)
return self.ctx.data_proxy.api_put_file(dataset_rid, transaction_rid, path_in_foundry_dataset, path_or_buf)
[docs]
def upload_dataset_files(
self,
dataset_rid: str,
transaction_rid: str,
path_file_dict: dict,
parallel_processes: int | None = None,
):
"""Uploads multiple local files to a foundry dataset.
Args:
dataset_rid (str): Unique identifier of the dataset
transaction_rid (str): transaction id
parallel_processes (int | None): Set number of threads for upload
path_file_dict (dict): A dictionary with the following structure:
.. code-block:: python
{
'<path_in_foundry_dataset>': '<local_file_path>',
...
}
"""
path_file_dict_pathlib = {k: Path(v) for k, v in path_file_dict.items()}
self.ctx.data_proxy.upload_dataset_files(
dataset_rid,
transaction_rid,
path_file_dict_pathlib,
max_workers=parallel_processes,
)
[docs]
def get_dataset_details(self, dataset_path_or_rid: str) -> dict:
"""Returns the resource information of a dataset.
Args:
dataset_path_or_rid (str): The full path or rid to the dataset
Returns:
dict:
the json response of the api
Raises:
DatasetNotFoundError: if dataset not found
"""
try:
if "ri.foundry.main.dataset" in dataset_path_or_rid:
response = self.ctx.compass.api_get_resource(dataset_path_or_rid, decoration={"path"})
else:
response = self.ctx.compass.api_get_resource_by_path(dataset_path_or_rid, decoration={"path"})
except ResourceNotFoundError as rnfe:
raise DatasetNotFoundError(rnfe.response, rnfe.info, **rnfe.kwargs) from rnfe
as_json = response.json()
if as_json["directlyTrashed"]:
warnings.warn(f"Dataset '{dataset_path_or_rid}' is in foundry trash.")
return as_json
[docs]
def get_child_objects_of_folder(self, folder_rid: str, page_size: int | None = None) -> Iterator[dict]:
"""Returns the child objects of a compass folder.
Args:
folder_rid (str): Compass folder rid,
e.g. ri.compass.main.folder.f549ae09-9534-44c7-967a-6c86b2339231
page_size (int): to control the pageSize manually
Yields:
dict:
information about child objects
Raises:
FolderNotFoundError: if folder not found
"""
yield from self.ctx.compass.get_child_objects_of_folder(folder_rid, limit=page_size)
[docs]
def create_folder(self, name: str, parent_id: str) -> dict:
"""Creates an empty folder in compass.
Args:
name (str): name of the new folder
parent_id (str): rid of the parent folder,
e.g. ri.compass.main.folder.aca0cce9-2419-4978-bb18-d4bc6e50bd7e
Returns:
dict:
with keys rid and name and other properties.
"""
return self.ctx.compass.api_create_folder(name, parent_id).json()
[docs]
def get_dataset_rid(self, dataset_path: str) -> str:
"""Returns the rid of a dataset, uses dataset_path as input.
Args:
dataset_path (str): The full path to the dataset
Returns:
str:
the dataset_rid
"""
return self.get_dataset_details(dataset_path)["rid"]
[docs]
def get_dataset_path(self, dataset_rid: str) -> str:
"""Returns the path of a dataset as str.
Args:
dataset_rid (str): The rid of the dataset
Returns:
str:
the dataset_path
Raises:
DatasetNotFoundError: if dataset was not found
"""
return self.ctx.compass.api_get_path(
dataset_rid,
error_handling=ErrorHandlingConfig({204: DatasetNotFoundError}, dataset_rid=dataset_rid),
).json()
[docs]
def get_dataset_paths(self, dataset_rids: list) -> dict:
"""Returns a list of paths for a list of passed rid's of a dataset.
Args:
dataset_rids (list): The rid's of datasets
Returns:
dict:
the dataset_path as dict of string
"""
return self.ctx.compass.get_paths(dataset_rids)
[docs]
def is_dataset_in_trash(self, dataset_path: str) -> bool:
"""Returns true if dataset is in compass trash.
Args:
dataset_path (str): The path to the dataset
Returns:
bool:
true if dataset is in trash
"""
in_trash = self.get_dataset_details(dataset_path)["directlyTrashed"]
if in_trash is None:
in_trash = False
return in_trash
[docs]
def get_dataset_schema(self, dataset_rid: str, transaction_rid: str | None = None, branch: str = "master") -> dict:
"""Returns the foundry dataset schema for a dataset, transaction, branch combination.
Args:
dataset_rid (str): The rid of the dataset
transaction_rid (str): The rid of the transaction
branch (str): The branch
Returns:
dict:
with foundry dataset schema
Raises:
DatasetNotFoundError: if dataset was not found
DatasetHasNoSchemaError: if dataset has no scheme
BranchNotFoundError: if branch was not found
KeyError: if the combination of dataset_rid, transaction_rid and branch was not found
"""
try:
return self.ctx.metadata.api_get_dataset_schema(
dataset_rid=dataset_rid,
branch=branch,
transaction_rid=transaction_rid,
).json()["schema"]
except DatasetHasNoSchemaError as e:
# we don't know if the branch does not exist, or if there is just no schema
# this will raise an error if the branch does not exist
self.get_branch(dataset_rid, branch)
# otherwise raise no schema
raise e from None
[docs]
def upload_dataset_schema(
self,
dataset_rid: str,
transaction_rid: str,
schema: dict,
branch: str = "master",
) -> requests.Response:
"""Uploads the foundry dataset schema for a dataset, transaction, branch combination.
Args:
dataset_rid (str): The rid of the dataset
transaction_rid (str): The rid of the transaction
schema (dict): The foundry schema
branch (str): The branch
"""
return self.ctx.metadata.api_upload_dataset_schema(dataset_rid, transaction_rid, schema, branch)
[docs]
def infer_dataset_schema(self, dataset_rid: str, branch: str = "master") -> dict:
"""Calls the foundry-schema-inference service to infer the dataset schema.
Returns dict with foundry schema, if status == SUCCESS
Args:
dataset_rid (str): The dataset rid
branch (str): The branch
Returns:
dict:
with dataset schema, that can be used to call upload_dataset_schema
Raises:
ValueError: if foundry schema inference failed
"""
return self.ctx.schema_inference.infer_dataset_schema(dataset_rid, branch)
[docs]
def get_dataset_identity(
self,
dataset_path_or_rid: str,
branch: api_types.DatasetBranch = "master",
check_read_access: bool = True,
) -> api_types.DatasetIdentity:
"""Returns the identity of this dataset (dataset_path, dataset_rid, last_transaction_rid, last_transaction).
Args:
dataset_path_or_rid (str): Path to dataset (e.g. /Global/...)
or rid of dataset (e.g. ri.foundry.main.dataset...)
branch (str): branch of the dataset
check_read_access (bool): default is True, checks if the user has read access ('gatekeeper:view-resource')
to the dataset otherwise exception is thrown
Returns:
dict:
with the keys 'dataset_path', 'dataset_rid', 'last_transaction_rid', 'last_transaction'
Raises:
DatasetNoReadAccessError: if you have no read access for that dataset
"""
dataset_details = self.get_dataset_details(dataset_path_or_rid)
dataset_rid = dataset_details["rid"]
dataset_path = dataset_details["path"]
if check_read_access and "gatekeeper:view-resource" not in dataset_details["operations"]:
raise DatasetNoReadAccessError(
dataset_path,
)
last_transaction = self.get_dataset_last_transaction(dataset_rid, branch)
return {
"dataset_path": dataset_path,
"dataset_rid": dataset_rid,
"last_transaction_rid": last_transaction["rid"] if last_transaction else None,
"last_transaction": last_transaction,
}
[docs]
def list_dataset_files(
self,
dataset_rid: str,
exclude_hidden_files: bool = True,
view: str = "master",
logical_path: str | None = None,
detail: bool = False,
*,
include_open_exclusive_transaction: bool = False,
) -> list:
"""Returns list of internal filenames of a dataset.
Args:
dataset_rid (str): the dataset rid
exclude_hidden_files (bool): if hidden files should be excluded (e.g. _log files)
view (str): branch or transaction rid of the dataset
logical_path (str): If logical_path is absent, returns all files in the view.
If logical_path matches a file exactly, returns just that file.
Otherwise, returns all files in the "directory" of logical_path:
(a slash is added to the end of logicalPath if necessary and a prefix-match is performed)
detail (bool): if passed as True, returns complete response from catalog API, otherwise only
returns logicalPath
include_open_exclusive_transaction (bool): if files added in open transaction should be returned
as well in the response
Returns:
list:
filenames
Raises:
DatasetNotFound: if dataset was not found
"""
files = self.ctx.catalog.list_dataset_files(
dataset_rid,
end_ref=view,
page_size=1000,
logical_path=logical_path,
exclude_hidden_files=exclude_hidden_files,
include_open_exclusive_transaction=include_open_exclusive_transaction,
)
if detail:
return files
return [f["logicalPath"] for f in files]
[docs]
def get_dataset_stats(self, dataset_rid: str, view: str = "master") -> dict:
"""Returns response from foundry catalogue stats endpoint.
Args:
dataset_rid (str): the dataset rid
view (str): branch or transaction rid of the dataset
Returns:
dict:
sizeInBytes, numFiles, hiddenFilesSizeInBytes, numHiddenFiles, numTransactions
"""
return self.ctx.catalog.api_get_dataset_stats(dataset_rid, view).json()
[docs]
def foundry_stats(self, dataset_rid: str, end_transaction_rid: str, branch: str = "master") -> dict:
"""Returns row counts and size of the dataset/view.
Args:
dataset_rid (str): The dataset RID.
end_transaction_rid (str): The specific transaction RID,
which will be used to return the statistics.
branch (str): The branch to query
Returns:
dict:
With the following structure:
{
datasetRid: str,
branch: str,
endTransactionRid: str,
schemaId: str,
computedDatasetStats: {
rowCount: str | None,
sizeInBytes: str,
columnStats: { "...": {nullCount: str | None, uniqueCount: str | None, avgLength: str | None, maxLength: str | None,} },
},
}
""" # noqa: E501
return self.ctx.foundry_stats.api_foundry_stats(dataset_rid, end_transaction_rid, branch).json()
[docs]
def download_dataset_file(
self,
dataset_rid: str,
output_directory: str | None,
foundry_file_path: str,
view: str = "master",
) -> str | bytes:
"""Downloads a single foundry dataset file into a directory.
Creates sub folder if necessary.
Args:
dataset_rid (str): the dataset rid
output_directory (str | None): the local output directory for the file or None
if None is passed, byte content of file is returned
foundry_file_path (str): the file_path on the foundry file system
view (str): branch or transaction rid of the dataset
Returns:
str | bytes:
local file path in case output_directory was passed
or file content as bytes
Raises:
ValueError: If download failed
"""
if output_directory:
return os.fspath(
self.ctx.data_proxy.download_dataset_file(dataset_rid, Path(output_directory), foundry_file_path),
)
return self.ctx.data_proxy.api_get_file_in_view(
dataset_rid,
view,
foundry_file_path,
stream=True,
).content
[docs]
def download_dataset_files(
self,
dataset_rid: str,
output_directory: PathLike[str] | str,
files: list | None = None,
view: str = "master",
parallel_processes: int | None = None,
) -> list[str]:
"""Downloads files of a dataset (in parallel) to a local output directory.
Args:
dataset_rid (str): the dataset rid
files (list | None): list of files or None, in which case all files are downloaded
output_directory (str): the output directory for the files
default value is calculated: multiprocessing.cpu_count() - 1
view (str): branch or transaction rid of the dataset
parallel_processes (int | None): Set number of threads for upload
Returns:
List[str]:
path to downloaded files
"""
return [
os.fspath(p)
for p in self.ctx.data_proxy.download_dataset_files(
dataset_rid,
path_from_path_or_str(output_directory),
set(files) if files else None,
view,
parallel_processes,
)
]
[docs]
@contextmanager
def download_dataset_files_temporary(
self,
dataset_rid: str,
files: list | None = None,
view: str = "master",
parallel_processes: int | None = None,
) -> Iterator[str]:
"""Downloads all files of a dataset to a temporary directory.
Which is deleted when the context is closed. Function returns the temporary directory.
Example usage:
>>> import parquet
>>> import pandas as pd
>>> from pathlib import Path
>>> with client.download_dataset_files_temporary(dataset_rid='ri.foundry.main.dataset.1', view='master') as \
>>> temp_folder:
>>> # Read using Pandas
>>> df = pd.read_parquet(temp_folder)
>>> # Read using pyarrow, here we pass only the files, which are normally in subfolder 'spark'
>>> pq = parquet.ParquetDataset(path_or_paths=[x for x in Path(temp_dir).glob('**/*') if x.is_file()])
Args:
dataset_rid (str): the dataset rid
files (List[str]): list of files or None, in which case all files are downloaded
view (str): branch or transaction rid of the dataset
parallel_processes (int): Set number of threads for download
Yields:
Iterator[str]:
path to temporary folder containing root of dataset files
"""
temp_output_directory = tempfile.mkdtemp(suffix=f"foundry_dev_tools-{dataset_rid}")
_ = self.download_dataset_files(
dataset_rid=dataset_rid,
output_directory=temp_output_directory,
files=files,
view=view,
parallel_processes=parallel_processes,
)
try:
yield temp_output_directory
finally:
shutil.rmtree(temp_output_directory)
[docs]
def get_dataset_as_raw_csv(self, dataset_rid: str, branch: api_types.DatasetBranch = "master") -> requests.Response:
"""Uses csv API to download a dataset as csv.
Args:
dataset_rid (str): the dataset rid
branch (str): branch of the dataset
Returns:
:external+requests:py:class:`~requests.Response`:
with the csv stream.
Can be converted to a pandas DataFrame
>>> pd.read_csv(io.BytesIO(response.content))
"""
return self.ctx.data_proxy.api_get_dataset_as_csv2(dataset_rid, branch)
@overload
def query_foundry_sql_legacy(
self,
query: str,
return_type: Literal["pandas"],
branch: api_types.Ref = ...,
sql_dialect: api_types.SqlDialect = ...,
timeout: int = ...,
) -> pd.core.frame.DataFrame: ...
@overload
def query_foundry_sql_legacy(
self,
query: str,
return_type: Literal["spark"],
branch: api_types.Ref = ...,
sql_dialect: api_types.SqlDialect = ...,
timeout: int = ...,
) -> pyspark.sql.DataFrame: ...
@overload
def query_foundry_sql_legacy(
self,
query: str,
return_type: Literal["arrow"],
branch: api_types.Ref = ...,
sql_dialect: api_types.SqlDialect = ...,
timeout: int = ...,
) -> pa.Table: ...
@overload
def query_foundry_sql_legacy(
self,
query: str,
return_type: Literal["raw"],
branch: api_types.Ref = ...,
sql_dialect: api_types.SqlDialect = ...,
timeout: int = ...,
) -> tuple[dict, list[list]]: ...
@overload
def query_foundry_sql_legacy(
self,
query: str,
return_type: api_types.SQLReturnType = ...,
branch: api_types.Ref = ...,
sql_dialect: api_types.SqlDialect = ...,
timeout: int = ...,
) -> tuple[dict, list[list]] | pd.core.frame.DataFrame | pa.Table | pyspark.sql.DataFrame: ...
[docs]
def query_foundry_sql_legacy(
self,
query: str,
return_type: api_types.SQLReturnType = "raw",
branch: api_types.Ref = "master",
sql_dialect: api_types.SqlDialect = "SPARK",
timeout: int = 600,
) -> tuple[dict, list[list]] | pd.core.frame.DataFrame | pa.Table | pyspark.sql.DataFrame:
"""Queries the dataproxy query API with spark SQL.
Example:
query_foundry_sql_legacy(query="SELECT * FROM `/Global/Foundry Operations/Foundry Support/iris`",
branch="master")
Args:
query: the sql query
branch: the branch of the dataset / query
return_type: See :py:class:`foundry_dev_tools.utils.api_types.SQLReturnType`
sql_dialect: the SQL dialect used for the query
timeout: the query request timeout
Returns:
tuple (dict, list):
(foundry_schema, data)
data: contains the data matrix,
foundry_schema: the foundry schema (fieldSchemaList key).
Can be converted to a pandas Dataframe, see below
.. code-block:: python
foundry_schema, data = self.query_foundry_sql_legacy(query, branch)
df = pd.DataFrame(
data=data, columns=[e["name"] for e in foundry_schema["fieldSchemaList"]]
)
Raises:
ValueError: if return_type is not in :py:class:SQLReturnType
DatasetHasNoSchemaError: if dataset has no schema
BranchNotFoundError: if branch was not found
"""
return self.ctx.data_proxy.query_foundry_sql_legacy(
query,
return_type=return_type,
branch=branch,
sql_dialect=sql_dialect,
timeout=timeout,
)
@overload
def query_foundry_sql(
self,
query: str,
return_type: Literal["pandas"],
branch: api_types.Ref = ...,
sql_dialect: api_types.SqlDialect = ...,
timeout: int = ...,
) -> pd.core.frame.DataFrame: ...
@overload
def query_foundry_sql(
self,
query: str,
return_type: Literal["spark"],
branch: api_types.Ref = ...,
sql_dialect: api_types.SqlDialect = ...,
timeout: int = ...,
) -> pyspark.sql.DataFrame: ...
@overload
def query_foundry_sql(
self,
query: str,
return_type: Literal["arrow"],
branch: api_types.Ref = ...,
sql_dialect: api_types.SqlDialect = ...,
timeout: int = ...,
) -> pa.Table: ...
@overload
def query_foundry_sql(
self,
query: str,
return_type: Literal["raw"],
branch: api_types.Ref = ...,
sql_dialect: api_types.SqlDialect = ...,
timeout: int = ...,
) -> tuple[dict, list[list]]: ...
@overload
def query_foundry_sql(
self,
query: str,
return_type: api_types.SQLReturnType = ...,
branch: api_types.Ref = ...,
sql_dialect: api_types.SqlDialect = ...,
timeout: int = ...,
) -> tuple[dict, list[list]] | pd.core.frame.DataFrame | pa.Table | pyspark.sql.DataFrame: ...
[docs]
def query_foundry_sql(
self,
query: str,
return_type: api_types.SQLReturnType = "pandas",
branch: api_types.Ref = "master",
sql_dialect: api_types.SqlDialect = "SPARK",
timeout: int = 600,
) -> tuple[dict, list[list]] | pd.core.frame.DataFrame | pa.Table | pyspark.sql.DataFrame:
"""Queries the Foundry SQL server with spark SQL dialect.
Uses Arrow IPC to communicate with the Foundry SQL Server Endpoint.
Falls back to query_foundry_sql_legacy in case pyarrow is not installed or the query does not return
Arrow Format.
Example:
df1 = client.query_foundry_sql("SELECT * FROM `/Global/Foundry Operations/Foundry Support/iris`")
query = ("SELECT col1 FROM `{start_transaction_rid}:{end_transaction_rid}@{branch}`.`{dataset_path_or_rid}` WHERE filterColumns = 'value1' LIMIT 1")
df2 = client.query_foundry_sql(query)
Args:
query: The SQL Query in Foundry Spark Dialect (use backticks instead of quotes)
branch: the dataset branch
sql_dialect: the sql dialect
return_type: See :py:class:foundry_dev_tools.foundry_api_client.SQLReturnType
timeout: Query Timeout, default value is 600 seconds
Returns:
:external+pandas:py:class:`~pandas.DataFrame` | :external+pyarrow:py:class:`~pyarrow.Table` | :external+spark:py:class:`~pyspark.sql.DataFrame`:
A pandas DataFrame, Spark DataFrame or pyarrow.Table with the result.
Raises:
ValueError: Only direct read eligible queries can be returned as arrow Table.
""" # noqa: E501
return self.ctx.foundry_sql_server.query_foundry_sql(
query,
return_type=return_type,
branch=branch,
sql_dialect=sql_dialect,
timeout=timeout,
)
[docs]
def get_user_info(self) -> dict:
"""Returns the multipass user info.
Returns:
dict:
.. code-block:: python
{
"id": "<multipass-id>",
"username": "<username>",
"attributes": {
"multipass:email:primary": ["<email>"],
"multipass:given-name": ["<given-name>"],
"multipass:organization": ["<your-org>"],
"multipass:organization-rid": ["ri.multipass..organization. ..."],
"multipass:family-name": ["<family-name>"],
"multipass:upn": ["<upn>"],
"multipass:realm": ["<your-company>"],
"multipass:realm-name": ["<your-org>"],
},
}
"""
return self.ctx.multipass.api_me().json()
[docs]
def get_group(self, group_id: str) -> dict:
"""Returns the multipass group information.
Returns:
dict:
The API response
.. code-block:: python
{
'id': '<id>',
'name': '<groupname>',
'attributes': {
'multipass:realm': ['palantir-internal-realm'],
'multipass:organization': ['<your-org>'],
'multipass:organization-rid': ['ri.multipass..organization.<...>'],
'multipass:realm-name': ['Palantir Internal']
}
"""
return self.ctx.multipass.api_get_group(group_id).json()
[docs]
def delete_group(self, group_id: str) -> requests.Response:
"""Deletes multipass group.
Args:
group_id (str): the group id to delete
"""
return self.ctx.multipass.api_delete_group(group_id)
[docs]
def create_third_party_application(
self,
client_type: api_types.MultipassClientType,
display_name: str,
description: str | None,
grant_types: list[api_types.MultipassGrantType],
redirect_uris: list | None,
logo_uri: str | None,
organization_rid: str,
allowed_organization_rids: list | None = None,
resources: list[api_types.Rid] | None = None,
operations: list[str] | None = None,
marking_ids: list[str] | None = None,
role_set_id: str | None = None,
role_grants: dict[str, list[str]] | None = None,
**kwargs,
) -> dict:
"""Creates Foundry Third Party application (TPA).
https://www.palantir.com/docs/foundry/platform-security-third-party/third-party-apps-overview/
User must have 'Manage OAuth 2.0 clients' workflow permissions.
Args:
client_type: Server Application (CONFIDENTIAL) or
Native or single-page application (PUBLIC)
display_name: Display name of the TPA
description: Long description of the TPA
grant_types: Usually, ["AUTHORIZATION_CODE", "REFRESH_TOKEN"] (authorization code grant)
or ["REFRESH_TOKEN", "CLIENT_CREDENTIALS"] (client credentials grant)
redirect_uris: Redirect URLs of TPA, used in combination with AUTHORIZATION_CODE grant
logo_uri: URI or embedded image 'data:image/png;base64,<...>'
organization_rid: Parent Organization of this TPA
allowed_organization_rids: Passing None or empty list means TPA is activated for all
Foundry organizations
resources: Resources allowed to access by the client, otherwise no resource restrictions
operations: Operations the client can be granted, otherwise no operation restrictions
marking_ids: Markings allowed to access by the client, otherwise no marking restrictions
role_set_id: roles allowed for this client, defaults to `oauth2-client`
role_grants: mapping between roles and principal ids dict[role id,list[principal id]]
**kwargs: gets passed to :py:meth:`APIClient.api_request`
See below for the structure
.. code-block:: python
{
"clientId":"<...>",
"clientSecret":"<...>",
"clientType":"<CONFIDENTIAL/PUBLIC>",
"organizationRid":"<...>",
"displayName":"<...>",
"description":null,
"logoUri":null,
"grantTypes":[<"AUTHORIZATION_CODE","REFRESH_TOKEN","CLIENT_CREDENTIALS">],
"redirectUris":[],
"allowedOrganizationRids":[]
}
"""
return self.ctx.multipass.api_create_third_party_application(
client_type=client_type,
display_name=display_name,
description=description,
grant_types=grant_types,
redirect_uris=redirect_uris,
logo_uri=logo_uri,
organization_rid=organization_rid,
allowed_organization_rids=allowed_organization_rids,
resources=resources,
operations=operations,
marking_ids=marking_ids,
role_set_id=role_set_id,
role_grants=role_grants,
**kwargs,
).json()
[docs]
def delete_third_party_application(self, client_id: str) -> requests.Response:
"""Deletes a Third Party Application.
Args:
client_id (str): The unique identifier of the TPA.
"""
return self.ctx.multipass.api_delete_third_party_application(client_id)
[docs]
def update_third_party_application(
self,
client_id: str,
client_type: api_types.MultipassClientType,
display_name: str,
description: str | None,
grant_types: list[api_types.MultipassGrantType],
redirect_uris: list | None,
logo_uri: str | None,
organization_rid: str,
allowed_organization_rids: list | None = None,
resources: list[api_types.Rid] | None = None,
operations: list[str] | None = None,
marking_ids: list[str] | None = None,
role_set_id: str | None = None,
**kwargs,
) -> dict:
"""Updates Foundry Third Party application (TPA).
https://www.palantir.com/docs/foundry/platform-security-third-party/third-party-apps-overview/
User must have 'Manage OAuth 2.0 clients' workflow permissions.
Args:
client_id: The unique identifier of the TPA.
client_type: Server Application (CONFIDENTIAL) or
Native or single-page application (PUBLIC)
display_name: Display name of the TPA
description: Long description of the TPA
grant_types: Usually, ["AUTHORIZATION_CODE", "REFRESH_TOKEN"] (authorization code grant)
or ["REFRESH_TOKEN", "CLIENT_CREDENTIALS"] (client credentials grant)
redirect_uris: Redirect URLs of TPA, used in combination with AUTHORIZATION_CODE grant
logo_uri: URI or embedded image 'data:image/png;base64,<...>'
organization_rid: Parent Organization of this TPA
allowed_organization_rids: Passing None or empty list means TPA is activated for all
Foundry organizations
resources: Resources allowed to access by the client, otherwise no resource restrictions
operations: Operations the client can be granted, otherwise no operation restrictions
marking_ids: Markings allowed to access by the client, otherwise no marking restrictions
role_set_id: roles allowed for this client, defaults to `oauth2-client`
**kwargs: gets passed to :py:meth:`APIClient.api_request`
Reponse in following structure:
.. code-block:: python
{
"clientId":"<...>",
"clientType":"<CONFIDENTIAL/PUBLIC>",
"organizationRid":"<...>",
"displayName":"<...>",
"description":null,
"logoUri":null,
"grantTypes":[<"AUTHORIZATION_CODE","REFRESH_TOKEN","CLIENT_CREDENTIALS">],
"redirectUris":[],
"allowedOrganizationRids":[]
}
"""
return self.ctx.multipass.api_update_third_party_application(
client_id=client_id,
client_type=client_type,
display_name=display_name,
description=description,
grant_types=grant_types,
redirect_uris=redirect_uris,
logo_uri=logo_uri,
organization_rid=organization_rid,
allowed_organization_rids=allowed_organization_rids,
resources=resources,
operations=operations,
marking_ids=marking_ids,
role_set_id=role_set_id,
**kwargs,
).json()
[docs]
def rotate_third_party_application_secret(
self,
client_id: str,
) -> dict:
"""Rotates Foundry Third Party application (TPA) secret.
Args:
client_id (str): The unique identifier of the TPA.
Returns:
dict:
See below for the structure
.. code-block:: python
{
"clientId":"<...>",
"clientSecret": "<...>",
"clientType":"<CONFIDENTIAL/PUBLIC>",
"organizationRid":"<...>",
"displayName":"<...>",
"description":null,
"logoUri":null,
"grantTypes":[<"AUTHORIZATION_CODE","REFRESH_TOKEN","CLIENT_CREDENTIALS">],
"redirectUris":[],
"allowedOrganizationRids":[]
}
"""
return self.ctx.multipass.api_rotate_third_party_application_secret(client_id).json()
[docs]
def enable_third_party_application(
self,
client_id: str,
operations: list | None = None,
resources: list | None = None,
marking_ids: list[str] | None = None,
grant_types: list[api_types.MultipassGrantType] | None = None,
require_consent: bool = True,
**kwargs,
) -> dict:
"""Enables Foundry Third Party application (TPA).
Args:
client_id: The unique identifier of the TPA.
operations: Scopes that this TPA is allowed to use (To be confirmed)
if None or empty list is passed, all scopes will be activated.
resources: Compass Project RID's that this TPA is allowed to access,
if None or empty list is passed, unrestricted access will be given.
marking_ids: Marking Ids that this TPA is allowed to access,
if None or empty list is passed, unrestricted access will be given.
grant_types: Grant types that this TPA is allowed to use to access resources,
if None is passed, no grant type restrictions
if an empty list is passed, no grant types are allowed for this TPA
require_consent: Wether users need to provide consent for this application to act on their behalf,
defaults to true
**kwargs: gets passed to :py:meth:`APIClient.api_request`
Response with the following structure:
.. code-block:: python
{
"client": {
"clientId": "<...>",
"organizationRid": "ri.multipass..organization.<...>",
"displayName": "<...>",
"description": None,
"logoUri": None,
},
"installation": {"resources": [], "operations": [], "markingIds": None},
}
"""
return self.ctx.multipass.api_enable_third_party_application(
client_id,
operations=operations,
resources=resources,
marking_ids=marking_ids,
grant_types=grant_types,
require_consent=require_consent,
**kwargs,
).json()
[docs]
def start_checks_and_build(
self,
repository_id: str,
ref_name: str,
commit_hash: str,
file_paths: list[str],
) -> dict:
"""Starts checks and builds.
Args:
repository_id (str): the repository id where the transform is located
ref_name (str): the git ref_name for the branch
commit_hash (str): the git commit hash
file_paths (List[str]): a list of python transform files
Returns:
dict: the JSON API response
"""
return self.ctx.jemma.start_checks_and_builds(repository_id, ref_name, commit_hash, set(file_paths))
[docs]
def get_build(self, build_rid: str) -> dict:
"""Get information about the build.
Args:
build_rid (str): the build RID
Returns:
dict: the JSON API response
"""
return self.ctx.build2.api_get_build_report(build_rid).json()
[docs]
def get_job_report(self, job_rid: str) -> dict:
"""Get the report for a job.
Args:
job_rid (str): the job RID
Returns:
dict: the job report response
"""
return self.ctx.build2.api_get_job_report(job_rid).json()
[docs]
def get_s3fs_storage_options(self) -> dict:
"""Get the foundry s3 credentials in the s3fs storage_options format.
Example:
>>> fc = FoundryRestClient()
>>> storage_options = fc.get_s3fs_storage_options()
>>> df = pd.read_parquet(
... "s3://ri.foundry.main.dataset.<uuid>/spark", storage_options=storage_options
... )
"""
return self.ctx.s3.get_s3fs_storage_options()
[docs]
def get_boto3_s3_client(self, **kwargs): # noqa: ANN201
"""Returns the boto3 s3 client with credentials applied and endpoint url set.
See :py:attr:`foundry_dev_tools.clients.s3_client.api_assume_role_with_webidentity`.
Example:
>>> from foundry_dev_tools import FoundryRestClient
>>> fc = FoundryRestClient()
>>> s3_client = fc.get_boto3_client()
>>> s3_client
Args:
**kwargs: gets passed to :py:meth:`boto3.session.Session.client`, `endpoint_url` will be overwritten
"""
return self.ctx.s3.get_boto3_client(**kwargs)
[docs]
def get_boto3_s3_resource(self, **kwargs): # noqa: ANN201
"""Returns boto3 s3 resource with credentials applied and endpoint url set.
Args:
**kwargs: gets passed to :py:meth:`boto3.session.Session.resource`, `endpoint_url` will be overwritten
"""
return self.ctx.s3.get_boto3_client(**kwargs)
[docs]
def get_s3_credentials(self, expiration_duration: int = 3600) -> dict:
"""Parses the AssumeRoleWithWebIdentity response and caches the credentials.
See :py:attr:`foundry_dev_tools.clients.s3_client.api_assume_role_with_webidentity`.
"""
return self.ctx.s3.get_credentials(expiration_duration)