Skip to content

octopus.manager

OctoManager.

ExecutionStrategy

Bases: Protocol

Protocol for outersplit execution strategies.

Source code in octopus/manager/execution.py
class ExecutionStrategy(Protocol):
    """Protocol for outersplit execution strategies."""

    def execute(
        self,
        outersplit_data: OuterSplits,
        run_fn: "Callable[[int, OuterSplit], None]",
    ) -> None:
        """Execute outersplits using this strategy."""
        ...

execute(outersplit_data, run_fn)

Execute outersplits using this strategy.

Source code in octopus/manager/execution.py
def execute(
    self,
    outersplit_data: OuterSplits,
    run_fn: "Callable[[int, OuterSplit], None]",
) -> None:
    """Execute outersplits using this strategy."""
    ...

OctoManager

Orchestrates the execution of outersplits.

Source code in octopus/manager/core.py
@define
class OctoManager:
    """Orchestrates the execution of outersplits."""

    outersplit_data: OuterSplits = field(validator=[validators.instance_of(dict)])
    """Preprocessed data for each outersplit, keyed by outersplit identifier."""

    study_context: StudyContext = field(validator=[validators.instance_of(StudyContext)])
    """Frozen runtime context containing study configuration."""

    workflow: Sequence[Task] = field(validator=[validators.instance_of(list)])
    """Workflow tasks to execute."""

    outer_parallelization: bool = field(validator=[validators.instance_of(bool)])
    """Whether to run outersplits in parallel."""

    run_single_outersplit_num: int = field(validator=[validators.instance_of(int)])
    """Index of single outersplit to run (-1 for all)."""

    def run_outersplits(self) -> None:
        """Run all outersplits."""
        if not self.outersplit_data:
            raise ValueError("No outersplit data defined")

        # Initialize Ray upfront to ensure worker setup hooks are registered before any workflows execute.
        # This is critical for:
        # 1. Inner parallelization: ML modules (e.g., Octo, AutoGluon) may spawn Ray workers for their
        #    internal operations (bagging, hyperparameter tuning) even when outer_parallelization=False
        # 2. Safety checks: The worker setup hook (_check_parallelization_disabled) must be configured
        #    before any Ray workers start, to detect and prevent thread-level parallelization issues
        # 3. Lifecycle clarity: Explicit init → run → shutdown at the manager level makes the
        #    Ray lifecycle predictable and easier to reason about
        init_ray(start_local_if_missing=True)

        resources = ResourceConfig.create(
            num_outersplits=len(self.outersplit_data),
            outer_parallelization=self.outer_parallelization,
            run_single_outersplit_num=self.run_single_outersplit_num,
        )
        logger.info(f"Preparing execution | {resources}")

        try:
            runner = WorkflowTaskRunner(
                study_context=self.study_context,
                workflow=self.workflow,
                cpus_per_outersplit=resources.cpus_per_outersplit,
            )
            strategy = self._select_strategy(resources.num_workers)
            strategy.execute(self.outersplit_data, runner.run)
        finally:
            shutdown_ray()

    def _select_strategy(self, num_workers: int) -> ExecutionStrategy:
        """Select execution strategy based on configuration.

        Args:
            num_workers: Number of parallel workers for ParallelRayStrategy.

        Returns:
            Appropriate execution strategy based on configuration.
        """
        if self.run_single_outersplit_num != -1:
            return SingleOutersplitStrategy(self.run_single_outersplit_num)
        if self.outer_parallelization:
            return ParallelRayStrategy(num_workers, self.study_context.log_dir)
        return SequentialStrategy()

outer_parallelization = field(validator=[validators.instance_of(bool)]) class-attribute instance-attribute

Whether to run outersplits in parallel.

outersplit_data = field(validator=[validators.instance_of(dict)]) class-attribute instance-attribute

Preprocessed data for each outersplit, keyed by outersplit identifier.

run_single_outersplit_num = field(validator=[validators.instance_of(int)]) class-attribute instance-attribute

Index of single outersplit to run (-1 for all).

study_context = field(validator=[validators.instance_of(StudyContext)]) class-attribute instance-attribute

Frozen runtime context containing study configuration.

workflow = field(validator=[validators.instance_of(list)]) class-attribute instance-attribute

Workflow tasks to execute.

run_outersplits()

Run all outersplits.

