octopus.manager
OctoManager.
ExecutionStrategy
Bases: Protocol
Protocol for outersplit execution strategies.
Source code in octopus/manager/execution.py
OctoManager
Orchestrates the execution of outersplits.
Source code in octopus/manager/core.py
outer_parallelization = field(validator=[validators.instance_of(bool)])
class-attribute
instance-attribute
Whether to run outersplits in parallel.
outersplit_data = field(validator=[validators.instance_of(dict)])
class-attribute
instance-attribute
Preprocessed data for each outersplit, keyed by outersplit identifier.
run_single_outersplit_num = field(validator=[validators.instance_of(int)])
class-attribute
instance-attribute
Index of single outersplit to run (-1 for all).
study_context = field(validator=[validators.instance_of(StudyContext)])
class-attribute
instance-attribute
Frozen runtime context containing study configuration.
workflow = field(validator=[validators.instance_of(list)])
class-attribute
instance-attribute
Workflow tasks to execute.
run_outersplits()
Run all outersplits.
Source code in octopus/manager/core.py
ParallelRayStrategy
Run outersplits in parallel using Ray.
Source code in octopus/manager/execution.py
execute(outersplit_data, run_fn)
Execute all outersplits in parallel using Ray.
Source code in octopus/manager/execution.py
ResourceConfig
Immutable configuration for CPU resources.
Source code in octopus/manager/core.py
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 | |
cpus_per_outersplit = field(validator=(validators.instance_of(int)))
class-attribute
instance-attribute
CPUs allocated to each outersplit for inner parallelization.
num_cpus = field(validator=(validators.instance_of(int)))
class-attribute
instance-attribute
Total available CPUs on the system.
num_outersplits = field(validator=(validators.instance_of(int)))
class-attribute
instance-attribute
Total number of outersplits in the study.
num_workers = field(validator=(validators.instance_of(int)))
class-attribute
instance-attribute
Number of parallel outer workers.
outer_parallelization = field(validator=(validators.instance_of(bool)))
class-attribute
instance-attribute
Whether outer parallelization is enabled.
run_single_outersplit_num = field(validator=(validators.instance_of(int)))
class-attribute
instance-attribute
Index of single outersplit to run (-1 for all). This is mainly used for testing and debugging.
__str__()
Return string representation of resource configuration.
Source code in octopus/manager/core.py
create(num_outersplits, outer_parallelization, run_single_outersplit_num, num_cpus=None)
classmethod
Create ResourceConfig with computed values.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
num_outersplits
|
int
|
Total number of outersplits in the study. |
required |
outer_parallelization
|
bool
|
Whether to run outersplits in parallel. |
required |
run_single_outersplit_num
|
int
|
Index of single outersplit to run (-1 for all). |
required |
num_cpus
|
int | None
|
Total CPUs available (auto-detected if None). |
None
|
Returns:
| Type | Description |
|---|---|
ResourceConfig
|
ResourceConfig with computed worker and CPU allocation. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If any input parameter is invalid. |
Source code in octopus/manager/core.py
SequentialStrategy
Run outersplits one after another.
Source code in octopus/manager/execution.py
execute(outersplit_data, run_fn)
Execute all outersplits sequentially.
Source code in octopus/manager/execution.py
SingleOutersplitStrategy
Run a single outersplit by index.
Source code in octopus/manager/execution.py
execute(outersplit_data, run_fn)
Execute only the outersplit at outersplit_index.
Source code in octopus/manager/execution.py
WorkflowTaskRunner
Runs workflow tasks for a single fold.
Handles the lifecycle of processing workflow tasks: - Saving fold data - Running tasks with dependencies - Saving task results
Attributes:
| Name | Type | Description |
|---|---|---|
study_context |
StudyContext
|
Frozen runtime context containing study configuration. |
workflow |
Sequence[Task]
|
List of workflow tasks to execute. |
cpus_per_outersplit |
int
|
Number of CPUs allocated to each task. |
Source code in octopus/manager/workflow_runner.py
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 | |
run(outersplit_id, outersplit)
Process all workflow tasks for a single fold.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
outersplit_id
|
int
|
Current fold ID |
required |
outersplit
|
OuterSplit
|
OuterSplit containing traindev and test DataFrames |
required |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If Ray is not initialized. |
Source code in octopus/manager/workflow_runner.py
get_available_cpus()
init_ray(address=None, num_cpus=None, start_local_if_missing=False, **kwargs)
Initialize Ray for the current process.
Connects to an existing cluster if an address is provided or set via environment variables; otherwise, optionally starts a local Ray instance.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
address
|
str | None
|
Ray head address (e.g., "auto", "127.0.0.1:6379"). If None, uses env vars RAY_ADDRESS or RAY_HEAD_ADDRESS if set. |
None
|
num_cpus
|
int | None
|
CPU limit when starting a local Ray instance (only used if starting locally). |
None
|
start_local_if_missing
|
bool
|
If True and no address is available, start a local Ray instance. |
False
|
**kwargs
|
Extra args forwarded to ray.init (e.g., runtime_env, log_to_driver, namespace). |
{}
|
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If no address is available and start_local_if_missing is False. |
Source code in octopus/manager/ray_parallel.py
run_parallel_inner(trainings, log_dir, num_cpus=1)
Run training.fit() for each item in parallel.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
trainings
|
Iterable[Any]
|
Objects with fit() method. |
required |
log_dir
|
UPath
|
Directory to store individual Ray worker logs. |
required |
num_cpus
|
int
|
CPUs per training task. |
1
|
Returns:
| Type | Description |
|---|---|
list[Any]
|
Results from each training.fit() in input order. |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If Ray is not initialized. |
Source code in octopus/manager/ray_parallel.py
run_parallel_outer_ray(outersplit_data, run_fn, log_dir, num_workers)
Execute run_fn(outersplit_id, outersplit) in parallel using Ray.
Preserves input order and limits concurrency to num_workers. Outer tasks reserve 0 CPUs so inner Ray work can use available CPUs.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
outersplit_data
|
OuterSplits
|
Dictionary mapping outersplit_id to OuterSplit(traindev, test). |
required |
run_fn
|
Callable[[int, OuterSplit], None]
|
Function called as run_fn(outersplit_id, outersplit). |
required |
log_dir
|
UPath
|
Directory to store individual Ray worker logs. |
required |
num_workers
|
int
|
Maximum number of concurrent outer tasks. |
required |
Source code in octopus/manager/ray_parallel.py
setup_ray_for_external_library()
Configure environment to enable external libraries to use the existing Ray instance.
Sets RAY_ADDRESS to the current Ray GCS address, preventing external libraries (e.g., AutoGluon, Ray Tune) from creating separate Ray instances that would cause resource conflicts.
Should be called before using external libraries that may use Ray.
Source code in octopus/manager/ray_parallel.py
shutdown_ray()
Shut down Ray if initialized. Safe to call multiple times.