Source code for baybe.recommenders.meta.sequential
"""Meta recommenders that switch recommenders based on the experimentation progress."""
# TODO After bayesian recommenders are enabled with no training data, a refactoring of
# this file will resolve type errors
# mypy: disable-error-code="arg-type"
import gc
from abc import abstractmethod
from collections.abc import Iterable, Iterator
from typing import Literal
import pandas as pd
from attrs import Factory, define, field, fields
from attrs.validators import deep_iterable, ge, in_, instance_of
from typing_extensions import override
from baybe.exceptions import NoRecommendersLeftError
from baybe.objectives.base import Objective
from baybe.recommenders.base import RecommenderProtocol
from baybe.recommenders.meta.base import MetaRecommender
from baybe.recommenders.pure.bayesian.botorch import BotorchRecommender
from baybe.recommenders.pure.nonpredictive.sampling import RandomRecommender
from baybe.searchspace import SearchSpace
from baybe.serialization import (
block_deserialization_hook,
block_serialization_hook,
converter,
)
from baybe.utils.conversion import to_string
[docs]
@define
class TwoPhaseMetaRecommender(MetaRecommender):
"""A two-phased meta recommender that switches at a certain specified point.
The recommender is switched when a new (batch) recommendation is requested and
the training data set size (i.e., the total number of collected measurements
including those gathered before the meta recommender was active) is equal to or
greater than the number specified via the ``switch_after`` parameter.
Note:
Throughout each phase, the meta recommender reuses the **same** recommender
object, that is, no new instances are created. Therefore, special attention is
required when using the meta recommender with stateful recommenders.
"""
initial_recommender: RecommenderProtocol = field(
factory=RandomRecommender, validator=instance_of(RecommenderProtocol)
)
"""The initial recommender used by the meta recommender."""
recommender: RecommenderProtocol = field(
factory=BotorchRecommender, validator=instance_of(RecommenderProtocol)
)
"""The recommender used by the meta recommender after the switch."""
switch_after: int = field(default=1, validator=[instance_of(int), ge(1)])
"""The number of experiments required for the recommender to switch."""
remain_switched: bool = field(default=False, validator=instance_of(bool))
"""Determines if the recommender should remain switched even if the number of
experiments falls below the threshold value in subsequent calls."""
# TODO: These should **not** be exposed via the constructor but the workaround
# is currently needed for correct (de-)serialization. A proper approach would be
# to not set them via the constructor but through a custom hook in combination
# with `_cattrs_include_init_false=True`. However, the way
# `get_base_structure_hook` is currently designed prevents such a hook from
# taking action.
_has_switched: bool = field(default=False, alias="_has_switched")
"""Indicates if the switch has already occurred."""
@override
@property
def is_stateful(self) -> bool:
return self.remain_switched
[docs]
@override
def select_recommender(
self,
batch_size: int | None = None,
searchspace: SearchSpace | None = None,
objective: Objective | None = None,
measurements: pd.DataFrame | None = None,
pending_experiments: pd.DataFrame | None = None,
) -> RecommenderProtocol:
n_data = len(measurements) if measurements is not None else 0
if (n_data >= self.switch_after) or (
self._has_switched and self.remain_switched
):
self._has_switched = True
return self.recommender
return self.initial_recommender
@override
def __str__(self) -> str:
fields = [
to_string("Initial recommender", self.initial_recommender),
to_string("Recommender", self.recommender),
to_string("Switch after", self.switch_after, single_line=True),
to_string("Remain switched", self.remain_switched, single_line=True),
to_string("Has switched", self._has_switched, single_line=True),
]
return to_string(self.__class__.__name__, *fields)
[docs]
@define
class BaseSequentialMetaRecommender(MetaRecommender):
"""Base class for sequential meta recommenders."""
# TODO: These should **not** be exposed via the constructor but the workaround
# is currently needed for correct (de-)serialization. A proper approach would be
# to not set them via the constructor but through a custom hook in combination
# with `_cattrs_include_init_false=True`. However, the way
# `get_base_structure_hook` is currently designed prevents such a hook from
# taking action.
_step: int = field(default=0, alias="_step", kw_only=True)
"""An index pointing to the current position in the recommender sequence."""
_was_used: bool = field(default=False, alias="_was_used", kw_only=True)
"""Boolean flag indicating if the current recommender has been used."""
_n_last_measurements: int = field(
default=0, alias="_n_last_measurements", kw_only=True
)
"""The number of measurements available at the last successful recommend call."""
@override
@property
def is_stateful(self) -> bool:
return True
@abstractmethod
def _get_recommender_at_current_step(self) -> RecommenderProtocol:
"""Get the recommender at the current sequence position."""
[docs]
@override
def select_recommender(
self,
batch_size: int | None = None,
searchspace: SearchSpace | None = None,
objective: Objective | None = None,
measurements: pd.DataFrame | None = None,
pending_experiments: pd.DataFrame | None = None,
) -> RecommenderProtocol:
# If the training dataset size has decreased, something went wrong
if (
n_data := len(measurements) if measurements is not None else 0
) < self._n_last_measurements:
raise RuntimeError(
f"The training dataset size decreased from {self._n_last_measurements} "
f"to {n_data} since the last function call, which indicates that "
f"'{self.__class__.__name__}' was not used as intended."
)
# Check conditions if the next recommender in sequence is required
more_data = n_data > self._n_last_measurements
if (not self._was_used) or (not more_data):
return self._get_recommender_at_current_step()
# Move on to the next recommender
self._step += 1
self._was_used = False
return self._get_recommender_at_current_step()
[docs]
@override
def recommend(
self,
batch_size: int,
searchspace: SearchSpace,
objective: Objective | None = None,
measurements: pd.DataFrame | None = None,
pending_experiments: pd.DataFrame | None = None,
) -> pd.DataFrame:
recommendation = super().recommend(
batch_size, searchspace, objective, measurements, pending_experiments
)
self._was_used = True
# Remember the training dataset size for the next call
self._n_last_measurements = len(measurements) if measurements is not None else 0
return recommendation
[docs]
@define
class SequentialMetaRecommender(BaseSequentialMetaRecommender):
"""A meta recommender that uses a pre-defined sequence of recommenders.
A new recommender is taken from the sequence whenever at least one new measurement
is available, until all recommenders are exhausted. More precisely, a recommender
change is triggered whenever the size of the training dataset increases; the
actual content of the dataset is ignored.
Note:
The provided sequence of recommenders will be internally pre-collected into a
list. If this is not acceptable, consider using
:class:`baybe.recommenders.meta.sequential.StreamingSequentialMetaRecommender`
instead.
Raises:
RuntimeError: If the training dataset size decreased compared to the previous
call.
NoRecommendersLeftError: If more recommenders are requested than there are
recommenders available and ``mode="raise"``.
"""
recommenders: list[RecommenderProtocol] = field(
converter=list, validator=deep_iterable(instance_of(RecommenderProtocol))
)
"""A finite-length sequence of recommenders to be used. For infinite-length
iterables, see
:class:`baybe.recommenders.meta.sequential.StreamingSequentialMetaRecommender`."""
mode: Literal["raise", "reuse_last", "cyclic"] = field(
default="raise",
validator=in_(("raise", "reuse_last", "cyclic")),
)
"""Defines what shall happen when the last recommender in the sequence has been
consumed but additional recommender changes are triggered:
* ``"raise"``: An error is raised.
* ``"reuse_last"``: The last recommender in the sequence is used indefinitely.
* ``"cycle"``: The selection restarts from the beginning of the sequence.
"""
@override
def _get_recommender_at_current_step(self) -> RecommenderProtocol:
idx = self._step
if self.mode == "reuse_last":
idx = min(idx, len(self.recommenders) - 1)
elif self.mode == "cyclic":
idx %= len(self.recommenders)
try:
return self.recommenders[idx]
except IndexError as ex:
raise NoRecommendersLeftError(
f"A total of {self._step + 1} recommender(s) was/were requested but "
f"the provided sequence contains only {self._step} element(s). "
f"Add more recommenders or adjust the "
f"'{fields(SequentialMetaRecommender).mode.name}' attribute."
) from ex
@override
def __str__(self) -> str:
fields = [
to_string("Recommenders", self.recommenders),
to_string("Mode", self.mode, single_line=True),
]
return to_string(self.__class__.__name__, *fields)
[docs]
@define
class StreamingSequentialMetaRecommender(BaseSequentialMetaRecommender):
"""A meta recommender that switches between recommenders from an iterable.
Similar to :class:`baybe.recommenders.meta.sequential.SequentialMetaRecommender`
but without explicit list conversion. This enables a number of advanced use cases:
* It supports arbitrary iterables, allowing to configure recommender sequences
of infinite length. This is useful when the total number of iterations unknown
in advance.
* It can be used to adaptively adjust the recommender sequence based on the
latest context available outside the class, by modifying the iterable on the fly.
The downside is that serialization is not supported.
Raises:
NoRecommendersLeftError: If more recommenders are requested than there are
recommenders available.
"""
recommenders: Iterable[RecommenderProtocol] = field()
"""An iterable providing the recommenders to be used."""
_iterator: Iterator = field(
init=False,
default=Factory(lambda self: iter(self.recommenders), takes_self=True),
)
"""The iterator used to traverse the recommenders."""
_last_recommender: RecommenderProtocol | None = field(init=False, default=None)
"""The recommender returned from the last call."""
_position_of_latest_recommender: int | None = field(init=False, default=None)
"""The position of the latest recommender fetched from the iterable."""
@override
def _get_recommender_at_current_step(self) -> RecommenderProtocol:
if self._step != self._position_of_latest_recommender:
try:
self._last_recommender = next(self._iterator)
except StopIteration as ex:
raise NoRecommendersLeftError(
f"A total of {self._step + 1} recommender(s) was/were requested "
f"but the provided iterator provided only {self._step} element(s). "
) from ex
self._position_of_latest_recommender = self._step
# By now, the first recommender has been fetched
assert self._last_recommender is not None
return self._last_recommender
@override
def __str__(self) -> str:
fields = [
to_string("Recommenders", self.recommenders),
]
return to_string(self.__class__.__name__, *fields)
# The recommender iterable cannot be serialized
converter.register_unstructure_hook(
StreamingSequentialMetaRecommender, block_serialization_hook
)
converter.register_structure_hook(
StreamingSequentialMetaRecommender, block_deserialization_hook
)
# Collect leftover original slotted classes processed by `attrs.define`
gc.collect()