from collections import defaultdict
from dataclasses import dataclass
from typing import Any, Literal, Optional
from warnings import warn
import numpy as np
from addict import Dict
from scipy import integrate, interpolate
from CADETProcess import CADETProcessError
from CADETProcess.dataStructure import cached_property_if_locked
from CADETProcess.dynamicEvents import EventHandler, Section, TimeLine
from .componentSystem import ComponentSystem
from .flowSheet import FlowSheet
from .unitOperation import Inlet, Outlet
__all__ = ["Process", "ProcessMeta"]
[docs]
class Process(EventHandler):
"""
Class for defining the dynamic changes of a flow sheet.
Attributes
----------
name : str
Name of the process object to be simulated.
system_state : np.ndarray
State of the process object
system_state_derivate : np.ndarray
Derivative of the state
See Also
--------
EventHandler
CADETProcess.processModel.FlowSheet
CADETProcess.simulation.Solver
"""
_initial_states = ["system_state", "system_state_derivative"]
def __init__(
self,
flow_sheet: FlowSheet,
name: str,
*args: Any,
**kwargs: Any,
) -> None:
"""Initialize Process."""
self.flow_sheet = flow_sheet
self.name = name
self.system_state = None
self.system_state_derivative = None
self._parameter_sensitivities = []
self._meta_information = Dict()
super().__init__(*args, **kwargs)
@property
def n_comp(self) -> int:
"""int: Number of components in the process."""
return self.flow_sheet.n_comp
@property
def meta_information(self) -> dict:
"""dict: Meta information of the process."""
# TODO: DO we still use this anywhere?
return self._meta_information
@property
def component_system(self) -> ComponentSystem:
"""ComponentSystem: Component system of the process."""
return self.flow_sheet.component_system
@property
def flow_sheet(self) -> FlowSheet:
"""FlowSheet: flow sheet of the process model.
Raises
------
TypeError:
If flow_sheet is not an instance of FlowSheet.
"""
return self._flow_sheet
@flow_sheet.setter
def flow_sheet(self, flow_sheet: FlowSheet) -> None:
if not isinstance(flow_sheet, FlowSheet):
raise TypeError("Expected FlowSheet")
self._flow_sheet = flow_sheet
@cached_property_if_locked
def m_feed(self) -> np.ndarray:
"""np.ndarray: Mass of feed components entering the system in one cycle."""
flow_rate_timelines = self.flow_rate_timelines
feed_all = np.zeros((self.n_comp,))
for feed in self.flow_sheet.feed_inlets:
feed_flow_rate_time_line = flow_rate_timelines[feed.name].total_out[None]
feed_signal_param = f"flow_sheet.{feed.name}.c"
if feed_signal_param in self.parameter_timelines:
tl = self.parameter_timelines[feed_signal_param]
feed_signal_time_line = tl
else:
feed_signal_time_line = TimeLine()
feed_section = Section(0, self.cycle_time, feed.c, is_polynomial=True)
feed_signal_time_line.add_section(feed_section)
m_i = np.zeros((self.n_comp, ))
for t0, t1 in zip(self.section_times[:-1], self.section_times[1:]):
# Precompute values on a fine grid for this segment
# move the right boundary infinitesimally to the left
t1m = np.nextafter(t1, t0)
t_eval = np.linspace(t0, t1m, 1001)
flow_vals = feed_flow_rate_time_line.value(t_eval)
c_vals = feed_signal_time_line.value(t_eval)
m_i += integrate.simpson(flow_vals * c_vals, t_eval, axis=0)
feed_all += m_i
return feed_all
@cached_property_if_locked
def V_eluent(self) -> float:
"""float: Volume of the eluent entering the system in one cycle."""
flow_rate_timelines = self.flow_rate_timelines
V_all = 0
for eluent in self.flow_sheet.eluent_inlets:
eluent_time_line = flow_rate_timelines[eluent.name]["total_out"][None]
V_eluent = eluent_time_line.integral().squeeze()
V_all += V_eluent
return float(V_all)
@cached_property_if_locked
def V_solid(self) -> float:
"""float: Volume of all solid phase material used in flow sheet."""
return sum([unit.volume_solid for unit in self.flow_sheet.units_with_binding])
@property
def process_meta(self) -> "ProcessMeta":
"""ProcessMeta: Process meta information."""
return ProcessMeta(
cycle_time=self.cycle_time,
V_eluent=self.V_eluent,
V_solid=self.V_solid,
m_feed=self.m_feed,
)
@cached_property_if_locked
def flow_rate_timelines(self) -> dict:
"""Return TimeLine of flow_rate for all unit_operations."""
flow_rate_timelines = {
unit.name: {
"total_in": defaultdict(TimeLine),
"origins": defaultdict(
lambda: defaultdict(lambda: defaultdict(TimeLine))
),
"total_out": defaultdict(TimeLine),
"destinations": defaultdict(
lambda: defaultdict(lambda: defaultdict(TimeLine))
),
}
for unit in self.flow_sheet.units
}
# Create dummy section state for Processes without events
if len(self.section_states) == 0:
it = [(None, {})]
else:
it = self.section_states.items()
for i, (time, state) in enumerate(it):
start = self.section_times[i]
end = self.section_times[i + 1]
flow_rates = self.flow_sheet.get_flow_rates(state)
for unit, flow_rate_dict in flow_rates.items():
unit_flow_rates = flow_rate_timelines[unit]
# If inlet, also use outlet for total_in
if isinstance(self.flow_sheet[unit], Inlet):
for port in flow_rate_dict["total_out"]:
section = Section(
start,
end,
flow_rate_dict.total_out[port],
is_polynomial=True,
)
unit_flow_rates["total_in"][port].add_section(section)
else:
for port in flow_rate_dict["total_in"]:
section = Section(
start,
end,
flow_rate_dict.total_in[port],
is_polynomial=True,
)
unit_flow_rates["total_in"][port].add_section(section)
for port in flow_rate_dict.origins:
for orig, origin_port_dict in flow_rate_dict.origins[port].items():
for orig_port, flow_rate_orig in origin_port_dict.items():
section = Section(
start, end, flow_rate_orig, is_polynomial=True
)
unit_flow_rates["origins"][port][orig][orig_port].add_section(section)
# If outlet, also use inlet for total_out
if isinstance(self.flow_sheet[unit], Outlet):
for port in flow_rate_dict["total_in"]:
section = Section(
start,
end,
flow_rate_dict.total_in[port],
is_polynomial=True,
)
unit_flow_rates["total_out"][port].add_section(section)
else:
for port in flow_rate_dict["total_out"]:
section = Section(
start,
end,
flow_rate_dict.total_out[port],
is_polynomial=True,
)
unit_flow_rates["total_out"][port].add_section(section)
for port in flow_rate_dict.destinations:
for dest, dest_port_dict in flow_rate_dict.destinations[port].items():
for dest_port, flow_rate_dest in dest_port_dict.items():
section = Section(
start,
end,
flow_rate_dest,
is_polynomial=True,
)
unit_flow_rates["destinations"][port][dest][dest_port].add_section(section)
return Dict(flow_rate_timelines)
@cached_property_if_locked
def flow_rate_section_states(self) -> dict:
"""Return flow rates for all units for every section time."""
section_states = {
time: {
unit.name: {
"total_in": defaultdict(list),
"origins": defaultdict(
lambda: defaultdict(lambda: defaultdict(list))
),
"total_out": defaultdict(list),
"destinations": defaultdict(
lambda: defaultdict(lambda: defaultdict(list))
),
}
for unit in self.flow_sheet.units
}
for time in self.section_times[0:-1]
}
for sec_time in self.section_times[0:-1]:
for unit, unit_flow_rates in self.flow_rate_timelines.items():
if isinstance(self.flow_sheet[unit], Inlet):
for port in unit_flow_rates["total_out"]:
section_states[sec_time][unit]["total_in"][port] = \
unit_flow_rates["total_out"][port].coefficients(sec_time)
else:
for port in unit_flow_rates["total_in"]:
section_states[sec_time][unit]["total_in"][port] = \
unit_flow_rates["total_in"][port].coefficients(sec_time)
for port, orig_dict in unit_flow_rates.origins.items():
for origin in orig_dict:
for origin_port, tl in orig_dict[origin].items():
section_states[sec_time][unit]["origins"][port][origin][origin_port] = \
tl.coefficients(sec_time) # noqa: E501
if isinstance(self.flow_sheet[unit], Outlet):
for port in unit_flow_rates["total_in"]:
section_states[sec_time][unit]["total_out"][port] = \
unit_flow_rates["total_in"][port].coefficients(sec_time)
else:
for port in unit_flow_rates["total_out"]:
section_states[sec_time][unit]["total_out"][port] = \
unit_flow_rates["total_out"][port].coefficients(sec_time)
for port, dest_dict in unit_flow_rates.destinations.items():
for dest in dest_dict:
for dest_port, tl in dest_dict[dest].items():
section_states[sec_time][unit]["destinations"][port][dest][dest_port] = \
tl.coefficients(sec_time) # noqa: E501
return Dict(section_states)
@property
def n_sensitivities(self) -> int:
"""int: Number of parameter sensitivities."""
return len(self.parameter_sensitivities)
@property
def parameter_sensitivities(self) -> list:
"""list: Parameter sensitivites."""
return self._parameter_sensitivities
@property
def parameter_sensitivity_names(self) -> list:
"""list: Parameter sensitivity names."""
return [sens.name for sens in self.parameter_sensitivities]
[docs]
def add_parameter_sensitivity(
self,
parameter_paths: str | list[str],
name: Optional[str] = None,
components: Optional[str | list[str]] = None,
polynomial_coefficients: Optional[str | list[str]] = None,
reaction_indices: Optional[int | list[int]] = None,
bound_state_indices: Optional[int | list[int]] = None,
section_indices: Optional[int | list[int]] = None,
abstols: Optional[float | list[float]] = None,
factors: Optional[int | list[int]] = None,
) -> None:
"""
Add parameter sensitivty to Process.
Parameters
----------
parameter_paths : str or list of str
The path to the parameter(s).
name : str, optional
The name of the parameter sensitivity.
If not provided, the name of the first parameter will be used.
components : str or list of str, optional
The component(s) to which the parameter(s) belong.
Must only be provided if parameter is specific to a certain compoment.
polynomial_coefficients: str or list of str, optional
The polynomial coefficients(s) to which the parameter(s) belong.
Must only be provided if parameter is specific to a certain coefficient.
reaction_indices : int or list of int, optional
The index(es) of the reaction(s) in the associated model(s), if applicable.
Must only be provided if parameter is specific to a certain reaction.
bound_state_indices : int or list of int, optional
The index(es) of the bound state(s) in the associated model(s), if applicable.
Must only be provided if parameter is specific to a certain bound state.
section_indices : int or list of int, optional
The index(es) of the section(s) in the associated model(s), if applicable.
Must only be provided if parameter is specific to a certain section.
abstols : float or list of float, optional
The absolute tolerances for each parameter.
If not provided, a default tolerance will be used.
factors : float or list of float, optional
The factors for each parameter.
If not provided, a default factor of 1 will be used.
Raises
------
CADETProcessError
Number of indices do not match for:
- components
- polynomial_coefficients
- reaction
- bound_state
- sections
- tolerances
- factors
Component is not found.
Unit is not found.
Parameter is not found.
Name is not provided (if number of parameters larger than 1).
If sensitivity name already exists.
Notes
-----
This functionality is still work in progress.
.. todo::
- [ ] Check if compoment/reaction/polynomial index are required.
- [ ] Specify time instead of section index;
"""
if not isinstance(parameter_paths, list):
parameter_paths = [parameter_paths]
n_params = len(parameter_paths)
if name is None:
if n_params > 1:
raise CADETProcessError(
"Must provide sensitivity name if n_params > 1."
)
else:
name = parameter_paths[0]
if name in self.parameter_sensitivity_names:
raise CADETProcessError(
"Parameter sensitivity with same name already exists."
)
if components is None:
components = n_params * [None]
if not isinstance(components, list):
components = [components]
if len(components) != n_params:
raise CADETProcessError("Number of component indices does not match.")
if components is None:
components = n_params * [None]
if not isinstance(polynomial_coefficients, list):
polynomial_coefficients = [polynomial_coefficients]
if len(polynomial_coefficients) != n_params:
raise CADETProcessError("Number of coefficient indices does not match.")
if reaction_indices is None:
reaction_indices = n_params * [None]
if not isinstance(reaction_indices, list):
reaction_indices = [reaction_indices]
if len(reaction_indices) != n_params:
raise CADETProcessError("Number of reaction indices does not match.")
if bound_state_indices is None:
bound_state_indices = n_params * [None]
if not isinstance(bound_state_indices, list):
bound_state_indices = [bound_state_indices]
if len(bound_state_indices) != n_params:
raise CADETProcessError("Number of bound_state indices does not match.")
if section_indices is None:
section_indices = n_params * [None]
if not isinstance(section_indices, list):
section_indices = [section_indices]
if len(section_indices) != n_params:
raise CADETProcessError("Number of section indices does not match.")
if abstols is None:
abstols = n_params * [None]
if not isinstance(abstols, list):
abstols = [abstols]
if len(abstols) != n_params:
raise CADETProcessError("Number of abstol entries does not match.")
if factors is None:
factors = n_params * [1]
if not isinstance(factors, list):
factors = [factors]
if len(factors) != n_params:
raise CADETProcessError("Number of factor entries does not match.")
units = []
associated_models = []
parameters = []
for param, comp, coeff, reac, state, section, tol, fac in zip(
parameter_paths,
components,
polynomial_coefficients,
reaction_indices,
bound_state_indices,
section_indices,
abstols,
factors,
):
param_parts = param.split(".")
unit = param_parts[0]
parameter = param_parts[-1]
if parameter == "flow_rate":
raise CADETProcessError(
"Flow rate is currently not supported for sensitivities."
)
parameters.append(parameter)
associated_model = None
if len(param_parts) == 3:
associated_model = param_parts[1]
if comp is not None and comp not in self.component_system.species:
raise CADETProcessError(f"Unknown component {comp}.")
unit = self.flow_sheet[unit]
if unit not in self.flow_sheet.units:
raise CADETProcessError("Not a valid unit")
units.append(unit)
if coeff is not None and parameter not in unit.polynomial_parameters:
raise CADETProcessError("Not a polynomial parameter.")
if parameter in unit.polynomial_parameters and coeff is None:
raise CADETProcessError("Polynomial coefficient must be provided.")
if associated_model is None:
if parameter not in unit.parameters:
raise CADETProcessError("Not a valid parameter.")
else:
associated_model = getattr(unit, associated_model)
if state is not None and state > associated_model.n_binding_sites:
raise ValueError("Binding site index exceed number of binding sites.")
if reac is not None and reac > associated_model.n_reactions:
raise ValueError("Reaction index exceed number of reactions.")
if parameter not in associated_model.parameters:
raise CADETProcessError("Not a valid parameter")
associated_models.append(associated_model)
sens = ParameterSensitivity(
name,
units,
parameters,
associated_models,
components,
polynomial_coefficients,
reaction_indices,
bound_state_indices,
section_indices,
abstols,
factors,
)
self._parameter_sensitivities.append(sens)
@property
def system_state(self) -> np.ndarray:
"""np.ndarray: State of the entire system."""
return self._system_state
@system_state.setter
def system_state(self, system_state: np.ndarray) -> None:
self._system_state = system_state
@property
def system_state_derivative(self) -> np.ndarray:
"""np.ndarray: State derivative of the entire system."""
return self._system_state_derivative
@system_state_derivative.setter
def system_state_derivative(self, system_state_derivative: np.ndarray) -> None:
self._system_state_derivative = system_state_derivative
@property
def parameters(self) -> dict:
"""dict: Parameters of the process."""
parameters = super().parameters
parameters["flow_sheet"] = self.flow_sheet.parameters
return parameters
@parameters.setter
def parameters(self, parameters: dict) -> None:
try:
self.flow_sheet.parameters = parameters.pop("flow_sheet")
except KeyError:
pass
super(Process, self.__class__).parameters.fset(self, parameters)
@property
def section_dependent_parameters(self) -> Dict:
"""dict: Section dependent parameters of the process."""
parameters = Dict()
parameters.flow_sheet = self.flow_sheet.section_dependent_parameters
return parameters
@property
def polynomial_parameters(self) -> Dict:
"""dict: Polynomial parameters of the process."""
parameters = super().polynomial_parameters
parameters.flow_sheet = self.flow_sheet.polynomial_parameters
return parameters
@property
def sized_parameters(self) -> Dict:
"""dict: Sized parameters of the process."""
parameters = super().sized_parameters
parameters.flow_sheet = self.flow_sheet.sized_parameters
return parameters
@property
def initial_state(self) -> dict:
"""dict: Initial state of the process."""
initial_state = {state: getattr(self, state) for state in self._initial_states}
initial_state["flow_sheet"] = self.flow_sheet.initial_state
return initial_state
@initial_state.setter
def initial_state(self, initial_state: dict) -> None:
try:
self.flow_sheet.initial_state = initial_state.pop("flow_sheet")
except KeyError:
pass
for state_name, state_value in initial_state.items():
if state_name not in self._initial_state:
raise CADETProcessError("Not an valid state")
setattr(self, state_name, state_value)
@property
def config(self) -> Dict:
"""dict[str, dict]: Parameters and initial state of the process."""
return Dict(
{"parameters": self.parameters, "initial_state": self.initial_state}
)
@config.setter
def config(self, config: dict) -> None:
self.parameters = config["parameters"]
self.initial_state = config["initial_state"]
[docs]
def add_concentration_profile(
self,
unit: str,
time: np.ndarray,
c: np.ndarray,
components: Optional[list[str]] = None,
s: float = 1e-6,
interpolation_method: Literal["cubic", "pchip", None] = "pchip",
) -> None:
"""
Add concentration profile to Process.
Parameters
----------
unit : str
The name of the inlet unit operation.
time : np.ndarray
1D array containing the time values of the concentration profile.
c : np.ndarray
2D array containing the concentration profile with shape
(len(time), n_comp), where n_comp is the number of components
specified in the `components` argument.
components : list[str] | None, optional
Component species for which the concentration profile shall be added.
If `None`, the profile is expected to have shape (len(time), n_comp).
If `-1`, the same (1D) profile is added to all components.
Default is `None`.
s : float, optional
Smoothing factor used to generate the spline representation of the
concentration profile. Default is `1e-6`.
interpolation_method : Literal["linear", "cubic", "pchip", None], optional
The interpolation method to use. Options:
- `"cubic"` : Cubic spline interpolation.
- `"pchip"` : Piecewise cubic Hermite interpolation (default).
- `None` : No interpolation, use raw time data.
Raises
------
TypeError
If the specified `unit` is not an Inlet unit operation.
ValueError
If the time values in `time` exceed the cycle time of the Process.
If `c` has an invalid shape.
If `interpolation_method` is unknown.
"""
if isinstance(unit, str):
unit = self.flow_sheet[unit]
if unit not in self.flow_sheet.inlets:
raise TypeError("Expected Inlet")
if max(time) > self.cycle_time:
raise ValueError("Inlet profile exceeds cycle time.")
# Handle components and concentration shape
if components is None:
if c.shape[1] != self.n_comp:
raise ValueError(
f"Expected shape ({len(time), self.n_comp}) for concentration array"
f". Got {c.shape}."
)
components = self.component_system.species
elif components == -1:
# Assume same profile for all components
if c.ndim > 1:
raise ValueError("Expected single concentration profile.")
c = np.column_stack([c] * self.n_comp)
components = self.component_system.species
if not isinstance(components, list):
components = [components]
indices = [self.component_system.species_indices[comp] for comp in components]
if len(indices) == 1 and c.ndim == 1:
c = np.array(c, ndmin=2).T
# Determine the interpolation method
if interpolation_method is None:
time_interp = time
else:
time_interp = np.linspace(0, max(time), max(1001, len(time)))
match interpolation_method:
case None:
pass
case "cubic":
interpolator = interpolate.CubicSpline(time, c)
case "pchip":
interpolator = interpolate.PchipInterpolator(time, c)
case _:
raise ValueError(
f"Unknown `interpolation_method`: {interpolation_method}."
)
c = interpolator(time_interp)
# Process each component's profile
for i, comp in enumerate(indices):
profile = c[:, i]
# Normalize the profile
min_val = np.min(profile)
max_val = np.max(profile)
range_val = max_val - min_val
if range_val == 0:
warn(
f"Component {comp} has no variation in concentration; "
"scaling will be skipped."
)
range_val = 1 # Avoid division by zero
normalized_profile = (profile - min_val) / range_val
# Fit the spline on the normalized profile
tck = interpolate.splrep(time_interp, normalized_profile, s=s)
ppoly = interpolate.PPoly.from_spline(tck)
# Add events with properly unscaled coefficients
for j, (t, sec) in enumerate(zip(ppoly.x, ppoly.c.T)):
if j < 3 or j > len(ppoly.x) - 5:
continue
# Scale coefficients and adjust the constant term
scaled_sec = sec * range_val
scaled_sec[-1] += min_val # Adjust the constant term for shifting
self.add_event(
f"{unit}_inlet_{comp}_{j - 3}",
f"flow_sheet.{unit}.c",
np.flip(scaled_sec),
t,
comp,
)
[docs]
def add_flow_rate_profile(
self,
unit: str,
time: np.ndarray,
flow_rate: np.ndarray,
s: float = 1e-6,
interpolation_method: Literal["cubic", "pchip", None] = "pchip",
) -> None:
"""
Add flow rate profile to a SourceMixin unit operation.
Parameters
----------
unit : str
The name of the SourceMixin unit operation.
time : np.ndarray
1D array containing the time values of the flow rate profile.
flow_rate : np.ndarray
1D array containing the flow rate values over time.
s : float, optional
Smoothing factor used to generate the spline representation of the flow rate profile.
Default is `1e-6`.
interpolation_method : Literal["linear", "cubic", "pchip", None], optional
The interpolation method to use. Options:
- `"cubic"` : Cubic spline interpolation.
- `"pchip"` : Piecewise cubic Hermite interpolation (default).
- `None` : No interpolation, use raw time data.
Raises
------
TypeError
If the specified `unit` is not a SourceMixin unit operation.
ValueError
If the time values in `time` exceed the cycle time of the Process.
"""
if isinstance(unit, str):
unit = self.flow_sheet[unit]
if unit not in self.flow_sheet.inlets + self.flow_sheet.cstrs:
raise TypeError("Expected SourceMixin.")
if max(time) > self.cycle_time:
raise ValueError("Inlet profile exceeds cycle time.")
# Compute min and max for scaling
min_val = np.min(flow_rate)
max_val = np.max(flow_rate)
range_val = max_val - min_val
if range_val == 0:
warn("Flow rate has no variation; scaling will be skipped.")
range_val = 1 # Avoid division by zero, effectively skipping scaling
# Normalize flow_rate to [0, 1]
normalized_flow_rate = (flow_rate - min_val) / range_val
# Determine interpolation method
if interpolation_method is None:
time_interp = time
interpolated_flow_rate = normalized_flow_rate
else:
time_interp = np.linspace(0, max(time), max(1001, len(time)))
match interpolation_method:
case "cubic":
interpolator = interpolate.CubicSpline(time, normalized_flow_rate)
case "pchip":
interpolator = interpolate.PchipInterpolator(
time, normalized_flow_rate
)
case _:
raise ValueError(
f"Unknown `interpolation_method`: {interpolation_method}."
)
interpolated_flow_rate = interpolator(time_interp)
# Fit the spline with the interpolated normalized flow rate
tck = interpolate.splrep(time_interp, interpolated_flow_rate, s=s)
ppoly = interpolate.PPoly.from_spline(tck)
# Add events with unscaled coefficients
for i, (t, sec) in enumerate(zip(ppoly.x, ppoly.c.T)):
if i < 3 or i > len(ppoly.x) - 5:
continue
# Unscale all coefficients
unscaled_sec = sec * range_val
unscaled_sec[-1] += min_val # Adjust the constant term for shifting
self.add_event(
f"{unit}_flow_rate_{i - 3}",
f"flow_sheet.{unit}.flow_rate",
np.flip(unscaled_sec),
t,
)
[docs]
def check_config(self) -> bool:
"""
Validate that process config is setup correctly.
Returns
-------
check : Bool
True if process is setup correctly. False otherwise.
"""
flag = super().check_config()
missing_parameters = self.flow_sheet.missing_parameters
if len(missing_parameters) > 0:
for param in missing_parameters:
if f"flow_sheet.{param}" not in self.event_parameters:
warn(f"Missing parameter {param}.")
flag = False
if not self.flow_sheet.check_connections():
flag = False
if self.cycle_time is None:
warn("Cycle time is not set")
flag = False
if not self.check_cstr_volume():
flag = False
return flag
[docs]
def check_cstr_volume(self) -> bool:
"""
Check if CSTRs run empty.
Returns
-------
flag : bool
False if any of the CSTRs run empty. True otherwise.
"""
flag = True
for cstr in self.flow_sheet.cstrs:
if cstr.flow_rate is None:
continue
V_0 = cstr.init_liquid_volume
unit_index = self.flow_sheet.get_unit_index(cstr)
for port in self.flow_sheet.units[unit_index].ports:
V_in = self.flow_rate_timelines[cstr.name].total_in[port].integral()
V_out = self.flow_rate_timelines[cstr.name].total_out[port].integral()
if V_0 + V_in - V_out < 0:
flag = False
warn(f"CSTR {cstr.name} runs empty on port {port} during process.")
return flag
def __str__(self) -> str:
"""str: String representation of the process."""
return self.name
@dataclass
class ParameterSensitivity:
"""Class for storing parameter sensitivity parameters."""
name: str
units: list
parameters: list
associated_models: list = None
components: list = None
polynomial_coefficients: list = None
reaction_indices: list = None
bound_state_indices: list = None
section_indices: list = None
abstols: list = None
factors: list = None
@dataclass
class ProcessMeta:
"""Class for storing process meta information."""
cycle_time: float
V_eluent: float
V_solid: float
m_feed: list[float]