Skip to content

Manager

OctoManager.

ExecutionStrategy

Bases: Protocol

Protocol for outer split execution strategies.

Source code in octopus/manager/execution.py
class ExecutionStrategy(Protocol):
    """Protocol for outer split execution strategies."""

    def execute(
        self,
        outer_split_data: OuterSplits,
        run_fn: "Callable[[int, OuterSplit, int], None]",
    ) -> None:
        """Execute outer splits using this strategy."""
        ...

execute(outer_split_data, run_fn)

Execute outer splits using this strategy.

Source code in octopus/manager/execution.py
def execute(
    self,
    outer_split_data: OuterSplits,
    run_fn: "Callable[[int, OuterSplit, int], None]",
) -> None:
    """Execute outer splits using this strategy."""
    ...

OctoManager

Orchestrates the execution of outer splits.

Source code in octopus/manager/core.py
@define
class OctoManager:
    """Orchestrates the execution of outer splits."""

    outer_split_data: OuterSplits = field(validator=[validators.instance_of(dict)])
    """Preprocessed data for each outer split, keyed by outer split identifier."""

    study_context: StudyContext = field(validator=[validators.instance_of(StudyContext)])
    """Frozen runtime context containing study configuration."""

    workflow: Sequence[Task] = field(validator=[validators.instance_of(list)])
    """Workflow tasks to execute."""

    n_cpus: int = field(validator=validators.instance_of(int))
    """Number of CPUs to use for parallel processing. n_cpus=0 uses all available CPUs.
       Negative values indicate abs(n_cpus) to leave free, e.g. -1 means use all but one CPU.
       Set to 1 to disable all parallel processing and run sequentially."""

    single_outer_split: int | None = field(
        validator=validators.optional(validators.and_(validators.instance_of(int), validators.ge(0)))
    )
    """Index of single outer split to run (None for all)."""

    def run_outer_splits(self) -> None:
        """Run all outer splits."""
        if not self.outer_split_data:
            raise ValueError("No outer split data defined")

        if self.single_outer_split is not None and not (0 <= self.single_outer_split < len(self.outer_split_data)):
            raise ValueError(
                f"single_outer_split must be between 0 and n_outer_splits-1"
                f" ({len(self.outer_split_data) - 1}), got {self.single_outer_split}"
            )

        # Initialize Ray upfront to ensure worker setup hooks are registered before any workflows execute.
        # This is critical for:
        # 1. Inner parallelization: ML modules (e.g., Tako, AutoGluon) may spawn Ray workers for their
        #    internal operations (bagging, hyperparameter tuning)
        # 2. Lifecycle clarity: Explicit init → run → shutdown at the manager level makes the
        #    Ray lifecycle predictable and easier to reason about
        resources = ray_parallel.init(
            n_cpus_user=self.n_cpus,
            n_outer_splits=len(self.outer_split_data),
            run_single_outer_split=self.single_outer_split is not None,
            namespace=f"octopus_study_{self.study_context.output_path}",
        )

        logger.info(f"Preparing execution | {resources}")

        try:
            runner = WorkflowTaskRunner(
                study_context=self.study_context,
                workflow=self.workflow,
            )
            strategy = self._select_strategy(resources)
            strategy.execute(self.outer_split_data, runner.run)
        finally:
            ray_parallel.shutdown()

    def _select_strategy(self, resources: ray_parallel.ResourceConfig) -> ExecutionStrategy:
        """Select execution strategy based on configuration.

        Args:
            resources: Resource configuration for execution.

        Returns:
            Appropriate execution strategy based on configuration.
        """
        if self.single_outer_split is not None:
            return SingleOuterSplitStrategy(
                outer_split_index=self.single_outer_split,
                n_cpus=resources.cpus_per_worker,
            )
        elif resources.n_workers > 1:
            return ParallelRayStrategy(
                n_cpus_per_worker=resources.cpus_per_worker,
                log_dir=self.study_context.log_dir,
            )
        else:
            return SequentialStrategy(
                n_cpus=resources.cpus_per_worker,
            )

n_cpus = field(validator=(validators.instance_of(int))) class-attribute instance-attribute

