Source code for foundry_dev_tools.clients.foundry_sql_server

"""Implementation of the foundry-sql-server API."""

from __future__ import annotations

import time
import warnings
from typing import TYPE_CHECKING, Literal, overload

from foundry_dev_tools.clients.api_client import APIClient
from foundry_dev_tools.errors.handling import ErrorHandlingConfig
from foundry_dev_tools.errors.sql import (
    FoundrySqlQueryClientTimedOutError,
    FoundrySqlQueryFailedError,
    FoundrySqlSerializationFormatNotImplementedError,
)
from foundry_dev_tools.utils.api_types import Ref, SqlDialect, SQLReturnType, assert_in_literal

if TYPE_CHECKING:
    import pandas as pd
    import pyarrow as pa
    import pyspark
    import requests


[docs] class FoundrySqlServerClient(APIClient): """FoundrySqlServerClient class that implements methods from the 'foundry-sql-server' API.""" api_name = "foundry-sql-server" @overload def query_foundry_sql( self, query: str, return_type: Literal["pandas"], branch: Ref = ..., sql_dialect: SqlDialect = ..., timeout: int = ..., ) -> pd.core.frame.DataFrame: ... @overload def query_foundry_sql( self, query: str, return_type: Literal["spark"], branch: Ref = ..., sql_dialect: SqlDialect = ..., timeout: int = ..., ) -> pyspark.sql.DataFrame: ... @overload def query_foundry_sql( self, query: str, return_type: Literal["arrow"], branch: Ref = ..., sql_dialect: SqlDialect = ..., timeout: int = ..., ) -> pa.Table: ... @overload def query_foundry_sql( self, query: str, return_type: Literal["raw"], branch: Ref = ..., sql_dialect: SqlDialect = ..., timeout: int = ..., ) -> tuple[dict, list[list]]: ... @overload def query_foundry_sql( self, query: str, return_type: SQLReturnType = ..., branch: Ref = ..., sql_dialect: SqlDialect = ..., timeout: int = ..., ) -> tuple[dict, list[list]] | pd.core.frame.DataFrame | pa.Table | pyspark.sql.DataFrame: ...
[docs] def query_foundry_sql( self, query: str, return_type: SQLReturnType = "pandas", branch: Ref = "master", sql_dialect: SqlDialect = "SPARK", timeout: int = 600, ) -> tuple[dict, list[list]] | pd.core.frame.DataFrame | pa.Table | pyspark.sql.DataFrame: """Queries the Foundry SQL server with spark SQL dialect. Uses Arrow IPC to communicate with the Foundry SQL Server Endpoint. Falls back to query_foundry_sql_legacy in case pyarrow is not installed or the query does not return Arrow Format. Example: df1 = client.query_foundry_sql("SELECT * FROM `/Global/Foundry Operations/Foundry Support/iris`") query = ("SELECT col1 FROM `{start_transaction_rid}:{end_transaction_rid}@{branch}`.`{dataset_path_or_rid}` WHERE filterColumns = 'value1' LIMIT 1") df2 = client.query_foundry_sql(query) Args: query: The SQL Query in Foundry Spark Dialect (use backticks instead of quotes) branch: the dataset branch sql_dialect: the sql dialect return_type: See :py:class:foundry_dev_tools.foundry_api_client.SQLReturnType timeout: Query Timeout, default value is 600 seconds Returns: :external+pandas:py:class:`~pandas.DataFrame` | :external+pyarrow:py:class:`~pyarrow.Table` | :external+spark:py:class:`~pyspark.sql.DataFrame`: A pandas DataFrame, Spark DataFrame or pyarrow.Table with the result. Raises: ValueError: Only direct read eligible queries can be returned as arrow Table. """ # noqa: E501 if return_type != "raw": try: response_json = self.api_queries_execute( query, branch=branch, dialect=sql_dialect, timeout=timeout, ).json() query_id = response_json["queryId"] status = response_json["status"] if status != {"ready": {}, "type": "ready"}: start_time = time.time() query_id = response_json["queryId"] while response_json["status"]["type"] == "running": response = self.api_queries_status(query_id) response_json = response.json() if response_json["status"]["type"] == "failed": raise FoundrySqlQueryFailedError(response) if time.time() > start_time + timeout: raise FoundrySqlQueryClientTimedOutError(response, timeout=timeout) time.sleep(0.2) arrow_stream_reader = self.read_fsql_query_results_arrow(query_id=query_id) if return_type == "pandas": return arrow_stream_reader.read_pandas() if return_type == "spark": from foundry_dev_tools.utils.converter.foundry_spark import ( arrow_stream_to_spark_dataframe, ) return arrow_stream_to_spark_dataframe(arrow_stream_reader) return arrow_stream_reader.read_all() return self._query_fsql( query=query, branch=branch, return_type=return_type, ) except ( FoundrySqlSerializationFormatNotImplementedError, ImportError, ) as exc: if return_type == "arrow": msg = ( "Only direct read eligible queries can be returned as arrow Table. Consider using setting" " return_type to 'pandas'." ) raise ValueError( msg, ) from exc warnings.warn("Falling back to query_foundry_sql_legacy!") return self.context.data_proxy.query_foundry_sql_legacy( query=query, return_type=return_type, branch=branch, sql_dialect=sql_dialect, timeout=timeout, )
[docs] def read_fsql_query_results_arrow(self, query_id: str) -> pa.ipc.RecordBatchStreamReader: """Create a bytes io reader if query returned arrow.""" from foundry_dev_tools._optional.pyarrow import pa # Couldn't get preload_content=False and gzip content-encoding to work together # If no bytesIO wrapper is used, response.read(1, cache_content=true) # does not return the first character but an empty byte (no idea why). # So it is essentially a trade-off between download time and memory consumption. # Download time seems to be a lot faster (2x) with gzip encoding turned on, while # memory consumption increases by the amount of raw bytes returned from the sql server. # I will optimize for faster downloads, for now. This decision can be revisited later. # # 01/2022: Moving to 'requests' instead of 'urllib3', did some experiments again # and noticed that preloading content is significantly faster than stream=True response = self.api_queries_results(query_id) if response.content[0] != 65: # 65 = "A" # # Queries are direct read eligible when: # # The dataset files are in a supported format. # # The formats currently supported by direct read are Parquet, CSV, Avro, and Soho. # # The query does not require SQL compute. Queries which contain aggregate, join, order by, # # and filter predicates are not direct read eligible. # # The query does not select from a column with a type that is ineligible for direct read. # # Ineligible types are array, map, and struct. # # May 2023: ARROW_V1 seems to consistently return ARROW format and not fallback to JSON. raise FoundrySqlSerializationFormatNotImplementedError(response) return pa.ipc.RecordBatchStreamReader(response.content[1:])
[docs] def api_queries_execute( self, query: str, branch: Ref, dialect: SqlDialect = "SPARK", timeout: int = 600, **kwargs, ) -> requests.Response: """Queries the foundry sql server. Args: query: the SQL query branch: the dataset branch dialect: see :py:class:`foundry_dev_tools.utils.api_types.SqlDialect` timeout: the query timeout **kwargs: gets passed to :py:meth:`APIClient.api_request` """ assert_in_literal(dialect, SqlDialect, "dialect") return self.api_request( "POST", "queries/execute", json={ "dialect": dialect, "fallbackBranchIds": [branch], "parameters": {"type": "unnamedParameterValues", "unnamedParameterValues": []}, "query": query, "serializationProtocol": "ARROW_V1", "timeout": timeout, }, error_handling=ErrorHandlingConfig(branch=branch, dialect=dialect, timeout=timeout), **kwargs, )
[docs] def api_queries_status( self, query_id: str, **kwargs, ) -> requests.Response: """Get the foundry sql query status. Args: query_id: query id to get status **kwargs: gets passed to :py:meth:`APIClient.api_request` """ return self.api_request( "GET", f"queries/{query_id}/status", json={}, **kwargs, )
[docs] def api_queries_results( self, query_id: str, **kwargs, ) -> requests.Response: """Get query results. Args: query_id: query id to get results **kwargs: gets passed to :py:meth:`APIClient.api_request` """ return self.api_request( "GET", f"queries/{query_id}/results", headers={ "Accept": "application/octet-stream", }, **kwargs, )