Source code for CADETProcess.optimization.axAdapater

import warnings
from typing import Any, Dict, Optional, Union

import numpy as np
import numpy.typing as npt
import pandas as pd
from ax import (
    Arm,
    ComparisonOp,
    Data,
    Experiment,
    Metric,
    Models,
    MultiObjective,
    MultiObjectiveOptimizationConfig,
    Objective,
    OptimizationConfig,
    OutcomeConstraint,
    ParameterConstraint,
    ParameterType,
    RangeParameter,
    Runner,
    SearchSpace,
)
from ax.core.base_trial import BaseTrial
from ax.core.metric import MetricFetchE, MetricFetchResult
from ax.global_stopping.strategies.improvement import ImprovementGlobalStoppingStrategy
from ax.models.torch.botorch_defaults import get_qLogNEI
from ax.models.torch.botorch_modular.surrogate import Surrogate
from ax.service.utils.report_utils import exp_to_df
from ax.utils.common.result import Err, Ok
from botorch.acquisition.analytic import LogExpectedImprovement
from botorch.models.gp_regression import SingleTaskGP
from botorch.utils.sampling import manual_seed

from CADETProcess import CADETProcessError
from CADETProcess.dataStructure import Float, Typed, UnsignedInteger
from CADETProcess.optimization import OptimizerBase
from CADETProcess.optimization.optimizationProblem import OptimizationProblem
from CADETProcess.optimization.parallelizationBackend import (
    ParallelizationBackendBase,
    SequentialBackend,
)

__all__ = [
    "GPEI",
    "BotorchModular",
    "NEHVI",
    "qNParEGO",
]


class CADETProcessMetric(Metric):
    def __init__(
        self,
        name: str,
        lower_is_better: Union[bool, None] = None,
        properties: Union[Dict[str, Any], None] = None,
    ) -> None:
        super().__init__(name, lower_is_better, properties)

    def fetch_trial_data(
        self,
        trial: BaseTrial,
        **kwargs: Any,
    ) -> MetricFetchResult:
        try:
            trial_results = trial.run_metadata
            records = []
            for arm_name, arm in trial.arms_by_name.items():
                results_dict = {
                    "trial_index": trial.index,
                    "arm_name": arm_name,
                    "metric_name": self.name,
                }

                # this looks up the value of the objective function
                # generated in the runner
                arm_results = trial_results["arms"][arm]
                results_dict.update({"mean": arm_results[self.name]})
                results_dict.update({"sem": 0.0})

                records.append(results_dict)

            return Ok(Data(df=pd.DataFrame.from_records(records)))
        except Exception as e:
            return Err(
                MetricFetchE(message=f"Failed to fetch {self.name}", exception=e)
            )


class CADETProcessRunner(Runner):
    def __init__(
        self,
        optimization_problem: OptimizationProblem,
        parallelization_backend: ParallelizationBackendBase,
    ) -> None:
        self.optimization_problem = optimization_problem
        self.parallelization_backend = parallelization_backend

    @property
    def staging_required(self) -> bool:
        return False

    def run(self, trial: BaseTrial) -> Dict[str, Any]:
        # Get X from arms.
        X = []
        for arm in trial.arms:
            x = np.array(list(arm.parameters.values()))
            X.append(x)

        X = np.row_stack(X)

        # adjust the number of cores to the number of batch trials
        # See: https://github.com/fau-advanced-separations/CADET-Process/issues/53

        # Calculate objectives
        # Explore if adding a small amount of noise to the result helps BO
        objective_labels = self.optimization_problem.objective_labels
        obj_fun = self.optimization_problem.evaluate_objectives

        F = obj_fun(
            X,
            untransform=True,
            get_dependent_values=True,
            ensure_minimization=True,
            parallelization_backend=self.parallelization_backend,
        )

        # Calculate nonlinear constraints
        # Explore if adding a small amount of noise to the result helps BO
        if self.optimization_problem.n_nonlinear_constraints > 0:
            nonlincon_cv_fun = (
                self.optimization_problem.evaluate_nonlinear_constraints_violation
            )
            nonlincon_labels = self.optimization_problem.nonlinear_constraint_labels

            CV = nonlincon_cv_fun(
                X,
                untransform=True,
                get_dependent_values=True,
                parallelization_backend=self.parallelization_backend,
            )

        else:
            CV = None
            nonlincon_labels = None

        # Update trial information with results.
        trial_metadata = self.get_metadata(
            trial, F, objective_labels, CV, nonlincon_labels
        )

        return trial_metadata

    @staticmethod
    def get_metadata(
        trial: BaseTrial,
        F: np.ndarray,
        objective_labels: list[str],
        CV: np.ndarray,
        nonlincon_labels: list[str],
    ) -> dict:
        trial_metadata = {"name": str(trial.index)}
        trial_metadata.update({"arms": {}})

        for i, arm in enumerate(trial.arms):
            f_dict = {
                f"{metric}_axidx_{i_obj}": f_metric[i]
                for i_obj, (metric, f_metric) in enumerate(zip(objective_labels, F.T))
            }
            cv_dict = {}
            if CV is not None:
                cv_dict = {
                    f"{metric}_axidx_{i_constr}": cv_metric[i]
                    for i_constr, (metric, cv_metric) in enumerate(
                        zip(nonlincon_labels, CV.T)
                    )
                }
            trial_metadata["arms"].update({arm: {**f_dict, **cv_dict}})

        return trial_metadata


