Source code for foundry_dev_tools.clients.tables

"""Implementation of the tables API."""

from __future__ import annotations

from typing import TYPE_CHECKING

from foundry_dev_tools.clients.api_client import APIClient

if TYPE_CHECKING:
    import requests

    from foundry_dev_tools.utils.api_types import FolderRid, SourceRid


[docs] class TablesClient(APIClient): """TablesClient class that implements the 'tables' api.""" api_name = "tables"
[docs] def api_create_table( self, name: str, parent_rid: FolderRid, upstream_config: dict, skip_validation: bool = False, **kwargs ) -> requests.Response: """Low level method to create a virtual table. Use a high level method like :py:meth:`TablesClient.create_glue_table` for easier usage. Args: name: The name of the table. This will be the name of the compass resource. parent_rid: The parent Rid of the compass resource. upstream_config: Individual table config. skip_validation: If true, skips the validation of the table. Default is False. **kwargs: gets passed to :py:meth:`APIClient.api_request` """ return self.api_request( "POST", api_path="tables", json={"name": name, "parentRid": parent_rid, "upstream": upstream_config}, params={"skipValidation": skip_validation}, **kwargs, )
[docs] def create_glue_table( self, name: str, parent_rid: FolderRid, connection_rid: SourceRid, table: str, database: str, skip_validation: bool = False, ) -> dict: """Creates a Virtual Table backed by the Glue Catalog. Args: name: The name of the table. This will be the name of the compass resource. parent_rid: The parent Rid of the compass resource. connection_rid: The rid of the compass Source. table: The name of the table in glue. database: The name of the database in glue. skip_validation: If true, skips the validation of the table. Default is False. """ upstream_config = { "type": "glue", "glue": {"connectionRid": connection_rid, "relation": {"table": table, "database": database}}, } return self.api_create_table( name=name, parent_rid=parent_rid, upstream_config=upstream_config, skip_validation=skip_validation ).json()
[docs] def create_snowflake_table( self, name: str, parent_rid: FolderRid, connection_rid: SourceRid, database: str, schema: str, table: str, skip_validation: bool = False, ) -> dict: """Creates a Virtual Table on a Snowflake Source. Args: name: The name of the table. This will be the name of the compass resource. parent_rid: The parent Rid of the compass resource. connection_rid: The rid of the compass Source. table: The name of the table in snowflake. database: The name of the database Snowflake. schema: The name of the schema in Snowflake. skip_validation: If true, skips the validation of the table. Default is False. """ upstream_config = { "type": "snowflake", "snowflake": { "connectionRid": connection_rid, "relation": {"table": table, "database": database, "schema": schema}, }, } return self.api_create_table( name=name, parent_rid=parent_rid, upstream_config=upstream_config, skip_validation=skip_validation ).json()
[docs] def api_list_tables( self, connection_rid: SourceRid, limit: int | None = 1000, page_token: str | None = None, **kwargs ) -> requests.Response: """List Virtual Tables for a source.""" return self.api_request( "GET", api_path="tables", params={ "connectionRid": connection_rid, **({"limit": limit} if limit else {}), **({"pageToken": page_token} if page_token else {}), }, **kwargs, )
[docs] def list_tables(self, connection_rid: SourceRid) -> list[dict]: """Returns all tables for a connection/source by handling pagination. Args: connection_rid: The rid of the compass Source/Connection. Returns: a list of dicts with the following format: { 'rid': 'ri.tables.main.table.<...>', 'upstream': { 'type': '<type>', '<type>': { 'connectionRid': 'ri.magritte..source.<...>', 'relation': {'database': 'test', 'schema': 'test', 'table': 'test2'} } } } """ tables_response = self.api_list_tables(connection_rid=connection_rid, limit=1000) response_json = tables_response.json() tables = response_json["tables"] while next_page_token := response_json.get("nextPageToken"): response_json = self.api_list_tables(connection_rid=connection_rid, page_token=next_page_token).json() tables.extend(response_json["tables"]) return tables