Source code in octopus/manager/core.py
def run_outersplits(self) -> None:
    """Run all outersplits."""
    if not self.outersplit_data:
        raise ValueError("No outersplit data defined")

    # Initialize Ray upfront to ensure worker setup hooks are registered before any workflows execute.
    # This is critical for:
    # 1. Inner parallelization: ML modules (e.g., Octo, AutoGluon) may spawn Ray workers for their
    #    internal operations (bagging, hyperparameter tuning) even when outer_parallelization=False
    # 2. Safety checks: The worker setup hook (_check_parallelization_disabled) must be configured
    #    before any Ray workers start, to detect and prevent thread-level parallelization issues
    # 3. Lifecycle clarity: Explicit init → run → shutdown at the manager level makes the
    #    Ray lifecycle predictable and easier to reason about
    init_ray(start_local_if_missing=True)

    resources = ResourceConfig.create(
        num_outersplits=len(self.outersplit_data),
        outer_parallelization=self.outer_parallelization,
        run_single_outersplit_num=self.run_single_outersplit_num,
    )
    logger.info(f"Preparing execution | {resources}")

    try:
        runner = WorkflowTaskRunner(
            study_context=self.study_context,
            workflow=self.workflow,
            cpus_per_outersplit=resources.cpus_per_outersplit,
        )
        strategy = self._select_strategy(resources.num_workers)
        strategy.execute(self.outersplit_data, runner.run)
    finally:
        shutdown_ray()

ParallelRayStrategy

Run outersplits in parallel using Ray.

Source code in octopus/manager/execution.py
@define
class ParallelRayStrategy:
    """Run outersplits in parallel using Ray."""

    num_workers: int
    log_dir: UPath

    def execute(
        self,
        outersplit_data: OuterSplits,
        run_fn: "Callable[[int, OuterSplit], None]",
    ) -> None:
        """Execute all outersplits in parallel using Ray."""

        def wrapped_run(outersplit_id: int, outersplit: OuterSplit) -> None:
            logger.set_log_group(LogGroup.PROCESSING, f"OUTER {outersplit_id}")
            logger.info("Starting execution")
            try:
                run_fn(outersplit_id, outersplit)
                logger.set_log_group(LogGroup.PREPARE_EXECUTION, f"OUTER {outersplit_id}")
                logger.info("Completed successfully")
            except Exception as e:
                logger.exception(f"Exception in task {outersplit_id}: {e!s}")

        run_parallel_outer_ray(
            outersplit_data=outersplit_data,
            run_fn=wrapped_run,
            log_dir=self.log_dir,
            num_workers=self.num_workers,
        )

execute(outersplit_data, run_fn)

Execute all outersplits in parallel using Ray.

Source code in octopus/manager/execution.py
def execute(
    self,
    outersplit_data: OuterSplits,
    run_fn: "Callable[[int, OuterSplit], None]",
) -> None:
    """Execute all outersplits in parallel using Ray."""

    def wrapped_run(outersplit_id: int, outersplit: OuterSplit) -> None:
        logger.set_log_group(LogGroup.PROCESSING, f"OUTER {outersplit_id}")
        logger.info("Starting execution")
        try:
            run_fn(outersplit_id, outersplit)
            logger.set_log_group(LogGroup.PREPARE_EXECUTION, f"OUTER {outersplit_id}")
            logger.info("Completed successfully")
        except Exception as e:
            logger.exception(f"Exception in task {outersplit_id}: {e!s}")

    run_parallel_outer_ray(
        outersplit_data=outersplit_data,
        run_fn=wrapped_run,
        log_dir=self.log_dir,
        num_workers=self.num_workers,
    )

ResourceConfig

Immutable configuration for CPU resources.

