Source code for foundry_dev_tools.clients.build2

"""Implementation of the build2 API."""

from __future__ import annotations

from typing import TYPE_CHECKING, Any

from foundry_dev_tools.clients.api_client import APIClient
from foundry_dev_tools.errors.compass import ResourceNotFoundError
from foundry_dev_tools.errors.handling import ErrorHandlingConfig
from foundry_dev_tools.utils import api_types
from foundry_dev_tools.utils.api_types import assert_in_literal

if TYPE_CHECKING:
    import requests

JOB_SPEC_SELECTION_TYPES = {
    api_types.JobSpecJobSpecsSelection: "jobSpecs",
    api_types.UpstreamJobSpecSelection: "upstream",
    api_types.ConnectingJobSpecSelection: "connecting",
    api_types.DatasetsJobSpecSelection: "datasets",
}


def _convert_job_spec_selections(
    job_spec_selections: list[
        api_types.DatasetsJobSpecSelection
        | api_types.JobSpecJobSpecsSelection
        | api_types.UpstreamJobSpecSelection
        | api_types.ConnectingJobSpecSelection
    ],
) -> list:
    """Converts existing job spec selections based on the provided 'job_spec_selection_type'."""
    result = []

    for job_spec_selection in job_spec_selections:
        for job_spec_selection_class, job_spec_selection_type in JOB_SPEC_SELECTION_TYPES.items():
            if set(job_spec_selection.keys()) == set(job_spec_selection_class.__annotations__.keys()):
                result.append(
                    {
                        job_spec_selection_type: job_spec_selection,
                        "type": job_spec_selection_type,
                    }
                )
                continue

    return result


