"""Functionality for managing DOE campaigns. Main point of interaction via Python."""
from __future__ import annotations
import gc
import json
from collections.abc import Callable, Collection
from functools import reduce
from typing import TYPE_CHECKING, Any
import cattrs
import numpy as np
import pandas as pd
from attrs import Attribute, Factory, define, evolve, field, fields
from attrs.converters import optional
from attrs.validators import instance_of
from typing_extensions import override
from baybe.constraints.base import DiscreteConstraint
from baybe.exceptions import IncompatibilityError, NotEnoughPointsLeftError
from baybe.objectives.base import Objective, to_objective
from baybe.parameters.base import Parameter
from baybe.recommenders.base import RecommenderProtocol
from baybe.recommenders.meta.base import MetaRecommender
from baybe.recommenders.meta.sequential import TwoPhaseMetaRecommender
from baybe.recommenders.pure.bayesian.base import BayesianRecommender
from baybe.recommenders.pure.nonpredictive.base import NonPredictiveRecommender
from baybe.searchspace._filtered import FilteredSubspaceDiscrete
from baybe.searchspace.core import (
SearchSpace,
SearchSpaceType,
to_searchspace,
validate_searchspace_from_config,
)
from baybe.serialization import SerialMixin, converter
from baybe.surrogates.base import SurrogateProtocol
from baybe.targets.base import Target
from baybe.telemetry import (
TELEM_LABELS,
telemetry_record_recommended_measurement_percentage,
telemetry_record_value,
)
from baybe.utils.basic import UNSPECIFIED, UnspecifiedType, is_all_instance
from baybe.utils.boolean import eq_dataframe
from baybe.utils.dataframe import filter_df, fuzzy_row_match
from baybe.utils.plotting import to_string
if TYPE_CHECKING:
from botorch.posteriors import Posterior
# Metadata columns
_RECOMMENDED = "recommended"
_MEASURED = "measured"
_EXCLUDED = "excluded"
_METADATA_COLUMNS = [_RECOMMENDED, _MEASURED, _EXCLUDED]
def _make_allow_flag_default_factory(
default: bool,
) -> Callable[[Campaign], bool | UnspecifiedType]:
"""Make a default factory for allow_* flags."""
def default_allow_flag(campaign: Campaign) -> bool | UnspecifiedType:
"""Attrs-compatible default factory for allow_* flags."""
if campaign.searchspace.type is SearchSpaceType.DISCRETE:
return default
return UNSPECIFIED
return default_allow_flag
def _validate_allow_flag(campaign: Campaign, attribute: Attribute, value: Any) -> None:
"""Attrs-compatible validator for context-aware validation of allow_* flags."""
match campaign.searchspace.type:
case SearchSpaceType.DISCRETE:
if not isinstance(value, bool):
raise ValueError(
f"For search spaces of '{SearchSpaceType.DISCRETE}', "
f"'{attribute.name}' must be a Boolean."
)
case _:
if value is not UNSPECIFIED:
raise ValueError(
f"For search spaces of type other than "
f"'{SearchSpaceType.DISCRETE}', '{attribute.name}' cannot be set "
f"since the flag is meaningless in such contexts.",
)
[docs]
@define
class Campaign(SerialMixin):
"""Main class for interaction with BayBE.
Campaigns define and record an experimentation process, i.e. the execution of a
series of measurements and the iterative sequence of events involved.
In particular, a campaign:
* Defines the objective of an experimentation process.
* Defines the search space over which the experimental parameter may vary.
* Defines a recommender for exploring the search space.
* Records the measurement data collected during the process.
* Records metadata about the progress of the experimentation process.
"""
# DOE specifications
searchspace: SearchSpace = field(converter=to_searchspace)
"""The search space in which the experiments are conducted.
When passing a :class:`baybe.parameters.base.Parameter`,
a :class:`baybe.searchspace.discrete.SubspaceDiscrete`, or a
a :class:`baybe.searchspace.continuous.SubspaceContinuous`, conversion to
:class:`baybe.searchspace.core.SearchSpace` is automatically applied."""
objective: Objective | None = field(default=None, converter=optional(to_objective))
"""The optimization objective.
When passing a :class:`baybe.targets.base.Target`, conversion to
:class:`baybe.objectives.single.SingleTargetObjective` is automatically applied."""
recommender: RecommenderProtocol = field(
factory=TwoPhaseMetaRecommender,
validator=instance_of(RecommenderProtocol),
)
"""The employed recommender"""
allow_recommending_already_measured: bool | UnspecifiedType = field(
default=Factory(
_make_allow_flag_default_factory(default=True), takes_self=True
),
validator=_validate_allow_flag,
kw_only=True,
)
"""Allow to recommend experiments that were already measured earlier.
Can only be set for discrete search spaces."""
allow_recommending_already_recommended: bool | UnspecifiedType = field(
default=Factory(
_make_allow_flag_default_factory(default=False), takes_self=True
),
validator=_validate_allow_flag,
kw_only=True,
)
"""Allow to recommend experiments that were already recommended earlier.
Can only be set for discrete search spaces."""
allow_recommending_pending_experiments: bool | UnspecifiedType = field(
default=Factory(
_make_allow_flag_default_factory(default=False), takes_self=True
),
validator=_validate_allow_flag,
kw_only=True,
)
"""Allow pending experiments to be part of the recommendations.
Can only be set for discrete search spaces."""
# Metadata
_searchspace_metadata: pd.DataFrame = field(init=False, eq=eq_dataframe)
"""Metadata tracking the experimentation status of the search space."""
n_batches_done: int = field(default=0, init=False)
"""The number of already processed batches."""
n_fits_done: int = field(default=0, init=False)
"""The number of fits already done."""
# Private
_measurements_exp: pd.DataFrame = field(
factory=pd.DataFrame, eq=eq_dataframe, init=False
)
"""The experimental representation of the conducted experiments."""
_cached_recommendation: pd.DataFrame = field(
factory=pd.DataFrame, eq=eq_dataframe, init=False
)
"""The cached recommendations."""
@_searchspace_metadata.default
def _default_searchspace_metadata(self) -> pd.DataFrame:
"""Create a fresh metadata object."""
df = pd.DataFrame(
False,
index=self.searchspace.discrete.exp_rep.index,
columns=_METADATA_COLUMNS,
)
df.loc[:, _EXCLUDED] = self.searchspace.discrete._excluded
return df
@override
def __str__(self) -> str:
recommended_count = sum(self._searchspace_metadata[_RECOMMENDED])
measured_count = sum(self._searchspace_metadata[_MEASURED])
excluded_count = sum(self._searchspace_metadata[_EXCLUDED])
n_elements = len(self._searchspace_metadata)
searchspace_fields = [
to_string(
"Recommended:",
f"{recommended_count}/{n_elements}",
single_line=True,
),
to_string(
"Measured:",
f"{measured_count}/{n_elements}",
single_line=True,
),
to_string(
"Excluded:",
f"{excluded_count}/{n_elements}",
single_line=True,
),
]
metadata_fields = [
to_string("Batches done", self.n_batches_done, single_line=True),
to_string("Fits done", self.n_fits_done, single_line=True),
to_string("Discrete Subspace Meta Data", *searchspace_fields),
]
metadata = to_string("Meta Data", *metadata_fields)
fields = [metadata, self.searchspace, self.objective, self.recommender]
return to_string(self.__class__.__name__, *fields)
@property
def measurements(self) -> pd.DataFrame:
"""The experimental data added to the Campaign."""
return self._measurements_exp
@property
def parameters(self) -> tuple[Parameter, ...]:
"""The parameters of the underlying search space."""
return self.searchspace.parameters
@property
def targets(self) -> tuple[Target, ...]:
"""The targets of the underlying objective."""
return self.objective.targets if self.objective is not None else ()
[docs]
@classmethod
def from_config(cls, config_json: str) -> Campaign:
"""Create a campaign from a configuration JSON.
Args:
config_json: The string with the configuration JSON.
Returns:
The constructed campaign.
"""
config = json.loads(config_json)
return converter.structure(config, Campaign)
[docs]
@classmethod
def validate_config(cls, config_json: str) -> None:
"""Validate a given campaign configuration JSON.
Args:
config_json: The JSON that should be validated.
"""
config = json.loads(config_json)
_validation_converter.structure(config, Campaign)
[docs]
def add_measurements(
self,
data: pd.DataFrame,
numerical_measurements_must_be_within_tolerance: bool = True,
) -> None:
"""Add results from a dataframe to the internal database.
Each addition of data is considered a new batch. Added results are checked for
validity. Categorical values need to have an exact match. For numerical values,
a campaign flag determines if values that lie outside a specified tolerance
are accepted.
Note that this modifies the provided data in-place.
Args:
data: The data to be added (with filled values for targets). Preferably
created via :func:`baybe.campaign.Campaign.recommend`.
numerical_measurements_must_be_within_tolerance: Flag indicating if
numerical parameters need to be within their tolerances.
Raises:
ValueError: If one of the targets has missing values or NaNs in the provided
dataframe.
TypeError: If the target has non-numeric entries in the provided dataframe.
"""
# Invalidate recommendation cache first (in case of uncaught exceptions below)
self._cached_recommendation = pd.DataFrame()
# Check if all targets have valid values
for target in self.targets:
if data[target.name].isna().any():
raise ValueError(
f"The target '{target.name}' has missing values or NaNs in the "
f"provided dataframe. Missing target values are not supported."
)
if data[target.name].dtype.kind not in "iufb":
raise TypeError(
f"The target '{target.name}' has non-numeric entries in the "
f"provided dataframe. Non-numeric target values are not supported."
)
# Check if all targets have valid values
for param in self.parameters:
if data[param.name].isna().any():
raise ValueError(
f"The parameter '{param.name}' has missing values or NaNs in the "
f"provided dataframe. Missing parameter values are not supported."
)
if param.is_numerical and (data[param.name].dtype.kind not in "iufb"):
raise TypeError(
f"The numerical parameter '{param.name}' has non-numeric entries in"
f" the provided dataframe."
)
# Read in measurements and add them to the database
self.n_batches_done += 1
to_insert = data.copy()
to_insert["BatchNr"] = self.n_batches_done
to_insert["FitNr"] = np.nan
self._measurements_exp = pd.concat(
[self._measurements_exp, to_insert], axis=0, ignore_index=True
)
# Update metadata
if self.searchspace.type in (SearchSpaceType.DISCRETE, SearchSpaceType.HYBRID):
idxs_matched = fuzzy_row_match(
self.searchspace.discrete.exp_rep,
data,
self.parameters,
numerical_measurements_must_be_within_tolerance,
)
self._searchspace_metadata.loc[idxs_matched, _MEASURED] = True
# Telemetry
telemetry_record_value(TELEM_LABELS["COUNT_ADD_RESULTS"], 1)
telemetry_record_recommended_measurement_percentage(
self._cached_recommendation,
data,
self.parameters,
numerical_measurements_must_be_within_tolerance,
)
[docs]
def toggle_discrete_candidates( # noqa: DOC501
self,
constraints: Collection[DiscreteConstraint] | pd.DataFrame,
exclude: bool,
complement: bool = False,
dry_run: bool = False,
) -> pd.DataFrame:
"""In-/exclude certain discrete points in/from the candidate set.
Args:
constraints: A filtering mechanism determining the candidates subset to be
in-/excluded. Can be either a collection of
:class:`~baybe.constraints.base.DiscreteConstraint` or a dataframe.
For the latter, see :func:`~baybe.utils.dataframe.filter_df`
for details.
exclude: If ``True``, the specified candidates are excluded.
If ``False``, the candidates are considered for recommendation.
complement: If ``True``, the filtering mechanism is inverted so that
the complement of the candidate subset specified by the filter is
toggled. For details, see :func:`~baybe.utils.dataframe.filter_df`.
dry_run: If ``True``, the target subset is only extracted but not
affected. If ``False``, the candidate set is updated correspondingly.
Useful for setting up the correct filtering mechanism.
Returns:
A new dataframe containing the discrete candidate set passing through the
specified filter.
"""
# Clear cache
self._cached_recommendation = pd.DataFrame()
df = self.searchspace.discrete.exp_rep
if isinstance(constraints, pd.DataFrame):
# Determine the candidate subset to be toggled
points = filter_df(df, constraints, complement)
elif isinstance(constraints, Collection) and is_all_instance(
constraints, DiscreteConstraint
):
# TODO: Should be taken over by upcoming `SubspaceDiscrete.filter` method,
# automatically choosing the appropriate backend (polars/pandas/...)
# Filter the search space dataframe according to the given constraint
idx = reduce(
lambda x, y: x.intersection(y), (c.get_valid(df) for c in constraints)
)
# Determine the candidate subset to be toggled
points = df.drop(index=idx) if complement else df.loc[idx].copy()
else:
raise TypeError(
"Candidate toggling is not implemented for the given type of "
"constraint specifications."
)
if not dry_run:
self._searchspace_metadata.loc[points.index, _EXCLUDED] = exclude
return points
[docs]
def recommend(
self,
batch_size: int,
pending_experiments: pd.DataFrame | None = None,
) -> pd.DataFrame:
"""Provide the recommendations for the next batch of experiments.
Args:
batch_size: Number of requested recommendations.
pending_experiments: Parameter configurations specifying experiments
that are currently pending.
Returns:
Dataframe containing the recommendations in experimental representation.
Raises:
ValueError: If ``batch_size`` is smaller than 1.
"""
if batch_size < 1:
raise ValueError(
f"You must at least request one recommendation per batch, but provided "
f"{batch_size=}."
)
# Invalidate cached recommendation if pending experiments are provided
if (pending_experiments is not None) and (len(pending_experiments) > 0):
self._cached_recommendation = pd.DataFrame()
# If there are cached recommendations and the batch size of those is equal to
# the previously requested one, we just return those
if len(self._cached_recommendation) == batch_size:
return self._cached_recommendation
# Update recommendation meta data
if len(self._measurements_exp) > 0:
self.n_fits_done += 1
self._measurements_exp.fillna({"FitNr": self.n_fits_done}, inplace=True)
# Prepare the search space according to the current campaign state
if self.searchspace.type is SearchSpaceType.DISCRETE:
# TODO: This implementation should at some point be hidden behind an
# appropriate public interface, like `SubspaceDiscrete.filter()`
mask_todrop = self._searchspace_metadata[_EXCLUDED].copy()
if not self.allow_recommending_already_recommended:
mask_todrop |= self._searchspace_metadata[_RECOMMENDED]
if not self.allow_recommending_already_measured:
mask_todrop |= self._searchspace_metadata[_MEASURED]
if (
not self.allow_recommending_pending_experiments
and pending_experiments is not None
):
mask_todrop |= pd.merge(
self.searchspace.discrete.exp_rep,
pending_experiments,
indicator=True,
how="left",
)["_merge"].eq("both")
searchspace = evolve(
self.searchspace,
discrete=FilteredSubspaceDiscrete.from_subspace(
self.searchspace.discrete, ~mask_todrop.to_numpy()
),
)
else:
searchspace = self.searchspace
# Pending experiments should not be passed to non-predictive recommenders
# to avoid complaints about unused arguments, so we need to know of what
# type the next recommender will be
recommender = self.recommender
if isinstance(recommender, MetaRecommender):
recommender = recommender.get_non_meta_recommender(
batch_size,
searchspace,
self.objective,
self._measurements_exp,
pending_experiments,
)
is_nonpredictive = isinstance(recommender, NonPredictiveRecommender)
# Get the recommended search space entries
try:
# NOTE: The `recommend` call must happen on `self.recommender` to update
# potential inner states in case of meta recommenders!
rec = self.recommender.recommend(
batch_size,
searchspace,
self.objective,
self._measurements_exp,
None if is_nonpredictive else pending_experiments,
)
except NotEnoughPointsLeftError as ex:
# Aliases for code compactness
f = fields(Campaign)
ok_m = self.allow_recommending_already_measured
ok_r = self.allow_recommending_already_recommended
ok_p = self.allow_recommending_pending_experiments
ok_m_name = f.allow_recommending_already_measured.name
ok_r_name = f.allow_recommending_already_recommended.name
ok_p_name = f.allow_recommending_pending_experiments.name
no_blocked_pending_points = ok_p or (pending_experiments is None)
# If there are no candidate restrictions to be relaxed
if ok_m and ok_r and no_blocked_pending_points:
raise ex
# Otherwise, extract possible relaxations
solution = [
f"'{name}=True'"
for name, value in [
(ok_m_name, ok_m),
(ok_r_name, ok_r),
(ok_p_name, no_blocked_pending_points),
]
if not value
]
message = solution[0] if len(solution) == 1 else " and/or ".join(solution)
raise NotEnoughPointsLeftError(
f"{str(ex)} Consider setting {message}."
) from ex
# Cache the recommendations
self._cached_recommendation = rec.copy()
# Update metadata
if self.searchspace.type in (SearchSpaceType.DISCRETE, SearchSpaceType.HYBRID):
self._searchspace_metadata.loc[rec.index, _RECOMMENDED] = True
# Telemetry
telemetry_record_value(TELEM_LABELS["COUNT_RECOMMEND"], 1)
telemetry_record_value(TELEM_LABELS["BATCH_SIZE"], batch_size)
return rec
[docs]
def posterior(self, candidates: pd.DataFrame) -> Posterior:
"""Get the posterior predictive distribution for the given candidates.
Args:
candidates: The candidate points in experimental recommendations.
For details, see :meth:`baybe.surrogates.base.Surrogate.posterior`.
Raises:
IncompatibilityError: If the underlying surrogate model exposes no
method for computing the posterior distribution.
Returns:
Posterior: The corresponding posterior object.
For details, see :meth:`baybe.surrogates.base.Surrogate.posterior`.
"""
surrogate = self.get_surrogate()
if not hasattr(surrogate, method_name := "posterior"):
raise IncompatibilityError(
f"The used surrogate type '{surrogate.__class__.__name__}' does not "
f"provide a '{method_name}' method."
)
import torch
with torch.no_grad():
return surrogate.posterior(candidates)
[docs]
def get_surrogate(
self,
batch_size: int | None = None,
pending_experiments: pd.DataFrame | None = None,
) -> SurrogateProtocol:
"""Get the current surrogate model.
Args:
batch_size: See :meth:`recommend`.
Only required when using meta recommenders that demand it.
pending_experiments: See :meth:`recommend`.
Only required when using meta recommenders that demand it.
Raises:
RuntimeError: If the current recommender does not provide a surrogate model.
Returns:
Surrogate: The surrogate of the current recommender.
Note:
Currently, this method always returns the surrogate model with respect to
the transformed target(s) / objective. This means that if you are using a
``SingleTargetObjective`` with a transformed target or a
``DesirabilityObjective``, the model's output will correspond to the
transformed quantities and not the original untransformed target(s).
"""
if self.objective is None:
raise IncompatibilityError(
f"No surrogate is available since no '{Objective.__name__}' is defined."
)
recommender: RecommenderProtocol
if isinstance(self.recommender, MetaRecommender):
recommender = self.recommender.get_non_meta_recommender(
batch_size,
self.searchspace,
self.objective,
self.measurements,
pending_experiments,
)
else:
recommender = self.recommender
if isinstance(recommender, BayesianRecommender):
return recommender.get_surrogate(
self.searchspace, self.objective, self.measurements
)
else:
raise RuntimeError(
f"The current recommender is of type "
f"'{recommender.__class__.__name__}', which does not provide "
f"a surrogate model. Surrogate models are only available for "
f"recommender subclasses of '{BayesianRecommender.__name__}'."
)
def _add_version(dict_: dict) -> dict:
"""Add the package version to the given dictionary."""
from baybe import __version__
return {**dict_, "version": __version__}
def _drop_version(dict_: dict) -> dict:
"""Drop the package version from the given dictionary."""
dict_.pop("version", None)
return dict_
# Register (un-)structure hooks
unstructure_hook = cattrs.gen.make_dict_unstructure_fn(
Campaign, converter, _cattrs_include_init_false=True
)
structure_hook = cattrs.gen.make_dict_structure_fn(
Campaign, converter, _cattrs_include_init_false=True, _cattrs_forbid_extra_keys=True
)
converter.register_unstructure_hook(
Campaign, lambda x: _add_version(unstructure_hook(x))
)
converter.register_structure_hook(
Campaign, lambda d, cl: structure_hook(_drop_version(d), cl)
)
# Converter for config validation
_validation_converter = converter.copy()
_validation_converter.register_structure_hook(
SearchSpace, validate_searchspace_from_config
)
# Collect leftover original slotted classes processed by `attrs.define`
gc.collect()