Source code in octopus/manager/core.py
@define(frozen=True)
class ResourceConfig:
    """Immutable configuration for CPU resources."""

    num_cpus: int = field(validator=validators.instance_of(int))
    """Total available CPUs on the system."""

    num_workers: int = field(validator=validators.instance_of(int))
    """Number of parallel outer workers."""

    cpus_per_outersplit: int = field(validator=validators.instance_of(int))
    """CPUs allocated to each outersplit for inner parallelization."""

    outer_parallelization: bool = field(validator=validators.instance_of(bool))
    """Whether outer parallelization is enabled."""

    run_single_outersplit_num: int = field(validator=validators.instance_of(int))
    """Index of single outersplit to run (-1 for all). This is mainly used for testing and debugging."""

    num_outersplits: int = field(validator=validators.instance_of(int))
    """Total number of outersplits in the study."""

    @classmethod
    def create(
        cls,
        num_outersplits: int,
        outer_parallelization: bool,
        run_single_outersplit_num: int,
        num_cpus: int | None = None,
    ) -> "ResourceConfig":
        """Create ResourceConfig with computed values.

        Args:
            num_outersplits: Total number of outersplits in the study.
            outer_parallelization: Whether to run outersplits in parallel.
            run_single_outersplit_num: Index of single outersplit to run (-1 for all).
            num_cpus: Total CPUs available (auto-detected if None).

        Returns:
            ResourceConfig with computed worker and CPU allocation.

        Raises:
            ValueError: If any input parameter is invalid.
        """
        if num_outersplits <= 0:
            raise ValueError(f"num_outersplits must be positive, got {num_outersplits}")

        if run_single_outersplit_num < -1:
            raise ValueError(
                f"run_single_outersplit_num must be -1 (all outersplits) or a valid index >= 0, got {run_single_outersplit_num}"
            )
        if run_single_outersplit_num >= num_outersplits:
            raise ValueError(
                f"run_single_outersplit_num ({run_single_outersplit_num}) must be less than num_outersplits ({num_outersplits})"
            )

        # Get or validate num_cpus
        if num_cpus is None:
            num_cpus = get_available_cpus()
        elif num_cpus <= 0:
            raise ValueError(f"num_cpus must be positive, got {num_cpus}")

        # Calculate effective number of outersplits for resource allocation
        effective_num_outersplits = 1 if run_single_outersplit_num != -1 else num_outersplits

        # Calculate resource allocation
        num_workers = min(effective_num_outersplits, num_cpus)
        if num_workers == 0:
            raise ValueError(
                f"Cannot allocate resources: num_workers computed as 0 (effective_num_outersplits={effective_num_outersplits}, num_cpus={num_cpus})"
            )

        cpus_per_outersplit = max(1, math.floor(num_cpus / num_workers)) if outer_parallelization else num_cpus

        return cls(
            num_cpus=num_cpus,
            num_workers=num_workers,
            cpus_per_outersplit=cpus_per_outersplit,
            outer_parallelization=outer_parallelization,
            run_single_outersplit_num=run_single_outersplit_num,
            num_outersplits=num_outersplits,
        )

    def __str__(self) -> str:
        """Return string representation of resource configuration."""
        return (
            f"Parallelization: {self.outer_parallelization} | "
            f"Single outersplit: {self.run_single_outersplit_num} | "
            f"Outersplits: {self.num_outersplits} | "
            f"CPUs: {self.num_cpus} | "
            f"Workers: {self.num_workers} | "
            f"CPUs/outersplit: {self.cpus_per_outersplit}"
        )

cpus_per_outersplit = field(validator=(validators.instance_of(int))) class-attribute instance-attribute

CPUs allocated to each outersplit for inner parallelization.

num_cpus = field(validator=(validators.instance_of(int))) class-attribute instance-attribute

Total available CPUs on the system.

num_outersplits = field(validator=(validators.instance_of(int))) class-attribute instance-attribute

Total number of outersplits in the study.

num_workers = field(validator=(validators.instance_of(int))) class-attribute instance-attribute

Number of parallel outer workers.

outer_parallelization = field(validator=(validators.instance_of(bool))) class-attribute instance-attribute

Whether outer parallelization is enabled.

run_single_outersplit_num = field(validator=(validators.instance_of(int))) class-attribute instance-attribute

Index of single outersplit to run (-1 for all). This is mainly used for testing and debugging.

__str__()

Return string representation of resource configuration.

Source code in octopus/manager/core.py
def __str__(self) -> str:
    """Return string representation of resource configuration."""
    return (
        f"Parallelization: {self.outer_parallelization} | "
        f"Single outersplit: {self.run_single_outersplit_num} | "
        f"Outersplits: {self.num_outersplits} | "
        f"CPUs: {self.num_cpus} | "
        f"Workers: {self.num_workers} | "
        f"CPUs/outersplit: {self.cpus_per_outersplit}"
    )

create(num_outersplits, outer_parallelization, run_single_outersplit_num, num_cpus=None) classmethod

Create ResourceConfig with computed values.

Parameters:

Name Type Description Default
num_outersplits int

