Source code for CADETProcess.simulator.simulator

from abc import abstractmethod
import numpy as np

from CADETProcess import CADETProcessError
from CADETProcess.log import get_logger, log_time, log_results, log_exceptions
from CADETProcess.dataStructure import Structure
from CADETProcess.dataStructure import Bool, UnsignedFloat, UnsignedInteger
from CADETProcess.processModel import Process
from CADETProcess.stationarity import StationarityEvaluator, RelativeArea, NRMSE


[docs] class SimulatorBase(Structure): """Base class for Solver APIs. Holds the configuration of the individual solvers and provides an interface for calling the run method. The class converts the process configuration into the API's configuration format and converts the results back to the CADETProcess format. Attributes ---------- time_resolution : float Time interval for user solution times. The default value is 1 second. resolution_cutoff : float Solution times closer to section times than the cutoff value are removed to avoid IDAS errors. The default value is 1e-3 seconds. n_cycles : int Number of cycles to be simulated. The default is 1. evaluate_stationarity : bool If True, simulate until stationarity is reached. The default is False n_cycles_min : int Minimum number of cycles to be simulated if evaluate_stationarity is True. The default is 5. n_cycles_max : int Maximum number of cycles to be simulated if evaluate_stationarity is True. The default is 100. raise_exception_on_max_cycles : bool Raise an exception when the maximum number of cycles is exceeded. The default is False See Also -------- CADETProcess.processModel.Process CADETProcess.stationarity.StationarityEvaluator """ time_resolution = UnsignedFloat(default=1) resolution_cutoff = UnsignedFloat(default=1e-3) n_cycles = UnsignedInteger(default=1) evaluate_stationarity = Bool(default=False) n_cycles_min = UnsignedInteger(default=5) n_cycles_max = UnsignedInteger(default=100) raise_exception_on_max_cycles = Bool(default=False) def __init__(self, stationarity_evaluator=None): self.logger = get_logger('Simulation') if stationarity_evaluator is None: self.stationarity_evaluator = StationarityEvaluator() self.stationarity_evaluator.add_criterion(RelativeArea()) self.stationarity_evaluator.add_criterion(NRMSE()) else: self.stationarity_evaluator = stationarity_evaluator self.evaluate_stationarity = True @property def sig_fig(self): """int: Number of significant figures based on resolution_cutoff.""" return int(-np.log10(self.resolution_cutoff))
[docs] def get_solution_time(self, process, cycle=1): """Get the time vector for one cycle of a process. Parameters ---------- process : Process The process to simulate. cycle : int, optional The cycle number, by default 1. Returns ------- np.array Time vector for one cycle. See Also -------- CADETProcess.processModel.Process.section_times get_solution_time_complete """ solution_times = np.arange(( cycle-1)*process.cycle_time, cycle*process.cycle_time, self.time_resolution ) section_times = self.get_section_times(process) solution_times = np.append(solution_times, section_times) solution_times = np.round(solution_times, self.sig_fig) solution_times = np.sort(solution_times) solution_times = np.unique(solution_times) diff = np.where(np.diff(solution_times) < self.resolution_cutoff)[0] indices = [] for d in diff: if solution_times[d] in process.section_times: indices.append(d+1) else: indices.append(d) solution_times = np.delete(solution_times, indices) return solution_times
[docs] def get_solution_time_complete(self, process): """Get the time vector for multiple cycles of a process. Parameters ---------- process : Process The process to simulate. Returns ------- np.array Time vector for multiple cycles of a process. See Also -------- n_cycles get_section_times_complete get_solution_time """ time = self.get_solution_time(process) solution_times = np.array([]) for i in range(self.n_cycles): solution_times = np.append( solution_times, (i)*time[-1] + time ) solution_times = np.round(solution_times, self.sig_fig) return solution_times.tolist()
[docs] def get_section_times(self, process): """Get the section times for a single cycle of a process. Parameters ---------- process : Process The process to simulate. Returns ------- list Section times for a single cycle of a process. See Also -------- get_section_times_complete get_solution_time_complete CADETProcess.processModel.Process.section_times """ section_times = np.array(process.section_times) section_times = np.round(section_times, self.sig_fig) return section_times.tolist()
[docs] def get_section_times_complete(self, process): """Get the section times for multiple cycles of a process. Parameters ---------- process : Process The process to simulate. Returns ------- list Section times for multiple cycles of a process. See Also -------- get_section_times n_cycles get_solution_time_complete CADETProcess.processModel.Process.section_times """ sections = np.array(self.get_section_times(process)) cycle_time = sections[-1] section_times_complete = [] for cycle in range(self.n_cycles): section_cycle = cycle * cycle_time + sections[0:-1] section_times_complete += section_cycle.tolist() section_times_complete.append(self.n_cycles * sections[-1]) section_times_complete = np.round(section_times_complete, self.sig_fig) return section_times_complete.tolist()
[docs] def simulate(self, process, previous_results=None, **kwargs): """Simulate the process. Depending on the state of evaluate_stationarity, the process is simulated until the termination criterion is reached. Parameters ---------- process : Process The process to be simulated. previous_results : SimulationResults, optional Results of the previous simulation run for initial conditions. **kwargs Additional keyword arguments. Returns ------- results : SimulationResults Results of the final cycle of the simulation. Raises ------ TypeError If the process is not an instance of Process. CADETProcessError If the process is not configured correctly. See Also -------- simulate_n_cycles simulate_to_stationarity run """ if not isinstance(process, Process): raise TypeError('Expected Process') process.lock = True if not process.check_config(): raise CADETProcessError("Process is not configured correctly.") if not self.evaluate_stationarity: results = self.simulate_n_cycles( process, self.n_cycles, previous_results, **kwargs ) else: results = self.simulate_to_stationarity( process, previous_results, **kwargs ) process.lock = False return results
[docs] @log_time('Simulation') @log_results('Simulation') @log_exceptions('Simulation') def simulate_n_cycles( self, process, n_cyc, previous_results=None, **kwargs): """Simulate the process for a given number of cycles. Parameters ---------- process : Process The process to be simulated. n_cyc : int The number of cycles to simulate. previous_results : SimulationResults, optional Results of the previous simulation run. **kwargs Additional keyword arguments. Returns ------- results : SimulationResults Results of the final cycle of the simulation. Raises ------ TypeError If the process is not an instance of Process. See Also -------- simulate simulate_to_stationarity StationarityEvaluator run """ n_cyc_orig = self.n_cycles self.n_cycles = n_cyc if not isinstance(process, Process): raise TypeError('Expected Process') if previous_results is not None: self.set_state_from_results(process, previous_results) return self.run(process, **kwargs) self.n_cycles = n_cyc_orig
[docs] @log_time('Simulation') @log_results('Simulation') @log_exceptions('Simulation') def simulate_to_stationarity( self, process, results=None, **kwargs): """Simulate the process until stationarity is reached. Parameters ---------- process : Process The process to be simulated. results : SimulationResults, optional Results of the previous simulation run. **kwargs Additional keyword arguments. Returns ------- results : SimulationResults Results of the final cycle of the simulation. Raises ------ TypeError If the process is not an instance of Process. CADETProcessError If the simulation doesn't terminate successfully and `raise_exception_on_max_cycles` is True. See Also -------- simulate run StationarityEvaluator """ if not isinstance(process, Process): raise TypeError('Expected Process') n_cyc_orig = self.n_cycles self.n_cycles = self.n_cycles_min if results is not None: n_cyc = results.n_cycles else: n_cyc = 0 while True: n_cyc += self.n_cycles_min if results is not None: self.set_state_from_results(process, results) new_results = self.run(process, **kwargs) if results is None: results = new_results else: results.update(new_results) if n_cyc == 1: continue stationarity = self.stationarity_evaluator.assert_stationarity( results ) if stationarity: break if n_cyc >= self.n_cycles_max: msg = "Exceeded maximum number of cycles." if self.raise_exception_on_max_cycles: raise CADETProcessError(msg) else: self.logger.warning(msg) break self.n_cycles = n_cyc_orig return results
[docs] def set_state_from_results(self, process, results): """Set the process state from the simulation results. Parameters ---------- process : Process The process to set the state for. results : SimulationResults The simulation results containing the state information. Returns ------- Process The process with the updated state. """ process.system_state = results.system_state['state'] process.system_state_derivative = \ results.system_state['state_derivative'] return process
[docs] @abstractmethod def run(process, **kwargs): """Abstract method for running a simulation. Parameters ---------- process : Process The process to be simulated. **kwargs Additional keyword arguments. Returns ------- SimulationResults The simulation results including process and solver configuration. Raises ------ TypeError If the process is not an instance of Process. CADETProcessError If the simulation doesn't terminate successfully. """ return
evaluate = simulate __call__ = simulate @property def stationarity_evaluator(self): """StationarityEvaluator: The stationarity evaluator.""" return self._stationarity_evaluator @stationarity_evaluator.setter def stationarity_evaluator(self, stationarity_evaluator): """Set the stationarity evaluator. Parameters ---------- stationarity_evaluator : StationarityEvaluator The stationarity evaluator to set. Raises ------ CADETProcessError If the stationarity evaluator is not an instance of StationarityEvaluator. """ if not isinstance(stationarity_evaluator, StationarityEvaluator): raise CADETProcessError('Expected StationarityEvaluator') self._stationarity_evaluator = stationarity_evaluator