"""Dataframe utilities."""
from __future__ import annotations
import functools
import warnings
from collections.abc import Callable, Collection, Iterable, Sequence
from typing import TYPE_CHECKING, Any, Literal, TypeVar, overload
import numpy as np
import pandas as pd
from baybe.exceptions import SearchSpaceMatchWarning
from baybe.targets.base import Target
from baybe.targets.binary import BinaryTarget
from baybe.targets.enum import TargetMode
from baybe.utils.numerical import DTypeFloatNumpy
if TYPE_CHECKING:
from torch import Tensor
from baybe.parameters.base import Parameter
from baybe.targets.base import Target
_T = TypeVar("_T", bound=Parameter | Target)
_ArrayLike = TypeVar("_ArrayLike", np.ndarray, Tensor)
@overload
def to_tensor(x: np.ndarray | pd.DataFrame, /) -> Tensor: ...
@overload
def to_tensor(*x: np.ndarray | pd.DataFrame) -> tuple[Tensor, ...]: ...
[docs]
def to_tensor(*x: np.ndarray | pd.DataFrame) -> Tensor | tuple[Tensor, ...]:
"""Convert numpy arrays and pandas dataframes to tensors.
Args:
*x: The array(s)/dataframe(s) to be converted.
Returns:
The provided array(s)/dataframe(s) represented as tensor(s).
"""
# FIXME This function seems to trigger a problem when some columns in either of
# the dfs have a dtype other than int or float (e.g. object, bool). This can
# weirdly happen, even if all values are numeric, e.g. when a target column is
# looked up from a df in simulation, it can have dtype object even if it's all
# floats. As a simple fix (this seems to be the most reasonable place to take
# care of this) df.values has been changed to df.values.astype(float),
# even though this seems like double casting here.
import torch
from baybe.utils.torch import DTypeFloatTorch
out = tuple(
torch.from_numpy(
(xi.values if isinstance(xi, pd.DataFrame) else xi).astype(DTypeFloatNumpy)
).to(DTypeFloatTorch)
for xi in x
)
if len(x) == 1:
out = out[0]
return out
[docs]
def add_fake_measurements(
data: pd.DataFrame,
targets: Iterable[Target],
good_reference_values: dict[str, list] | None = None,
good_intervals: dict[str, tuple[float, float]] | None = None,
bad_intervals: dict[str, tuple[float, float]] | None = None,
) -> pd.DataFrame:
"""Add fake measurements to a dataframe which was the result of a recommendation.
It is possible to specify "good" values, which will be given a better
target value. With this, the algorithm can be driven towards certain optimal values
whilst still being random. Useful for testing. Note that the dataframe is changed
in-place and also returned.
Args:
data: A dataframe containing parameter configurations in experimental
representation, for instance, created via
:func:`baybe.campaign.Campaign.recommend`.
targets: The targets for which fake results should be added to the dataframe.
good_reference_values: A dictionary containing parameter names (= dict keys) and
respective parameter values (= dict values) that specify what will be
considered good parameter settings. Conditions for different parameters are
connected via "and" logic, i.e. the targets will only get good values when
all parameters have good reference values.
good_intervals: A dictionary containing target names (= dict keys) and
respective "good" target value ranges (= dict values) in the form of
2-tuples. Each target will be assigned a random value in its respective
target range whenever the corresponding parameters meet the conditions
specified through ``good_reference_values``.
bad_intervals: Analogous to ``good_intervals`` but covering the cases where
the parameters lie outside the conditions specified through
``good_reference_values``.
Returns:
The modified dataframe.
Raises:
ValueError: If good values for a parameter were specified, but this parameter
is not part of the dataframe.
ValueError: If the target mode is unrecognized when trying to add fake values.
TypeError: If the entries in ``good_reference_values`` are not lists.
"""
# Per default, there are no reference values for good parameters
if good_reference_values is None:
good_reference_values = {}
# Validate input
for param, vals in good_reference_values.items():
if param not in data.columns:
raise ValueError(
f"When adding fake results you specified good "
f"values for the parameter '{param}' but this "
f"parameter is not in the dataframe."
)
if not isinstance(vals, list):
raise TypeError(
f"Entries in parameter 'good_reference_values' "
f"(which is a dictionary) must be lists, but you "
f"provided {vals}."
)
# Set defaults for good intervals
if good_intervals is None:
good_intervals = {}
for target in targets:
if isinstance(target, BinaryTarget):
continue
if target.mode is TargetMode.MAX:
lbound = target.bounds.lower if np.isfinite(target.bounds.lower) else 66
ubound = (
target.bounds.upper if np.isfinite(target.bounds.upper) else 100
)
interv = (lbound, ubound)
elif target.mode is TargetMode.MIN:
lbound = target.bounds.lower if np.isfinite(target.bounds.lower) else 0
ubound = target.bounds.upper if np.isfinite(target.bounds.upper) else 33
interv = (lbound, ubound)
elif target.mode is TargetMode.MATCH:
lbound = target.bounds.lower if np.isfinite(target.bounds.lower) else 0
ubound = (
target.bounds.upper if np.isfinite(target.bounds.upper) else 100
)
interv = (
lbound + 0.4 * (ubound - lbound),
lbound + 0.6 * (ubound - lbound),
)
else:
raise ValueError(
"Unrecognized target mode when trying to add fake values."
)
good_intervals[target.name] = interv
# Set defaults for bad intervals
if bad_intervals is None:
bad_intervals = {}
for target in targets:
if isinstance(target, BinaryTarget):
continue
if target.mode is TargetMode.MAX:
lbound = target.bounds.lower if np.isfinite(target.bounds.lower) else 0
ubound = target.bounds.upper if np.isfinite(target.bounds.upper) else 33
interv = (lbound, ubound)
elif target.mode is TargetMode.MIN:
lbound = target.bounds.lower if np.isfinite(target.bounds.lower) else 66
ubound = (
target.bounds.upper if np.isfinite(target.bounds.upper) else 100
)
interv = (lbound, ubound)
elif target.mode is TargetMode.MATCH:
lbound = target.bounds.lower if np.isfinite(target.bounds.lower) else 0
ubound = (
target.bounds.upper if np.isfinite(target.bounds.upper) else 100
)
interv = (
# Take as bad values the interval above the good interval
lbound + 0.6 * (ubound - lbound),
lbound + 1.2 * (ubound - lbound),
)
else:
raise ValueError(
"Unrecognized target mode when trying to add fake values."
)
bad_intervals[target.name] = interv
# Add the fake data for each target
for target in targets:
if isinstance(target, BinaryTarget):
# TODO: When refactoring, take into account good and bad intervals
data[target.name] = np.random.choice(
[target.failure_value, target.success_value], size=len(data)
)
continue
# Add bad values
data[target.name] = np.random.uniform(
bad_intervals[target.name][0], bad_intervals[target.name][1], len(data)
)
# Create masks that identify locations where to place good values
masks = []
for param, vals in good_reference_values.items():
mask = data[param].isin(vals)
masks.append(mask)
# Overwrite bad values with good ones using the computed masks
if len(masks) > 0:
final_mask = pd.concat(masks, axis=1).all(axis=1)
data.loc[final_mask, target.name] = np.random.uniform(
good_intervals[target.name][0],
good_intervals[target.name][1],
final_mask.sum(),
)
return data
[docs]
def add_parameter_noise(
data: pd.DataFrame,
parameters: Iterable[Parameter],
noise_type: Literal["absolute", "relative_percent"] = "absolute",
noise_level: float = 1.0,
) -> pd.DataFrame:
"""Apply uniform noise to the parameter values of a recommendation frame.
The noise can be additive or multiplicative.
This can be used to simulate experimental noise or imperfect user input containing
numerical parameter values that differ from the recommendations. Note that the
dataframe is changed in-place and also returned.
Args:
data: Output of the ``recommend`` function of a ``Campaign`` object, see
:func:`baybe.campaign.Campaign.recommend`.
parameters: The parameters for which the values shall be corrupted.
noise_type: Defines whether the noise should be additive or multiplicative.
noise_level: Level/magnitude of the noise. Must be provided as numerical value
for noise type ``absolute`` and as percentage for noise type
``relative_percent``.
Returns:
The modified dataframe.
Raises:
ValueError: If ``noise_type`` is neither ``absolute`` nor
``relative_percent``.
"""
# Validate input
if noise_type not in ("relative_percent", "absolute"):
raise ValueError(
f"Parameter 'noise_type' was {noise_type} but must be either "
"'absolute' or 'relative_percent'."
)
for param in (p for p in parameters if p.is_numerical):
# Add selected noise type
if noise_type == "relative_percent":
data[param.name] *= np.random.uniform(
1.0 - noise_level / 100.0, 1.0 + noise_level / 100.0, len(data)
)
elif noise_type == "absolute":
data[param.name] += np.random.uniform(-noise_level, noise_level, len(data))
# Respect continuous intervals
if param.is_continuous:
data[param.name] = data[param.name].clip(
param.bounds.lower, param.bounds.upper
)
return data
[docs]
def df_drop_single_value_columns(
df: pd.DataFrame, lst_exclude: list = None
) -> pd.DataFrame:
"""Drop dataframe columns with zero variance.
Args:
df: The dataframe to be cleaned.
lst_exclude: List of column names that are excluded from this filter.
Returns:
The cleaned dataframe.
"""
if lst_exclude is None:
lst_exclude = []
to_keep = []
for col in df.columns:
if (col in lst_exclude) or (df[col].nunique() > 1):
to_keep.append(col)
return df[to_keep]
[docs]
def df_drop_string_columns(
df: pd.DataFrame, ignore_list: list[str] | None = None
) -> pd.DataFrame:
"""Drop dataframe columns with string values.
Args:
df: The dataframe to be cleaned.
ignore_list: List of columns that should not be dropped, even if they include
string values.
Returns:
The cleaned dataframe.
"""
ignore_list = ignore_list or []
no_string = ~df.applymap(lambda x: isinstance(x, str)).any()
no_string = no_string[no_string].index
to_keep = set(no_string).union(set(ignore_list))
ordered_cols = [col for col in df if col in to_keep]
return df[ordered_cols]
[docs]
def add_noise_to_perturb_degenerate_rows(
df: pd.DataFrame, noise_ratio: float = 0.001
) -> pd.DataFrame:
"""Add noise to degenerate rows to make them numerically distinguishable.
Note that the dataframe is changed in-place and also returned. The dataframe is
left untouched if no rows are degenerate.
Args:
df: The dataframe to be modified.
noise_ratio: The magnitude of generated uniform noise relative to the
min-max range of values for each column.
Returns:
The modified dataframe.
Raises:
TypeError: If the provided dataframe has non-numerical content.
"""
# Find degenerate rows, exit if there are none
degen_rows = df.duplicated(keep=False)
if not degen_rows.any():
return df
# Assert that the input is purely numerical
if any(df[col].dtype.kind not in "iufb" for col in df.columns):
raise TypeError(
f"'{add_noise_to_perturb_degenerate_rows.__name__}' only supports purely "
f"numerical dataframes."
)
# Find the min-max range for each column. Constant columns will be assigned a range
# of 1 as fallback as otherwise they would be left untouched
column_ranges = df.max() - df.min()
column_ranges = column_ranges.replace(0, 1)
# Generate noise
noise = np.random.uniform(
-noise_ratio, noise_ratio, size=(degen_rows.sum(), df.shape[1])
)
noise_df = pd.DataFrame(noise, columns=df.columns, index=df.index[degen_rows])
# Scale noise by column ranges and add it to the original dataframe
noise_df = noise_df * column_ranges
df.loc[degen_rows] += noise_df
return df
[docs]
def fuzzy_row_match(
left_df: pd.DataFrame,
right_df: pd.DataFrame,
parameters: Sequence[Parameter],
) -> pd.Index:
"""Match rows of the right dataframe to rows of the left dataframe.
This is useful for matching measurements to entries in the search space, e.g. to
detect which ones have been measured. For categorical parameters, there needs to be
an exact match with any of the allowed values. For numerical parameters, the points
with the smallest deviation are considered.
Note:
This function assumes that the dataframes contain only allowed values as
specified in the parameter objects. No further validation to assert this is
done.
Args:
left_df: The data that serves as lookup reference.
right_df: The data that is checked for matching rows in the left dataframe.
parameters: Parameter objects that identify the relevant column names and how
matching is performed.
Returns:
The index of the matching rows in ``left_df``.
Raises:
ValueError: If either ``left_df`` or ``right_df`` does not contain columns for
each entry in parameters.
"""
# Separate columns types
cat_cols = {p.name for p in parameters if (not p.is_numerical and p.is_discrete)}
num_cols = {p.name for p in parameters if (p.is_numerical and p.is_discrete)}
non_discrete_cols = {p.name for p in parameters if not p.is_discrete}
# Assert that all parameters appear in the given dataframes
if diff := (cat_cols | num_cols).difference(left_df.columns):
raise ValueError(
f"For fuzzy row matching, all discrete parameters need to have a "
f"corresponding column in the left dataframe. Parameters not found: {diff})"
)
if diff := (cat_cols | num_cols).difference(right_df.columns):
raise ValueError(
f"For fuzzy row matching, all discrete parameters need to have a "
f"corresponding column in the right dataframe. Parameters not found: "
f"{diff})"
)
provided_cols = {p.name for p in parameters}
allowed_cols = cat_cols | num_cols | non_discrete_cols
assert allowed_cols == provided_cols, (
f"There are parameter types that would be silently ignored: "
f"{provided_cols.difference(allowed_cols)}"
)
# Initialize the match matrix. We will later filter it down using other
# matrices (representing the matches for individual parameters) via logical 'and'.
match_matrix = pd.DataFrame(
True, index=right_df.index, columns=left_df.index, dtype=bool
)
# Match categorical parameters
for col in cat_cols:
# Per categorical parameter, this identifies matches between all elements of
# left and right and stores them in a matrix.
match_matrix &= right_df[col].values[:, None] == left_df[col].values[None, :]
# Match numerical parameters
for col in num_cols:
# Per numerical parameter, this identifies the rows with the smallest absolute
# difference and records them in a matrix.
abs_diff = np.abs(right_df[col].values[:, None] - left_df[col].values[None, :])
min_diff = abs_diff.min(axis=1, keepdims=True)
match_matrix &= abs_diff == min_diff
# Find the matching indices. If a right row is not matched to any of the rows in
# left, idxmax would return the first index of left_df. Hence, we remember these
# cases and drop them explicitly.
matched_indices = pd.Index(match_matrix.idxmax(axis=1).values)
mask_no_match = ~match_matrix.any(axis=1)
matched_indices = matched_indices[~mask_no_match]
mask_multiple_matches = match_matrix.sum(axis=1) > 1
if multiple_match_indices := right_df.index[mask_multiple_matches].tolist():
w = SearchSpaceMatchWarning(
f"Some input rows have multiple matches with the search space. "
f"Matching only first occurrence for these rows. Indices with multiple "
f"matches: {multiple_match_indices}",
right_df.loc[multiple_match_indices],
)
warnings.warn(w)
return matched_indices
[docs]
def pretty_print_df(
df: pd.DataFrame,
max_rows: int | None = 6,
max_columns: int | None = 4,
max_colwidth: int | None = 16,
precision: int = 3,
) -> str:
"""Convert a dataframe into a pretty/readable format.
This function returns a customized str representation of the dataframe.
In case the dataframe is empty, it returns a corresponding statement.
Args:
df: The dataframe to be printed.
max_rows: Maximum number of rows to display.
max_columns: Maximum number of columns to display.
max_colwidth: Maximum width of an individual column.
precision: Number of digits to which numbers should be rounded.
Returns:
The values to be printed as a str table.
"""
# Get custom str representation via pandas option_context
with pd.option_context(
"display.max_rows",
max_rows,
"display.max_columns",
max_columns,
"display.max_colwidth",
max_colwidth,
"display.precision",
precision,
"expand_frame_repr",
False,
):
# Pandas does not truncate the names of columns with long names, which makes
# computational representations barely readable in some of the examples. Hence,
# we truncate them manually. For details, see
# https://stackoverflow.com/questions/64976267/pandas-truncate-column-names)
str_df = df.rename(
columns=lambda x: x[:max_colwidth],
)
str_df = str(str_df)
return str_df
[docs]
def filter_df(
df: pd.DataFrame, /, to_keep: pd.DataFrame, complement: bool = False
) -> pd.DataFrame:
"""Filter a dataframe based on a second dataframe defining filtering conditions.
Filtering is done via a join (see ``complement`` argument for details) between the
input dataframe and the filter dataframe.
Args:
df: The dataframe to be filtered.
to_keep: The dataframe defining the filtering conditions. By default
(see ``complement`` argument), it defines the rows to be kept in the sense
of an inner join.
complement: If ``False``, the filter dataframe determines the rows to be kept
(i.e. selection via inner join). If ``True``, the filtering mechanism is
inverted so that the complement set of rows is kept (i.e. selection
via anti-join).
Returns:
A new dataframe containing the result of the filtering process.
Examples:
>>> df = pd.DataFrame(
... [[0, "a"], [0, "b"], [1, "a"], [1, "b"]],
... columns=["num", "cat"]
... )
>>> df
num cat
0 0 a
1 0 b
2 1 a
3 1 b
>>> filter_df(df, pd.DataFrame([0], columns=["num"]), complement=False)
num cat
0 0 a
1 0 b
>>> filter_df(df, pd.DataFrame([0], columns=["num"]), complement=True)
num cat
2 1 a
3 1 b
>>> filter_df(df, pd.DataFrame(), complement=True)
num cat
0 0 a
1 0 b
2 1 a
3 1 b
>>> filter_df(df, pd.DataFrame(), complement=False)
Empty DataFrame
Columns: [num, cat]
Index: []
"""
# Handle special case of empty filter
if to_keep.empty:
return df if complement else pd.DataFrame(columns=df.columns)
# Remember original index name
index_name = df.index.name
# Identify rows to be dropped
out = pd.merge(
df.reset_index(names="_df_index"), to_keep, how="left", indicator=True
).set_index("_df_index")
to_drop = out["_merge"] == ("both" if complement else "left_only")
# Drop the points
out.drop(index=out[to_drop].index, inplace=True)
out.drop("_merge", axis=1, inplace=True)
# Restore original index name
out.index.name = index_name
return out
[docs]
def arrays_to_dataframes(
input_labels: Sequence[str],
output_labels: Sequence[str],
/,
use_torch: bool = False,
) -> Callable[
[Callable[[_ArrayLike], _ArrayLike]], Callable[[pd.DataFrame], pd.DataFrame]
]:
"""Make a decorator for labeling the input/output columns of array-based callables.
Useful for creating parameter-to-target lookups from array-based logic.
The decorator transforms a callable designed to work with unlabelled arrays such
that it can operate with dataframes instead. The original callable is expected to
accept and return two-dimensional arrays. When decorated, the callable accepts and
returns dataframes whose columns are mapped to the corresponding arrays based on the
specified label sequences.
Args:
input_labels: The sequence of labels for the input columns.
output_labels: The sequence of labels for the output columns.
use_torch: Flag indicating if the callable is to be called with a numpy array
or with a torch tensor.
Returns:
The decorator for the given input and output labels.
"""
def decorator(
fn: Callable[[_ArrayLike], _ArrayLike], /
) -> Callable[[pd.DataFrame], pd.DataFrame]:
"""Turn an array-based callable into a dataframe-based callable."""
@functools.wraps(fn)
def wrapper(df: pd.DataFrame, /) -> pd.DataFrame:
"""Translate to/from an array-based callable using dataframes."""
array_in = df[list(input_labels)].to_numpy()
if use_torch:
import torch
with torch.no_grad():
array_out = fn(torch.from_numpy(array_in)).numpy()
else:
array_out = fn(array_in)
return pd.DataFrame(array_out, columns=list(output_labels), index=df.index)
return wrapper
return decorator
class _ValidatedDataFrame(pd.DataFrame):
"""Wrapper indicating the underlying experimental data was already validated."""
[docs]
def handle_missing_values(
data: pd.DataFrame, columns: Collection[str], drop: bool = False
) -> pd.DataFrame:
"""Handle missing inputs by dropping corresponding rows or raising an error.
Args:
data: Data to be checked.
columns: The column names to check.
drop: Whether to drop the corresponding rows instead of raising an error.
Raises:
ValueError: If any row contains NaN in the target columns. Only relevant
if ``drop=False``.
Returns:
A new dataframe with the rows containing NaN dropped.
"""
mask = data[columns].isna().any(axis=1)
if (not drop) and mask.any():
raise ValueError(
f"Incomplete measurements identified by NaN were found in the input, "
f"but are not supported. Bad input in the rows with these "
f"indices: {data.index[mask].tolist()}"
)
return data.loc[~mask]