Skip to content

octopus.predict

Octopus predict package — standalone prediction and analysis from saved studies.

TaskPredictor

Ensemble model for predicting on new, unseen data.

Wraps the fitted models from a single task across all outer splits. All methods require explicit data — no test/train data is stored. All results are computed fresh from loaded models.

Parameters:

Name Type Description Default
study_path

Path to the study directory.

required
task_id

Concrete workflow task index (must be >= 0).

required
result_type

Result type for filtering results (default: 'best').

required

Raises:

Type Description
ValueError

If task_id is negative, out of range, or no models found.

FileNotFoundError

If expected study artifacts are missing.

Example

tp = TaskPredictor("studies/my_study", task_id=0) predictions = tp.predict(new_data, df=True)

Source code in octopus/predict/task_predictor.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
@define(slots=False)
class TaskPredictor:
    """Ensemble model for predicting on new, unseen data.

    Wraps the fitted models from a single task across all outer splits.
    All methods require **explicit data** — no test/train data is stored.
    All results are computed fresh from loaded models.

    Args:
        study_path: Path to the study directory.
        task_id: Concrete workflow task index (must be >= 0).
        result_type: Result type for filtering results (default: 'best').

    Raises:
        ValueError: If task_id is negative, out of range, or no models found.
        FileNotFoundError: If expected study artifacts are missing.

    Example:
        >>> tp = TaskPredictor("studies/my_study", task_id=0)
        >>> predictions = tp.predict(new_data, df=True)
    """

    _study_path: UPath = field(converter=_to_upath, alias="study_path")
    _task_id: int = field(alias="task_id")
    _result_type: str = field(default="best", alias="result_type")

    # Computed fields — populated in __attrs_post_init__
    _config: dict[str, Any] = field(init=False, factory=dict, repr=False)
    _metadata: StudyMetadata = field(init=False)

    # Flattened from artifacts for fast access
    _outersplits: list[int] = field(init=False, factory=list)
    _models: dict[int, Any] = field(init=False, factory=dict, repr=False)
    _selected_features: dict[int, list[str]] = field(init=False, factory=dict, repr=False)
    _feature_cols_per_split: dict[int, list[str]] = field(init=False, factory=dict, repr=False)
    _feature_groups_per_split: dict[int, dict[str, list[str]]] = field(init=False, factory=dict, repr=False)
    _feature_cols: list[str] = field(init=False, factory=list, repr=False)

    def __attrs_post_init__(self) -> None:
        """Load config, validate, and load artifacts from the study directory."""
        loader = StudyLoader(self._study_path)
        self._config = loader.load_config()

        # Validate task_id via I/O layer
        loader.validate_task_id(self._task_id, self._config)

        # Extract metadata via I/O layer
        self._metadata = loader.extract_metadata(self._config)

        # Load all per-split artifacts via I/O layer
        artifacts = loader.load_task_artifacts(
            self._task_id,
            self._result_type,
            self._metadata.n_outersplits,
        )

        # Flatten artifacts for fast per-split access
        self._outersplits = list(artifacts.outersplit_ids)
        for split_id, sa in artifacts.splits.items():
            self._models[split_id] = sa.model
            self._selected_features[split_id] = sa.selected_features
            self._feature_cols_per_split[split_id] = sa.feature_cols
            self._feature_groups_per_split[split_id] = sa.feature_groups

        # Compute union of feature_cols across all outersplits
        all_feature_cols: set[str] = set()
        for split_id in self._outersplits:
            split_fcols = self._feature_cols_per_split.get(split_id, [])
            if split_fcols:
                all_feature_cols.update(split_fcols)

        if all_feature_cols:
            self._feature_cols = sorted(all_feature_cols)
        else:
            self._feature_cols = self._metadata.feature_cols

    # ── Properties ──────────────────────────────────────────────

    @property
    def ml_type(self) -> MLType:
        """Machine learning type (classification, regression, timetoevent)."""
        return self._metadata.ml_type

    @property
    def target_metric(self) -> str:
        """Target metric name."""
        return self._metadata.target_metric

    @property
    def target_col(self) -> str:
        """Target column name from config."""
        return self._metadata.target_col

    @property
    def target_assignments(self) -> dict[str, str]:
        """Target column assignments from prepared config."""
        return self._metadata.target_assignments

    @property
    def positive_class(self) -> Any:
        """Positive class label for classification."""
        return self._metadata.positive_class

    @property
    def row_id_col(self) -> str | None:
        """Row ID column name."""
        return self._metadata.row_id_col

    @property
    def feature_cols(self) -> list[str]:
        """Input feature column names from study config."""
        return self._feature_cols

    @property
    def n_outersplits(self) -> int:
        """Number of loaded outersplits."""
        return len(self._outersplits)

    @property
    def outersplits(self) -> list[int]:
        """List of loaded outersplit IDs."""
        return list(self._outersplits)

    @property
    def config(self) -> dict[str, Any]:
        """Full study configuration dictionary.

        Note:
            After ``TaskPredictor.load()``, this returns an empty dict
            because the full config is not serialized — only the metadata
            fields needed for prediction are saved.
        """
        return self._config

    @property
    def classes_(self) -> np.ndarray:
        """Class labels from the first model (classification only).

        Raises:
            AttributeError: If the model does not have a classes_ attribute.
        """
        model = self._models[self._outersplits[0]]
        if not hasattr(model, "classes_"):
            raise AttributeError(f"Not a classification model: {type(model).__name__}")
        result: np.ndarray = model.classes_
        return result

    @property
    def feature_cols_per_split(self) -> dict[int, list[str]]:
        """Input feature columns per outersplit (loaded from disk)."""
        return self._feature_cols_per_split

    @property
    def feature_groups_per_split(self) -> dict[int, dict[str, list[str]]]:
        """Feature groups per outersplit (loaded from disk)."""
        return self._feature_groups_per_split

    # ── Per-outersplit access ───────────────────────────────────

    def get_model(self, outersplit_id: int) -> Any:
        """Get the fitted model for an outersplit.

        Args:
            outersplit_id: Outer split index.

        Returns:
            The fitted model object.
        """
        return self._models[outersplit_id]

    def get_selected_features(self, outersplit_id: int) -> list[str]:
        """Get selected features for an outersplit.

        Args:
            outersplit_id: Outer split index.

        Returns:
            List of selected feature names.
        """
        return self._selected_features[outersplit_id]

    # ── Prediction ──────────────────────────────────────────────

    def predict(self, data: pd.DataFrame, df: bool = False) -> np.ndarray | pd.DataFrame:
        """Predict on new data using all outer-split models.

        Args:
            data: DataFrame containing feature columns.
            df: If True, return a DataFrame with per-outersplit predictions
                and ensemble (averaged) predictions, with columns
                ``outersplit``, ``row_id``, ``prediction``.
                If False (default), return ensemble-averaged ndarray.

        Returns:
            Ensemble-averaged predictions as ndarray, or a DataFrame with
            per-split and ensemble rows when ``df=True``.
        """
        per_split_preds: list[np.ndarray] = []
        all_rows: list[pd.DataFrame] = []

        for split_id in self._outersplits:
            features = self._selected_features[split_id]
            preds = self._models[split_id].predict(data[features])
            per_split_preds.append(preds)

            if df:
                split_df = pd.DataFrame(
                    {
                        "outersplit": split_id,
                        "row_id": data.index,
                        "prediction": preds,
                    }
                )
                all_rows.append(split_df)

        ensemble: np.ndarray = np.mean(per_split_preds, axis=0)

        if df:
            ensemble_df = pd.DataFrame(
                {
                    "outersplit": "ensemble",
                    "row_id": data.index,
                    "prediction": ensemble,
                }
            )
            all_rows.append(ensemble_df)
            return pd.concat(all_rows, ignore_index=True)
        return ensemble

    def predict_proba(self, data: pd.DataFrame, df: bool = False) -> np.ndarray | pd.DataFrame:
        """Predict probabilities on new data (classification/multiclass only).

        Args:
            data: DataFrame containing feature columns.
            df: If True, return a DataFrame with per-outersplit probabilities
                and ensemble (averaged) probabilities, with columns
                ``outersplit``, ``row_id``, plus one column per class label.
                If False (default), return ensemble-averaged ndarray.

        Returns:
            Ensemble-averaged probabilities as ndarray, or a DataFrame with
            per-split and ensemble rows when ``df=True``.

        Raises:
            TypeError: If ml_type is not classification or multiclass.
        """
        if self.ml_type not in (MLType.BINARY, MLType.MULTICLASS):
            raise TypeError(
                f"predict_proba() is only available for classification and multiclass tasks, "
                f"but this study has ml_type='{self.ml_type}'."
            )
        per_split_probas: list[np.ndarray] = []
        all_rows: list[pd.DataFrame] = []
        class_labels = self.classes_

        for split_id in self._outersplits:
            features = self._selected_features[split_id]
            probas = self._models[split_id].predict_proba(data[features])
            if isinstance(probas, pd.DataFrame):
                probas = probas.values
            per_split_probas.append(probas)

            if df:
                split_df = pd.DataFrame(probas, columns=class_labels)
                split_df.insert(0, "outersplit", split_id)
                split_df.insert(1, "row_id", data.index.values)
                all_rows.append(split_df)

        ensemble: np.ndarray = np.mean(per_split_probas, axis=0)

        if df:
            ensemble_df = pd.DataFrame(ensemble, columns=class_labels)
            ensemble_df.insert(0, "outersplit", "ensemble")
            ensemble_df.insert(1, "row_id", data.index.values)
            all_rows.append(ensemble_df)
            return pd.concat(all_rows, ignore_index=True)
        return ensemble

    # ── Scoring ─────────────────────────────────────────────────

    def performance(
        self,
        data: pd.DataFrame,
        metrics: list[str] | None = None,
        threshold: float = 0.5,
    ) -> pd.DataFrame:
        """Compute performance scores on provided data for each outer split.

        Each outer-split model is scored independently on the **same** data.
        Scores are computed fresh — never read from disk.

        Args:
            data: Data to score on; must contain feature columns + target column.
            metrics: List of metric names to compute.
                If None, uses the study target metric.
            threshold: Classification threshold for threshold-dependent metrics.

        Returns:
            DataFrame with columns: outersplit, metric, score.
        """
        if metrics is None:
            metrics = [self.target_metric]

        rows = []
        for split_id in self._outersplits:
            model = self._models[split_id]
            features = self._selected_features[split_id]

            for metric_name in metrics:
                # target_assignments supports T2E (duration/event keys)
                score = get_performance_from_model(
                    model=model,
                    data=data,
                    feature_cols=features,
                    target_metric=metric_name,
                    target_assignments=self.target_assignments,
                    threshold=threshold,
                    positive_class=self.positive_class,
                )
                rows.append({"outersplit": split_id, "metric": metric_name, "score": score})

        return pd.DataFrame(rows)

    # ── Feature Importance ──────────────────────────────────────

    def _build_pool_data(self, data: pd.DataFrame) -> dict[int, pd.DataFrame]:
        """Build per-split pool data for permutation FI.

        In study-connected mode (constructed via ``TaskPredictor(study_path,
        task_id)``), loads per-split ``data_traindev.parquet`` from the study
        directory.  This provides a richer pool of replacement values for
        permutation FI, better approximating the marginal distribution of
        each feature.

        In deployment mode (loaded via ``TaskPredictor.load(path)``), the
        original study directory is not available, so the user-provided
        ``data`` is used as the pool for all splits.

        Args:
            data: User-provided data (used as fallback for all splits).

        Returns:
            Dict mapping outersplit_id to pool DataFrame.
        """
        loader = StudyLoader(self._study_path)
        pool: dict[int, pd.DataFrame] = {}

        for split_id in self._outersplits:
            split_loader = loader.get_outersplit_loader(
                outersplit_id=split_id,
                task_id=self._task_id,
                result_type=self._result_type,
            )
            traindev_path = split_loader.fold_dir / "data_traindev.parquet"
            if traindev_path.exists():
                pool[split_id] = split_loader.load_train_data()
            else:
                pool[split_id] = data

        return pool

    def _dispatch_fi(
        self,
        test_data: dict[int, pd.DataFrame],
        train_data: dict[int, pd.DataFrame],
        fi_type: FIType,
        *,
        n_repeats: int = 10,
        feature_groups: dict[str, list[str]] | None = None,
        random_state: int = 42,
        **kwargs: Any,
    ) -> pd.DataFrame:
        """Dispatch FI calculation to the appropriate Layer 2 function.

        Shared dispatch logic for both ``TaskPredictor`` (user-provided data)
        and ``TaskPredictorTest`` (stored study data).  The caller is
        responsible for constructing the ``test_data`` and ``train_data``
        dicts; this method only handles algorithm dispatch and result
        formatting.

        Args:
            test_data: Dict mapping outersplit_id to test DataFrame.
            train_data: Dict mapping outersplit_id to train DataFrame
                (used as the sampling pool for permutation FI).
            fi_type: Feature importance type (must already be a ``FIType``).
            n_repeats: Number of permutation repeats.
            feature_groups: Feature groups for group permutation.
                If ``None`` and ``fi_type`` is ``GROUP_PERMUTATION``,
                groups are loaded from the study via
                ``_compute_feature_groups()``.
            random_state: Random seed.
            **kwargs: Additional kwargs forwarded to the FI function
                (e.g. ``shap_type``, ``max_samples`` for SHAP).

        Returns:
            DataFrame with FI results including a ``fi_type`` column
            and per-split + ensemble rows.

        Raises:
            ValueError: If ``fi_type`` is not a recognized ``FIType``.
        """
        from octopus.predict.feature_importance import (  # noqa: PLC0415
            calculate_fi_permutation,
            calculate_fi_shap,
        )

        if fi_type in (FIType.PERMUTATION, FIType.GROUP_PERMUTATION):
            resolved_groups = None
            if fi_type == FIType.GROUP_PERMUTATION:
                if feature_groups is not None:
                    resolved_groups = feature_groups
                else:
                    resolved_groups = self._compute_feature_groups()

            result = calculate_fi_permutation(
                models=self._models,
                selected_features=self._selected_features,
                test_data=test_data,
                train_data=train_data,
                target_assignments=self.target_assignments,
                target_metric=self.target_metric,
                positive_class=self.positive_class,
                n_repeats=n_repeats,
                random_state=random_state,
                feature_groups=resolved_groups,
                feature_cols=self._feature_cols,
            )
        elif fi_type == FIType.SHAP:
            result = calculate_fi_shap(
                models=self._models,
                selected_features=self._selected_features,
                test_data=test_data,
                ml_type=self.ml_type,
                feature_cols=self._feature_cols,
                **kwargs,
            )
        else:
            raise ValueError(
                f"Unknown fi_type '{fi_type}'. Use FIType.PERMUTATION, FIType.GROUP_PERMUTATION, or FIType.SHAP."
            )

        result.insert(0, "fi_type", fi_type.value)
        return result

    def calculate_fi(
        self,
        data: pd.DataFrame,
        fi_type: FIType = FIType.PERMUTATION,
        *,
        n_repeats: int = 10,
        feature_groups: dict[str, list[str]] | None = None,
        random_state: int = 42,
        **kwargs: Any,
    ) -> pd.DataFrame:
        """Calculate feature importance on provided data across all outer splits.

        Computes FI fresh from loaded models, providing p-values,
        confidence intervals, and group permutation support.

        Args:
            data: Data to compute FI on (must contain features + target).
            fi_type: Type of feature importance. One of:
                - ``FIType.PERMUTATION`` — Per-feature permutation importance.
                - ``FIType.GROUP_PERMUTATION`` — Per-feature + per-group permutation
                  importance.  Uses ``feature_groups`` (from study config or
                  explicitly provided) to also compute group-level importance.
                - ``FIType.SHAP`` — SHAP-based importance.  Pass ``shap_type`` as a
                  kwarg to select the explainer: ``"kernel"`` (default),
                  ``"permutation"``, or ``"exact"``.
            n_repeats: Number of permutation repeats (for permutation FI).
            feature_groups: Dict mapping group names to feature lists
                (for group_permutation).  If None and fi_type is
                ``FIType.GROUP_PERMUTATION``, groups are loaded from the study.
            random_state: Random seed.
            **kwargs: Additional keyword arguments passed to the FI function.
                For ``fi_type=FIType.SHAP``, supported kwargs include:
                ``shap_type`` (``"kernel"``, ``"permutation"``, ``"exact"``),
                ``max_samples``, ``background_size``.

        Returns:
            DataFrame with feature importance results including a ``fi_type``
            column and per-split + ensemble rows.

        Raises:
            ValueError: If fi_type is unknown.
        """
        fi_type = FIType(fi_type)

        # Build per-split data dicts
        # All splits share the same DataFrame reference.  Safe because
        # compute_permutation_single / compute_shap_single copy data before mutating.
        test_data = dict.fromkeys(self._outersplits, data)
        train_data = self._build_pool_data(data)

        return self._dispatch_fi(
            test_data,
            train_data,
            fi_type,
            n_repeats=n_repeats,
            feature_groups=feature_groups,
            random_state=random_state,
            **kwargs,
        )

    def _compute_feature_groups(self) -> dict[str, list[str]]:
        """Compute merged feature groups from all outersplits.

        Merges the per-split feature groups loaded from disk into a single
        dict. Groups with the same name across splits are merged by taking
        the union of their features.

        Returns:
            Dict mapping group names to lists of feature names.
        """
        all_groups: dict[str, list[str]] = {}
        for split_id in self._outersplits:
            split_groups = self._feature_groups_per_split.get(split_id, {})
            for group_name, group_features in split_groups.items():
                if group_name in all_groups:
                    existing = set(all_groups[group_name])
                    existing.update(group_features)
                    all_groups[group_name] = sorted(existing)
                else:
                    all_groups[group_name] = sorted(group_features)
        return all_groups

    # ── Serialization ───────────────────────────────────────────

    def save(self, path: str | UPath) -> None:
        """Save the predictor for standalone deployment.

        Writes a self-contained directory with models + metadata only
        (no data). The saved predictor can be loaded later without the
        original study directory.

        Args:
            path: Directory path to save to. Created if it doesn't exist.
        """
        save_dir = UPath(path)
        save_dir.mkdir(parents=True, exist_ok=True)

        # Save metadata
        metadata = {
            "task_id": self._task_id,
            "ml_type": self.ml_type,
            "target_metric": self.target_metric,
            "target_col": self.target_col,
            "target_assignments": self.target_assignments,
            "positive_class": self.positive_class,
            "row_id_col": self.row_id_col,
            "feature_cols": self._feature_cols,
            "outersplits": self._outersplits,
            "result_type": self._result_type,
            "feature_cols_per_split": {str(k): v for k, v in self._feature_cols_per_split.items()},
            "feature_groups_per_split": {str(k): v for k, v in self._feature_groups_per_split.items()},
        }
        with (save_dir / "metadata.json").open("w") as f:
            json.dump(metadata, f, indent=2, default=str)

        # Save models
        models_dir = save_dir / "models"
        models_dir.mkdir(parents=True, exist_ok=True)
        for split_id in self._outersplits:
            joblib_save(self._models[split_id], models_dir / f"model_{split_id:03d}.joblib")

        # Save selected features
        features_dir = save_dir / "selected_features"
        features_dir.mkdir(parents=True, exist_ok=True)
        for split_id in self._outersplits:
            with (features_dir / f"split_{split_id:03d}.json").open("w") as f:
                json.dump(self._selected_features[split_id], f)

        # Save version info
        with (save_dir / "version.json").open("w") as f:
            json.dump({"octopus_version": get_version()}, f, indent=2)

    @classmethod
    def load(cls, path: str | UPath) -> TaskPredictor:
        """Load a previously saved predictor.

        Args:
            path: Directory path containing the saved predictor.

        Returns:
            A new TaskPredictor instance that can predict without the
            original study directory.
        """
        load_dir = UPath(path)

        # Load metadata
        with (load_dir / "metadata.json").open() as f:
            metadata_dict = json.load(f)

        # Load version and warn if mismatch
        version_path = load_dir / "version.json"
        if version_path.exists():
            with version_path.open() as f:
                version_info = json.load(f)
            saved_version = version_info.get("octopus_version", "unknown")
            current_version = get_version()
            if saved_version not in ("unknown", current_version):
                import warnings  # noqa: PLC0415

                warnings.warn(
                    f"Predictor was saved with octopus {saved_version}, "
                    f"but current version is {current_version}. "
                    f"Predictions may differ.",
                    stacklevel=2,
                )

        # Create instance without calling __init__ / __attrs_post_init__
        # Use TaskPredictor explicitly (not cls) to avoid subclass issues
        instance = TaskPredictor.__new__(TaskPredictor)

        instance._study_path = UPath(load_dir)
        instance._result_type = metadata_dict.get("result_type", "best")
        instance._task_id = metadata_dict["task_id"]
        instance._config = {}
        instance._metadata = StudyMetadata(
            ml_type=metadata_dict["ml_type"],
            target_metric=metadata_dict["target_metric"],
            target_col=metadata_dict["target_col"],
            target_assignments=metadata_dict.get("target_assignments", {}),
            positive_class=metadata_dict.get("positive_class"),
            row_id_col=metadata_dict.get("row_id_col"),
            feature_cols=metadata_dict.get("feature_cols", []),
            n_outersplits=len(metadata_dict.get("outersplits", [])),
        )
        instance._feature_cols = metadata_dict.get("feature_cols", [])
        instance._outersplits = metadata_dict.get("outersplits", [])
        # Restore per-split data
        instance._feature_cols_per_split = {
            int(k): v for k, v in metadata_dict.get("feature_cols_per_split", {}).items()
        }
        instance._feature_groups_per_split = {
            int(k): v for k, v in metadata_dict.get("feature_groups_per_split", {}).items()
        }

        # Load models
        instance._models = {}
        models_dir = load_dir / "models"
        for split_id in instance._outersplits:
            instance._models[split_id] = joblib_load(models_dir / f"model_{split_id:03d}.joblib")

        # Load selected features
        instance._selected_features = {}
        features_dir = load_dir / "selected_features"
        for split_id in instance._outersplits:
            with (features_dir / f"split_{split_id:03d}.json").open() as f:
                instance._selected_features[split_id] = json.load(f)

        return instance