Total number of outersplits in the study.

required
outer_parallelization bool

Whether to run outersplits in parallel.

required
run_single_outersplit_num int

Index of single outersplit to run (-1 for all).

required
num_cpus int | None

Total CPUs available (auto-detected if None).

None

Returns:

Type Description
ResourceConfig

ResourceConfig with computed worker and CPU allocation.

Raises:

Type Description
ValueError

If any input parameter is invalid.

Source code in octopus/manager/core.py
@classmethod
def create(
    cls,
    num_outersplits: int,
    outer_parallelization: bool,
    run_single_outersplit_num: int,
    num_cpus: int | None = None,
) -> "ResourceConfig":
    """Create ResourceConfig with computed values.

    Args:
        num_outersplits: Total number of outersplits in the study.
        outer_parallelization: Whether to run outersplits in parallel.
        run_single_outersplit_num: Index of single outersplit to run (-1 for all).
        num_cpus: Total CPUs available (auto-detected if None).

    Returns:
        ResourceConfig with computed worker and CPU allocation.

    Raises:
        ValueError: If any input parameter is invalid.
    """
    if num_outersplits <= 0:
        raise ValueError(f"num_outersplits must be positive, got {num_outersplits}")

    if run_single_outersplit_num < -1:
        raise ValueError(
            f"run_single_outersplit_num must be -1 (all outersplits) or a valid index >= 0, got {run_single_outersplit_num}"
        )
    if run_single_outersplit_num >= num_outersplits:
        raise ValueError(
            f"run_single_outersplit_num ({run_single_outersplit_num}) must be less than num_outersplits ({num_outersplits})"
        )

    # Get or validate num_cpus
    if num_cpus is None:
        num_cpus = get_available_cpus()
    elif num_cpus <= 0:
        raise ValueError(f"num_cpus must be positive, got {num_cpus}")

    # Calculate effective number of outersplits for resource allocation
    effective_num_outersplits = 1 if run_single_outersplit_num != -1 else num_outersplits

    # Calculate resource allocation
    num_workers = min(effective_num_outersplits, num_cpus)
    if num_workers == 0:
        raise ValueError(
            f"Cannot allocate resources: num_workers computed as 0 (effective_num_outersplits={effective_num_outersplits}, num_cpus={num_cpus})"
        )

    cpus_per_outersplit = max(1, math.floor(num_cpus / num_workers)) if outer_parallelization else num_cpus

    return cls(
        num_cpus=num_cpus,
        num_workers=num_workers,
        cpus_per_outersplit=cpus_per_outersplit,
        outer_parallelization=outer_parallelization,
        run_single_outersplit_num=run_single_outersplit_num,
        num_outersplits=num_outersplits,
    )

SequentialStrategy

Run outersplits one after another.

Source code in octopus/manager/execution.py
@define
class SequentialStrategy:
    """Run outersplits one after another."""

    def execute(
        self,
        outersplit_data: OuterSplits,
        run_fn: "Callable[[int, OuterSplit], None]",
    ) -> None:
        """Execute all outersplits sequentially."""
        logger.set_log_group(LogGroup.PROCESSING)
        for outersplit_id in outersplit_data:
            logger.info(f"Running outer split: {outersplit_id}")
            run_fn(outersplit_id, outersplit_data[outersplit_id])

execute(outersplit_data, run_fn)

Execute all outersplits sequentially.

Source code in octopus/manager/execution.py
def execute(
    self,
    outersplit_data: OuterSplits,
    run_fn: "Callable[[int, OuterSplit], None]",
) -> None:
    """Execute all outersplits sequentially."""
    logger.set_log_group(LogGroup.PROCESSING)
    for outersplit_id in outersplit_data:
        logger.info(f"Running outer split: {outersplit_id}")
        run_fn(outersplit_id, outersplit_data[outersplit_id])

SingleOutersplitStrategy

Run a single outersplit by index.

Source code in octopus/manager/execution.py
@define
class SingleOutersplitStrategy:
    """Run a single outersplit by index."""

    outersplit_index: int

    def execute(
        self,
        outersplit_data: OuterSplits,
        run_fn: "Callable[[int, OuterSplit], None]",
    ) -> None:
        """Execute only the outersplit at outersplit_index."""
        logger.set_log_group(LogGroup.PROCESSING)
        logger.info(f"Running single outersplit: {self.outersplit_index}")
        outersplit_id = self.outersplit_index
        run_fn(outersplit_id, outersplit_data[outersplit_id])