Number of CPUs to use for parallel processing. n_cpus=0 uses all available CPUs. Negative values indicate abs(n_cpus) to leave free, e.g. -1 means use all but one CPU. Set to 1 to disable all parallel processing and run sequentially.

outer_split_data = field(validator=[validators.instance_of(dict)]) class-attribute instance-attribute

Preprocessed data for each outer split, keyed by outer split identifier.

single_outer_split = field(validator=(validators.optional(validators.and_(validators.instance_of(int), validators.ge(0))))) class-attribute instance-attribute

Index of single outer split to run (None for all).

study_context = field(validator=[validators.instance_of(StudyContext)]) class-attribute instance-attribute

Frozen runtime context containing study configuration.

workflow = field(validator=[validators.instance_of(list)]) class-attribute instance-attribute

Workflow tasks to execute.

run_outer_splits()

Run all outer splits.

Source code in octopus/manager/core.py
def run_outer_splits(self) -> None:
    """Run all outer splits."""
    if not self.outer_split_data:
        raise ValueError("No outer split data defined")

    if self.single_outer_split is not None and not (0 <= self.single_outer_split < len(self.outer_split_data)):
        raise ValueError(
            f"single_outer_split must be between 0 and n_outer_splits-1"
            f" ({len(self.outer_split_data) - 1}), got {self.single_outer_split}"
        )

    # Initialize Ray upfront to ensure worker setup hooks are registered before any workflows execute.
    # This is critical for:
    # 1. Inner parallelization: ML modules (e.g., Tako, AutoGluon) may spawn Ray workers for their
    #    internal operations (bagging, hyperparameter tuning)
    # 2. Lifecycle clarity: Explicit init → run → shutdown at the manager level makes the
    #    Ray lifecycle predictable and easier to reason about
    resources = ray_parallel.init(
        n_cpus_user=self.n_cpus,
        n_outer_splits=len(self.outer_split_data),
        run_single_outer_split=self.single_outer_split is not None,
        namespace=f"octopus_study_{self.study_context.output_path}",
    )

    logger.info(f"Preparing execution | {resources}")

    try:
        runner = WorkflowTaskRunner(
            study_context=self.study_context,
            workflow=self.workflow,
        )
        strategy = self._select_strategy(resources)
        strategy.execute(self.outer_split_data, runner.run)
    finally:
        ray_parallel.shutdown()

ParallelRayStrategy

Bases: ExecutionStrategy

Run outer splits in parallel using Ray.

This strategy starts as many parallel workers as allowed by the resource configuration set up in ray_parallel.init() and executes one outer split per worker.

Source code in octopus/manager/execution.py
@define
class ParallelRayStrategy(ExecutionStrategy):
    """Run outer splits in parallel using Ray.

    This strategy starts as many parallel workers as allowed by the resource
    configuration set up in ray_parallel.init() and executes one outer split per worker.
    """

    n_cpus_per_worker: int = field(validator=[validators.instance_of(int), validators.ge(1)])
    """Number of CPUs to use for parallel processing within each parallel worker."""
    log_dir: UPath = field(validator=validators.instance_of(UPath))

    def execute(
        self,
        outer_split_data: OuterSplits,
        run_fn: "Callable[[int, OuterSplit, int], None]",
    ) -> None:
        """Execute all outer splits in parallel using Ray."""

        def wrapped_run(outer_split_id: int, outer_split: OuterSplit, n_cpus_per_worker: int) -> None:
            logger.set_log_group(LogGroup.PROCESSING, f"OUTER {outer_split_id}")
            logger.info(f"Starting execution for outer split {outer_split_id}")
            try:
                run_fn(outer_split_id, outer_split, n_cpus_per_worker)
                logger.set_log_group(LogGroup.PREPARE_EXECUTION, f"OUTER {outer_split_id}")
                logger.info(f"Completed successfully for outer split {outer_split_id}")
            except Exception as e:
                logger.exception(f"Exception in task {outer_split_id}: {e!s}")
                raise e

        ray_parallel.run_parallel_outer(
            outer_split_data=outer_split_data,
            run_fn=wrapped_run,
            log_dir=self.log_dir,
            n_cpus_per_worker=self.n_cpus_per_worker,
        )

n_cpus_per_worker = field(validator=[validators.instance_of(int), validators.ge(1)]) class-attribute instance-attribute