classes_ property

Class labels from the first model (classification only).

Raises:

Type Description
AttributeError

If the model does not have a classes_ attribute.

config property

Full study configuration dictionary.

Note

After TaskPredictor.load(), this returns an empty dict because the full config is not serialized — only the metadata fields needed for prediction are saved.

feature_cols property

Input feature column names from study config.

feature_cols_per_split property

Input feature columns per outersplit (loaded from disk).

feature_groups_per_split property

Feature groups per outersplit (loaded from disk).

ml_type property

Machine learning type (classification, regression, timetoevent).

n_outersplits property

Number of loaded outersplits.

outersplits property

List of loaded outersplit IDs.

positive_class property

Positive class label for classification.

row_id_col property

Row ID column name.

target_assignments property

Target column assignments from prepared config.

target_col property

Target column name from config.

target_metric property

Target metric name.

__attrs_post_init__()

Load config, validate, and load artifacts from the study directory.

Source code in octopus/predict/task_predictor.py
def __attrs_post_init__(self) -> None:
    """Load config, validate, and load artifacts from the study directory."""
    loader = StudyLoader(self._study_path)
    self._config = loader.load_config()

    # Validate task_id via I/O layer
    loader.validate_task_id(self._task_id, self._config)

    # Extract metadata via I/O layer
    self._metadata = loader.extract_metadata(self._config)

    # Load all per-split artifacts via I/O layer
    artifacts = loader.load_task_artifacts(
        self._task_id,
        self._result_type,
        self._metadata.n_outersplits,
    )

    # Flatten artifacts for fast per-split access
    self._outersplits = list(artifacts.outersplit_ids)
    for split_id, sa in artifacts.splits.items():
        self._models[split_id] = sa.model
        self._selected_features[split_id] = sa.selected_features
        self._feature_cols_per_split[split_id] = sa.feature_cols
        self._feature_groups_per_split[split_id] = sa.feature_groups

    # Compute union of feature_cols across all outersplits
    all_feature_cols: set[str] = set()
    for split_id in self._outersplits:
        split_fcols = self._feature_cols_per_split.get(split_id, [])
        if split_fcols:
            all_feature_cols.update(split_fcols)

    if all_feature_cols:
        self._feature_cols = sorted(all_feature_cols)
    else:
        self._feature_cols = self._metadata.feature_cols