execute(outersplit_data, run_fn)

Execute only the outersplit at outersplit_index.

Source code in octopus/manager/execution.py
def execute(
    self,
    outersplit_data: OuterSplits,
    run_fn: "Callable[[int, OuterSplit], None]",
) -> None:
    """Execute only the outersplit at outersplit_index."""
    logger.set_log_group(LogGroup.PROCESSING)
    logger.info(f"Running single outersplit: {self.outersplit_index}")
    outersplit_id = self.outersplit_index
    run_fn(outersplit_id, outersplit_data[outersplit_id])

WorkflowTaskRunner

Runs workflow tasks for a single fold.

Handles the lifecycle of processing workflow tasks: - Saving fold data - Running tasks with dependencies - Saving task results

Attributes:

Name Type Description
study_context StudyContext

Frozen runtime context containing study configuration.

workflow Sequence[Task]

List of workflow tasks to execute.

cpus_per_outersplit int

Number of CPUs allocated to each task.

Source code in octopus/manager/workflow_runner.py
@define
class WorkflowTaskRunner:
    """Runs workflow tasks for a single fold.

    Handles the lifecycle of processing workflow tasks:
    - Saving fold data
    - Running tasks with dependencies
    - Saving task results

    Attributes:
        study_context: Frozen runtime context containing study configuration.
        workflow: List of workflow tasks to execute.
        cpus_per_outersplit: Number of CPUs allocated to each task.
    """

    study_context: StudyContext = field(validator=[validators.instance_of(StudyContext)])
    workflow: Sequence[Task] = field(validator=[validators.instance_of(list)])
    cpus_per_outersplit: int = field(validator=[validators.instance_of(int)])

    def run(self, outersplit_id: int, outersplit: OuterSplit) -> None:
        """Process all workflow tasks for a single fold.

        Args:
            outersplit_id: Current fold ID
            outersplit: OuterSplit containing traindev and test DataFrames

        Raises:
            RuntimeError: If Ray is not initialized.
        """
        if not ray.is_initialized():
            raise RuntimeError(
                "Ray is not initialized. WorkflowTaskRunner.run() must be called after Ray initialization by OctoManager.run_outersplits()."
            )

        # Save fold data
        fold_dir = self.study_context.output_path / f"outersplit{outersplit_id}"
        fold_dir.mkdir(parents=True, exist_ok=True)
        train_path = fold_dir / "data_traindev.parquet"
        parquet_save(outersplit.traindev, train_path)
        test_path = fold_dir / "data_test.parquet"
        parquet_save(outersplit.test, test_path)

        # task_results: dict[task_id -> dict[ResultType, ModuleResult]]
        task_results: dict[int, dict[ResultType, ModuleResult]] = {}

        for task in self.workflow:
            self._log_task_info(task)

            result = self._run_task(outersplit_id, outersplit, task, task_results)
            task_results[task.task_id] = result

    def _run_task(
        self,
        outersplit_id: int,
        outersplit: OuterSplit,
        task: Task,
        task_results: dict[int, dict[ResultType, ModuleResult]],
    ) -> dict[ResultType, ModuleResult]:
        """Run a single workflow task.

        Args:
            outersplit_id: Current fold ID
            outersplit: OuterSplit containing traindev and test DataFrames
            task: Task to run
            task_results: Dictionary of results from previous tasks

        Returns:
            Dict mapping ResultType to ModuleResult.

        Raises:
            ValueError: If task depends on a task that has not run yet
        """
        # Resolve upstream dependencies
        if task.depends_on is not None:
            if task.depends_on not in task_results:
                raise ValueError(f"Task {task.task_id} depends on task {task.depends_on} which has not run yet")
            upstream_results = task_results[task.depends_on]
            feature_cols = upstream_results[ResultType.BEST].selected_features
            # Build prior_results by concatenating DataFrames from all upstream ModuleResult values
            prior_results: dict[str, pd.DataFrame] = {}
            for df_name in ["scores", "predictions", "feature_importances"]:
                dfs = []
                for module_result in upstream_results.values():
                    df = getattr(module_result, df_name)
                    if isinstance(df, pd.DataFrame) and not df.empty:
                        out = df.copy()
                        out["module"] = module_result.module
                        dfs.append(out)
                prior_results[df_name] = pd.concat(dfs, ignore_index=True) if dfs else pd.DataFrame()
            logger.info(f"Prior results keys: {prior_results.keys()}")
        else:
            feature_cols = self.study_context.feature_cols
            prior_results = {}

        # Calculate feature groups
        feature_groups = calculate_feature_groups(outersplit.traindev, feature_cols)

        # Create output directory
        output_dir = self.study_context.output_path / f"outersplit{outersplit_id}" / f"task{task.task_id}"
        output_dir.mkdir(parents=True, exist_ok=True)
        results_dir = output_dir / "results"
        results_dir.mkdir(parents=True, exist_ok=True)
        scratch_dir = output_dir / "scratch"
        scratch_dir.mkdir(parents=True, exist_ok=True)

        logger.info(f"Running task {task.task_id} for fold {outersplit_id}")

        # Create execution module from config and run fit()
        module = task.create_module()
        results = module.fit(
            data_traindev=outersplit.traindev,
            data_test=outersplit.test,
            feature_cols=feature_cols,
            study_context=self.study_context,
            outersplit_id=outersplit_id,
            results_dir=results_dir,
            scratch_dir=scratch_dir,
            num_assigned_cpus=self.cpus_per_outersplit,
            feature_groups=feature_groups,
            prior_results=prior_results,
        )

        self._save_task_context(output_dir, feature_cols, feature_groups)
        self._save_task_config(task, output_dir)
        for result_type, module_result in results.items():
            module_result.save(results_dir / result_type.value)

        # Clean up scratch directory
        rmtree(scratch_dir)

        return results

    def _save_task_config(self, task: Task, output_dir: UPath) -> None:
        """Save task configuration to JSON.

        Args:
            task: Task to save configuration for
            output_dir: Directory to save configuration in
        """
        config_path = output_dir / "config" / "task_config.json"

        config_dict = asdict(task)

        with config_path.open("w") as f:
            json.dump(config_dict, f, indent=2)

    def _save_task_context(
        self,
        output_dir: UPath,
        feature_cols: list[str],
        feature_groups: dict | None,
    ) -> None:
        """Save task runtime context to disk.

        Saves the input feature columns and correlation-based feature groups
        that were used when running this task. These are needed by
        ``TaskPredictor`` for prediction and feature importance computation.

        Files are written to a ``config/`` subdirectory to match the path
        expected by ``OuterSplitLoader``.

        Args:
            output_dir: Task output directory (e.g. outersplit0/task0/).
            feature_cols: Input feature columns used by this task.
            feature_groups: Correlation-based feature groups, or None.
        """
        config_dir = output_dir / "config"
        config_dir.mkdir(parents=True, exist_ok=True)

        with (config_dir / "feature_cols.json").open("w") as f:
            json.dump(feature_cols, f, indent=2)

        if feature_groups:
            with (config_dir / "feature_groups.json").open("w") as f:
                json.dump(feature_groups, f, indent=2)

    def _log_task_info(self, task: Task) -> None:
        """Log information about a workflow task.

        Args:
            task: Task to log information about
        """
        logger.info(
            f"Processing workflow task: {task.task_id} | Input task: {task.depends_on} | Module: {task.module} | Description: {task.description}"
        )

