Source code for baybe.recommenders.meta.sequential
"""Meta recommenders that switch recommenders based on the experimentation progress."""
# TODO After bayesian recommenders are enabled with no training data, a refactoring of
#  this file will resolve type errors
# mypy: disable-error-code="arg-type"
from collections.abc import Iterable, Iterator
from typing import Literal
import pandas as pd
from attrs import define, field
from attrs.validators import deep_iterable, in_, instance_of
from baybe.exceptions import NoRecommendersLeftError
from baybe.objectives.base import Objective
from baybe.recommenders.meta.base import MetaRecommender
from baybe.recommenders.pure.base import PureRecommender
from baybe.recommenders.pure.bayesian.botorch import BotorchRecommender
from baybe.recommenders.pure.nonpredictive.sampling import RandomRecommender
from baybe.searchspace import SearchSpace
from baybe.serialization import (
    block_deserialization_hook,
    block_serialization_hook,
    converter,
)
from baybe.utils.plotting import to_string
[docs]
@define
class TwoPhaseMetaRecommender(MetaRecommender):
    """A two-phased meta recommender that switches at a certain specified point.
    The recommender is switched when a new (batch) recommendation is requested and
    the training data set size (i.e., the total number of collected measurements
    including those gathered before the meta recommender was active) is equal to or
    greater than the number specified via the ``switch_after`` parameter.
    Note:
        Throughout each phase, the meta recommender reuses the **same** recommender
        object, that is, no new instances are created. Therefore, special attention is
        required when using the meta recommender with stateful recommenders.
    """
    initial_recommender: PureRecommender = field(factory=RandomRecommender)
    """The initial recommender used by the meta recommender."""
    recommender: PureRecommender = field(factory=BotorchRecommender)
    """The recommender used by the meta recommender after the switch."""
    switch_after: int = field(default=1)
    """The number of experiments after which the recommender is switched for the next
    requested batch."""
[docs]
    def select_recommender(  # noqa: D102
        self,
        batch_size: int,
        searchspace: SearchSpace | None = None,
        objective: Objective | None = None,
        measurements: pd.DataFrame | None = None,
        pending_experiments: pd.DataFrame | None = None,
    ) -> PureRecommender:
        # See base class.
        return (
            self.recommender
            if (measurements is not None) and (len(measurements) >= self.switch_after)
            else self.initial_recommender
        )
    def __str__(self) -> str:
        fields = [
            to_string("Initial recommender", self.initial_recommender),
            to_string("Recommender", self.recommender),
            to_string("Switch after", self.switch_after, single_line=True),
        ]
        return to_string(self.__class__.__name__, *fields)
[docs]
@define
class SequentialMetaRecommender(MetaRecommender):
    """A meta recommender that uses a pre-defined sequence of recommenders.
    A new recommender is taken from the sequence whenever at least one new measurement
    is available, until all recommenders are exhausted. More precisely, a recommender
    change is triggered whenever the size of the training dataset increases; the
    actual content of the dataset is ignored.
    Note:
        The provided sequence of recommenders will be internally pre-collected into a
        list. If this is not acceptable, consider using
        :class:`baybe.recommenders.meta.sequential.StreamingSequentialMetaRecommender`
        instead.
    Raises:
        RuntimeError: If the training dataset size decreased compared to the previous
            call.
        NoRecommendersLeftError: If more recommenders are requested than there are
            recommenders available and ``mode="raise"``.
    """
    # Exposed
    recommenders: list[PureRecommender] = field(
        converter=list, validator=deep_iterable(instance_of(PureRecommender))
    )
    """A finite-length sequence of recommenders to be used. For infinite-length
    iterables, see
    :class:`baybe.recommenders.meta.sequential.StreamingSequentialMetaRecommender`."""
    mode: Literal["raise", "reuse_last", "cyclic"] = field(
        default="raise",
        validator=in_(("raise", "reuse_last", "cyclic")),
    )
    """Defines what shall happen when the last recommender in the sequence has been
    consumed but additional recommender changes are triggered:
        * ``"raise"``: An error is raised.
        * ``"reuse_last"``: The last recommender in the sequence is used indefinitely.
        * ``"cycle"``: The selection restarts from the beginning of the sequence.
    """
    # Private
    # TODO: These should **not** be exposed via the constructor but the workaround
    #   is currently needed for correct (de-)serialization. A proper approach would be
    #   to not set them via the constructor but through a custom hook in combination
    #   with `_cattrs_include_init_false=True`. However, the way
    #   `get_base_structure_hook` is currently designed prevents such a hook from
    #   taking action.
    _step: int = field(default=-1, alias="_step")
    """Counts how often the recommender has already been switched."""
    _n_last_measurements: int = field(default=-1, alias="_n_last_measurements")
    """The number of measurements that were available at the last call."""