calculate_fi(data, fi_type=FIType.PERMUTATION, *, n_repeats=10, feature_groups=None, random_state=42, **kwargs)

Calculate feature importance on provided data across all outer splits.

Computes FI fresh from loaded models, providing p-values, confidence intervals, and group permutation support.

Parameters:

Name Type Description Default
data DataFrame

Data to compute FI on (must contain features + target).

required
fi_type FIType

Type of feature importance. One of: - FIType.PERMUTATION — Per-feature permutation importance. - FIType.GROUP_PERMUTATION — Per-feature + per-group permutation importance. Uses feature_groups (from study config or explicitly provided) to also compute group-level importance. - FIType.SHAP — SHAP-based importance. Pass shap_type as a kwarg to select the explainer: "kernel" (default), "permutation", or "exact".

PERMUTATION
n_repeats int

Number of permutation repeats (for permutation FI).

10
feature_groups dict[str, list[str]] | None

Dict mapping group names to feature lists (for group_permutation). If None and fi_type is FIType.GROUP_PERMUTATION, groups are loaded from the study.

None
random_state int

Random seed.

42
**kwargs Any

Additional keyword arguments passed to the FI function. For fi_type=FIType.SHAP, supported kwargs include: shap_type ("kernel", "permutation", "exact"), max_samples, background_size.