run(outersplit_id, outersplit)

Process all workflow tasks for a single fold.

Parameters:

Name Type Description Default
outersplit_id int

Current fold ID

required
outersplit OuterSplit

OuterSplit containing traindev and test DataFrames

required

Raises:

Type Description
RuntimeError

If Ray is not initialized.

Source code in octopus/manager/workflow_runner.py
def run(self, outersplit_id: int, outersplit: OuterSplit) -> None:
    """Process all workflow tasks for a single fold.

    Args:
        outersplit_id: Current fold ID
        outersplit: OuterSplit containing traindev and test DataFrames

    Raises:
        RuntimeError: If Ray is not initialized.
    """
    if not ray.is_initialized():
        raise RuntimeError(
            "Ray is not initialized. WorkflowTaskRunner.run() must be called after Ray initialization by OctoManager.run_outersplits()."
        )

    # Save fold data
    fold_dir = self.study_context.output_path / f"outersplit{outersplit_id}"
    fold_dir.mkdir(parents=True, exist_ok=True)
    train_path = fold_dir / "data_traindev.parquet"
    parquet_save(outersplit.traindev, train_path)
    test_path = fold_dir / "data_test.parquet"
    parquet_save(outersplit.test, test_path)

    # task_results: dict[task_id -> dict[ResultType, ModuleResult]]
    task_results: dict[int, dict[ResultType, ModuleResult]] = {}

    for task in self.workflow:
        self._log_task_info(task)

        result = self._run_task(outersplit_id, outersplit, task, task_results)
        task_results[task.task_id] = result