Number of CPUs to use for parallel processing within each parallel worker.

execute(outer_split_data, run_fn)

Execute all outer splits in parallel using Ray.

Source code in octopus/manager/execution.py
def execute(
    self,
    outer_split_data: OuterSplits,
    run_fn: "Callable[[int, OuterSplit, int], None]",
) -> None:
    """Execute all outer splits in parallel using Ray."""

    def wrapped_run(outer_split_id: int, outer_split: OuterSplit, n_cpus_per_worker: int) -> None:
        logger.set_log_group(LogGroup.PROCESSING, f"OUTER {outer_split_id}")
        logger.info(f"Starting execution for outer split {outer_split_id}")
        try:
            run_fn(outer_split_id, outer_split, n_cpus_per_worker)
            logger.set_log_group(LogGroup.PREPARE_EXECUTION, f"OUTER {outer_split_id}")
            logger.info(f"Completed successfully for outer split {outer_split_id}")
        except Exception as e:
            logger.exception(f"Exception in task {outer_split_id}: {e!s}")
            raise e

    ray_parallel.run_parallel_outer(
        outer_split_data=outer_split_data,
        run_fn=wrapped_run,
        log_dir=self.log_dir,
        n_cpus_per_worker=self.n_cpus_per_worker,
    )

SequentialStrategy

Bases: ExecutionStrategy

Run outer splits one after another.

Source code in octopus/manager/execution.py
@define
class SequentialStrategy(ExecutionStrategy):
    """Run outer splits one after another."""

    n_cpus: int = field(validator=[validators.instance_of(int), validators.ge(1)])
    """Number of CPUs to use for parallel processing in each sequential step."""

    def execute(
        self,
        outer_split_data: OuterSplits,
        run_fn: "Callable[[int, OuterSplit, int], None]",
    ) -> None:
        """Execute all outer splits sequentially."""
        logger.set_log_group(LogGroup.PROCESSING)
        for outer_split_id in outer_split_data:
            logger.info(f"Running outer split: {outer_split_id}")
            run_fn(outer_split_id, outer_split_data[outer_split_id], self.n_cpus)

n_cpus = field(validator=[validators.instance_of(int), validators.ge(1)]) class-attribute instance-attribute

Number of CPUs to use for parallel processing in each sequential step.

execute(outer_split_data, run_fn)

Execute all outer splits sequentially.

Source code in octopus/manager/execution.py
def execute(
    self,
    outer_split_data: OuterSplits,
    run_fn: "Callable[[int, OuterSplit, int], None]",
) -> None:
    """Execute all outer splits sequentially."""
    logger.set_log_group(LogGroup.PROCESSING)
    for outer_split_id in outer_split_data:
        logger.info(f"Running outer split: {outer_split_id}")
        run_fn(outer_split_id, outer_split_data[outer_split_id], self.n_cpus)

SingleOuterSplitStrategy

Bases: ExecutionStrategy

Run a single outer split by index.

Source code in octopus/manager/execution.py
@define
class SingleOuterSplitStrategy(ExecutionStrategy):
    """Run a single outer split by index."""

    outer_split_index: int = field(validator=[validators.instance_of(int), validators.ge(0)])
    n_cpus: int = field(validator=[validators.instance_of(int), validators.ge(1)])
    """Number of CPUs to use for parallel processing within the single outer split."""

    def execute(
        self,
        outer_split_data: OuterSplits,
        run_fn: "Callable[[int, OuterSplit, int], None]",
    ) -> None:
        """Execute only the outer split at outer_split_index."""
        logger.set_log_group(LogGroup.PROCESSING)
        logger.info(f"Running single outer split: {self.outer_split_index}")
        outer_split_id = self.outer_split_index
        run_fn(outer_split_id, outer_split_data[outer_split_id], self.n_cpus)

n_cpus = field(validator=[validators.instance_of(int), validators.ge(1)]) class-attribute instance-attribute

Number of CPUs to use for parallel processing within the single outer split.

execute(outer_split_data, run_fn)

Execute only the outer split at outer_split_index.