[docs] class Build2Client(APIClient): """Build2Client class implements the 'build2' API.""" api_name = "build2"
[docs] def api_get_build_report(self, build_rid: api_types.Rid, **kwargs) -> requests.Response: """Returns the build report. Args: build_rid: the build RID **kwargs: gets passed to :py:meth:`APIClient.api_request` """ return self.api_request( "GET", "info/builds2/" + build_rid, error_handling=ErrorHandlingConfig({204: ResourceNotFoundError}), **kwargs, )
[docs] def api_get_job_report(self, job_rid: api_types.Rid, **kwargs) -> requests.Response: """Returns the report for a job. Args: job_rid: the job RID **kwargs: gets passed to :py:meth:`APIClient.api_request` """ return self.api_request( "GET", "info/jobs3/" + job_rid, error_handling=ErrorHandlingConfig({204: ResourceNotFoundError}), **kwargs, )
[docs] def api_submit_build( self, job_spec_selections: list[ api_types.DatasetsJobSpecSelection | api_types.JobSpecJobSpecsSelection | api_types.UpstreamJobSpecSelection | api_types.ConnectingJobSpecSelection ], submission_id: str | None = None, build_group_rid: str | None = None, branch: api_types.DatasetBranch = "master", branch_fallbacks: set[api_types.DatasetBranch] | None = None, force_build: bool | None = None, ignore_build_policy: bool | None = False, finish_on_failure: bool | None = None, num_run_job_attempts: int | None = None, force_retry: bool | None = None, retry_backoff_duration: int | None = None, build_parameters: dict[str, Any] | None = None, suppress_notifications: bool | None = None, exceeded_duration_mode: api_types.ExceededDurationMode | None = None, input_failure_strategies: list[api_types.InputStrategy] | None = None, output_queue_strategy: api_types.OutputQueueStrategy | None = None, **kwargs, ) -> requests.Response: """Request to submit a build. Args: job_spec_selections: A set of job spec selection for which the build service will determine the job specs to run based off of the information provided in the selection. There are several types of job spec selections to choose from: (a) :py:class:`~foundry_dev_tools.utils.api_types.DatasetsJobSpecSelection`: The build service will determine the job specs to run, based on the dataset rids provided (b) :py:class:`~foundry_dev_tools.utils.api_types.JobSpecJobSpecsSelection`: The build service will directly run the specified job spec rids (c) :py:class:`~foundry_dev_tools.utils.api_types.UpstreamJobSpecSelection`: A list of downstream dataset rids can be defined to start from searching for job specs. A separate list of dataset rids to ignore can be specified as well in order to prevent further exploration of the dependency graph on the given dataset rids (d) :py:class:`~foundry_dev_tools.utils.api_types.ConnectingJobSpecSelection`: The build service will determine the job specs in between the list of upstream and downstream dataset rids. Additionally, a list of dataset rids to ignore can be passed along on which to prevent further exploration of the dependency graph submission_id: Optional submission identifier to uniquely identify a new build submission build_group_rid: Specify in order to join a build group which serves as a collector acting as if all the builds sharing the same build group were submitted as a single build branch: The branch that all jobs in this build are run on. Defaults to `master` if not specified branch_fallbacks: Fallback branches to use when resolving dataset properties for input specs force_build: When set to 'True', will ignore staleness checking when running this build and runs all job specs regardless of whether they are stale ignore_build_policy: When set to 'True', will allow for jobs to be run even if their build policy is not satisfied finish_on_failure: When set to 'True', will abort all other jobs and finish the build if any single job fails num_run_job_attempts: The maximum number of attempts a job should be run when it fails due to a retryable error. If a value <= 1 is specified, jobs will never be retried. force_retry: When set to 'True', all errors will be treated as retryable. The maximum number of retry attempts is strictly given by `num_run_job_attempts` retry_backoff_duration: The duration in seconds to wait before attempting to run a job again. If not present, jobs will be retried immediately build_parameters: Parameters to apply to every job in the build which typically serve as "configuration" for the workers suppress_notifications: When set to 'True', the user will not receive a notification when the build is complete exceeded_duration_mode: The action taken when job with custom expiration has expired. Defaults to 'CANCEL' if not provided at all input_failure_strategies: Overwrites the strategy for each input dataset in case its build fails. output_queue_strategy: Defines the output queueing behaviour of the jobs that are created by this build. Defaults to 'QUEUE_UP' for builds with BatchVariant job specs and 'SUPERSEDE' for builds with LongRunningVariant job specs. **kwargs: gets passed to :py:meth:`APIClient.api_request` Returns: requests.Response: the response contains a json which returns information to the build submitted previously. """ if exceeded_duration_mode: assert_in_literal(exceeded_duration_mode, api_types.ExceededDurationMode, "exceeded_duration_mode") for input_strategy in input_failure_strategies or {}: assert_in_literal( input_strategy["failureStrategy"], api_types.InputFailureStrategy, "input_failure_strategies['failureStrategy']", ) if output_queue_strategy: assert_in_literal(output_queue_strategy, api_types.OutputQueueStrategy, "output_queue_strategy") job_spec_selections = _convert_job_spec_selections(job_spec_selections) body = { "jobSpecSelections": job_spec_selections, "submissionId": submission_id, "buildGroupRid": build_group_rid, "branch": branch, "branchFallbacks": {"branches": list(branch_fallbacks) if branch_fallbacks else []}, "forceBuild": force_build, "ignoreBuildPolicy": ignore_build_policy, "finishOnFailures": finish_on_failure, "numRunJobAttempts": num_run_job_attempts, "forceRetry": force_retry, "retryBackoffDuration": retry_backoff_duration, "buildParameters": build_parameters or {}, "suppressNotifications": suppress_notifications, "exceededDurationMode": exceeded_duration_mode, "inputFailureStrategies": list(input_failure_strategies) if input_failure_strategies else [], "outputQueueStrategy": output_queue_strategy, } return self.api_request( "POST", "manager/submitBuild", json=body, **kwargs, )
[docs] def submit_dataset_build( self, dataset_rid: api_types.DatasetRid, branch: api_types.DatasetBranch = "master", force_build: bool | None = False, ) -> dict: """Submit a dataset build. Args: dataset_rid: The resource identifier of the dataset to build branch: The branch that all jobs in this build are run on. Defaults to `master` if not specified force_build: When set to 'True', will ignore staleness checking when running this build and runs all job specs regardless of whether they are stale. Defaults to 'False' Returns: dict: .. code-block:: python { "isNewSubmission": "<submission-id>", "buildRid": "<...>", "buildGroupRid": "<build-group-rid>", "jobsCreated": { "<job-spec-rid>": "<job-rid>", ... }, "jobsInOtherBuilds": { "<job-spec-rid>": "<job-rid>", ... } } """ datasets_job_spec_selections = [{"datasetRids": [dataset_rid], "isRequired": True}] return self.api_submit_build( job_spec_selections=datasets_job_spec_selections, branch=branch, force_build=force_build ).json()