Source code for CADETProcess.processModel.process

from collections import defaultdict
from warnings import warn
from dataclasses import dataclass

from addict import Dict
import numpy as np
from scipy import integrate
from scipy import interpolate

from CADETProcess import CADETProcessError
from CADETProcess.dataStructure import cached_property_if_locked

from CADETProcess.dynamicEvents import EventHandler
from CADETProcess.dynamicEvents import Section, TimeLine

from .flowSheet import FlowSheet
from .unitOperation import Inlet, SourceMixin, Outlet


[docs] class Process(EventHandler): """Class for defining the dynamic changes of a flow sheet. Attributes ---------- name : str Name of the process object to be simulated. system_state : np.ndarray State of the process object system_state_derivate : ndarray Derivative of the state See Also -------- EventHandler CADETProcess.processModel.FlowSheet CADETProcess.simulation.Solver """ _initial_states = ['system_state', 'system_state_derivative'] def __init__(self, flow_sheet, name, *args, **kwargs): self.flow_sheet = flow_sheet self.name = name self.system_state = None self.system_state_derivative = None self._parameter_sensitivities = [] self._meta_information = Dict() super().__init__(*args, **kwargs) @property def n_comp(self): return self.flow_sheet.n_comp @property def meta_information(self): return self._meta_information @property def component_system(self): return self.flow_sheet.component_system @property def flow_sheet(self): """FlowSheet: flow sheet of the process model. Raises ------ TypeError: If flow_sheet is not an instance of FlowSheet. """ return self._flow_sheet @flow_sheet.setter def flow_sheet(self, flow_sheet): if not isinstance(flow_sheet, FlowSheet): raise TypeError('Expected FlowSheet') self._flow_sheet = flow_sheet @cached_property_if_locked def m_feed(self): """ndarray: Mass of feed components entering the system in one cycle. !!! Account for dynamic flow rates and concentrations! """ flow_rate_timelines = self.flow_rate_timelines feed_all = np.zeros((self.n_comp,)) for feed in self.flow_sheet.feed_inlets: feed_flow_rate_time_line = flow_rate_timelines[feed.name].total_out feed_signal_param = f'flow_sheet.{feed.name}.c' if feed_signal_param in self.parameter_timelines: tl = self.parameter_timelines[feed_signal_param] feed_signal_time_line = tl else: feed_signal_time_line = TimeLine() feed_section = Section( 0, self.cycle_time, feed.c, is_polynomial=True ) feed_signal_time_line.add_section(feed_section) m_i = [ integrate.quad( lambda t: feed_flow_rate_time_line.value(t) * feed_signal_time_line.value(t)[comp], 0, self.cycle_time, points=self.event_times )[0] for comp in range(self.n_comp) ] feed_all += np.array(m_i) return feed_all @cached_property_if_locked def V_eluent(self): """float: Volume of the eluent entering the system in one cycle.""" flow_rate_timelines = self.flow_rate_timelines V_all = 0 for eluent in self.flow_sheet.eluent_inlets: eluent_time_line = flow_rate_timelines[eluent.name]['total_out'] V_eluent = eluent_time_line.integral().squeeze() V_all += V_eluent return float(V_all) @cached_property_if_locked def V_solid(self): """float: Volume of all solid phase material used in flow sheet.""" return sum( [unit.volume_solid for unit in self.flow_sheet.units_with_binding] ) @cached_property_if_locked def flow_rate_timelines(self): """dict: TimeLine of flow_rate for all unit_operations.""" flow_rate_timelines = { unit.name: { 'total_in': TimeLine(), 'origins': defaultdict(TimeLine), 'total_out': TimeLine(), 'destinations': defaultdict(TimeLine) } for unit in self.flow_sheet.units } # Create dummy section state for Processes without events if len(self.section_states) == 0: it = [(None, {})] else: it = self.section_states.items() for i, (time, state) in enumerate(it): start = self.section_times[i] end = self.section_times[i+1] flow_rates = self.flow_sheet.get_flow_rates(state) for unit, flow_rate in flow_rates.items(): unit_flow_rates = flow_rate_timelines[unit] # If inlet, also use outlet for total_in if isinstance(self.flow_sheet[unit], Inlet): section = Section( start, end, flow_rate.total_out, is_polynomial=True ) else: section = Section( start, end, flow_rate.total_in, is_polynomial=True ) unit_flow_rates['total_in'].add_section(section) for orig, flow_rate_orig in flow_rate.origins.items(): section = Section( start, end, flow_rate_orig, is_polynomial=True ) unit_flow_rates['origins'][orig].add_section(section) # If outlet, also use inlet for total_out if isinstance(self.flow_sheet[unit], Outlet): section = Section( start, end, flow_rate.total_in, is_polynomial=True ) else: section = Section( start, end, flow_rate.total_out, is_polynomial=True ) unit_flow_rates['total_out'].add_section(section) for dest, flow_rate_dest in flow_rate.destinations.items(): section = Section( start, end, flow_rate_dest, is_polynomial=True ) unit_flow_rates['destinations'][dest].add_section(section) return Dict(flow_rate_timelines) @cached_property_if_locked def flow_rate_section_states(self): """dict: Flow rates for all units for every section time.""" section_states = { time: { unit.name: { 'total_in': [], 'origins': defaultdict(dict), 'total_out': [], 'destinations': defaultdict(dict), } for unit in self.flow_sheet.units } for time in self.section_times[0:-1] } for sec_time in self.section_times[0:-1]: for unit, unit_flow_rates in self.flow_rate_timelines.items(): if isinstance(self.flow_sheet[unit], Inlet): section_states[sec_time][unit]['total_in'] \ = unit_flow_rates['total_out'].coefficients(sec_time) else: section_states[sec_time][unit]['total_in'] \ = unit_flow_rates['total_in'].coefficients(sec_time) for orig, tl in unit_flow_rates.origins.items(): section_states[sec_time][unit]['origins'][orig] \ = tl.coefficients(sec_time) if isinstance(self.flow_sheet[unit], Outlet): section_states[sec_time][unit]['total_out'] \ = unit_flow_rates['total_in'].coefficients(sec_time) else: section_states[sec_time][unit]['total_out'] \ = unit_flow_rates['total_out'].coefficients(sec_time) for dest, tl in unit_flow_rates.destinations.items(): section_states[sec_time][unit]['destinations'][dest] \ = tl.coefficients(sec_time) return Dict(section_states) @property def n_sensitivities(self): """int: Number of parameter sensitivities.""" return len(self.parameter_sensitivities) @property def parameter_sensitivities(self): """list: Parameter sensitivites.""" return self._parameter_sensitivities @property def parameter_sensitivity_names(self): """list: Parameter sensitivity names.""" return [sens.name for sens in self.parameter_sensitivities]
[docs] def add_parameter_sensitivity( self, parameter_paths, name=None, components=None, polynomial_coefficients=None, reaction_indices=None, bound_state_indices=None, section_indices=None, abstols=None, factors=None): """Add parameter sensitivty to Process. Parameters ---------- parameter_paths : str or list of str The path to the parameter(s). name : str, optional The name of the parameter sensitivity. If not provided, the name of the first parameter will be used. components : str or list of str, optional The component(s) to which the parameter(s) belong. Must only be provided if parameter is specific to a certain compoment. polynomial_coefficients: str or list of str, optional The polynomial coefficients(s) to which the parameter(s) belong. Must only be provided if parameter is specific to a certain coefficient. reaction_indices : int or list of int, optional The index(es) of the reaction(s) in the associated model(s), if applicable. Must only be provided if parameter is specific to a certain reaction. bound_state_indices : int or list of int, optional The index(es) of the bound state(s) in the associated model(s), if applicable. Must only be provided if parameter is specific to a certain bound state. section_indices : int or list of int, optional The index(es) of the section(s) in the associated model(s), if applicable. If not provided, Must only be provided if parameter is specific to a certain section. abstols : float or list of float, optional The absolute tolerances for each parameter. If not provided, a default tolerance will be used. factors : float or list of float, optional The factors for each parameter. If not provided, a default factor of 1 will be used. Raises ------ CADETProcessError Number of indices do not match for: - components - polynomial_coefficients - reaction - bound_state - sections - tolerances - factors Component is not found. Unit is not found. Parameter is not found. Name is not provided (if number of parameters larger than 1). If sensitivity name already exists. Notes ----- This functionality is still work in progress. .. todo:: - [ ] Check if compoment/reaction/polynomial index are required. - [ ] Specify time instead of section index; """ if not isinstance(parameter_paths, list): parameter_paths = [parameter_paths] n_params = len(parameter_paths) if name is None: if n_params > 1: raise CADETProcessError( "Must provide sensitivity name if n_params > 1." ) else: name = parameter_paths[0] if name in self.parameter_sensitivity_names: raise CADETProcessError( "Parameter sensitivity with same name already exists." ) if components is None: components = n_params * [None] if not isinstance(components, list): components = [components] if len(components) != n_params: raise CADETProcessError("Number of component indices does not match.") if components is None: components = n_params * [None] if not isinstance(polynomial_coefficients, list): polynomial_coefficients = [polynomial_coefficients] if len(polynomial_coefficients) != n_params: raise CADETProcessError("Number of coefficient indices does not match.") if reaction_indices is None: reaction_indices = n_params * [None] if not isinstance(reaction_indices, list): reaction_indices = [reaction_indices] if len(reaction_indices) != n_params: raise CADETProcessError("Number of reaction indices does not match.") if bound_state_indices is None: bound_state_indices = n_params * [None] if not isinstance(bound_state_indices, list): bound_state_indices = [bound_state_indices] if len(bound_state_indices) != n_params: raise CADETProcessError("Number of bound_state indices does not match.") if section_indices is None: section_indices = n_params * [None] if not isinstance(section_indices, list): section_indices = [section_indices] if len(section_indices) != n_params: raise CADETProcessError("Number of section indices does not match.") if abstols is None: abstols = n_params * [None] if not isinstance(abstols, list): abstols = [abstols] if len(abstols) != n_params: raise CADETProcessError("Number of abstol entries does not match.") if factors is None: factors = n_params * [1] if not isinstance(factors, list): factors = [factors] if len(factors) != n_params: raise CADETProcessError("Number of factor entries does not match.") units = [] associated_models = [] parameters = [] for param, comp, coeff, reac, state, section, tol, fac in zip( parameter_paths, components, polynomial_coefficients, reaction_indices, bound_state_indices, section_indices, abstols, factors): param_parts = param.split('.') unit = param_parts[0] parameter = param_parts[-1] if parameter == 'flow_rate': raise CADETProcessError( 'Flow rate is currently not supported for sensitivities.' ) parameters.append(parameter) associated_model = None if len(param_parts) == 3: associated_model = param_parts[1] if comp is not None and comp not in self.component_system.species: raise CADETProcessError(f'Unknown component {comp}.') unit = self.flow_sheet[unit] if unit not in self.flow_sheet.units: raise CADETProcessError('Not a valid unit') units.append(unit) if coeff is not None and parameter not in unit.polynomial_parameters: raise CADETProcessError('Not a polynomial parameter.') if parameter in unit.polynomial_parameters and coeff is None: raise CADETProcessError('Polynomial coefficient must be provided.') if associated_model is None: if parameter not in unit.parameters: raise CADETProcessError('Not a valid parameter.') else: associated_model = getattr(unit, associated_model) if state is not None \ and state > associated_model.n_binding_sites: raise ValueError('Binding site index exceed number of binding sites.') if reac is not None \ and reac > associated_model.n_reactions: raise ValueError('Reaction index exceed number of reactions.') if parameter not in associated_model.parameters: raise CADETProcessError('Not a valid parameter') associated_models.append(associated_model) sens = ParameterSensitivity( name, units, parameters, associated_models, components, polynomial_coefficients, reaction_indices, bound_state_indices, section_indices, abstols, factors ) self._parameter_sensitivities.append(sens)
@property def system_state(self): return self._system_state @system_state.setter def system_state(self, system_state): self._system_state = system_state @property def system_state_derivative(self): return self._system_state_derivative @system_state_derivative.setter def system_state_derivative(self, system_state_derivative): self._system_state_derivative = system_state_derivative @property def parameters(self): parameters = super().parameters parameters['flow_sheet'] = self.flow_sheet.parameters return parameters @parameters.setter def parameters(self, parameters): try: self.flow_sheet.parameters = parameters.pop('flow_sheet') except KeyError: pass super(Process, self.__class__).parameters.fset(self, parameters) @property def section_dependent_parameters(self): parameters = Dict() parameters.flow_sheet = self.flow_sheet.section_dependent_parameters return parameters @property def polynomial_parameters(self): parameters = super().polynomial_parameters parameters.flow_sheet = self.flow_sheet.polynomial_parameters return parameters @property def sized_parameters(self): parameters = super().sized_parameters parameters.flow_sheet = self.flow_sheet.sized_parameters return parameters @property def initial_state(self): initial_state = { state: getattr(self, state) for state in self._initial_states } initial_state['flow_sheet'] = self.flow_sheet.initial_state return initial_state @initial_state.setter def initial_state(self, initial_state): try: self.flow_sheet.initial_state = initial_state.pop('flow_sheet') except KeyError: pass for state_name, state_value in initial_state.items(): if state_name not in self._initial_state: raise CADETProcessError('Not an valid state') setattr(self, state_name, state_value) @property def config(self): return Dict({ 'parameters': self.parameters, 'initial_state': self.initial_state }) @config.setter def config(self, config): self.parameters = config['parameters'] self.initial_state = config['initial_state']
[docs] def add_concentration_profile(self, unit, time, c, components=None, s=1e-6): """Add concentration profile to Process. Parameters ---------- unit : str The name of the inlet unit operation. time : np.ndarray An array containing the time values of the concentration profile. c : np.ndarray An array containing the concentration profile with shape (len(time), n_comp), where n_comp is the number of components specified with the `components` argument. components : list, optional. Component species for which the concentration profile shall be added. If `None`, the profile is expected to have shape (len(time), n_comp). If `-1`, the same (1D) profile is added to all components. The default is `None`. s : float, optional A smoothing factor used to generate the spline representation of the concentration profile. The default is 1e-6. Raises ------ TypeError If the specified `unit` is not an Inlet unit operation. ValueError If the time values in `time` exceed the cycle time of the Process or if `c` has an invalid shape. CADETProcessError If the number of components in `c` does not match the number of components in the Process. """ if isinstance(unit, str): unit = self.flow_sheet[unit] if unit not in self.flow_sheet.inlets: raise TypeError('Expected Inlet') if max(time) > self.cycle_time: raise ValueError('Inlet profile exceeds cycle time') if components == -1: # Assume same profile for all components. if c.ndim > 1: raise ValueError('Expected single concentration profile') c = np.column_stack([c]*self.n_comp) components = self.component_system.species elif components is None and c.shape[1] != self.n_comp: # Else, c must be given for all components. raise CADETProcessError('Number of components does not match') if not isinstance(components, list): components = [components] indices = [self.component_system.species_indices[comp] for comp in components] if len(indices) == 1 and c.ndim == 1: c = np.array(c, ndmin=2).T for i, comp in enumerate(indices): tck = interpolate.splrep(time, c[:, i], s=s) ppoly = interpolate.PPoly.from_spline(tck) for i, (t, sec) in enumerate(zip(ppoly.x, ppoly.c.T)): if i < 3: continue elif i > len(ppoly.x) - 5: continue evt = self.add_event( f'{unit}_inlet_{comp}_{i-3}', f'flow_sheet.{unit}.c', np.flip(sec), t, comp )
[docs] def add_flow_rate_profile(self, unit, time, flow_rate, s=1e-6): """Add flow rate profile to a SourceMixin unit operation. Parameters ---------- unit : str The name of the SourceMixin unit operation. time : np.ndarray An array containing the time values of the flow rate profile. flow_rate : np.ndarray An array containing the flow rate profile. s : float, optional A smoothing factor used to generate the spline representation of the flow rate profile. The default is 1e-6. Raises ------ TypeError If the specified `unit` is not a SourceMixin unit operation. ValueError If the time values in `time` exceed the cycle time of the Process. """ if isinstance(unit, str): unit = self.flow_sheet[unit] if unit not in self.flow_sheet.inlets + self.flow_sheet.cstrs: raise TypeError('Expected SourceMixin.') if max(time) > self.cycle_time: raise ValueError('Inlet profile exceeds cycle time') tck = interpolate.splrep(time, flow_rate, s=s) ppoly = interpolate.PPoly.from_spline(tck) for i, (t, sec) in enumerate(zip(ppoly.x, ppoly.c.T)): if i < 3: continue elif i > len(ppoly.x) - 5: continue evt = self.add_event( f'{unit}_flow_rate_{i-3}', f'flow_sheet.{unit}.flow_rate', np.flip(sec), t )
[docs] def check_config(self): """Validate that process config is setup correctly. Returns ------- check : Bool True if process is setup correctly. False otherwise. """ flag = super().check_config() missing_parameters = self.flow_sheet.missing_parameters if len(missing_parameters) > 0: for param in missing_parameters: if f'flow_sheet.{param}' not in self.event_parameters: warn(f"Missing parameter {param}.") flag = False if not self.flow_sheet.check_connections(): flag = False if self.cycle_time is None: warn('Cycle time is not set') flag = False if not self.check_cstr_volume(): flag = False return flag
[docs] def check_cstr_volume(self): """Check if CSTRs run empty. Returns ------- flag : bool False if any of the CSTRs run empty. True otherwise. """ flag = True for cstr in self.flow_sheet.cstrs: if cstr.flow_rate is None: continue V_0 = cstr.V V_in = self.flow_rate_timelines[cstr.name].total_in.integral() V_out = self.flow_rate_timelines[cstr.name].total_out.integral() if V_0 + V_in - V_out < 0: flag = False warn(f'CSTR {cstr.name} runs empty during process.') return flag
def __str__(self): return self.name
@dataclass class ParameterSensitivity(): name: str units: list parameters: list associated_models: list = None components: list = None polynomial_coefficients: list = None reaction_indices: list = None bound_state_indices: list = None section_indices: list = None abstols: list = None factors: list = None