Source code for CADETProcess.optimization.optimizer

from abc import abstractmethod
import os
from pathlib import Path
import shutil
import time
import warnings

from cadet import H5
import numpy as np
import matplotlib.pyplot as plt
import numpy as np

from CADETProcess import settings
from CADETProcess import log
from CADETProcess import CADETProcessError
from CADETProcess.dataStructure import Structure
from CADETProcess.dataStructure import (
    Typed, UnsignedInteger, RangedInteger, UnsignedFloat
)

from CADETProcess.optimization import OptimizationProblem
from CADETProcess.optimization import Individual, Population, ParetoFront
from CADETProcess.optimization import ParallelizationBackendBase, Joblib
from CADETProcess.optimization import OptimizationResults

__all__ = ['OptimizerBase']


[docs] class OptimizerBase(Structure): """BaseClass for optimization solver APIs. Holds the configuration of the individual solvers and gives an interface for calling the run method. The class has to convert the OptimizationProblem configuration to the APIs configuration format and convert the results back to the CADET-Process format. Attributes ---------- is_population_based : bool True, if the optimizer evaluates entire populations at every step. supports_multi_objective : bool True, if the optimizer supports multi-objective optimization. supports_linear_constraints : bool True, if the optimizer supports linear constraints. supports_linear_equality_constraints : bool True, if the optimizer supports linear equality constraints. supports_nonlinear_constraints : bool True, if the optimizer supports nonlinear constraints. supports_bounds : bool True, if the optimizer supports bound constraints ignore_linear_constraints_config: bool True, if the optimizer can handle transforms and dependent variables in linear constraints. progress_frequency : int Number of generations after which the optimizer reports progress. The default is 1. cv_tol : float Tolerance for constraint violation. The default is 1e-6. similarity_tol : UnsignedFloat, optional Tolerance for individuals to be considered similar. Similar items are removed from the Pareto front to limit its size. The default is None, indicating that all individuals should be kept. n_max_evals : int, optional Maximum number of function evaluations. n_max_iter : int, optional Maximum number of iterations (e.g. generations). parallelization_backend : ParallelizationBackendBase, optional Class used to handle parallelized (and also sequential) evaluation of eval_fun functions for each individual in a given population. The default parallelization backend is 'Joblib', which provides parallel execution using multiple cores. n_cores : int, optional Proxy to the number of cores used by the parallelization backend. """ is_population_based = False supports_single_objective = True supports_multi_objective = False supports_linear_constraints = False supports_linear_equality_constraints = False supports_nonlinear_constraints = False supports_bounds = False ignore_linear_constraints_config = False progress_frequency = RangedInteger(lb=1, default=1) x_tol = UnsignedFloat() f_tol = UnsignedFloat() cv_tol = UnsignedFloat(default=0) n_max_iter = UnsignedInteger(default=100000) n_max_evals = UnsignedInteger(default=100000) similarity_tol = UnsignedFloat() parallelization_backend = Typed(ty=ParallelizationBackendBase) _general_options = [ 'progress_frequency', 'x_tol', 'f_tol', 'cv_tol', 'similarity_tol', 'n_max_iter', 'n_max_evals', ] def __init__(self, *args, **kwargs): self.parallelization_backend = Joblib() super().__init__(*args, **kwargs)
[docs] def optimize( self, optimization_problem, x0=None, save_results=True, results_directory=None, use_checkpoint=False, overwrite_results_directory=False, exist_ok=True, log_level="INFO", reinit_cache=True, delete_cache=True, *args, **kwargs): """Solve OptimizationProblem. Parameters ---------- optimization_problem : OptimizationProblem OptimizationProblem to be solved. x0 : list, optional Initial values. If None, valid points are generated. save_results : bool, optional If True, save results. The default is True. results_directory : str, optional Results directory. If None, working directory is used. Only has an effect, if save_results == True. use_checkpoint : bool, optional If True, try continuing fom checkpoint. The default is True. Only has an effect, if save_results == True. overwrite_results_directory : bool, optional If True, overwrite existing results directory. The default is False. exist_ok : bool, optional If False, Exception is raised when results_directory is not empty. The default is True. log_level : str, optional log level. The default is "INFO". reinit_cache : bool, optional If True, delete ResultsCache after finishing. The default is True. *args : TYPE Additional arguments for Optimizer. **kwargs : TYPE Additional keyword arguments for Optimizer. Raises ------ TypeError If optimization_problem is not an instance of OptimizationProblem. CADETProcessError If Optimizer is not suited for OptimizationProblem (e.g. multi-objective). Returns ------- results : OptimizationResults Results of the Optimization. See Also -------- OptimizationProblem OptimizationResults CADETProcess.optimization.ResultsCache """ self._current_cache_entries = [] self.logger = log.get_logger(str(self), level=log_level) # Check OptimizationProblem if not isinstance(optimization_problem, OptimizationProblem): raise TypeError('Expected OptimizationProblem') if not self.check_optimization_problem(optimization_problem): raise CADETProcessError('Cannot solve OptimizationProblem.') self.optimization_problem = optimization_problem # Setup OptimizationResults self.results = OptimizationResults( optimization_problem=optimization_problem, optimizer=self, similarity_tol=self.similarity_tol, cv_tol=self.cv_tol, ) if save_results: if results_directory is None: results_directory = settings.working_directory / f"results_{optimization_problem.name}" results_directory = Path(results_directory) if overwrite_results_directory and results_directory.exists(): shutil.rmtree(results_directory) try: results_directory.mkdir( exist_ok=exist_ok, parents=True ) except FileExistsError: raise CADETProcessError( "Results directory already exists. " "To continue using same directory, 'exist_ok=True'. " "To overwrite, set 'overwrite_results_directory=True. " ) self.results.results_directory = results_directory checkpoint_path = os.path.join(results_directory, 'checkpoint.h5') if use_checkpoint and os.path.isfile(checkpoint_path): self.logger.info("Continue optimization from checkpoint.") data = H5() data.filename = checkpoint_path data.load() self.results.update_from_dict(data) else: self.results.setup_csv() # Setup Callbacks if save_results and optimization_problem.n_callbacks > 0: callbacks_dir = results_directory / "callbacks" callbacks_dir.mkdir(exist_ok=True) if optimization_problem.n_callbacks > 1: for callback in optimization_problem.callbacks: callback_dir = callbacks_dir / str(callback) callback_dir.mkdir(exist_ok=True) else: callbacks_dir = None self.callbacks_dir = callbacks_dir if reinit_cache: self.optimization_problem.setup_cache() if x0 is not None: flag, x0 = self.check_x0(optimization_problem, x0) if not flag: raise ValueError("x0 contains invalid entries.") log.log_time('Optimization', self.logger.level)(self.run) log.log_results('Optimization', self.logger.level)(self.run) log.log_exceptions('Optimization', self.logger.level)(self.run) backend = plt.get_backend() plt.switch_backend('agg') start = time.time() self.run(self.optimization_problem, x0, *args, **kwargs) time_elapsed = time.time() - start self.results.time_elapsed = time_elapsed self.results.cpu_time = self.n_cores * time_elapsed self.run_final_processing() if delete_cache: optimization_problem.delete_cache(reinit=True) self._current_cache_entries = [] plt.switch_backend(backend) if not self.results.success: raise CADETProcessError( f"Optimizaton failed with message: {self.results.exit_message}" ) return self.results
[docs] @abstractmethod def run(optimization_problem, x0=None, *args, **kwargs): """Abstract Method for solving an optimization problem. Parameters ---------- optimization_problem : OptimizationProblem Optimization problem to be solved. x0 : list, optional Initial population of independent variables in untransformed space. Returns ------- results : OptimizationResults Optimization results including OptimizationProblem and Optimizer configuration. Raises ------ CADETProcessError If solver doesn't terminate successfully """ return
[docs] def check_optimization_problem(self, optimization_problem): """ Check if problem is configured correctly and supported by the optimizer. Parameters ---------- optimization_problem: OptimizationProblem An optimization problem to check. Returns ------- flag : bool True if the optimization problem is supported and configured correctly, False otherwise. """ flag = True if not optimization_problem.check_config( ignore_linear_constraints=self.ignore_linear_constraints_config): # Warnings are raised internally flag = False if optimization_problem.n_objectives == 1 and not self.supports_single_objective: warnings.warn( "Optimizer does not support single-objective problems" ) flag = False if optimization_problem.n_objectives > 1 and not self.supports_multi_objective: warnings.warn( "Optimizer does not support multi-objective problems" ) flag = False if ( not np.all(np.isinf(optimization_problem.lower_bounds_independent_transformed)) and not np.all(np.isinf(optimization_problem.upper_bounds_independent_transformed)) ) and not self.supports_bounds: warnings.warn( "Optimizer does not support bounds" ) flag = False if optimization_problem.n_linear_constraints > 0 \ and not self.supports_linear_constraints: warnings.warn( "Optimizer does not support problems with linear constraints." ) flag = False if optimization_problem.n_linear_equality_constraints > 0 \ and not self.supports_linear_equality_constraints: warnings.warn( "Optimizer does not support problems with linear equality constraints." ) flag = False if optimization_problem.n_nonlinear_constraints > 0 \ and not self.supports_nonlinear_constraints: warnings.warn( "Optimizer does not support problems with nonlinear constraints." ) flag = False return flag
[docs] def check_x0(self, optimization_problem, x0): """Check the initial guess x0 for an optimization problem. Parameters ---------- optimization_problem : OptimizationProblem The optimization problem instance to which x0 is related. x0 : array_like The initial guess for the optimization problem. It can be a single individual or a population. Returns ------- tuple A tuple containing a boolean flag indicating if x0 is valid, and the potentially modified x0. """ flag = True shape = np.array(x0).shape is_x0_1d = len(shape) == 1 x0 = np.array(x0, ndmin=2) n_dependent_variables = optimization_problem.n_dependent_variables n_independent_variables = optimization_problem.n_independent_variables n_variables = n_dependent_variables + n_independent_variables if x0.shape[1] != n_variables and x0.shape[1] != n_independent_variables: warnings.warn( f"x0 for optimization problem is expected to be of length " f"{n_independent_variables} or" f"{n_variables}. Got {x0.shape[1]}" ) flag = False if n_dependent_variables > 0 and x0.shape[1] == n_variables: x0 = [optimization_problem.get_independent_values(ind) for ind in x0] warnings.warn( "x0 contains dependent values. Will recompute dependencies for consistency." ) x0 = np.array(x0) for x in x0: if not optimization_problem.check_individual(x, get_dependent_values=True): flag = False break if is_x0_1d: x0 = x0[0] x0 = x0.tolist() return flag, x0
def _create_population(self, X_transformed, F, F_min, G, CV): """Create new population from current generation for post procesing.""" X_transformed = np.array(X_transformed, ndmin=2) F = np.array(F, ndmin=2) F_min = np.array(F_min, ndmin=2) G = np.array(G, ndmin=2) CV = np.array(CV, ndmin=2) if self.optimization_problem.n_meta_scores > 0: M_min = self.optimization_problem.evaluate_meta_scores_population( X_transformed, untransform=True, ensure_minimization=True, parallelization_backend=self.parallelization_backend, ) M = self.optimization_problem.transform_maximization(M_min, scores='meta_scores') else: M_min = len(X_transformed)*[None] M = len(X_transformed)*[None] if self.optimization_problem.n_nonlinear_constraints == 0: G = len(X_transformed)*[None] CV = len(X_transformed)*[None] population = Population() for x_transformed, f, f_min, g, cv, m, m_min in zip(X_transformed, F, F_min, G, CV, M, M_min): x = self.optimization_problem.get_dependent_values( x_transformed, untransform=True ) ind = Individual( x, f, g, m, x_transformed, f_min, cv, self.cv_tol, m_min, self.optimization_problem.independent_variable_names, self.optimization_problem.objective_labels, self.optimization_problem.nonlinear_constraint_labels, self.optimization_problem.meta_score_labels, self.optimization_problem.variable_names, ) population.add_individual(ind) return population def _create_pareto_front(self, X_opt_transformed): """Create new pareto front from current generation for post procesing.""" if X_opt_transformed is None: pareto_front = None else: pareto_front = Population() for x_opt_transformed in X_opt_transformed: x_opt = self.optimization_problem.get_dependent_values( x_opt_transformed, untransform=True ) ind = self.results.population_all[x_opt] pareto_front.add_individual(ind) return pareto_front def _create_meta_front(self): """Create new meta front from current generation for post procesing.""" if self.optimization_problem.n_multi_criteria_decision_functions == 0: meta_front = None else: pareto_front = self.results.pareto_front X_meta_front = \ self.optimization_problem.evaluate_multi_criteria_decision_functions( pareto_front ) meta_front = Population() for x in X_meta_front: meta_front.add_individual(pareto_front[x]) return meta_front def _evaluate_callbacks(self, current_generation, sub_dir=None): if sub_dir is not None: callbacks_dir = self.callbacks_dir / sub_dir callbacks_dir.mkdir(exist_ok=True, parents=True) else: callbacks_dir = self.callbacks_dir for callback in self.optimization_problem.callbacks: if self.optimization_problem.n_callbacks > 1: _callbacks_dir = callbacks_dir / str(callback) else: _callbacks_dir = callbacks_dir callback.cleanup(_callbacks_dir, current_generation) callback._callbacks_dir = _callbacks_dir self.optimization_problem.evaluate_callbacks_population( self.results.meta_front, current_generation, parallelization_backend=self.parallelization_backend, ) def _log_results(self, current_generation): self.logger.info( f'Finished Generation {current_generation}.' ) for ind in self.results.meta_front: message = f'x: {ind.x}, f: {ind.f}' if self.optimization_problem.n_nonlinear_constraints > 0: message += f', g: {ind.g}' if self.optimization_problem.n_meta_scores > 0: message += f', m: {ind.m}' self.logger.info(message)
[docs] def run_post_processing( self, X_transformed, F_minimized, G, CV, current_generation, X_opt_transformed=None ): """Run post-processing of generation. Notes ----- This method also works for optimizers that only perform a single evaluation per "generation". Parameters ---------- X_transformed : list Optimization variable values of generation in independent transformed space. F_minimized : list Objective function values of generation. This assumes that all objective function values are minimized. G : list Nonlinear constraint function values of generation. CV : list Nonlinear constraints violation of of generation. current_generation : int Current generation. X_opt_transformed : list, optional (Currently) best variable values in independent transformed space. If None, internal pareto front is used to determine best values. """ F = self.optimization_problem.transform_maximization(F_minimized, scores='objectives') population = self._create_population(X_transformed, F, F_minimized, G, CV) self.results.update(population) pareto_front = self._create_pareto_front(X_opt_transformed) self.results.update_pareto(pareto_front) meta_front = self._create_meta_front() if meta_front is not None: self.results.update_meta(meta_front) if current_generation % self.progress_frequency == 0: self.results.plot_figures(show=False) self._evaluate_callbacks(current_generation) self.results.save_results('checkpoint') # Remove new entries from cache that didn't make it to the meta front for x in population.x: x_key = x.tobytes() if x not in self.results.meta_front.x: self.optimization_problem.prune_cache(x_key) else: self._current_cache_entries.append(x_key) # Remove old meta front entries from cache that were replaced by better ones for x_key in self._current_cache_entries: x = np.frombuffer(x_key) if not np.all(np.isin(x, self.results.meta_front.x)): self.optimization_problem.prune_cache(x_key) self._current_cache_entries.remove(x_key) self._log_results(current_generation)
[docs] def run_final_processing(self): self.results.plot_figures(show=False) if self.optimization_problem.n_callbacks > 0: self._evaluate_callbacks(0, 'final') self.results.save_results('final')
@property def options(self): """dict: Optimizer options.""" return { opt: getattr(self, opt) for opt in (self._general_options + self._specific_options) } @property def specific_options(self): """dict: Optimizer spcific options.""" return { opt: getattr(self, opt) for opt in (self._specific_options) } @property def n_cores(self): """int: Proxy to the number of cores used by the parallelization backend. Note, this will always return the actual number of cores used, even if negative values are set. See Also -------- parallelization_backend """ return self.parallelization_backend._n_cores @n_cores.setter def n_cores(self, n_cores): self.parallelization_backend.n_cores = n_cores def __str__(self): return self.__class__.__name__