{}

Returns:

Type Description
DataFrame

DataFrame with feature importance results including a fi_type

DataFrame

column and per-split + ensemble rows.

Raises:

Type Description
ValueError

If fi_type is unknown.

Source code in octopus/predict/task_predictor.py
def calculate_fi(
    self,
    data: pd.DataFrame,
    fi_type: FIType = FIType.PERMUTATION,
    *,
    n_repeats: int = 10,
    feature_groups: dict[str, list[str]] | None = None,
    random_state: int = 42,
    **kwargs: Any,
) -> pd.DataFrame:
    """Calculate feature importance on provided data across all outer splits.

    Computes FI fresh from loaded models, providing p-values,
    confidence intervals, and group permutation support.

    Args:
        data: Data to compute FI on (must contain features + target).
        fi_type: Type of feature importance. One of:
            - ``FIType.PERMUTATION`` — Per-feature permutation importance.
            - ``FIType.GROUP_PERMUTATION`` — Per-feature + per-group permutation
              importance.  Uses ``feature_groups`` (from study config or
              explicitly provided) to also compute group-level importance.
            - ``FIType.SHAP`` — SHAP-based importance.  Pass ``shap_type`` as a
              kwarg to select the explainer: ``"kernel"`` (default),
              ``"permutation"``, or ``"exact"``.
        n_repeats: Number of permutation repeats (for permutation FI).
        feature_groups: Dict mapping group names to feature lists
            (for group_permutation).  If None and fi_type is
            ``FIType.GROUP_PERMUTATION``, groups are loaded from the study.
        random_state: Random seed.
        **kwargs: Additional keyword arguments passed to the FI function.
            For ``fi_type=FIType.SHAP``, supported kwargs include:
            ``shap_type`` (``"kernel"``, ``"permutation"``, ``"exact"``),
            ``max_samples``, ``background_size``.

    Returns:
        DataFrame with feature importance results including a ``fi_type``
        column and per-split + ensemble rows.

    Raises:
        ValueError: If fi_type is unknown.
    """
    fi_type = FIType(fi_type)

    # Build per-split data dicts
    # All splits share the same DataFrame reference.  Safe because
    # compute_permutation_single / compute_shap_single copy data before mutating.
    test_data = dict.fromkeys(self._outersplits, data)
    train_data = self._build_pool_data(data)

    return self._dispatch_fi(
        test_data,
        train_data,
        fi_type,
        n_repeats=n_repeats,
        feature_groups=feature_groups,
        random_state=random_state,
        **kwargs,
    )

get_model(outersplit_id)

Get the fitted model for an outersplit.

Parameters:

Name Type Description Default
outersplit_id int

Outer split index.

required

Returns:

Type Description
Any

The fitted model object.

Source code in octopus/predict/task_predictor.py
def get_model(self, outersplit_id: int) -> Any:
    """Get the fitted model for an outersplit.

    Args:
        outersplit_id: Outer split index.

    Returns:
        The fitted model object.
    """
    return self._models[outersplit_id]

get_selected_features(outersplit_id)

Get selected features for an outersplit.

Parameters:

Name Type Description Default
outersplit_id int

Outer split index.

required

Returns:

Type Description
list[str]

List of selected feature names.

Source code in octopus/predict/task_predictor.py
def get_selected_features(self, outersplit_id: int) -> list[str]:
    """Get selected features for an outersplit.

    Args:
        outersplit_id: Outer split index.

    Returns:
        List of selected feature names.
    """
    return self._selected_features[outersplit_id]

load(path) classmethod

Load a previously saved predictor.

Parameters:

Name Type Description Default
path str | UPath

Directory path containing the saved predictor.

required

Returns:

Type Description
TaskPredictor

A new TaskPredictor instance that can predict without the

TaskPredictor

original study directory.

Source code in octopus/predict/task_predictor.py
@classmethod
def load(cls, path: str | UPath) -> TaskPredictor:
    """Load a previously saved predictor.

    Args:
        path: Directory path containing the saved predictor.

    Returns:
        A new TaskPredictor instance that can predict without the
        original study directory.
    """
    load_dir = UPath(path)

    # Load metadata
    with (load_dir / "metadata.json").open() as f:
        metadata_dict = json.load(f)

    # Load version and warn if mismatch
    version_path = load_dir / "version.json"
    if version_path.exists():
        with version_path.open() as f:
            version_info = json.load(f)
        saved_version = version_info.get("octopus_version", "unknown")
        current_version = get_version()
        if saved_version not in ("unknown", current_version):
            import warnings  # noqa: PLC0415

            warnings.warn(
                f"Predictor was saved with octopus {saved_version}, "
                f"but current version is {current_version}. "
                f"Predictions may differ.",
                stacklevel=2,
            )

    # Create instance without calling __init__ / __attrs_post_init__
    # Use TaskPredictor explicitly (not cls) to avoid subclass issues
    instance = TaskPredictor.__new__(TaskPredictor)

    instance._study_path = UPath(load_dir)
    instance._result_type = metadata_dict.get("result_type", "best")
    instance._task_id = metadata_dict["task_id"]
    instance._config = {}
    instance._metadata = StudyMetadata(
        ml_type=metadata_dict["ml_type"],
        target_metric=metadata_dict["target_metric"],
        target_col=metadata_dict["target_col"],
        target_assignments=metadata_dict.get("target_assignments", {}),
        positive_class=metadata_dict.get("positive_class"),
        row_id_col=metadata_dict.get("row_id_col"),
        feature_cols=metadata_dict.get("feature_cols", []),
        n_outersplits=len(metadata_dict.get("outersplits", [])),
    )
    instance._feature_cols = metadata_dict.get("feature_cols", [])
    instance._outersplits = metadata_dict.get("outersplits", [])
    # Restore per-split data
    instance._feature_cols_per_split = {
        int(k): v for k, v in metadata_dict.get("feature_cols_per_split", {}).items()
    }
    instance._feature_groups_per_split = {
        int(k): v for k, v in metadata_dict.get("feature_groups_per_split", {}).items()
    }

    # Load models
    instance._models = {}
    models_dir = load_dir / "models"
    for split_id in instance._outersplits:
        instance._models[split_id] = joblib_load(models_dir / f"model_{split_id:03d}.joblib")

    # Load selected features
    instance._selected_features = {}
    features_dir = load_dir / "selected_features"
    for split_id in instance._outersplits:
        with (features_dir / f"split_{split_id:03d}.json").open() as f:
            instance._selected_features[split_id] = json.load(f)

    return instance