Source code in octopus/manager/execution.py
def execute(
    self,
    outer_split_data: OuterSplits,
    run_fn: "Callable[[int, OuterSplit, int], None]",
) -> None:
    """Execute only the outer split at outer_split_index."""
    logger.set_log_group(LogGroup.PROCESSING)
    logger.info(f"Running single outer split: {self.outer_split_index}")
    outer_split_id = self.outer_split_index
    run_fn(outer_split_id, outer_split_data[outer_split_id], self.n_cpus)

WorkflowTaskRunner

Runs workflow tasks for a single outer split.

Handles the lifecycle of processing workflow tasks: - Saving split data - Running tasks with dependencies - Saving task results

Attributes:

Name Type Description
study_context StudyContext

Frozen runtime context containing study configuration.

workflow Sequence[Task]

List of workflow tasks to execute.

Source code in octopus/manager/workflow_runner.py
@define
class WorkflowTaskRunner:
    """Runs workflow tasks for a single outer split.

    Handles the lifecycle of processing workflow tasks:
    - Saving split data
    - Running tasks with dependencies
    - Saving task results

    Attributes:
        study_context: Frozen runtime context containing study configuration.
        workflow: List of workflow tasks to execute.
    """

    study_context: StudyContext = field(validator=[validators.instance_of(StudyContext)])
    workflow: Sequence[Task] = field(validator=[validators.instance_of(list)])

    def run(self, outer_split_id: int, outer_split: OuterSplit, n_assigned_cpus: int) -> None:
        """Process all workflow tasks for a single outer split.

        Args:
            outer_split_id: Current outer split ID
            outer_split: OuterSplit containing traindev and test DataFrames
            n_assigned_cpus: Number of CPUs assigned to this outer split for inner parallel processing
        """
        # Save split row IDs (not full datasets) for reproducibility
        outer_split_dir = self.study_context.output_path / f"outersplit{outer_split_id}"
        outer_split_dir.mkdir(parents=True, exist_ok=True)
        row_id_col = self.study_context.row_id_col
        split_ids = {
            "row_id_col": row_id_col,
            "traindev_row_ids": outer_split.traindev[row_id_col].tolist(),
            "test_row_ids": outer_split.test[row_id_col].tolist(),
        }
        with (outer_split_dir / "split_row_ids.json").open("w") as f:
            json.dump(split_ids, f)

        # task_results: dict[task_id -> dict[ResultType, ModuleResult]]
        task_results: dict[int, dict[ResultType, ModuleResult]] = {}

        for task in self.workflow:
            self._log_task_info(task)
            result = self._run_task(outer_split_id, outer_split, task, n_assigned_cpus, task_results, outer_split_dir)
            task_results[task.task_id] = result

    def _run_task(
        self,
        outer_split_id: int,
        outer_split: OuterSplit,
        task: Task,
        n_assigned_cpus: int,
        task_results: dict[int, dict[ResultType, ModuleResult]],
        outer_split_dir: UPath,
    ) -> dict[ResultType, ModuleResult]:
        """Run a single workflow task.

        Args:
            outer_split_id: Current outer split ID
            outer_split: OuterSplit containing traindev and test DataFrames
            task: Task to run
            n_assigned_cpus: Number of CPUs assigned to this outer split for inner parallel processing
            task_results: Dictionary of results from previous tasks
            outer_split_dir: directory where all data/results relevant for this outer
              split reside / should be saved to

        Returns:
            Dict mapping ResultType to ModuleResult.

        Raises:
            ValueError: If task depends on a task that has not run yet
        """
        # Resolve upstream dependencies
        if task.depends_on is not None:
            if task.depends_on not in task_results:
                raise ValueError(f"Task {task.task_id} depends on task {task.depends_on} which has not run yet")
            dependency_results = task_results[task.depends_on]
            feature_cols = dependency_results[ResultType.BEST].selected_features
        else:
            feature_cols = self.study_context.feature_cols
            dependency_results = {}

        # Calculate feature groups
        feature_groups = calculate_feature_groups(outer_split.traindev, feature_cols)

        # Create output directory
        module_output_dir = outer_split_dir / f"task{task.task_id}"
        module_output_dir.mkdir(parents=True, exist_ok=True)
        module_results_dir = module_output_dir / "results"
        module_results_dir.mkdir(parents=True, exist_ok=True)
        module_scratch_dir = module_output_dir / "scratch"
        module_scratch_dir.mkdir(parents=True, exist_ok=True)

        logger.info(f"Running task {task.task_id} for outer split {outer_split_id}")

        # Create execution module from config and run fit()
        module = task.create_module()
        results = module.fit(
            data_traindev=outer_split.traindev,
            data_test=outer_split.test,
            feature_cols=feature_cols,
            study_context=self.study_context,
            outer_split_id=outer_split_id,
            results_dir=module_results_dir,
            scratch_dir=module_scratch_dir,
            n_assigned_cpus=n_assigned_cpus,
            feature_groups=feature_groups,
            dependency_results=dependency_results,
        )

        self._save_task_context(module_output_dir, feature_cols, feature_groups)
        self._save_task_config(task, module_output_dir)
        for result_type, module_result in results.items():
            module_result.save(module_results_dir / result_type.value)

        # Clean up scratch directory
        rmtree(module_scratch_dir)

        return results

    def _save_task_config(self, task: Task, output_dir: UPath) -> None:
        """Save task configuration to JSON.

        Args:
            task: Task to save configuration for
            output_dir: Directory to save configuration in
        """
        config_path = output_dir / "config" / "task_config.json"

        config_dict = asdict(task)

        with config_path.open("w") as f:
            json.dump(config_dict, f, indent=2)

    def _save_task_context(
        self,
        output_dir: UPath,
        feature_cols: list[str],
        feature_groups: dict | None,
    ) -> None:
        """Save task runtime context to disk.

        Saves the input feature columns and correlation-based feature groups
        that were used when running this task. These are needed by
        ``OctoPredictor`` for prediction and feature importance computation.

        Files are written to a ``config/`` subdirectory to match the path
        expected by ``OuterSplitLoader``.

        Args:
            output_dir: Task output directory (e.g. outersplit0/task0/).
            feature_cols: Input feature columns used by this task.
            feature_groups: Correlation-based feature groups, or None.
        """
        config_dir = output_dir / "config"
        config_dir.mkdir(parents=True, exist_ok=True)

        with (config_dir / "feature_cols.json").open("w") as f:
            json.dump(feature_cols, f, indent=2)

        if feature_groups:
            with (config_dir / "feature_groups.json").open("w") as f:
                json.dump(feature_groups, f, indent=2)

    def _log_task_info(self, task: Task) -> None:
        """Log information about a workflow task.

        Args:
            task: Task to log information about
        """
        logger.info(
            f"Processing workflow task: {task.task_id} | Input task: {task.depends_on} | Module: {task.module} | Description: {task.description}"
        )

