Source code for CADETProcess.optimization.results

import csv
import os
from pathlib import Path
from typing import Literal
import warnings

from addict import Dict
import matplotlib.colors as colors
import matplotlib.cm as cmx
import matplotlib.pyplot as plt
cmap_feas = plt.get_cmap('winter_r')
cmap_infeas = plt.get_cmap('autumn_r')
import numpy as np

from cadet import H5
from CADETProcess import plotting
from CADETProcess.dataStructure import Structure
from CADETProcess.dataStructure import (
    Bool, Dictionary, NdArray, String, UnsignedInteger, UnsignedFloat
)

from CADETProcess import CADETProcessError
from CADETProcess.sysinfo import system_information
from CADETProcess.optimization import Individual, Population, ParetoFront


[docs] class OptimizationResults(Structure): """Optimization results. Attributes ---------- optimization_problem : OptimizationProblem Optimization problem. optimizer : OptimizerBase Optimizer used to optimize the OptimizationProblem. success : bool True if optimization was successfully terminated. False otherwise. exit_flag : int Information about the solver termination. exit_message : str Additional information about the solver status. time_elapsed : float Execution time of simulation. cpu_time : float CPU run time, taking into account the number of cores used for the optimiation. system_information : dict Information about the system on which the optimization was performed. x : list Values of optimization variables at optimum. f : np.ndarray Value of objective function at x. g : np.ndarray Values of constraint function at x population_last : Population Last population. pareto_front : ParetoFront Pareto optimal solutions. meta_front : ParetoFront Reduced pareto optimal solutions using meta scores and multi-criteria decision functions. """ success = Bool(default=False) exit_flag = UnsignedInteger() exit_message = String() time_elapsed = UnsignedFloat() cpu_time = UnsignedFloat() system_information = Dictionary() def __init__( self, optimization_problem, optimizer, similarity_tol: float = 0, ): self.optimization_problem = optimization_problem self.optimizer = optimizer self._optimizer_state = Dict() self._population_all = Population() self._populations = [] self._similarity_tol = similarity_tol self._pareto_fronts = [] if optimization_problem.n_multi_criteria_decision_functions > 0: self._meta_fronts = [] else: self._meta_fronts = None self.results_directory = None self.system_information = system_information @property def results_directory(self) -> Path: return self._results_directory @results_directory.setter def results_directory(self, results_directory: str | os.PathLike): if results_directory is not None: results_directory = Path(results_directory) self.plot_directory = Path(results_directory / 'figures') self.plot_directory.mkdir(exist_ok=True) else: self.plot_directory = None self._results_directory = results_directory @property def is_finished(self) -> bool: if self.exit_flag is None: return False else: return True @property def optimizer_state(self): return self._optimizer_state @property def populations(self) -> list[Population]: return self._populations @property def population_last(self) -> Population: return self.populations[-1] @property def population_all(self) -> Population: return self._population_all @property def pareto_fronts(self) -> list[ParetoFront]: return self._pareto_fronts @property def pareto_front(self) -> ParetoFront: return self._pareto_fronts[-1] @property def meta_fronts(self) -> list[Population]: if self._meta_fronts is None: return self.pareto_fronts else: return self._meta_fronts @property def meta_front(self) -> Population: if self._meta_fronts is None: return self.pareto_front else: return self._meta_fronts[-1]
[docs] def update(self, new: Individual | Population): """Update Results. Parameters ---------- new : Individual, Population New results Raises ------ CADETProcessError If new is not an instance of Individual or Population """ if isinstance(new, Individual): population = Population() population.add_individual(new) elif isinstance(new, Population): population = new else: raise CADETProcessError("Expected Population or Individual") self._populations.append(population) self.population_all.update(population)
[docs] def update_pareto(self, pareto_new: Population | None = None): """Update pareto front with new population. Parameters ---------- pareto_new : Population, optional New pareto front. If None, update existing front with latest population. """ pareto_front = ParetoFront( similarity_tol=self._similarity_tol ) if pareto_new is not None: pareto_front.update_population(pareto_new) else: if len(self.pareto_fronts) > 0: pareto_front.update_population(self.pareto_front) pareto_front.update_population(self.population_last) if self._similarity_tol is not None: pareto_front.remove_similar() self._pareto_fronts.append(pareto_front)
[docs] def update_meta(self, meta_front: Population): """Update meta front with new population. Parameters ---------- meta_front : Population New meta front. """ if self._similarity_tol is not None: meta_front.remove_similar() self._meta_fronts.append(meta_front)
@property def n_evals(self) -> int: """int: Number of evaluations.""" return sum([len(pop) for pop in self.populations]) @property def n_gen(self) -> int: """int: Number of generations.""" return len(self.populations) @property def x(self) -> np.ndarray: """np.array: Optimal points in untransformed space.""" return self.meta_front.x @property def x_transformed(self) -> np.ndarray: """np.array: Optimal points in transformed space.""" return self.meta_front.x_transformed @property def cv_bounds(self) -> np.ndarray: """np.array: Bound constraint violation of optimal points.""" return self.meta_front.cv_bounds @property def cv_lincon(self) -> np.ndarray: """np.array: Linear constraint violation of optimal points.""" return self.meta_front.cv_lincon @property def cv_lineqcon(self) -> np.ndarray: """np.array: Linear equality constraint violation of optimal points.""" return self.meta_front.cv_lineqcon @property def f(self) -> np.ndarray: """np.array: Objective function values of optimal points.""" return self.meta_front.f @property def g(self) -> np.ndarray: """np.array: Nonlinear constraint function values of optimal points.""" return self.meta_front.g @property def cv_nonlincon(self) -> np.ndarray: """np.array: Nonlinear constraint violation values of optimal points.""" return self.meta_front.cv_nonlincon @property def m(self) -> np.ndarray: """np.array: Meta scores of optimal points.""" return self.meta_front.m @property def n_evals_history(self) -> np.ndarray: """int: Number of evaluations per generation.""" n_evals = [len(pop) for pop in self.populations] return np.cumsum(n_evals) @property def f_best_history(self) -> np.ndarray: """np.array: Best objective values per generation.""" return np.array([pop.f_best for pop in self.meta_fronts]) @property def f_min_history(self) -> np.ndarray: """np.array: Minimum objective values per generation.""" return np.array([pop.f_min for pop in self.meta_fronts]) @property def f_max_history(self) -> np.ndarray: """np.array: Maximum objective values per generation.""" return np.array([pop.f_max for pop in self.meta_fronts]) @property def f_avg_history(self) -> np.ndarray: """np.array: Average objective values per generation.""" return np.array([pop.f_avg for pop in self.meta_fronts]) @property def g_best_history(self) -> np.ndarray: """np.array: Best nonlinear constraint per generation.""" return np.array([pop.g_best for pop in self.meta_fronts]) @property def g_min_history(self) -> np.ndarray: """np.array: Minimum nonlinear constraint values per generation.""" if self.optimization_problem.n_nonlinear_constraints == 0: return None else: return np.array([pop.g_min for pop in self.meta_fronts]) @property def g_max_history(self) -> np.ndarray: """np.array: Maximum nonlinear constraint values per generation.""" if self.optimization_problem.n_nonlinear_constraints == 0: return None else: return np.array([pop.g_max for pop in self.meta_fronts]) @property def g_avg_history(self) -> np.ndarray: """np.array: Average nonlinear constraint values per generation.""" if self.optimization_problem.n_nonlinear_constraints == 0: return None else: return np.array([pop.g_avg for pop in self.meta_fronts]) @property def cv_nonlincon_min_history(self) -> np.ndarray: """np.array: Minimum nonlinear constraint violation values per generation.""" if self.optimization_problem.n_nonlinear_constraints == 0: return None else: return np.array([pop.cv_nonlincon_min for pop in self.meta_fronts]) @property def cv_nonlincon_max_history(self) -> np.ndarray: """np.array: Maximum nonlinear constraint violation values per generation.""" if self.optimization_problem.n_nonlinear_constraints == 0: return None else: return np.array([pop.cv_nonlincon_max for pop in self.meta_fronts]) @property def cv_nonlincon_avg_history(self) -> np.ndarray: """np.array: Average nonlinear constraint violation values per generation.""" if self.optimization_problem.n_nonlinear_constraints == 0: return None else: return np.array([pop.cv_nonlincon_avg for pop in self.meta_fronts]) @property def m_best_history(self) -> np.ndarray: """np.array: Best meta scores per generation.""" return np.array([pop.m_best for pop in self.meta_fronts]) @property def m_min_history(self) -> np.ndarray: """np.array: Minimum meta scores per generation.""" if self.optimization_problem.n_meta_scores == 0: return None else: return np.array([pop.m_min for pop in self.meta_fronts]) @property def m_max_history(self) -> np.ndarray: """np.array: Maximum meta scores per generation.""" if self.optimization_problem.n_meta_scores == 0: return None else: return np.array([pop.m_max for pop in self.meta_fronts]) @property def m_avg_history(self) -> np.ndarray: """np.array: Average meta scores per generation.""" if self.optimization_problem.n_meta_scores == 0: return None else: return np.array([pop.m_avg for pop in self.meta_fronts])
[docs] def plot_figures(self, show=True): """Plot result figures. See Also -------- plot_convergence plot_objectives plot_corner plot_pareto """ if self.plot_directory is None: return with warnings.catch_warnings(): warnings.simplefilter("ignore") self.plot_convergence( 'objectives', show=show, plot_directory=self.plot_directory ) if self.optimization_problem.n_nonlinear_constraints > 0: self.plot_convergence( 'nonlinear_constraints', show=show, plot_directory=self.plot_directory ) if self.optimization_problem.n_meta_scores > 0: self.plot_convergence( 'meta_scores', show=show, plot_directory=self.plot_directory ) self.plot_objectives( show=show, plot_directory=self.plot_directory ) if self.optimization_problem.n_variables > 1 and len(self.x) > 1: self.plot_corner( show=show, plot_directory=self.plot_directory ) if self.optimization_problem.n_objectives > 1: self.plot_pareto( show=show, plot_directory=self.plot_directory, plot_evolution=True, plot_pareto=False, )
[docs] def plot_objectives( self, include_meta=True, plot_pareto=False, plot_infeasible=True, plot_individual=False, autoscale=True, show=True, plot_directory=None): """Plot objective function values for all optimization generations. Parameters ---------- include_meta : bool, optional If True, meta scores will be included in the plot. The default is True. plot_pareto : bool, optional If True, only plot Pareto front members of each generation are plotted. Else, all evaluated individuals are plotted. The default is False. plot_infeasible : bool, optional If True, plot infeasible points. The default is True. plot_individual : bool, optional If True, create separate figures for each objective. Otherwise, all objectives are plotted in one figure. The default is False. plot_infeasible : bool, optional If True, plot infeasible points. The default is False. autoscale : bool, optional If True, automatically adjust the scaling of the axes. The default is True. show : bool, optional If True, display the plot. The default is True. plot_directory : str, optional The directory where the plot should be saved. The default is None. See Also -------- CADETProcess.optimization.Population.plot_objectives """ axs = None figs = None _show = False _plot_directory = None cNorm = colors.Normalize(vmin=0, vmax=self.n_gen) scalarMap_feas = cmx.ScalarMappable(norm=cNorm, cmap=cmap_feas) scalarMap_infeas = cmx.ScalarMappable(norm=cNorm, cmap=cmap_infeas) if plot_pareto: populations = self.pareto_fronts population_last = self.pareto_front else: populations = self.populations population_last = self.population_last for i, gen in enumerate(populations): if gen is population_last: _plot_directory = plot_directory _show = show axs, figs = gen.plot_objectives( axs, figs, include_meta=include_meta, plot_infeasible=plot_infeasible, plot_individual=plot_individual, autoscale=autoscale, color_feas=scalarMap_feas.to_rgba(i), color_infeas=scalarMap_infeas.to_rgba(i), show=_show, plot_directory=_plot_directory )
[docs] def plot_pareto( self, show=True, plot_pareto=True, plot_evolution=False, plot_directory=None): """Plot Pareto fronts for each generation in the optimization. The Pareto front represents the optimal solutions that cannot be improved in one objective without sacrificing another. The method shows a pairwise Pareto plot, where each objective is plotted against every other objective in a scatter plot, allowing for a visualization of the trade-offs between the objectives. To highlight the progress, a colormap is used where later generations are plotted with darker blueish colors. Parameters ---------- show : bool, optional If True, display the plot. The default is True. plot_pareto : bool, optional If True, only Pareto front members of each generation are plotted. Else, all evaluated individuals are plotted. The default is True. plot_evolution : bool, optional If True, the Pareto front is plotted for each generation. Else, only final Pareto front is plotted. The default is False. plot_directory : str, optional The directory where the plot should be saved. The default is None. See Also -------- CADETProcess.optimization.Population.plot_pareto """ plot = None _show = False _plot_directory = None cNorm = colors.Normalize(vmin=0, vmax=self.n_gen) scalarMap_feas = cmx.ScalarMappable(norm=cNorm, cmap=cmap_feas) scalarMap_infeas = cmx.ScalarMappable(norm=cNorm, cmap=cmap_infeas) if plot_pareto: populations = self.pareto_fronts population_last = self.pareto_front else: populations = self.populations population_last = self.population_last if not plot_evolution: populations = [population_last] for i, gen in enumerate(populations): if gen is population_last: _plot_directory = plot_directory _show = show plot = gen.plot_pareto( plot, color_feas=scalarMap_feas.to_rgba(i), color_infeas=scalarMap_infeas.to_rgba(i), show=_show, plot_directory=_plot_directory )
[docs] def plot_corner(self, *args, **kwargs): """Create a corner plot of the independent variables. Parameters ---------- untransformed : bool, optional If True, use the untransformed independent variables. The default is True. show : bool, optional If True, display the plot. The default is True. plot_directory : str, optional The directory where the plot should be saved. The default is None. See Also -------- CADETProcess.results.plot_corner corner.corner """ try: self.population_all.plot_corner(*args, **kwargs) except AssertionError: pass
[docs] def setup_convergence_figure(self, target, plot_individual=False): if target == 'objectives': n = self.optimization_problem.n_objectives elif target == 'nonlinear_constraints': n = self.optimization_problem.n_nonlinear_constraints elif target == 'meta_scores': n = self.optimization_problem.n_meta_scores else: raise CADETProcessError("Unknown target.") if n == 0: return (None, None) fig_all, axs_all = plt.subplots( ncols=n, figsize=(n*6 + 2, 6), squeeze=False, ) axs_all = axs_all.reshape((-1,)) plt.close(fig_all) figs_ind = [] axs_ind = [] for i in range(n): fig, ax = plt.subplots() figs_ind.append(fig) axs_ind.append(ax) plt.close(fig) axs_ind = np.array(axs_ind).reshape(axs_all.shape) if plot_individual: return figs_ind, axs_ind else: return fig_all, axs_all
[docs] def plot_convergence( self, target='objectives', figs=None, axs=None, plot_individual=False, plot_avg=True, autoscale=True, show=True, plot_directory=None): """Plot the convergence of optimization metrics over evaluations. Parameters ---------- target : str, optional The target metrics to plot: 'objectives', 'nonlinear_constraints', or 'meta_scores'. The default is 'objectives'. figs : plt.Figure or list of plt.Figure, optional Figure(s) to plot the objectives on. axs : plt.Axes or list of plt.Axes, optional Axes to plot the objectives on. If None, new figures and axes will be created. plot_individual : bool, optional If True, create individual figure vor each metric. The default is False. autoscale : bool, optional If True, autoscale the y-axis. The default is True. show : bool, optional If True, show the plot. The default is True. plot_directory : str, optional A directory to save the plot, by default None. Returns ------- tuple Tuple with (lists of) figure and axes objects. """ if axs is None: figs, axs = self.setup_convergence_figure(target, plot_individual) if not isinstance(figs, list): figs = [figs] layout = plotting.Layout() layout.x_label = '$n_{Evaluations}$' if target == 'objectives': funcs = self.optimization_problem.objectives values_min = self.f_best_history values_avg = self.f_avg_history elif target == 'nonlinear_constraints': funcs = self.optimization_problem.nonlinear_constraints values_min = self.g_best_history values_avg = self.g_avg_history elif target == 'meta_scores': funcs = self.optimization_problem.meta_scores values_min = self.m_best_history values_avg = self.m_avg_history else: raise CADETProcessError("Unknown target.") if len(funcs) == 0: return counter = 0 for func in funcs: start = counter stop = counter+func.n_metrics v_func_min = values_min[:, start:stop] v_func_avg = values_avg[:, start:stop] for i_metric in range(func.n_metrics): v_line_min = v_func_min[:, i_metric] v_line_avg = v_func_avg[:, i_metric] ax = axs[counter + i_metric] lines = ax.get_lines() if len(lines) > 0: lines[0].set_xdata(self.n_evals_history) lines[0].set_ydata(v_line_min) if plot_avg and self.population_last.n_individuals > 1: lines[1].set_ydata(v_line_avg) else: if plot_avg and self.population_last.n_individuals > 1: label = 'best' else: label=None ax.plot( self.n_evals_history, v_line_min, '--', color='k', label=label ) if plot_avg and self.population_last.n_individuals > 1: ax.plot( self.n_evals_history, v_line_avg, '-', color='k', alpha=0.5, label='avg' ) layout.x_lim = (0, np.max(self.n_evals_history)+1) try: label = func.labels[i_metric] except AttributeError: label = f'{func}_{i_metric}' if plot_avg and self.population_last.n_individuals > 1: y_min = np.nanmin(v_line_min) y_max = np.nanmax(v_line_avg) else: y_min = np.nanmin(v_line_min) y_max = np.nanmax(v_line_min) layout.y_label = label if autoscale and y_min > 0: if y_max / y_min > 100.0: ax.set_yscale('log') layout.y_label = f"$log_{{10}}$({label})" try: plotting.set_layout(ax, layout) ax.relim() ax.autoscale_view() except ValueError: pass counter += func.n_metrics for fig in figs: fig.tight_layout() if not show: plt.close(fig) else: dummy = plt.figure(figsize=fig.get_size_inches()) new_manager = dummy.canvas.manager new_manager.canvas.figure = fig fig.set_canvas(new_manager.canvas) plt.show() if plot_directory is not None: plot_directory = Path(plot_directory) if plot_individual: for i, fig in enumerate(figs): figname = f'convergence_{target}_{i}' fig.savefig( f'{plot_directory / figname}.png' ) else: figname = f'convergence_{target}' figs[0].savefig( f'{plot_directory / figname}.png' )
[docs] def save_results(self, file_name: str): """ Save results to H5 file. Parameters ---------- file_name : str Results file name without file extension. """ if self.results_directory is not None: self._update_csv(self.population_last, 'results_all', mode='a') self._update_csv(self.population_last, 'results_last', mode='w') self._update_csv(self.pareto_front, 'results_pareto', mode='w') if self.optimization_problem.n_meta_scores > 0: self._update_csv(self.meta_front, 'results_meta', mode='w') results = H5() results.root = Dict(self.to_dict()) results.filename = self.results_directory / f'{file_name}.h5' results.save()
[docs] def to_dict(self) -> dict: """Convert Results to a dictionary. Returns ------- addict.Dict Results as a dictionary with populations stored as list of dictionaries. """ data = Dict() data.system_information = self.system_information data.optimizer_state = self.optimizer_state data.population_all_id = str(self.population_all.id) data.populations = {i: pop.to_dict() for i, pop in enumerate(self.populations)} data.pareto_fronts = { i: front.to_dict() for i, front in enumerate(self.pareto_fronts) } if self._meta_fronts is not None: data.meta_fronts = { i: front.to_dict() for i, front in enumerate(self.meta_fronts) } if self.time_elapsed is not None: data.time_elapsed = self.time_elapsed data.cpu_time = self.cpu_time return data
[docs] def update_from_dict(self, data: dict): """Update internal state from dictionary. Parameters ---------- data : dict Serialized data. """ self._optimizer_state = data['optimizer_state'] self._population_all = Population(id=data['population_all_id']) for pop_dict in data['populations'].values(): pop = Population.from_dict(pop_dict) self.update(pop) self._pareto_fronts = [ ParetoFront.from_dict(d) for d in data['pareto_fronts'].values() ] if self._meta_fronts is not None: self._meta_fronts = [ ParetoFront.from_dict(d) for d in data['meta_fronts'].values() ]
[docs] def setup_csv(self): """Create csv files for optimization results.""" self._setup_csv('results_all') self._setup_csv('results_last') self._setup_csv('results_pareto') if self.optimization_problem.n_meta_scores > 0: self._setup_csv('results_meta')
def _setup_csv(self, file_name: str): """ Create csv file for optimization results. Parameters ---------- file_name : str Results file name without file extension. """ header = [ "id", *self.optimization_problem.variable_names, *self.optimization_problem.objective_labels ] if self.optimization_problem.n_nonlinear_constraints > 0: header += [*self.optimization_problem.nonlinear_constraint_labels] if self.optimization_problem.n_meta_scores > 0: header += [*self.optimization_problem.meta_score_labels] with open( f'{self.results_directory / file_name}.csv', 'w' ) as csvfile: writer = csv.writer(csvfile, delimiter=",") writer.writerow(header) def _update_csv( self, population: Population, file_name: str, mode: Literal["w", "b"], ): """ Update csv file with latest population. Parameters ---------- population : Population latest Population. file_name : str Results file name without file extension. mode : {'a', 'w'} a: append to existing file. w: Create new csv. See Also -------- setup_csv """ if mode == 'w': self._setup_csv(file_name) mode = 'a' with open( f'{self.results_directory / file_name}.csv', mode ) as csvfile: writer = csv.writer(csvfile, delimiter=",") for ind in population: row = [ ind.id, *ind.x.tolist(), *ind.f.tolist() ] if ind.g is not None: row += ind.g.tolist() if ind.m is not None: row += ind.m.tolist() writer.writerow(row)