Source code for foundry_dev_tools.clients.catalog

"""Implementation of the foundry-catalog API."""

from __future__ import annotations

from typing import TYPE_CHECKING, Any
from urllib.parse import quote_plus

from foundry_dev_tools.clients.api_client import APIClient
from foundry_dev_tools.errors.dataset import BranchNotFoundError, DatasetNotFoundError
from foundry_dev_tools.errors.handling import ErrorHandlingConfig
from foundry_dev_tools.utils import api_types
from foundry_dev_tools.utils.api_types import assert_in_literal

if TYPE_CHECKING:
    import requests


[docs] class CatalogClient(APIClient): """To be implemented/transferred.""" api_name = "foundry-catalog"
[docs] def list_dataset_files( self, dataset_rid: api_types.DatasetRid, end_ref: api_types.View = "master", page_size: int = 1000, logical_path: api_types.PathInDataset | None = None, page_start_logical_path: api_types.PathInDataset | None = None, start_transaction_rid: api_types.TransactionRid | None = None, include_open_exclusive_transaction: bool = False, exclude_hidden_files: bool = False, temporary_credentials_auth_token: str | None = None, ) -> list: """Same as :py:meth:`CatalogClient.api_get_dataset_view_files3`, but iterates through all pages. Args: dataset_rid: the dataset rid end_ref: branch or transaction rid of the dataset page_size: the maximum page size returned logical_path: If logical_path is absent, returns all files in the view. If logical_path matches a file exactly, returns just that file. Otherwise, returns all files in the "directory" of logical_path: (a slash is added to the end of logicalPath if necessary and a prefix-match is performed) page_start_logical_path: if specified page starts at the given path, otherwise at the beginning of the file list start_transaction_rid: if a startTransactionRid is given, the view starting at the startTransactionRid and ending at the endRef is returned include_open_exclusive_transaction: if files added in open transaction should be returned as well in the response exclude_hidden_files: if hidden files should be excluded (e.g. _log files) temporary_credentials_auth_token: to generate temporary credentials for presigned URLs Returns: list[FileResourcesPage]: .. code-block:: python [ { "logicalPath": "..", "pageStartLogicalPath": "..", "includeOpenExclusiveTransaction": "..", "excludeHiddenFiles": "..", }, ] """ def _inner_get(page_start_logical_path: str | None = None) -> dict: return self.api_get_dataset_view_files3( dataset_rid=dataset_rid, end_ref=end_ref, page_size=page_size, logical_path=logical_path, page_start_logical_path=page_start_logical_path, include_open_exclusive_transaction=include_open_exclusive_transaction, exclude_hidden_files=exclude_hidden_files, start_transaction_rid=start_transaction_rid, temporary_credentials_auth_token=temporary_credentials_auth_token, ).json() result: list[dict] = [] first_result = _inner_get(page_start_logical_path=page_start_logical_path) result.extend(first_result["values"]) next_page_token = first_result.get("nextPageToken", None) while next_page_token is not None: batch_result = _inner_get(page_start_logical_path=next_page_token) next_page_token = batch_result.get("nextPageToken", None) result.extend(batch_result["values"]) # type: ignore[arg-type] return result
[docs] def api_get_dataset_view_files3( self, dataset_rid: api_types.DatasetRid, end_ref: api_types.View, page_size: int, logical_path: api_types.PathInDataset | None = None, page_start_logical_path: api_types.PathInDataset | None = None, include_open_exclusive_transaction: bool = False, exclude_hidden_files: bool = False, start_transaction_rid: api_types.View | None = None, temporary_credentials_auth_token: str | None = None, **kwargs, ) -> requests.Response: """Returns files in the dataset view matching the specified parameters. Args: dataset_rid: the dataset rid end_ref: branch or transaction rid of the dataset page_size: the maximum page size returned logical_path: If logical_path is absent, returns all files in the view. If logical_path matches a file exactly, returns just that file. Otherwise, returns all files in the "directory" of logical_path: (a slash is added to the end of logicalPath if necessary and a prefix-match is performed) page_start_logical_path: if specified page starts at the given path, otherwise at the beginning of the file list include_open_exclusive_transaction: if files added in open transaction should be returned as well in the response exclude_hidden_files: if hidden files should be excluded (e.g. _log files) start_transaction_rid: if a startTransactionRid is given, the view starting at the startTransactionRid and ending at the endRef is returned temporary_credentials_auth_token: to generate temporary credentials for presigned URLs **kwargs: gets passed to :py:meth:`APIClient.api_request` Returns: response: the response contains a json dict with the following keys: values: an array of file resource objects nextPageToken: which can be used for the next request as `start_transaction_rid` """ params = {"pageSize": page_size} if start_transaction_rid: params["startTransactionRid"] = start_transaction_rid get_dataset_view_files_request = {} if logical_path: get_dataset_view_files_request["logicalPath"] = logical_path if page_start_logical_path: get_dataset_view_files_request["pageStartLogicalPath"] = page_start_logical_path if include_open_exclusive_transaction: get_dataset_view_files_request["includeOpenExclusiveTransaction"] = include_open_exclusive_transaction if exclude_hidden_files: get_dataset_view_files_request["excludeHiddenFiles"] = exclude_hidden_files return self.api_request( "PUT", f"catalog/datasets/{dataset_rid}/views/{quote_plus(end_ref)}/files3", params=params, headers={"Temporary-Credentials-Authorization": temporary_credentials_auth_token}, json=get_dataset_view_files_request, error_handling=ErrorHandlingConfig( DatasetNotFoundError, dataset_rid=dataset_rid, logical_path=logical_path, ), **kwargs, )
[docs] def api_get_events( self, types: set[str], limit: int | None = None, page_token: str | None = None, **kwargs, ) -> requests.Response: """Returns a page of events filtered by the :any:`types` parameter. Args: types: types to filter for limit: limit the maximum numbers of events per page page_token: for pagination **kwargs: gets passed to :py:meth:`APIClient.api_request` """ return self.api_request( "GET", "catalog/events", params={"types": types, "limit": limit, "pageToken": page_token}, **kwargs, )
[docs] def api_create_dataset( self, dataset_path: api_types.FoundryPath, **kwargs, ) -> requests.Response: """Creates dataset at specified path. Args: dataset_path: path on foundry for the to be created dataset **kwargs: gets passed to :py:meth:`APIClient.api_request` """ return self.api_request( "POST", "catalog/datasets", json={"path": dataset_path}, error_handling=ErrorHandlingConfig(dataset_path=dataset_path), **kwargs, )
[docs] def api_get_dataset(self, dataset_rid: api_types.DatasetRid, **kwargs) -> requests.Response: """Returns rid and fileSystemId of dataset. Args: dataset_rid: the dataset rid **kwargs: gets passed to :py:meth:`APIClient.api_request` """ return self.api_request( "GET", f"catalog/datasets/{dataset_rid}", error_handling=ErrorHandlingConfig({204: DatasetNotFoundError}, dataset_rid=dataset_rid), **kwargs, )
[docs] def api_delete_dataset(self, dataset_rid: api_types.DatasetRid, **kwargs) -> requests.Response: """Deletes the dataset. Args: dataset_rid: the dataset rid **kwargs: gets passed to :py:meth:`APIClient.api_request` """ return self.api_request( "DELETE", "catalog/datasets", json={"rid": dataset_rid}, **kwargs, )
[docs] def api_set_transaction_type( self, dataset_rid: api_types.DatasetRid, transaction_rid: api_types.TransactionRid, transaction_type: api_types.FoundryTransaction, **kwargs, ) -> requests.Response: """Set transaction type. Args: dataset_rid: dataset rid transaction_rid: transaction rid transaction_type: foundry transaction type, see :py:class:`api_types.FoundryTransaction` **kwargs: gets passed to :py:meth:`APIClient.api_request` """ assert_in_literal(transaction_type, api_types.FoundryTransaction, "transaction_type") return self.api_request( "POST", f"catalog/datasets/{dataset_rid}/transactions/{transaction_rid}", data=f'"{transaction_type}"', **kwargs, )
[docs] def api_start_transaction( self, dataset_rid: api_types.DatasetRid, branch_id: api_types.DatasetBranch, record: dict[str, Any] | None = None, provenance: dict | None = None, user_id: str | None = None, start_transaction_type: api_types.FoundryTransaction | None = None, **kwargs, ) -> requests.Response: """Start a transaction on a dataset. Args: dataset_rid: dataset rid to start transaction on branch_id: the dataset branch record: record provenance: provenance for transaction user_id: start transaction as another user, needs `foundry:set-user-id` permissions start_transaction_type: transaction type, default is `APPEND` **kwargs: gets passed to :py:meth:`APIClient.api_request` """ post_json = {"branchId": branch_id, "record": record or {}} if provenance is not None: post_json["provenance"] = provenance if user_id is not None: post_json["userId"] = user_id if start_transaction_type is not None: assert_in_literal(start_transaction_type, api_types.FoundryTransaction, "start_transaction_type") post_json["startTransactionType"] = start_transaction_type return self.api_request( "POST", f"catalog/datasets/{dataset_rid}/transactions", json=post_json, error_handling=ErrorHandlingConfig( {"Default:InvalidArgument": DatasetNotFoundError}, dataset_rid=dataset_rid, branch_id=branch_id, ), **kwargs, )
[docs] def api_commit_transaction( self, dataset_rid: api_types.DatasetRid, transaction_rid: api_types.TransactionRid, record: dict[str, Any] | None = None, provenance: dict | None = None, do_sever_inherited_permissions: bool | None = None, **kwargs, ) -> requests.Response: """Commit a transaction on a dataset. Args: dataset_rid: dataset rid to start transaction on transaction_rid: the transaction to commit record: record provenance: provenance for transaction do_sever_inherited_permissions: wether dependant conditions are removed on the transaction **kwargs: gets passed to :py:meth:`APIClient.api_request` """ post_json = {"transactionRid": transaction_rid, "record": record or {}} if provenance is not None: post_json["provenance"] = provenance if do_sever_inherited_permissions is not None: post_json["doSeverInheritedPermissions"] = do_sever_inherited_permissions # type: ignore[assignment] return self.api_request( "POST", f"catalog/datasets/{dataset_rid}/transactions/{transaction_rid}/commit", json=post_json, error_handling=ErrorHandlingConfig(dataset_rid=dataset_rid, transaction_rid=transaction_rid), **kwargs, )
[docs] def api_abort_transaction( self, dataset_rid: api_types.DatasetRid, transaction_rid: api_types.TransactionRid, record: dict[str, Any] | None = None, provenance: dict | None = None, do_sever_inherited_permissions: bool | None = None, **kwargs, ) -> requests.Response: """Abort a transaction on a dataset. Args: dataset_rid: dataset rid to start transaction on transaction_rid: the transaction to commit record: record provenance: provenance for transaction do_sever_inherited_permissions: wether dependant conditions are removed on the transaction **kwargs: gets passed to :py:meth:`APIClient.api_request` """ post_json = {"transactionRid": transaction_rid, "record": record or {}} if provenance is not None: post_json["provenance"] = provenance if do_sever_inherited_permissions is not None: post_json["doSeverInheritedPermissions"] = do_sever_inherited_permissions # type: ignore[assignment] return self.api_request( "POST", f"catalog/datasets/{dataset_rid}/transactions/{transaction_rid}/abortWithMetadata", json=post_json, **kwargs, )
[docs] def api_get_transaction(self, dataset_rid: api_types.DatasetRid, ref: api_types.Ref, **kwargs) -> requests.Response: """Get the transaction for a given ref.""" return self.api_request("GET", f"catalog/datasets/{dataset_rid}/transactions/{ref}", **kwargs)
[docs] def api_get_reverse_transactions2( self, dataset_rid: api_types.DatasetRid, start_ref: api_types.View, page_size: int, end_transaction_rid: api_types.TransactionRid | None = None, include_open_exclusive_transaction: bool | None = False, allow_deleted_dataset: bool | None = None, **kwargs, ) -> requests.Response: """Get reverse transactions. Args: dataset_rid: dataset rid to get transactions start_ref: at what ref to start listing page_size: response page entry size end_transaction_rid: at what transaction to stop listing include_open_exclusive_transaction: include open exclusive transaction allow_deleted_dataset: respond even if dataset was deleted **kwargs: gets passed to :py:meth:`APIClient.api_request` """ params = {"pageSize": page_size} if end_transaction_rid is not None: params["endTransactionRid"] = end_transaction_rid # type: ignore[assignment] if include_open_exclusive_transaction is not None: params["includeOpenExclusiveTransaction"] = include_open_exclusive_transaction if allow_deleted_dataset is not None: params["allowDeletedDataset"] = allow_deleted_dataset return self.api_request( "GET", f"catalog/datasets/{dataset_rid}/reverse-transactions2/{quote_plus(start_ref)}", params=params, **kwargs, )
[docs] def api_create_branch( self, dataset_rid: api_types.DatasetRid, branch_id: api_types.DatasetBranch, parent_ref: api_types.TransactionRid | None = None, parent_branch_id: api_types.DatasetBranch | None = None, **kwargs, ) -> requests.Response: """Creates a branch on a dataset. Args: dataset_rid: the dataset branch_id: the branch to create parent_ref: optionally the transaction off which the branch will be based parent_branch_id: optionally a parent branch name, otherwise a root branch **kwargs: gets passed to :py:meth:`APIClient.api_request` """ return self.api_request( "POST", f"catalog/datasets/{dataset_rid}/branchesUnrestricted2/{quote_plus(branch_id)}", json={"parentRef": parent_ref, "parentBranchId": parent_branch_id}, **kwargs, )
[docs] def api_update_branch( self, dataset_rid: api_types.DatasetRid, branch: api_types.DatasetBranch, parent_ref: api_types.View | None = None, **kwargs, ) -> requests.Response: """Updates the latest transaction of branch 'branch' to the latest transaction of branch 'parent_branch'. Args: dataset_rid: Unique identifier of the dataset branch: The branch to update (e.g. master) parent_ref: the name of the branch to copy the last transaction from or a transaction rid **kwargs: gets passed to :py:meth:`APIClient.api_request` Returns: dict: example below for the branch response .. code-block:: python { "id": "..", "rid": "ri.foundry.main.branch...", "ancestorBranchIds": [], "creationTime": "", "transactionRid": "ri.foundry.main.transaction....", } """ return self.api_request( "POST", f"catalog/datasets/{dataset_rid}/branchesUpdate2/{quote_plus(branch)}", data=f'"{parent_ref}"', **kwargs, )
[docs] def api_get_branch( self, dataset_rid: api_types.DatasetRid, branch: api_types.DatasetBranch, **kwargs, ) -> requests.Response: """Returns branch information. Args: dataset_rid: Unique identifier of the dataset branch: Branch name **kwargs: gets passed to :py:meth:`APIClient.api_request` Returns: dict: with keys id (name) and rid (unique id) of the branch. """ return self.api_request( "GET", f"catalog/datasets/{dataset_rid}/branches2/{quote_plus(branch)}", error_handling=ErrorHandlingConfig({204: BranchNotFoundError}, dataset_rid=dataset_rid, branch=branch), **kwargs, )
[docs] def api_get_branches( self, dataset_rid: api_types.DatasetRid, **kwargs, ) -> requests.Response: """Returns branch names of dataset. Args: dataset_rid: Unique identifier of the dataset **kwargs: gets passed to :py:meth:`APIClient.api_request` Returns: list[str]: list of dataset branch names """ return self.api_request( "GET", f"catalog/datasets/{dataset_rid}/branches", **kwargs, )
[docs] def api_remove_dataset_file( self, dataset_rid: api_types.DatasetRid, transaction_id: api_types.TransactionRid, logical_path: api_types.FoundryPath, recursive: bool = False, **kwargs, ) -> requests.Response: """Removes the given file from an open transaction. If the logical path matches a file exactly then only that file will be removed, regardless of the value of recursive. If the logical path represents a directory, then all files prefixed with the logical path followed by '/' will be removed when recursive is true and no files will be removed when recursive is false. If the given logical path does not match a file or directory then this call is ignored and does not throw an exception. Args: dataset_rid: Unique identifier of the dataset transaction_id: transaction rid logical_path: logical path in the backing filesystem recursive: recurse into subdirectories **kwargs: gets passed to :py:meth:`APIClient.api_request` """ return self.api_request( "POST", f"catalog/datasets/{dataset_rid}/transactions/{transaction_id}/files/remove", params={"logicalPath": logical_path, "recursive": recursive}, **kwargs, )
[docs] def api_add_files_to_delete_transaction( self, dataset_rid: api_types.DatasetRid, transaction_id: api_types.TransactionRid, logical_paths: list[api_types.PathInDataset], **kwargs, ) -> requests.Response: """Adds files in an open DELETE transaction. Files added to DELETE transactions affect the dataset view by removing files from the view. Args: dataset_rid: Unique identifier of the dataset transaction_id: transaction rid logical_paths: files in the dataset to delete **kwargs: gets passed to :py:meth:`APIClient.api_request` """ return self.api_request( "POST", f"catalog/datasets/{dataset_rid}/transactions/{transaction_id}/files/addToDeleteTransaction", json={"logicalPaths": logical_paths}, **kwargs, )
[docs] def api_get_dataset_stats( self, dataset_rid: api_types.DatasetRid, end_ref: api_types.View = "master", **kwargs, ) -> requests.Response: """Returns response from foundry catalogue stats endpoint. Args: dataset_rid: the dataset rid end_ref: branch or transaction rid of the dataset **kwargs: gets passed to :py:meth:`APIClient.api_request` dict: sizeInBytes, numFiles, hiddenFilesSizeInBytes, numHiddenFiles, numTransactions """ return self.api_request( "GET", f"catalog/datasets/{dataset_rid}/views/{quote_plus(end_ref)}/stats", **kwargs, )