performance(data, metrics=None, threshold=0.5)

Compute performance scores on provided data for each outer split.

Each outer-split model is scored independently on the same data. Scores are computed fresh — never read from disk.

Parameters:

Name Type Description Default
data DataFrame

Data to score on; must contain feature columns + target column.

required
metrics list[str] | None

List of metric names to compute. If None, uses the study target metric.

None
threshold float

Classification threshold for threshold-dependent metrics.

0.5

Returns:

Type Description
DataFrame

DataFrame with columns: outersplit, metric, score.

Source code in octopus/predict/task_predictor.py
def performance(
    self,
    data: pd.DataFrame,
    metrics: list[str] | None = None,
    threshold: float = 0.5,
) -> pd.DataFrame:
    """Compute performance scores on provided data for each outer split.

    Each outer-split model is scored independently on the **same** data.
    Scores are computed fresh — never read from disk.

    Args:
        data: Data to score on; must contain feature columns + target column.
        metrics: List of metric names to compute.
            If None, uses the study target metric.
        threshold: Classification threshold for threshold-dependent metrics.

    Returns:
        DataFrame with columns: outersplit, metric, score.
    """
    if metrics is None:
        metrics = [self.target_metric]

    rows = []
    for split_id in self._outersplits:
        model = self._models[split_id]
        features = self._selected_features[split_id]

        for metric_name in metrics:
            # target_assignments supports T2E (duration/event keys)
            score = get_performance_from_model(
                model=model,
                data=data,
                feature_cols=features,
                target_metric=metric_name,
                target_assignments=self.target_assignments,
                threshold=threshold,
                positive_class=self.positive_class,
            )
            rows.append({"outersplit": split_id, "metric": metric_name, "score": score})

    return pd.DataFrame(rows)

predict(data, df=False)

Predict on new data using all outer-split models.

Parameters:

Name Type Description Default
data DataFrame

DataFrame containing feature columns.

required
df bool

If True, return a DataFrame with per-outersplit predictions and ensemble (averaged) predictions, with columns outersplit, row_id, prediction. If False (default), return ensemble-averaged ndarray.

False

Returns:

Type Description
ndarray | DataFrame

Ensemble-averaged predictions as ndarray, or a DataFrame with

ndarray | DataFrame

per-split and ensemble rows when df=True.

Source code in octopus/predict/task_predictor.py
def predict(self, data: pd.DataFrame, df: bool = False) -> np.ndarray | pd.DataFrame:
    """Predict on new data using all outer-split models.

    Args:
        data: DataFrame containing feature columns.
        df: If True, return a DataFrame with per-outersplit predictions
            and ensemble (averaged) predictions, with columns
            ``outersplit``, ``row_id``, ``prediction``.
            If False (default), return ensemble-averaged ndarray.

    Returns:
        Ensemble-averaged predictions as ndarray, or a DataFrame with
        per-split and ensemble rows when ``df=True``.
    """
    per_split_preds: list[np.ndarray] = []
    all_rows: list[pd.DataFrame] = []

    for split_id in self._outersplits:
        features = self._selected_features[split_id]
        preds = self._models[split_id].predict(data[features])
        per_split_preds.append(preds)

        if df:
            split_df = pd.DataFrame(
                {
                    "outersplit": split_id,
                    "row_id": data.index,
                    "prediction": preds,
                }
            )
            all_rows.append(split_df)

    ensemble: np.ndarray = np.mean(per_split_preds, axis=0)

    if df:
        ensemble_df = pd.DataFrame(
            {
                "outersplit": "ensemble",
                "row_id": data.index,
                "prediction": ensemble,
            }
        )
        all_rows.append(ensemble_df)
        return pd.concat(all_rows, ignore_index=True)
    return ensemble

predict_proba(data, df=False)

Predict probabilities on new data (classification/multiclass only).

Parameters:

Name Type Description Default
data DataFrame

DataFrame containing feature columns.

required
df bool

If True, return a DataFrame with per-outersplit probabilities and ensemble (averaged) probabilities, with columns outersplit, row_id, plus one column per class label. If False (default), return ensemble-averaged ndarray.

False

Returns:

Type Description
ndarray | DataFrame

Ensemble-averaged probabilities as ndarray, or a DataFrame with

ndarray | DataFrame

per-split and ensemble rows when df=True.

Raises:

Type Description
TypeError

If ml_type is not classification or multiclass.

Source code in octopus/predict/task_predictor.py
def predict_proba(self, data: pd.DataFrame, df: bool = False) -> np.ndarray | pd.DataFrame:
    """Predict probabilities on new data (classification/multiclass only).

    Args:
        data: DataFrame containing feature columns.
        df: If True, return a DataFrame with per-outersplit probabilities
            and ensemble (averaged) probabilities, with columns
            ``outersplit``, ``row_id``, plus one column per class label.
            If False (default), return ensemble-averaged ndarray.

    Returns:
        Ensemble-averaged probabilities as ndarray, or a DataFrame with
        per-split and ensemble rows when ``df=True``.

    Raises:
        TypeError: If ml_type is not classification or multiclass.
    """
    if self.ml_type not in (MLType.BINARY, MLType.MULTICLASS):
        raise TypeError(
            f"predict_proba() is only available for classification and multiclass tasks, "
            f"but this study has ml_type='{self.ml_type}'."
        )
    per_split_probas: list[np.ndarray] = []
    all_rows: list[pd.DataFrame] = []
    class_labels = self.classes_

    for split_id in self._outersplits:
        features = self._selected_features[split_id]
        probas = self._models[split_id].predict_proba(data[features])
        if isinstance(probas, pd.DataFrame):
            probas = probas.values
        per_split_probas.append(probas)

        if df:
            split_df = pd.DataFrame(probas, columns=class_labels)
            split_df.insert(0, "outersplit", split_id)
            split_df.insert(1, "row_id", data.index.values)
            all_rows.append(split_df)

    ensemble: np.ndarray = np.mean(per_split_probas, axis=0)

    if df:
        ensemble_df = pd.DataFrame(ensemble, columns=class_labels)
        ensemble_df.insert(0, "outersplit", "ensemble")
        ensemble_df.insert(1, "row_id", data.index.values)
        all_rows.append(ensemble_df)
        return pd.concat(all_rows, ignore_index=True)
    return ensemble

save(path)

Save the predictor for standalone deployment.

Writes a self-contained directory with models + metadata only (no data). The saved predictor can be loaded later without the original study directory.

Parameters:

Name Type Description Default
path str | UPath

Directory path to save to. Created if it doesn't exist.