class AxInterface(OptimizerBase):
    """Wrapper around Ax's bayesian optimization API."""

    supports_bounds = True
    supports_multi_objective = False
    supports_linear_constraints = True
    supports_linear_equality_constraints = False
    supports_nonlinear_constraints = True

    early_stopping_improvement_window = UnsignedInteger(default=1000)
    early_stopping_improvement_bar = Float(default=1e-10)
    n_init_evals = UnsignedInteger(default=10)
    n_max_evals = UnsignedInteger(default=100)
    seed = UnsignedInteger(default=12345)

    _specific_options = [
        "n_init_evals",
        "n_max_evals",
        "seed",
        "early_stopping_improvement_window",
        "early_stopping_improvement_bar",
    ]

    @staticmethod
    def _setup_parameters(optimizationProblem: OptimizationProblem) -> list:
        parameters = []
        for var in optimizationProblem.independent_variables:
            lb, ub = var.transformed_bounds
            param = RangeParameter(
                name=var.name,
                parameter_type=ParameterType.FLOAT,
                lower=lb,
                upper=ub,
                log_scale=False,
                is_fidelity=False,
            )
            parameters.append(param)

        return parameters

    @staticmethod
    def _setup_linear_constraints(optimizationProblem: OptimizationProblem) -> list:
        A_transformed = optimizationProblem.A_independent_transformed
        b_transformed = optimizationProblem.b_transformed
        indep_vars = optimizationProblem.independent_variables
        parameter_constraints = []
        for a_t, b_t in zip(A_transformed, b_transformed):
            constr = ParameterConstraint(
                constraint_dict={var.name: a for var, a in zip(indep_vars, a_t)},
                bound=b_t,
            )
            parameter_constraints.append(constr)

        return parameter_constraints

    @classmethod
    def _setup_searchspace(
        cls, optimizationProblem: OptimizationProblem
    ) -> SearchSpace:
        return SearchSpace(
            parameters=cls._setup_parameters(optimizationProblem),
            parameter_constraints=cls._setup_linear_constraints(optimizationProblem),
        )

    def _setup_objectives(self) -> list:
        """Parse objective functions from optimization problem."""
        objective_names = self.optimization_problem.objective_labels

        objectives = []
        for i, obj_name in enumerate(objective_names):
            ax_metric = CADETProcessMetric(
                name=f"{obj_name}_axidx_{i}",
                lower_is_better=True,
            )

            obj = Objective(metric=ax_metric, minimize=True)
            objectives.append(obj)

        return objectives

    def _setup_outcome_constraints(self) -> list:
        """Parse nonliear constraint functions from optimization problem."""
        nonlincon_names = self.optimization_problem.nonlinear_constraint_labels

        outcome_constraints = []
        for i_constr, name in enumerate(nonlincon_names):
            ax_metric = CADETProcessMetric(name=f"{name}_axidx_{i_constr}")

            nonlincon = OutcomeConstraint(
                metric=ax_metric,
                op=ComparisonOp.LEQ,
                bound=0.0,
                relative=False,
            )
            outcome_constraints.append(nonlincon)

        return outcome_constraints

    def _create_manual_data(
        self, trial: BaseTrial, F: npt.ArrayLike, G: Optional[npt.ArrayLike] = None
    ) -> dict:
        objective_labels = self.optimization_problem.objective_labels
        nonlincon_labels = self.optimization_problem.nonlinear_constraint_labels
        return CADETProcessRunner.get_metadata(
            trial, F, objective_labels, G, nonlincon_labels
        )

    def _create_manual_trial(self, X: npt.ArrayLike) -> None:
        """Create trial from pre-evaluated data."""
        variables = self.optimization_problem.independent_variable_names

        for i, x in enumerate(X):
            trial = self.ax_experiment.new_trial()
            trial_data = {
                "input": {var: x_i for var, x_i in zip(variables, x)},
            }

            arm_name = f"{trial.index}_{0}"
            trial.add_arm(Arm(parameters=trial_data["input"], name=arm_name))
            trial.run()
            trial.mark_completed()
            self._post_processing(trial)

            # When returning to batch trials, the Arms can be initialized here
            # and then collectively returned. See commit history

    def _post_processing(self, trial: BaseTrial) -> None:
        """
        Run post processing.

        Ax holds the data of the model in a dataframe an experiment consists of trials
        which consist of arms in a sequential experiment, each trial only has one arm.

        Arms are evaluated. These hold the parameters.
        """
        op = self.optimization_problem

        # get the trial level data as a dataframe
        trial_data = self.ax_experiment.fetch_trials_data([trial.index])
        data = trial_data.df

        # DONE: Update for multi-processing. If n_cores > 1: len(arms) > 1 (oder @Flo?)
        X = np.array([list(arm.parameters.values()) for arm in trial.arms])
        objective_labels = [
            f"{obj_name}_axidx_{i}" for i, obj_name in enumerate(op.objective_labels)
        ]

        n_ind = len(X)

        # Get objective values
        F_data = data[data["metric_name"].isin(objective_labels)]
        assert np.all(
            F_data["metric_name"].values
            == np.repeat(objective_labels, len(X)).astype(object)
        )
        F = F_data["mean"].values.reshape((op.n_objectives, n_ind)).T

        # Get nonlinear constraint values
        if op.n_nonlinear_constraints > 0:
            nonlincon_labels = [
                f"{name}_axidx_{i}"
                for i, name in enumerate(op.nonlinear_constraint_labels)
            ]
            G_data = data[data["metric_name"].isin(nonlincon_labels)]
            assert np.all(
                G_data["metric_name"].values.tolist()
                == np.repeat(nonlincon_labels, len(X))
            )
            G = G_data["mean"].values.reshape((op.n_nonlinear_constraints, n_ind)).T

            nonlincon_cv_fun = op.evaluate_nonlinear_constraints_violation
            CV = nonlincon_cv_fun(X, untransform=True, get_dependent_values=True)
        else:
            G = None
            CV = None

        # Ideally, the current optimum w.r.t. single and multi objective can be
        # obtained at this point and passed to run_post_processing.
        # with X_opt_transformed. Implementation is pending.
        # See: https://github.com/fau-advanced-separations/CADET-Process/issues/53

        self.run_post_processing(
            X_transformed=X,
            F_minimized=F,
            G=G,
            CV_nonlincon=CV,
            current_generation=self.ax_experiment.num_trials,
            X_opt_transformed=None,
        )

    def _setup_model(self) -> None:
        """
        Initialize a pre-instantiated `Model` class.

        The class specifies the surrogate model (e.g. Gaussian Process) and acquisition function,
        which are used in the bayesian optimization algorithm.
        """
        raise NotImplementedError

        """Initialize an optimization configuration for Ax."""

    def _setup_optimization_config(
        self,
        objectives: list[Objective],
        outcome_constraints: OutcomeConstraint,
    ) -> None:
        raise NotImplementedError

    def _run(
        self, optimization_problem: OptimizationProblem, x0: npt.ArrayLike
    ) -> None:
        search_space = self._setup_searchspace(self.optimization_problem)
        objectives = self._setup_objectives()
        outcome_constraints = self._setup_outcome_constraints()
        optimization_config = self._setup_optimization_config(
            objectives=objectives, outcome_constraints=outcome_constraints
        )

        runner = CADETProcessRunner(
            optimization_problem=self.optimization_problem,
            parallelization_backend=SequentialBackend(),
        )

        self.global_stopping_strategy = ImprovementGlobalStoppingStrategy(
            min_trials=self.n_init_evals + self.early_stopping_improvement_window,
            window_size=self.early_stopping_improvement_window,
            improvement_bar=self.early_stopping_improvement_bar,
            inactive_when_pending_trials=True,
        )

        self.ax_experiment = Experiment(
            search_space=search_space,
            name=self.optimization_problem.name,
            optimization_config=optimization_config,
            runner=runner,
        )

        # Internal storage for tracking data
        self._data = self.ax_experiment.fetch_data()

        # Restore previous results from checkpoint
        if len(self.results.populations) > 0:
            for pop in self.results.populations:
                X, F, G = pop.x, pop.f, pop.g
                trial = self._create_manual_trial(X)
                trial.mark_running(no_runner_required=True)

                trial_data = self._create_manual_data(trial, F, G)
                trial.run_metadata.update(trial_data)
                trial.mark_completed()

        else:
            if x0 is not None:
                x0_init = np.array(x0, ndmin=2)

                if len(x0_init) < self.n_init_evals:
                    warnings.warn(
                        "Initial population smaller than popsize. "
                        "Creating missing entries."
                    )
                    n_remaining = self.n_init_evals - len(x0_init)
                    x0_remaining = optimization_problem.create_initial_values(
                        n_remaining, seed=self.seed, include_dependent_variables=False
                    )
                    x0_init = np.vstack((x0_init, x0_remaining))
                elif len(x0_init) > self.n_init_evals:
                    warnings.warn(
                        "Initial population larger than popsize. Omitting overhead."
                    )
                    x0_init = x0_init[0 : self.n_init_evals]

            else:
                # Create initial samples if they are not provided
                x0_init = self.optimization_problem.create_initial_values(
                    n_samples=self.n_init_evals,
                    include_dependent_variables=False,
                    seed=self.seed + 5641,
                )

            x0_init_transformed = np.array(optimization_problem.transform(x0_init))
            self._create_manual_trial(x0_init_transformed)
            print(exp_to_df(self.ax_experiment))

        n_iter = self.results.n_gen
        n_evals = self.results.n_evals

        global_stopping_message = None

        if n_evals >= self.n_max_evals:
            raise CADETProcessError(
                f"Initial number of evaluations exceeds `n_max_evals` "
                f"({self.n_max_evals})."
            )

        with manual_seed(seed=self.seed):
            while not (n_evals >= self.n_max_evals or n_iter >= self.n_max_iter):
                # Reinitialize GP+EI model at each step with updated data.
                modelbridge = self.train_model()

                print(f"Running optimization trial {n_evals + 1}/{self.n_max_evals}...")

                # samples can be accessed here by sample_generator.arms:
                sample_generator = modelbridge.gen(n=1)

                # A staging phase can be implemented here if needed.
                # See: https://github.com/fau-advanced-separations/CADET-Process/issues/53

                # The strategy itself will check if enough trials have already been
                # completed.
                (
                    stop_optimization,
                    global_stopping_message,
                ) = self.global_stopping_strategy.should_stop_optimization(
                    experiment=self.ax_experiment
                )

                if stop_optimization:
                    print(global_stopping_message)
                    break

                trial = self.ax_experiment.new_trial(generator_run=sample_generator)
                trial.run()

                trial.mark_completed()
                self._post_processing(trial)

                n_iter += 1
                n_evals += len(trial.arms)

        print(exp_to_df(self.ax_experiment))

        self.results.success = True
        self.results.exit_flag = 0
        self.results.exit_message = global_stopping_message