[docs]
    def select_recommender(  # noqa: D102
        self,
        batch_size: int,
        searchspace: SearchSpace | None = None,
        objective: Objective | None = None,
        measurements: pd.DataFrame | None = None,
        pending_experiments: pd.DataFrame | None = None,
    ) -> PureRecommender:
        # See base class.
        n_data = len(measurements) if measurements is not None else 0
        # If the training dataset size has increased, move to the next recommender
        if n_data > self._n_last_measurements:
            self._step += 1
        # If the training dataset size has decreased, something went wrong
        elif n_data < self._n_last_measurements:
            raise RuntimeError(
                f"The training dataset size decreased from {self._n_last_measurements} "
                f"to {n_data} since the last function call, which indicates that "
                f"'{self.__class__.__name__}' was not used as intended."
            )
        # Get the right index for the "next" recommender
        idx = self._step
        if self.mode == "reuse_last":
            idx = min(idx, len(self.recommenders) - 1)
        elif self.mode == "cyclic":
            idx %= len(self.recommenders)
        # Get the recommender
        try:
            recommender = self.recommenders[idx]
        except IndexError as ex:
            raise NoRecommendersLeftError(
                f"A total of {self._step+1} recommender(s) was/were requested but the "
                f"provided sequence contains only {self._step} element(s)."
            ) from ex
        # Remember the training dataset size for the next call
        self._n_last_measurements = n_data
        return recommender
    def __str__(self) -> str:
        fields = [
            to_string("Recommenders", self.recommenders),
            to_string("Mode", self.mode, single_line=True),
        ]
        return to_string(self.__class__.__name__, *fields)
[docs]
@define
class StreamingSequentialMetaRecommender(MetaRecommender):
    """A meta recommender that switches between recommenders from an iterable.
    Similar to :class:`baybe.recommenders.meta.sequential.SequentialMetaRecommender`
    but without explicit list conversion. Consequently, it supports arbitrary
    iterables, possibly of infinite length. The downside is that serialization is not
    supported.
    Raises:
        NoRecommendersLeftError: If more recommenders are requested than there are
            recommenders available.
    """
    # Exposed
    recommenders: Iterable[PureRecommender] = field()
    """An iterable providing the recommenders to be used."""
    # Private
    # TODO: See :class:`baybe.recommenders.meta.sequential.SequentialMetaRecommender`
    _step: int = field(init=False, default=-1)
    """Counts how often the recommender has already been switched."""
    _n_last_measurements: int = field(init=False, default=-1)
    """The number of measurements that were available at the last call."""
    _iterator: Iterator = field(init=False)
    """The iterator used to traverse the recommenders."""
    _last_recommender: PureRecommender | None = field(init=False, default=None)
    """The recommender returned from the last call."""
[docs]
    @_iterator.default
    def default_iterator(self):
        """Initialize the recommender iterator."""
        return iter(self.recommenders)
[docs]
    def select_recommender(  # noqa: D102
        self,
        batch_size: int,
        searchspace: SearchSpace | None = None,
        objective: Objective | None = None,
        measurements: pd.DataFrame | None = None,
        pending_experiments: pd.DataFrame | None = None,
    ) -> PureRecommender:
        # See base class.
        use_last = True
        n_data = len(measurements) if measurements is not None else 0
        # If the training dataset size has increased, move to the next recommender
        if n_data > self._n_last_measurements:
            self._step += 1
            use_last = False
        # If the training dataset size has decreased, something went wrong
        elif n_data < self._n_last_measurements:
            raise RuntimeError(
                f"The training dataset size decreased from {self._n_last_measurements} "
                f"to {n_data} since the last function call, which indicates that "
                f"'{self.__class__.__name__}' was not used as intended."
            )
        # Get the recommender
        try:
            if not use_last:
                self._last_recommender = next(self._iterator)
        except StopIteration as ex:
            raise NoRecommendersLeftError(
                f"A total of {self._step+1} recommender(s) was/were requested but the "
                f"provided iterator provided only {self._step} element(s)."
            ) from ex
        # Remember the training dataset size for the next call
        self._n_last_measurements = n_data
        return self._last_recommender  # type: ignore[return-value]
    def __str__(self) -> str:
        fields = [
            to_string("Recommenders", self.recommenders),
        ]
        return to_string(self.__class__.__name__, *fields)
# The recommender iterable cannot be serialized
converter.register_unstructure_hook(
    StreamingSequentialMetaRecommender, block_serialization_hook
)
converter.register_structure_hook(
    StreamingSequentialMetaRecommender, block_deserialization_hook
)