required
Source code in octopus/predict/task_predictor.py
def save(self, path: str | UPath) -> None:
    """Save the predictor for standalone deployment.

    Writes a self-contained directory with models + metadata only
    (no data). The saved predictor can be loaded later without the
    original study directory.

    Args:
        path: Directory path to save to. Created if it doesn't exist.
    """
    save_dir = UPath(path)
    save_dir.mkdir(parents=True, exist_ok=True)

    # Save metadata
    metadata = {
        "task_id": self._task_id,
        "ml_type": self.ml_type,
        "target_metric": self.target_metric,
        "target_col": self.target_col,
        "target_assignments": self.target_assignments,
        "positive_class": self.positive_class,
        "row_id_col": self.row_id_col,
        "feature_cols": self._feature_cols,
        "outersplits": self._outersplits,
        "result_type": self._result_type,
        "feature_cols_per_split": {str(k): v for k, v in self._feature_cols_per_split.items()},
        "feature_groups_per_split": {str(k): v for k, v in self._feature_groups_per_split.items()},
    }
    with (save_dir / "metadata.json").open("w") as f:
        json.dump(metadata, f, indent=2, default=str)

    # Save models
    models_dir = save_dir / "models"
    models_dir.mkdir(parents=True, exist_ok=True)
    for split_id in self._outersplits:
        joblib_save(self._models[split_id], models_dir / f"model_{split_id:03d}.joblib")

    # Save selected features
    features_dir = save_dir / "selected_features"
    features_dir.mkdir(parents=True, exist_ok=True)
    for split_id in self._outersplits:
        with (features_dir / f"split_{split_id:03d}.json").open("w") as f:
            json.dump(self._selected_features[split_id], f)

    # Save version info
    with (save_dir / "version.json").open("w") as f:
        json.dump({"octopus_version": get_version()}, f, indent=2)

TaskPredictorTest

Bases: TaskPredictor

Predictor for analysing study results on held-out test data.

Inherits from TaskPredictor and additionally stores test and train data. Overrides predict, predict_proba, performance, and calculate_fi to use stored test data implicitly — the caller never needs to pass data.

Each outer-split model predicts only on its corresponding test data. No averaging across splits.

Parameters:

Name Type Description Default
study_path

Path to the study directory.

required
task_id

Concrete workflow task index (must be >= 0).

required
result_type

Result type for filtering results (default: 'best').

required

Raises:

Type Description
ValueError

If task_id is negative, out of range, or no models found.

FileNotFoundError

If expected study artifacts are missing.

Example

tp = TaskPredictorTest("studies/my_study", task_id=0) scores = tp.performance(metrics=["AUCROC", "ACC"])

Source code in octopus/predict/task_predictor_test.py
@define(slots=False)
class TaskPredictorTest(TaskPredictor):
    """Predictor for analysing study results on held-out test data.

    Inherits from ``TaskPredictor`` and additionally stores test and train
    data.  Overrides ``predict``, ``predict_proba``, ``performance``, and
    ``calculate_fi`` to use stored test data implicitly — the caller never
    needs to pass data.

    Each outer-split model predicts **only** on its corresponding test data.
    No averaging across splits.

    Args:
        study_path: Path to the study directory.
        task_id: Concrete workflow task index (must be >= 0).
        result_type: Result type for filtering results (default: 'best').

    Raises:
        ValueError: If task_id is negative, out of range, or no models found.
        FileNotFoundError: If expected study artifacts are missing.

    Example:
        >>> tp = TaskPredictorTest("studies/my_study", task_id=0)
        >>> scores = tp.performance(metrics=["AUCROC", "ACC"])
    """

    # Additional fields for test/train data (populated in __attrs_post_init__)
    _test_data: dict[int, pd.DataFrame] = field(init=False, factory=dict, repr=False)
    _train_data: dict[int, pd.DataFrame] = field(init=False, factory=dict, repr=False)

    def __attrs_post_init__(self) -> None:
        """Load base artifacts via parent, then additionally load test/train data."""
        # Call parent __attrs_post_init__ to load config, validate, and load models
        super().__attrs_post_init__()

        # Additionally load test and train data per split via StudyLoader factory
        loader = StudyLoader(self._study_path)
        for split_id in self._outersplits:
            split_loader = loader.get_outersplit_loader(
                outersplit_id=split_id,
                task_id=self._task_id,
                result_type=self._result_type,
            )
            self._test_data[split_id] = split_loader.load_test_data()
            self._train_data[split_id] = split_loader.load_train_data()

    # ── Prediction (per-split on own test data) ─────────────────

    def _get_target_columns(self, test: pd.DataFrame) -> dict[str, Any]:
        """Build target column(s) for ``df=True`` output.

        Returns a dict suitable for unpacking into a DataFrame constructor.

        For single-target tasks (regression, binary, multiclass):
            ``{"target": <array>}``

        For multi-target tasks (T2E):
            ``{"target_duration": <array>, "target_event": <array>}``
            — one key per role in ``target_assignments``, prefixed with
            ``"target_"``.

        The single-target form uses the bare name ``"target"`` (no role
        suffix) to preserve backwards compatibility with existing callers.

        Args:
            test: DataFrame containing the target column(s).

        Returns:
            Dict mapping output column names to arrays of target values.
        """
        assignments = self.target_assignments
        if len(assignments) == 1:
            col = next(iter(assignments.values()))
            return {"target": test[col].values}
        return {f"target_{role}": test[col].values for role, col in assignments.items()}

    def predict(self, df: bool = False) -> np.ndarray | pd.DataFrame:  # type: ignore[override]
        """Predict on stored test data.  Each model predicts only on its own test data.

        No ensemble averaging — results are collected per split.

        Args:
            df: If True, return a DataFrame with outersplit, row_id, prediction,
                and target columns.  For T2E tasks the target columns are
                ``target_duration`` and ``target_event`` instead of ``target``.
                If False (default), return concatenated ndarray.

        Returns:
            Per-split predictions as ndarray or DataFrame.
        """
        row_id_col = self.row_id_col

        all_preds = []
        all_rows = []

        for split_id in self._outersplits:
            features = self._selected_features[split_id]
            test = self._test_data[split_id]
            preds = self._models[split_id].predict(test[features])
            all_preds.append(preds)

            if df:
                row_ids = test[row_id_col] if row_id_col and row_id_col in test.columns else pd.RangeIndex(len(test))
                split_df = pd.DataFrame(
                    {
                        "outersplit": split_id,
                        "row_id": row_ids.values if hasattr(row_ids, "values") else row_ids,
                        "prediction": preds,
                        **self._get_target_columns(test),
                    }
                )
                all_rows.append(split_df)

        if df:
            return pd.concat(all_rows, ignore_index=True)
        return np.concatenate(all_preds)

    def predict_proba(self, df: bool = False) -> np.ndarray | pd.DataFrame:  # type: ignore[override]
        """Predict probabilities on stored test data (classification/multiclass only).

        Each model predicts only on its own test data.  No averaging.

        Args:
            df: If True, return a DataFrame with outersplit, row_id, probability
                columns per class, and target column(s).  If False (default),
                return concatenated ndarray.

        Returns:
            Per-split probabilities as ndarray or DataFrame.

        Raises:
            TypeError: If ml_type is not classification or multiclass.
        """
        if self.ml_type not in (MLType.BINARY, MLType.MULTICLASS):
            raise TypeError(
                f"predict_proba() is only available for classification and multiclass tasks, "
                f"but this study has ml_type='{self.ml_type}'."
            )
        row_id_col = self.row_id_col
        class_labels = self.classes_

        all_probas = []
        all_rows = []

        for split_id in self._outersplits:
            features = self._selected_features[split_id]
            test = self._test_data[split_id]
            probas = self._models[split_id].predict_proba(test[features])
            if isinstance(probas, pd.DataFrame):
                probas = probas.values
            all_probas.append(probas)

            if df:
                row_ids = test[row_id_col] if row_id_col and row_id_col in test.columns else pd.RangeIndex(len(test))
                split_df = pd.DataFrame(probas, columns=class_labels)
                split_df.insert(0, "outersplit", split_id)
                row_vals: Any = row_ids.values if hasattr(row_ids, "values") else row_ids
                split_df.insert(1, "row_id", row_vals)
                for col_name, col_values in self._get_target_columns(test).items():
                    split_df[col_name] = col_values
                all_rows.append(split_df)

        if df:
            return pd.concat(all_rows, ignore_index=True)
        return np.concatenate(all_probas)

    # ── Scoring (per-split on own test data) ────────────────────

    def performance(  # type: ignore[override]
        self,
        metrics: list[str] | None = None,
        threshold: float = 0.5,
    ) -> pd.DataFrame:
        """Compute performance scores on stored test data.

        Each outer-split model is scored **only on its own test data**.
        Scores are computed fresh — never read from disk.

        Args:
            metrics: List of metric names to compute.
                If None, uses the study target metric.
            threshold: Classification threshold for threshold-dependent metrics.

        Returns:
            DataFrame with columns: outersplit, metric, score.
        """
        if metrics is None:
            metrics = [self.target_metric]

        rows = []
        for split_id in self._outersplits:
            model = self._models[split_id]
            features = self._selected_features[split_id]
            test = self._test_data[split_id]

            for metric_name in metrics:
                score = get_performance_from_model(
                    model=model,
                    data=test,
                    feature_cols=features,
                    target_metric=metric_name,
                    target_assignments=self.target_assignments,
                    threshold=threshold,
                    positive_class=self.positive_class,
                )
                rows.append({"outersplit": split_id, "metric": metric_name, "score": score})

        return pd.DataFrame(rows)

    # ── Feature Importance (per-split on own test data) ─────────

    def calculate_fi(  # type: ignore[override]
        self,
        fi_type: FIType = FIType.PERMUTATION,
        *,
        n_repeats: int = 10,
        feature_groups: dict[str, list[str]] | None = None,
        random_state: int = 42,
        **kwargs: Any,
    ) -> pd.DataFrame:
        """Calculate feature importance using stored test data and models.

        Each split's model permutes features only in its own test data.
        Delegates to ``_dispatch_fi()`` (inherited from ``TaskPredictor``)
        with stored per-split test and train data.

        Args:
            fi_type: Type of feature importance. One of:
                - ``FIType.PERMUTATION`` — Per-feature permutation importance.
                - ``FIType.GROUP_PERMUTATION`` — Per-feature + per-group permutation
                  importance.  Uses ``feature_groups`` (from study config or
                  explicitly provided) to also compute group-level importance.
                - ``FIType.SHAP`` — SHAP-based importance.  Pass ``shap_type`` as a
                  kwarg to select the explainer: ``"kernel"`` (default),
                  ``"permutation"``, or ``"exact"``.
            n_repeats: Number of permutation repeats.
            feature_groups: Dict mapping group names to feature lists.
                If None and fi_type is ``FIType.GROUP_PERMUTATION``, groups are
                loaded from the study.
            random_state: Random seed.
            **kwargs: Additional keyword arguments passed to the FI function.
                For ``fi_type=FIType.SHAP``, supported kwargs include:
                ``shap_type`` (``"kernel"``, ``"permutation"``,
                ``"exact"``),
                ``max_samples``, ``background_size``.

        Returns:
            DataFrame with feature importance results including a ``fi_type``
            column and per-split + ensemble rows.

        Raises:
            ValueError: If fi_type is unknown.
        """
        fi_type = FIType(fi_type)

        return self._dispatch_fi(
            self._test_data,
            self._train_data,
            fi_type,
            n_repeats=n_repeats,
            feature_groups=feature_groups,
            random_state=random_state,
            **kwargs,
        )

    # ── Serialization — not supported ───────────────────────────

    def save(self, path: str | UPath) -> None:
        """Not supported for TaskPredictorTest.

        Args:
            path: Ignored — not used.

        Raises:
            NotImplementedError: Always. The study directory is the
                persistent artifact for test predictors.
        """
        raise NotImplementedError(
            "TaskPredictorTest does not support save(). "
            "The study directory is the persistent artifact. "
            "Use TaskPredictor for standalone deployment."
        )

    @classmethod
    def load(cls, path: str | UPath) -> TaskPredictorTest:
        """Not supported for TaskPredictorTest.

        Args:
            path: Ignored — not used.

        Returns:
            Never returns — always raises.

        Raises:
            NotImplementedError: Always. Use TaskPredictor.load() for
                loading saved predictors.
        """
        raise NotImplementedError(
            "TaskPredictorTest does not support load(). "
            "Construct from a study directory instead, or use TaskPredictor.load() "
            "for standalone deployment."
        )