run(outer_split_id, outer_split, n_assigned_cpus)

Process all workflow tasks for a single outer split.

Parameters:

Name Type Description Default
outer_split_id int

Current outer split ID

required
outer_split OuterSplit

OuterSplit containing traindev and test DataFrames

required
n_assigned_cpus int

Number of CPUs assigned to this outer split for inner parallel processing

required
Source code in octopus/manager/workflow_runner.py
def run(self, outer_split_id: int, outer_split: OuterSplit, n_assigned_cpus: int) -> None:
    """Process all workflow tasks for a single outer split.

    Args:
        outer_split_id: Current outer split ID
        outer_split: OuterSplit containing traindev and test DataFrames
        n_assigned_cpus: Number of CPUs assigned to this outer split for inner parallel processing
    """
    # Save split row IDs (not full datasets) for reproducibility
    outer_split_dir = self.study_context.output_path / f"outersplit{outer_split_id}"
    outer_split_dir.mkdir(parents=True, exist_ok=True)
    row_id_col = self.study_context.row_id_col
    split_ids = {
        "row_id_col": row_id_col,
        "traindev_row_ids": outer_split.traindev[row_id_col].tolist(),
        "test_row_ids": outer_split.test[row_id_col].tolist(),
    }
    with (outer_split_dir / "split_row_ids.json").open("w") as f:
        json.dump(split_ids, f)

    # task_results: dict[task_id -> dict[ResultType, ModuleResult]]
    task_results: dict[int, dict[ResultType, ModuleResult]] = {}

    for task in self.workflow:
        self._log_task_info(task)
        result = self._run_task(outer_split_id, outer_split, task, n_assigned_cpus, task_results, outer_split_dir)
        task_results[task.task_id] = result