get_available_cpus()

Get available CPUs on the system.

Source code in octopus/manager/core.py
def get_available_cpus() -> int:
    """Get available CPUs on the system."""
    total_cpus = os.cpu_count()
    if total_cpus is None:
        raise RuntimeError("Could not determine number of CPUs.")
    return total_cpus

init_ray(address=None, num_cpus=None, start_local_if_missing=False, **kwargs)

Initialize Ray for the current process.

Connects to an existing cluster if an address is provided or set via environment variables; otherwise, optionally starts a local Ray instance.

Parameters:

Name Type Description Default
address str | None

Ray head address (e.g., "auto", "127.0.0.1:6379"). If None, uses env vars RAY_ADDRESS or RAY_HEAD_ADDRESS if set.

None
num_cpus int | None

CPU limit when starting a local Ray instance (only used if starting locally).

None
start_local_if_missing bool

If True and no address is available, start a local Ray instance.

False
**kwargs

Extra args forwarded to ray.init (e.g., runtime_env, log_to_driver, namespace).

{}

Raises:

Type Description
RuntimeError

If no address is available and start_local_if_missing is False.

Source code in octopus/manager/ray_parallel.py
def init_ray(
    address: str | None = None,
    num_cpus: int | None = None,
    start_local_if_missing: bool = False,
    **kwargs,
) -> None:
    """Initialize Ray for the current process.

    Connects to an existing cluster if an address is provided or set via
    environment variables; otherwise, optionally starts a local Ray instance.

    Args:
        address: Ray head address (e.g., "auto", "127.0.0.1:6379"). If None, uses
            env vars RAY_ADDRESS or RAY_HEAD_ADDRESS if set.
        num_cpus: CPU limit when starting a local Ray instance (only used if starting locally).
        start_local_if_missing: If True and no address is available, start a local Ray instance.
        **kwargs: Extra args forwarded to ray.init (e.g., runtime_env, log_to_driver, namespace).

    Raises:
        RuntimeError: If no address is available and start_local_if_missing is False.
    """
    if ray.is_initialized():
        return

    addr = address or os.environ.get("RAY_ADDRESS") or os.environ.get("RAY_HEAD_ADDRESS")
    if addr:
        ray.init(
            address=addr,
            runtime_env={"worker_process_setup_hook": _check_parallelization_disabled},
            **kwargs,
        )
        return

    if start_local_if_missing:
        ray.init(
            num_cpus=num_cpus,
            runtime_env={"worker_process_setup_hook": _check_parallelization_disabled},
            **kwargs,
        )
        return

    raise RuntimeError(
        "No Ray address provided. Set RAY_ADDRESS env, pass address='auto', or call init_ray(..., start_local_if_missing=True) once in the driver."
    )

run_parallel_inner(trainings, log_dir, num_cpus=1)

Run training.fit() for each item in parallel.

Parameters:

Name Type Description Default
trainings Iterable[Any]

Objects with fit() method.

required
log_dir UPath

Directory to store individual Ray worker logs.

required
num_cpus int

CPUs per training task.

1

Returns:

Type Description
list[Any]

Results from each training.fit() in input order.

Raises:

Type Description
RuntimeError

If Ray is not initialized.

Source code in octopus/manager/ray_parallel.py
def run_parallel_inner(trainings: Iterable[Any], log_dir: UPath, num_cpus: int = 1) -> list[Any]:
    """Run training.fit() for each item in parallel.

    Args:
        trainings: Objects with fit() method.
        log_dir: Directory to store individual Ray worker logs.
        num_cpus: CPUs per training task.

    Returns:
        Results from each training.fit() in input order.

    Raises:
        RuntimeError: If Ray is not initialized.
    """
    if not ray.is_initialized():
        raise RuntimeError("Ray is not initialized. Call init_ray() first.")

    @ray.remote(num_cpus=num_cpus)
    def execute_training(training: Any, idx: int, log_dir: UPath) -> Any:
        _setup_worker_logging(log_dir)
        return training.fit()

    futures = [execute_training.remote(training, idx, log_dir) for idx, training in enumerate(trainings)]
    return ray.get(futures)