__attrs_post_init__()

Load base artifacts via parent, then additionally load test/train data.

Source code in octopus/predict/task_predictor_test.py
def __attrs_post_init__(self) -> None:
    """Load base artifacts via parent, then additionally load test/train data."""
    # Call parent __attrs_post_init__ to load config, validate, and load models
    super().__attrs_post_init__()

    # Additionally load test and train data per split via StudyLoader factory
    loader = StudyLoader(self._study_path)
    for split_id in self._outersplits:
        split_loader = loader.get_outersplit_loader(
            outersplit_id=split_id,
            task_id=self._task_id,
            result_type=self._result_type,
        )
        self._test_data[split_id] = split_loader.load_test_data()
        self._train_data[split_id] = split_loader.load_train_data()

calculate_fi(fi_type=FIType.PERMUTATION, *, n_repeats=10, feature_groups=None, random_state=42, **kwargs)

Calculate feature importance using stored test data and models.

Each split's model permutes features only in its own test data. Delegates to _dispatch_fi() (inherited from TaskPredictor) with stored per-split test and train data.

Parameters:

Name Type Description Default
fi_type FIType

Type of feature importance. One of: - FIType.PERMUTATION — Per-feature permutation importance. - FIType.GROUP_PERMUTATION — Per-feature + per-group permutation importance. Uses feature_groups (from study config or explicitly provided) to also compute group-level importance. - FIType.SHAP — SHAP-based importance. Pass shap_type as a kwarg to select the explainer: "kernel" (default), "permutation", or "exact".

PERMUTATION
n_repeats int

Number of permutation repeats.

10
feature_groups dict[str, list[str]] | None

Dict mapping group names to feature lists. If None and fi_type is FIType.GROUP_PERMUTATION, groups are loaded from the study.

None
random_state int

Random seed.

42
**kwargs Any

Additional keyword arguments passed to the FI function. For fi_type=FIType.SHAP, supported kwargs include: shap_type ("kernel", "permutation", "exact"), max_samples, background_size.

{}

Returns:

Type Description
DataFrame

DataFrame with feature importance results including a fi_type

DataFrame

column and per-split + ensemble rows.

Raises:

Type Description
ValueError

If fi_type is unknown.

Source code in octopus/predict/task_predictor_test.py
def calculate_fi(  # type: ignore[override]
    self,
    fi_type: FIType = FIType.PERMUTATION,
    *,
    n_repeats: int = 10,
    feature_groups: dict[str, list[str]] | None = None,
    random_state: int = 42,
    **kwargs: Any,
) -> pd.DataFrame:
    """Calculate feature importance using stored test data and models.

    Each split's model permutes features only in its own test data.
    Delegates to ``_dispatch_fi()`` (inherited from ``TaskPredictor``)
    with stored per-split test and train data.

    Args:
        fi_type: Type of feature importance. One of:
            - ``FIType.PERMUTATION`` — Per-feature permutation importance.
            - ``FIType.GROUP_PERMUTATION`` — Per-feature + per-group permutation
              importance.  Uses ``feature_groups`` (from study config or
              explicitly provided) to also compute group-level importance.
            - ``FIType.SHAP`` — SHAP-based importance.  Pass ``shap_type`` as a
              kwarg to select the explainer: ``"kernel"`` (default),
              ``"permutation"``, or ``"exact"``.
        n_repeats: Number of permutation repeats.
        feature_groups: Dict mapping group names to feature lists.
            If None and fi_type is ``FIType.GROUP_PERMUTATION``, groups are
            loaded from the study.
        random_state: Random seed.
        **kwargs: Additional keyword arguments passed to the FI function.
            For ``fi_type=FIType.SHAP``, supported kwargs include:
            ``shap_type`` (``"kernel"``, ``"permutation"``,
            ``"exact"``),
            ``max_samples``, ``background_size``.

    Returns:
        DataFrame with feature importance results including a ``fi_type``
        column and per-split + ensemble rows.

    Raises:
        ValueError: If fi_type is unknown.
    """
    fi_type = FIType(fi_type)

    return self._dispatch_fi(
        self._test_data,
        self._train_data,
        fi_type,
        n_repeats=n_repeats,
        feature_groups=feature_groups,
        random_state=random_state,
        **kwargs,
    )

