Source code for foundry_dev_tools.resources.dataset

"""Dataset helper class."""

from __future__ import annotations

import io
import os
import tempfile
import time
from contextlib import contextmanager
from pathlib import Path
from typing import TYPE_CHECKING, ClassVar, overload

from foundry_dev_tools.errors.compass import ResourceNotFoundError
from foundry_dev_tools.errors.dataset import (
    BranchNotFoundError,
    DatasetHasNoOpenTransactionError,
    DatasetNotFoundError,
    TransactionTypeMismatchError,
)
from foundry_dev_tools.resources import resource

if TYPE_CHECKING:
    import sys
    from collections.abc import Iterator
    from typing import IO, AnyStr, Literal, Self

    if sys.version_info < (3, 11):
        from typing_extensions import Self
    else:
        from typing import Self

    import pandas as pd
    import pandas.core.frame
    import polars.dataframe.frame
    import pyarrow as pa
    import pyspark.sql

    from foundry_dev_tools.config.context import FoundryContext
    from foundry_dev_tools.utils import api_types


[docs] class Dataset(resource.Resource): """Helper class for datasets.""" _branch: api_types.Branch = None _transaction: api_types.Transaction | None = None __created__: bool | None = None rid_start: ClassVar[str] = "ri.foundry.main.dataset"
[docs] def __init__(self, *args, **kwargs) -> None: if self.__created__ is None: msg = ( "This class cannot be instantiated directly." " Please use Dataset.create, Dataset.from_path or Dataset.from_rid." ) raise RuntimeError(msg) super().__init__(*args, **kwargs)
@property def branch(self) -> api_types.Branch: """The branch of the dataset. If self._branch is not set, and the default "master" branch does not exist, it will be created. """ if not self._branch: # create branch with default name, if the branch is not set # (e.g. when created via get_resource or directly instantiated) self.switch_branch("master", create_branch_if_not_exists=True) return self._branch @branch.setter def branch(self, branch: api_types.DatasetBranch): self._branch = branch
[docs] @classmethod def from_rid( cls, context: FoundryContext, rid: api_types.Rid, /, *, branch: api_types.Ref | api_types.Branch = "master", create_branch_if_not_exists: bool = True, parent_ref: api_types.TransactionRid | None = None, parent_branch_id: api_types.DatasetBranch | None = None, **kwargs, ) -> Dataset: """Returns dataset at path. Args: context: the foundry context for the dataset rid: the rid of the dataset branch: the branch of the dataset create_branch_if_not_exists: create branch if branch does not exist parent_ref: optionally the transaction off which the branch will be based (only used if branch needs to be created) parent_branch_id: optionally a parent branch name, otherwise a root branch (only used if branch needs to be created) **kwargs: passed to :py:meth:`foundry_dev_tools.resources.resource.Resource.from_rid` """ inst = super().from_rid(context, rid, **kwargs) if isinstance(branch, dict): inst.branch = branch else: inst.switch_branch( branch=branch, create_branch_if_not_exists=create_branch_if_not_exists, parent_ref=parent_ref, parent_branch_id=parent_branch_id, ) return inst
[docs] @classmethod def from_path( cls, context: FoundryContext, path: api_types.FoundryPath, /, *, branch: api_types.Ref = "master", create_if_not_exist: bool = False, create_branch_if_not_exists: bool = True, parent_ref: api_types.TransactionRid | None = None, parent_branch_id: api_types.DatasetBranch | None = None, **kwargs, ) -> Dataset: """Returns dataset at path. Args: context: the foundry context for the dataset path: the path where the dataset is located on foundry branch: the branch of the dataset create_if_not_exist: if the dataset does not exist, create it and the branch create_branch_if_not_exists: create branch if branch does not exist, branch always will be created if resource does not exist parent_ref: optionally the transaction off which the branch will be based (only used if branch needs to be created) parent_branch_id: optionally a parent branch name, otherwise a root branch (only used if branch needs to be created) **kwargs: passed to :py:meth:`foundry_dev_tools.resources.resource.Resource.from_path` """ try: inst = super().from_path(context, path, **kwargs) inst.switch_branch( branch=branch, create_branch_if_not_exists=create_branch_if_not_exists, parent_ref=parent_ref, parent_branch_id=parent_branch_id, ) inst.__created__ = False except ResourceNotFoundError: if create_if_not_exist: return cls.create(context, path, branch, parent_ref=parent_ref, parent_branch_id=parent_branch_id) raise else: return inst
[docs] @classmethod def create( cls, context: FoundryContext, path: api_types.FoundryPath, branch: api_types.Ref, parent_ref: api_types.Ref | None = None, parent_branch_id: api_types.Ref | None = None, ) -> Dataset: """Create a foundry dataset. See :py:meth:`~foundry_dev_tools.clients.catalog.CatalogClient.api_create_dataset`. """ rid = context.catalog.api_create_dataset(path).json()["rid"] inst = cls.from_rid(context, rid) inst.switch_branch(branch=branch, parent_ref=parent_ref, parent_branch_id=parent_branch_id) inst.__created__ = True return inst
[docs] def create_branch( self, branch: api_types.DatasetBranch, parent_ref: api_types.TransactionRid | None = None, parent_branch_id: api_types.DatasetBranch | None = None, ) -> Self: """Creates a branch on a dataset and switches to it. Args: branch: the branch to create parent_ref: optionally the transaction off which the branch will be based parent_branch_id: optionally a parent branch name, otherwise a root branch """ self.branch = self._context.catalog.api_create_branch( dataset_rid=self.rid, branch_id=branch, parent_ref=parent_ref, parent_branch_id=parent_branch_id, ).json() return self
[docs] def switch_branch( self, branch: api_types.DatasetBranch, create_branch_if_not_exists: bool = False, parent_ref: api_types.TransactionRid | None = None, parent_branch_id: api_types.DatasetBranch | None = None, ) -> Self: """Switch to another branch. Args: branch: the name of the branch to switch to create_branch_if_not_exists: create branch if branch does not exist, branch always will be created if resource does not exist parent_ref: optionally the transaction off which the branch will be based (only used if branch needs to be created) parent_branch_id: optionally a parent branch name, otherwise a root branch (only used if branch needs to be created) """ try: self.branch = self.get_branch(branch) except BranchNotFoundError: if create_branch_if_not_exists: self.create_branch(branch=branch, parent_ref=parent_ref, parent_branch_id=parent_branch_id) else: raise return self
[docs] def get_branch(self, branch: api_types.DatasetBranch) -> api_types.Branch: """Returns the branch resource.""" return self._context.catalog.api_get_branch( self.rid, branch, ).json()
[docs] def get_transactions( self, page_size: int, end_transaction_rid: api_types.TransactionRid | None = None, include_open_exclusive_transaction: bool | None = False, allow_deleted_dataset: bool | None = None, **kwargs, ) -> list[api_types.Transaction]: """Get reverse transactions. Args: page_size: response page entry size end_transaction_rid: at what transaction to stop listing include_open_exclusive_transaction: include open exclusive transaction allow_deleted_dataset: respond even if dataset was deleted **kwargs: gets passed to :py:meth:`APIClient.api_request` """ return [ v["transaction"] for v in self._context.catalog.api_get_reverse_transactions2( self.rid, self.branch["id"], page_size=page_size, end_transaction_rid=end_transaction_rid, include_open_exclusive_transaction=include_open_exclusive_transaction, allow_deleted_dataset=allow_deleted_dataset, **kwargs, ).json()["values"] ]
[docs] def get_last_transaction(self) -> api_types.Transaction | None: """Returns the last transaction or None if there are no transactions.""" v = self.get_transactions( page_size=1, include_open_exclusive_transaction=True, ) if v is not None and len(v) > 0: return v[0] return None
[docs] def get_open_transaction(self) -> api_types.Transaction | None: """Gets the open transaction or None.""" last = self.get_last_transaction() if last is not None and last["status"] == "OPEN": return last return None
[docs] def start_transaction(self, start_transaction_type: api_types.FoundryTransaction | None = None) -> Self: """Start a transaction on the dataset.""" self._transaction = self._context.catalog.api_start_transaction( self.rid, self.branch["id"], record={}, start_transaction_type=start_transaction_type, ).json() if start_transaction_type is not None and start_transaction_type != "APPEND": self._context.catalog.api_set_transaction_type( self.rid, self.transaction["rid"], start_transaction_type, ) return self
@property def transaction(self) -> api_types.Transaction: """Get the current transaction or raise Error if there is no open transaction.""" if self._transaction is not None: return self._transaction # type: ignore[return-value] self._transaction = self.get_open_transaction() if self._transaction is not None: return self._transaction raise DatasetHasNoOpenTransactionError(f"{self.path=} {self.rid=}") # noqa: EM102,TRY003
[docs] def commit_transaction(self) -> Self: """Commit the transaction on the dataset.""" if self.transaction is not None: self._context.catalog.api_commit_transaction(self.rid, self.transaction["rid"]) self._transaction = None return self
[docs] def abort_transaction(self) -> Self: """Commit the transaction on the dataset.""" if self.transaction is not None: self._context.catalog.api_abort_transaction(self.rid, self.transaction["rid"]) self._transaction = None return self
[docs] def upload_files( self, path_file_dict: dict[api_types.PathInDataset, Path], max_workers: int | None = None, **kwargs, ) -> Self: """Uploads multiple local files to a foundry dataset. Args: max_workers: Set number of threads for upload path_file_dict: A dictionary which maps the path in the dataset -> local file path **kwargs: get passed to :py:meth:`foundry_dev_tools.resources.dataset.Dataset.transaction_context` """ with self.transaction_context(**kwargs): self._context.data_proxy.upload_dataset_files( dataset_rid=self.rid, transaction_rid=self.transaction["rid"], path_file_dict=path_file_dict, max_workers=max_workers, ) return self
[docs] def upload_file( self, file_path: Path, path_in_foundry_dataset: api_types.PathInDataset, **kwargs, ) -> Self: """Upload a file to the dataset. Args: file_path: local file path to upload path_in_foundry_dataset: file path inside the foundry dataset transaction_type: if this dataset does not have an open transaction, opens a transaction with the specified type. **kwargs: get passed to :py:meth:`foundry_dev_tools.resources.dataset.Dataset.transaction_context` """ with self.transaction_context(**kwargs): self._context.data_proxy.upload_dataset_file( dataset_rid=self.rid, transaction_rid=self.transaction["rid"], path=file_path, path_in_foundry_dataset=path_in_foundry_dataset, ) return self
[docs] def upload_folder( self, folder_path: Path, max_workers: int | None = None, **kwargs, ) -> Self: """Uploads all files contained in the folder to the dataset. The default transaction type is UPDATE. Args: folder_path: the folder to upload max_workers: Set number of threads for upload **kwargs: get passed to :py:meth:`foundry_dev_tools.resources.dataset.Dataset.transaction_context` """ kwargs.setdefault("transaction_type", "UPDATE") self.upload_files( {str(f.relative_to(folder_path)): f for f in folder_path.rglob("*") if f.is_file()}, max_workers=max_workers ) return self
[docs] def delete_files(self, logical_paths: list[api_types.PathInDataset], **kwargs) -> Self: """Adds files in an open DELETE transaction. Files added to DELETE transactions affect the dataset view by removing files from the view. Args: logical_paths: files in the dataset to delete **kwargs: get passed to :py:meth:`foundry_dev_tools.resources.dataset.Dataset.transaction_context` (transaction_type is forced to DELETE) """ with self.transaction_context(transaction_type="DELETE", **kwargs): self._context.catalog.api_add_files_to_delete_transaction( self.rid, self.transaction["rid"], logical_paths=logical_paths ) return self
[docs] def remove_file( self, logical_path: api_types.FoundryPath, recursive: bool = False, ) -> Self: """Removes the given file from an open transaction. If the logical path matches a file exactly then only that file will be removed, regardless of the value of recursive. If the logical path represents a directory, then all files prefixed with the logical path followed by '/' will be removed when recursive is true and no files will be removed when recursive is false. If the given logical path does not match a file or directory then this call is ignored and does not throw an exception. Args: logical_path: logical path in the backing filesystem recursive: recurse into subdirectories """ self._context.catalog.api_remove_dataset_file( self.rid, self.transaction["rid"], logical_path=logical_path, recursive=recursive, ) return self
[docs] def put_file( self, logical_path: api_types.PathInDataset, file_data: str | bytes | IO[AnyStr], overwrite: bool | None = None, **kwargs, ) -> Self: """Opens, writes, and closes a file under the specified dataset and transaction. Args: dataset_rid: dataset rid transaction_rid: transaction rid logical_path: file path in dataset file_data: content of the file overwrite: defaults to false, if true -> Overwrite the file if it already exists in the transaction. **kwargs: get passed to :py:meth:`foundry_dev_tools.resources.dataset.Dataset.transaction_context` """ with self.transaction_context(**kwargs): self._context.data_proxy.api_put_file( self.rid, self.transaction["rid"], logical_path=logical_path, file_data=file_data, overwrite=overwrite, ) return self
[docs] def download_files( self, output_directory: Path, paths_in_dataset: set[api_types.PathInDataset] | None = None, max_workers: int | None = None, ) -> list[Path]: """Downloads multiple files from a dataset and saves them in the output directory. Args: output_directory: the directory where the files will be saved paths_in_dataset: the files to download, if None (the default) it will download all files max_workers: how many connections to use in parallel to download the files """ return self._context.data_proxy.download_dataset_files( dataset_rid=self.rid, output_directory=output_directory, view=self.branch["id"], files=paths_in_dataset, max_workers=max_workers, )
[docs] def get_file( self, path_in_dataset: api_types.PathInDataset, start_transaction_rid: api_types.TransactionRid | None = None, range_header: str | None = None, ) -> bytes: """Get bytes of a file in a Dataset. Args: path_in_dataset: the file to get start_transaction_rid: start transaction rid range_header: HTTP range header """ return self._context.data_proxy.api_get_file_in_view( dataset_rid=self.rid, end_ref=self.branch["id"], logical_path=path_in_dataset, start_transaction_rid=start_transaction_rid, range_header=range_header, ).content
[docs] def download_file(self, output_directory: Path, path_in_dataset: api_types.PathInDataset) -> Path: """Downloads the file to the output directory. Args: path_in_dataset: the file to download output_directory: the directory where the file will be saved """ return self._context.data_proxy.download_dataset_file( dataset_rid=self.rid, output_directory=output_directory, foundry_file_path=path_in_dataset, view=self.branch["id"], )
[docs] @contextmanager def download_files_temporary( self, paths_in_dataset: list[api_types.PathInDataset] | None = None, max_workers: int | None = None, ) -> Iterator[Path]: """Downloads dataset files to temporary directory and cleans it up afterwards. A wrapper around :py:meth:`foundry_dev_tools.resources.dataset.Dataset.download_files` together with :py:class:`tempfile.TemporaryDirectory` as a contextmanager. Args: paths_in_dataset: the files to download, if None (the default) it will download all files max_workers: how many connections to use in parallel to download the files """ with tempfile.TemporaryDirectory(suffix=f"foundry_dev_tools-{self.rid}") as tmp_dir: path = Path(tmp_dir) _ = self.download_files(output_directory=path, paths_in_dataset=paths_in_dataset, max_workers=max_workers) yield path
[docs] def save_dataframe( self, df: pandas.core.frame.DataFrame | polars.dataframe.frame.DataFrame | pyspark.sql.DataFrame, transaction_type: api_types.FoundryTransaction = "SNAPSHOT", foundry_schema: api_types.FoundrySchema | None = None, ) -> Self: """Saves a dataframe to Foundry. If the dataset in Foundry does not exist it is created. If the branch does not exist, it is created. If the dataset exists, an exception is thrown. If exists_ok=True is passed, the dataset is overwritten. Creates SNAPSHOT transactions by default. Args: df: A pyspark, pandas or polars DataFrame to upload dataset_path_or_rid: path or rid of the dataset in which the object should be stored. branch: Branch of the dataset in which the object should be stored exists_ok: By default, this method creates a new dataset. Pass exists_ok=True to overwrite according to strategy from parameter 'mode' transaction_type: Foundry Transaction type, see :py:class:`foundry_dev_tools.utils.api_types.FoundryTransaction` foundry_schema: use a custom foundry schema instead of the infered one """ with self.transaction_context(transaction_type=transaction_type): from foundry_dev_tools._optional.pandas import pd from foundry_dev_tools._optional.polars import pl # TODO needed? # to be backwards compatible to most readers, that expect files # to be under spark/ folder = str(round(time.time() * 1000)) if transaction_type == "APPEND" else "spark" parquet_compression = "snappy" use_pandas = not pd.__fake__ and isinstance(df, pd.DataFrame) use_polars = not pl.__fake__ and isinstance(df, pl.DataFrame) if use_pandas or use_polars: buf = io.BytesIO() schema_flavor = "spark" if use_pandas: # write pandas dataframe as parquet to buffer df.to_parquet( buf, compression=parquet_compression, flavor=schema_flavor, index=False, ) else: # write polars dataframe as parquet to buffer df.write_parquet( buf, compression=parquet_compression, use_pyarrow=True, pyarrow_options={"flavor": schema_flavor}, ) buf.seek(0) # go to beginning of file self.put_file( folder + "/dataset.parquet", buf, ) else: with tempfile.TemporaryDirectory() as path: p_path = Path(path) df.write.format("parquet").option("compression", parquet_compression).save( path=path, mode="overwrite", ) filenames = list( filter(lambda file: not file.endswith(".crc"), os.listdir(path)), ) path_file_dict = {f"{folder}/{file}": p_path.joinpath(file) for file in filenames} self._context.data_proxy.upload_dataset_files( self.rid, self.transaction["rid"], path_file_dict=path_file_dict, ) transaction = self.transaction if foundry_schema is None: foundry_schema = self.infer_schema() self.upload_schema(transaction["rid"], foundry_schema) return self
[docs] def infer_schema(self) -> dict: """Returns the infered dataset schema.""" return self._context.schema_inference.infer_dataset_schema(self.rid, self.branch["id"])
[docs] def upload_schema(self, transaction_rid: api_types.TransactionRid, schema: api_types.FoundrySchema) -> Self: """Uploads the foundry dataset schema for a dataset, transaction, branch combination. Args: transaction_rid: The rid of the transaction schema: The foundry schema """ self._context.metadata.api_upload_dataset_schema( dataset_rid=self.rid, transaction_rid=transaction_rid, schema=schema, branch=self.branch["id"], ) return self
[docs] def list_files( self, end_ref: api_types.View | None = None, page_size: int = 1000, logical_path: api_types.PathInDataset | None = None, page_start_logical_path: api_types.PathInDataset | None = None, include_open_exclusive_transaction: bool = False, exclude_hidden_files: bool = False, temporary_credentials_auth_token: str | None = None, ) -> list: """Wraps :py:meth:`foundry_dev_tools.clients.CatalogClient.list_dataset_files`. Args: end_ref: branch or transaction rid of the dataset, defaults to the current branch page_size: the maximum page size returned logical_path: If logical_path is absent, returns all files in the view. If logical_path matches a file exactly, returns just that file. Otherwise, returns all files in the "directory" of logical_path: (a slash is added to the end of logicalPath if necessary and a prefix-match is performed) page_start_logical_path: if specified page starts at the given path, otherwise at the beginning of the file list include_open_exclusive_transaction: if files added in open transaction should be returned as well in the response exclude_hidden_files: if hidden files should be excluded (e.g. _log files) temporary_credentials_auth_token: to generate temporary credentials for presigned URLs Returns: list[FileResourcesPage]: .. code-block:: python [ { "logicalPath": "..", "pageStartLogicalPath": "..", "includeOpenExclusiveTransaction": "..", "excludeHiddenFiles": "..", }, ] """ return self._context.catalog.list_dataset_files( self.rid, end_ref=end_ref or self.branch["id"], page_size=page_size, logical_path=logical_path, page_start_logical_path=page_start_logical_path, include_open_exclusive_transaction=include_open_exclusive_transaction, exclude_hidden_files=exclude_hidden_files, temporary_credentials_auth_token=temporary_credentials_auth_token, )
@overload def query_foundry_sql( self, query: str, return_type: Literal["pandas"], sql_dialect: api_types.SqlDialect = ..., timeout: int = ..., ) -> pd.core.frame.DataFrame: ... @overload def query_foundry_sql( self, query: str, return_type: Literal["spark"], sql_dialect: api_types.SqlDialect = ..., timeout: int = ..., ) -> pyspark.sql.DataFrame: ... @overload def query_foundry_sql( self, query: str, return_type: Literal["arrow"], sql_dialect: api_types.SqlDialect = ..., timeout: int = ..., ) -> pa.Table: ... @overload def query_foundry_sql( self, query: str, return_type: Literal["raw"], sql_dialect: api_types.SqlDialect = ..., timeout: int = ..., ) -> tuple[dict, list[list]]: ... @overload def query_foundry_sql( self, query: str, return_type: api_types.SQLReturnType = ..., sql_dialect: api_types.SqlDialect = ..., timeout: int = ..., ) -> tuple[dict, list[list]] | pd.core.frame.DataFrame | pa.Table | pyspark.sql.DataFrame: ...
[docs] def query_foundry_sql( self, query: str, return_type: api_types.SQLReturnType = "pandas", sql_dialect: api_types.SqlDialect = "SPARK", timeout: int = 600, ) -> tuple[dict, list[list]] | pd.DataFrame | pa.Table | pyspark.sql.DataFrame: """Wrapper around :py:meth:`foundry_dev_tools.clients.foundry_sql_server.FoundrySqlServerClient.query_foundry_sql`. But it automatically prepends the dataset location, so instead of: >>> ctx.foundry_sql_server.query_foundry_sql("SELECT * FROM `/path/to/dataset` WHERE a=1") You can just use: >>> ds = ctx.get_dataset_by_path("/path/to/dataset") >>> ds.query_foundry_sql("SELECT * WHERE a=1") """ # noqa: E501 # This is a quick fix. Foundry Sql Server seems to decide that # "FROM `rid` SELECT *" is not direct read eligible. # tracking: ri.issues.main.issue.da8272ca-9e78-4100-af3f-6ac40aaecf51 query_string = f"SELECT * FROM `{self.rid}`" if query == "SELECT *" else f"FROM `{self.rid}` {query}" # noqa: S608 return self._context.foundry_sql_server.query_foundry_sql( query=query_string, return_type=return_type, branch=self.branch["id"], sql_dialect=sql_dialect, timeout=timeout, )
[docs] def to_spark(self) -> pyspark.sql.DataFrame: """Get dataset as :py:class:`pyspark.sql.DataFrame`. Via :py:meth:`foundry_dev_tools.resources.dataset.Dataset.query_foundry_sql` """ return self.query_foundry_sql("SELECT *", return_type="spark")
[docs] def to_arrow(self) -> pa.Table: """Get dataset as a :py:class:`pyarrow.Table`. Via :py:meth:`foundry_dev_tools.resources.dataset.Dataset.query_foundry_sql` """ return self.query_foundry_sql("SELECT *", return_type="arrow")
[docs] def to_pandas(self) -> pandas.core.frame.DataFrame: """Get dataset as a :py:class:`pandas.DataFrame`. Via :py:meth:`foundry_dev_tools.resources.dataset.Dataset.query_foundry_sql` """ return self.query_foundry_sql("SELECT *", return_type="pandas")
[docs] def to_polars(self) -> polars.dataframe.frame.DataFrame: """Get dataset as a :py:class:`polars.DataFrame`. Via :py:meth:`foundry_dev_tools.resources.dataset.Dataset.query_foundry_sql` """ try: import polars as pl except ImportError as e: msg = "The optional 'polars' package is not installed. Please install it to use the 'to_polars' method" raise ImportError(msg) from e return pl.from_arrow(self.to_arrow())
[docs] @contextmanager def transaction_context( self, transaction_type: api_types.FoundryTransaction | None = None, abort_on_error: bool = True, ): """Handles transactions for dataset functions. If there is no open transaction it will start one. If there is already an open transaction it will check if the transaction_type is correct. If this context manager started the transaction it will also commit or abort it (if abort_on_error is True). Args: abort_on_error: if an error happens while in transaction_context and this is set to true it will abort the transaction instead of committing it. Only takes effect if this context manager also started the transaction. transaction_type: if there is no open transaction it will open a transaction with this type, if there is already an open transaction and it does not match this transaction_type it will raise an Error """ created = False # first check if there is already an open transaction if self._transaction is None: self._transaction = self.get_open_transaction() if self._transaction is None: self.start_transaction(transaction_type) created = True elif transaction_type is not None and self.transaction["type"] != transaction_type: raise TransactionTypeMismatchError( requested_transaction_type=transaction_type, open_transaction_type=self.transaction["type"] ) try: yield except: if abort_on_error and created: self.abort_transaction() raise else: if created: self.commit_transaction()
[docs] def sync(self) -> Self: """Fetches the attributes again + the dataset branch information.""" # update branch information self.switch_branch(self.branch["id"]) try: return super().sync() except ResourceNotFoundError as e: raise DatasetNotFoundError from e
def _get_repr_dict(self) -> dict: d = super()._get_repr_dict() d["branch"] = self.branch["id"] return d
resource.RID_CLASS_REGISTRY[Dataset.rid_start] = Dataset