run_parallel_outer_ray(outersplit_data, run_fn, log_dir, num_workers)

Execute run_fn(outersplit_id, outersplit) in parallel using Ray.

Preserves input order and limits concurrency to num_workers. Outer tasks reserve 0 CPUs so inner Ray work can use available CPUs.

Parameters:

Name Type Description Default
outersplit_data OuterSplits

Dictionary mapping outersplit_id to OuterSplit(traindev, test).

required
run_fn Callable[[int, OuterSplit], None]

Function called as run_fn(outersplit_id, outersplit).

required
log_dir UPath

Directory to store individual Ray worker logs.

required
num_workers int

Maximum number of concurrent outer tasks.

required
Source code in octopus/manager/ray_parallel.py
def run_parallel_outer_ray(
    outersplit_data: OuterSplits,
    run_fn: Callable[[int, OuterSplit], None],
    log_dir: UPath,
    num_workers: int,
) -> None:
    """Execute run_fn(outersplit_id, outersplit) in parallel using Ray.

    Preserves input order and limits concurrency to num_workers. Outer tasks reserve
    0 CPUs so inner Ray work can use available CPUs.

    Args:
        outersplit_data: Dictionary mapping outersplit_id to OuterSplit(traindev, test).
        run_fn: Function called as run_fn(outersplit_id, outersplit).
        log_dir: Directory to store individual Ray worker logs.
        num_workers: Maximum number of concurrent outer tasks.
    """
    # Ensure Ray is ready in the driver (connect or start local)
    init_ray(start_local_if_missing=True)

    @ray.remote(num_cpus=0)
    def outer_task(outersplit_id: int, outersplit: OuterSplit, log_dir: UPath) -> int:
        _setup_worker_logging(log_dir)
        run_fn(outersplit_id, outersplit)
        return outersplit_id

    outersplit_ids = list(outersplit_data.keys())
    n = len(outersplit_ids)
    if n == 0:
        return

    max_concurrent = max(1, min(num_workers, n))
    inflight: list[ObjectRef] = []
    next_i = 0

    # Prime up to max_concurrent tasks
    while next_i < n and len(inflight) < max_concurrent:
        outersplit_id = outersplit_ids[next_i]
        inflight.append(outer_task.remote(outersplit_id, outersplit_data[outersplit_id], log_dir))
        next_i += 1

    # Drain with backpressure
    while inflight:
        done, inflight = ray.wait(inflight, num_returns=1)
        ray.get(done[0])
        if next_i < n:
            outersplit_id = outersplit_ids[next_i]
            inflight.append(outer_task.remote(outersplit_id, outersplit_data[outersplit_id], log_dir))
            next_i += 1

setup_ray_for_external_library()

Configure environment to enable external libraries to use the existing Ray instance.

Sets RAY_ADDRESS to the current Ray GCS address, preventing external libraries (e.g., AutoGluon, Ray Tune) from creating separate Ray instances that would cause resource conflicts.

Should be called before using external libraries that may use Ray.

Source code in octopus/manager/ray_parallel.py
def setup_ray_for_external_library() -> None:
    """Configure environment to enable external libraries to use the existing Ray instance.

    Sets RAY_ADDRESS to the current Ray GCS address, preventing external libraries
    (e.g., AutoGluon, Ray Tune) from creating separate Ray instances that would
    cause resource conflicts.

    Should be called before using external libraries that may use Ray.
    """
    if ray.is_initialized():
        ray_address = ray.get_runtime_context().gcs_address
        if ray_address:
            os.environ["RAY_ADDRESS"] = ray_address
    else:
        # If Ray is not initialized, clear the RAY_ADDRESS to avoid stale references
        os.environ.pop("RAY_ADDRESS", None)

shutdown_ray()

Shut down Ray if initialized. Safe to call multiple times.

Source code in octopus/manager/ray_parallel.py
def shutdown_ray() -> None:
    """Shut down Ray if initialized. Safe to call multiple times."""
    if ray.is_initialized():
        ray.shutdown()
    # Clear RAY_ADDRESS to avoid stale references after shutdown
    os.environ.pop("RAY_ADDRESS", None)
    os.environ.pop("RAY_HEAD_ADDRESS", None)