class SingleObjectiveAxInterface(AxInterface):
    def _setup_optimization_config(
        self,
        objectives: list[Objective],
        outcome_constraints: OutcomeConstraint,
    ) -> OptimizationConfig:
        return OptimizationConfig(
            objective=objectives[0], outcome_constraints=outcome_constraints
        )


class MultiObjectiveAxInterface(AxInterface):
    supports_multi_objective = True

    def _setup_optimization_config(
        self,
        objectives: list[Objective],
        outcome_constraints: OutcomeConstraint,
    ) -> MultiObjectiveOptimizationConfig:
        return MultiObjectiveOptimizationConfig(
            objective=MultiObjective(objectives),
            outcome_constraints=outcome_constraints,
        )


[docs] class GPEI(SingleObjectiveAxInterface): """Gaussian Process with Expected Improvement for single objectives.""" def __repr__(self) -> str: """str: String representation of the optimization algorithm.""" return "GPEI"
[docs] def train_model(self) -> Models: """Train model.""" return Models.GPEI( experiment=self.ax_experiment, data=self.ax_experiment.fetch_data() )
[docs] class BotorchModular(SingleObjectiveAxInterface): """ Modular bayesian optimization algorithm. BotorchModular takes 2 optional arguments and uses the BOTORCH_MODULAR API of Ax to construct a Model which connects both components with the respective transforms necessary. Attributes ---------- acquisition_fn: type, optional AcquisitionFunction class. The default is LogExpectedImprovement. surrogate_model: type, optional Model class. The default is SingleTaskGP. """ acquisition_fn = Typed(ty=type, default=LogExpectedImprovement) surrogate_model = Typed(ty=type, default=SingleTaskGP) _specific_options = ["acquisition_fn", "surrogate_model"] def __repr__(self) -> str: """str: String representation of the optimization algorithm.""" afn = self.acquisition_fn.__name__ smn = self.surrogate_model.__name__ return f"BotorchModular({smn}+{afn})"
[docs] def train_model(self) -> NotImplementedError: """Train model.""" raise NotImplementedError( "This model is currently broken. Please use Only GPEI or NEHVI" ) return Models.BOTORCH_MODULAR( experiment=self.ax_experiment, surrogate=Surrogate(self.surrogate_model), botorch_acqf_class=self.acquisition_fn, data=self.ax_experiment.fetch_data(), )
[docs] class NEHVI(MultiObjectiveAxInterface): """Noisy expected hypervolume improvement multi-objective algorithm.""" supports_single_objective = False def __repr__(self) -> str: """str: String representation of the optimization algorithm.""" smn = "SingleTaskGP" afn = "NEHVI" return f"{smn}+{afn}"
[docs] def train_model(self) -> Models: """Train model.""" return Models.MOO( experiment=self.ax_experiment, data=self.ax_experiment.fetch_data() )
class qNParEGO(MultiObjectiveAxInterface): """ qNParEGO multi-objective algorithm. ParEGO transforms the MOO problem into a single objective problem by applying a randomly weighted augmented Chebyshev scalarization to the objectives, and maximizing the expected improvement of that scalarized quantity (Knowles, 2006). Recently, Daulton et al. (2020) used a multi-output Gaussian process and compositional Monte Carlo objective to extend ParEGO to the batch setting (qParEGO), which proved to be a strong baseline for MOBO. Additionally, the authors proposed a noisy variant (qNParEGO), but the empirical evaluation of qNParEGO was limited. [Daulton et al. 2021 "Parallel Bayesian Optimization of Multiple Noisy Objectives with Expected Hypervolume Improvement"] """ supports_single_objective = False def __repr__(self) -> str: """str: String representation of the algorithm.""" smn = "SingleTaskGP" afn = "qNParEGO" return f"{smn}+{afn}" def train_model(self) -> Models: """Train model.""" return Models.MOO( experiment=self.ax_experiment, data=self.ax_experiment.fetch_data(), acqf_constructor=get_qLogNEI, default_model_gen_options={ "acquisition_function_kwargs": { "chebyshev_scalarization": True, } }, )