load(path) classmethod

Not supported for TaskPredictorTest.

Parameters:

Name Type Description Default
path str | UPath

Ignored — not used.

required

Returns:

Type Description
TaskPredictorTest

Never returns — always raises.

Raises:

Type Description
NotImplementedError

Always. Use TaskPredictor.load() for loading saved predictors.

Source code in octopus/predict/task_predictor_test.py
@classmethod
def load(cls, path: str | UPath) -> TaskPredictorTest:
    """Not supported for TaskPredictorTest.

    Args:
        path: Ignored — not used.

    Returns:
        Never returns — always raises.

    Raises:
        NotImplementedError: Always. Use TaskPredictor.load() for
            loading saved predictors.
    """
    raise NotImplementedError(
        "TaskPredictorTest does not support load(). "
        "Construct from a study directory instead, or use TaskPredictor.load() "
        "for standalone deployment."
    )

performance(metrics=None, threshold=0.5)

Compute performance scores on stored test data.

Each outer-split model is scored only on its own test data. Scores are computed fresh — never read from disk.

Parameters:

Name Type Description Default
metrics list[str] | None

List of metric names to compute. If None, uses the study target metric.

None
threshold float

Classification threshold for threshold-dependent metrics.

0.5

Returns:

Type Description
DataFrame

DataFrame with columns: outersplit, metric, score.

Source code in octopus/predict/task_predictor_test.py
def performance(  # type: ignore[override]
    self,
    metrics: list[str] | None = None,
    threshold: float = 0.5,
) -> pd.DataFrame:
    """Compute performance scores on stored test data.

    Each outer-split model is scored **only on its own test data**.
    Scores are computed fresh — never read from disk.

    Args:
        metrics: List of metric names to compute.
            If None, uses the study target metric.
        threshold: Classification threshold for threshold-dependent metrics.

    Returns:
        DataFrame with columns: outersplit, metric, score.
    """
    if metrics is None:
        metrics = [self.target_metric]

    rows = []
    for split_id in self._outersplits:
        model = self._models[split_id]
        features = self._selected_features[split_id]
        test = self._test_data[split_id]

        for metric_name in metrics:
            score = get_performance_from_model(
                model=model,
                data=test,
                feature_cols=features,
                target_metric=metric_name,
                target_assignments=self.target_assignments,
                threshold=threshold,
                positive_class=self.positive_class,
            )
            rows.append({"outersplit": split_id, "metric": metric_name, "score": score})

    return pd.DataFrame(rows)

predict(df=False)

Predict on stored test data. Each model predicts only on its own test data.

No ensemble averaging — results are collected per split.

Parameters:

Name Type Description Default
df bool

If True, return a DataFrame with outersplit, row_id, prediction, and target columns. For T2E tasks the target columns are target_duration and target_event instead of target. If False (default), return concatenated ndarray.

False

Returns:

Type Description
ndarray | DataFrame

Per-split predictions as ndarray or DataFrame.

Source code in octopus/predict/task_predictor_test.py
def predict(self, df: bool = False) -> np.ndarray | pd.DataFrame:  # type: ignore[override]
    """Predict on stored test data.  Each model predicts only on its own test data.

    No ensemble averaging — results are collected per split.

    Args:
        df: If True, return a DataFrame with outersplit, row_id, prediction,
            and target columns.  For T2E tasks the target columns are
            ``target_duration`` and ``target_event`` instead of ``target``.
            If False (default), return concatenated ndarray.

    Returns:
        Per-split predictions as ndarray or DataFrame.
    """
    row_id_col = self.row_id_col

    all_preds = []
    all_rows = []

    for split_id in self._outersplits:
        features = self._selected_features[split_id]
        test = self._test_data[split_id]
        preds = self._models[split_id].predict(test[features])
        all_preds.append(preds)

        if df:
            row_ids = test[row_id_col] if row_id_col and row_id_col in test.columns else pd.RangeIndex(len(test))
            split_df = pd.DataFrame(
                {
                    "outersplit": split_id,
                    "row_id": row_ids.values if hasattr(row_ids, "values") else row_ids,
                    "prediction": preds,
                    **self._get_target_columns(test),
                }
            )
            all_rows.append(split_df)

    if df:
        return pd.concat(all_rows, ignore_index=True)
    return np.concatenate(all_preds)

predict_proba(df=False)

Predict probabilities on stored test data (classification/multiclass only).

Each model predicts only on its own test data. No averaging.

Parameters:

Name Type Description Default
df bool

If True, return a DataFrame with outersplit, row_id, probability columns per class, and target column(s). If False (default), return concatenated ndarray.

False

Returns:

Type Description
ndarray | DataFrame

Per-split probabilities as ndarray or DataFrame.

Raises:

Type Description
TypeError

If ml_type is not classification or multiclass.

Source code in octopus/predict/task_predictor_test.py
def predict_proba(self, df: bool = False) -> np.ndarray | pd.DataFrame:  # type: ignore[override]
    """Predict probabilities on stored test data (classification/multiclass only).

    Each model predicts only on its own test data.  No averaging.

    Args:
        df: If True, return a DataFrame with outersplit, row_id, probability
            columns per class, and target column(s).  If False (default),
            return concatenated ndarray.

    Returns:
        Per-split probabilities as ndarray or DataFrame.

    Raises:
        TypeError: If ml_type is not classification or multiclass.
    """
    if self.ml_type not in (MLType.BINARY, MLType.MULTICLASS):
        raise TypeError(
            f"predict_proba() is only available for classification and multiclass tasks, "
            f"but this study has ml_type='{self.ml_type}'."
        )
    row_id_col = self.row_id_col
    class_labels = self.classes_

    all_probas = []
    all_rows = []

    for split_id in self._outersplits:
        features = self._selected_features[split_id]
        test = self._test_data[split_id]
        probas = self._models[split_id].predict_proba(test[features])
        if isinstance(probas, pd.DataFrame):
            probas = probas.values
        all_probas.append(probas)

        if df:
            row_ids = test[row_id_col] if row_id_col and row_id_col in test.columns else pd.RangeIndex(len(test))
            split_df = pd.DataFrame(probas, columns=class_labels)
            split_df.insert(0, "outersplit", split_id)
            row_vals: Any = row_ids.values if hasattr(row_ids, "values") else row_ids
            split_df.insert(1, "row_id", row_vals)
            for col_name, col_values in self._get_target_columns(test).items():
                split_df[col_name] = col_values
            all_rows.append(split_df)

    if df:
        return pd.concat(all_rows, ignore_index=True)
    return np.concatenate(all_probas)

save(path)

Not supported for TaskPredictorTest.

Parameters:

Name Type Description Default
path str | UPath

Ignored — not used.

required

Raises:

Type Description
NotImplementedError

Always. The study directory is the persistent artifact for test predictors.

Source code in octopus/predict/task_predictor_test.py
def save(self, path: str | UPath) -> None:
    """Not supported for TaskPredictorTest.

    Args:
        path: Ignored — not used.

    Raises:
        NotImplementedError: Always. The study directory is the
            persistent artifact for test predictors.
    """
    raise NotImplementedError(
        "TaskPredictorTest does not support save(). "
        "The study directory is the persistent artifact. "
        "Use TaskPredictor for standalone deployment."
    )