from copy import deepcopy
from functools import wraps
from typing import Any, Optional, NoReturn
import warnings
import numpy as np
import matplotlib.pyplot as plt
from CADETProcess import CADETProcessError
from CADETProcess import plotting
from CADETProcess.dataStructure import Structure
from CADETProcess.dataStructure import Integer, UnsignedInteger, UnsignedFloat
from CADETProcess.processModel import ComponentSystem
from CADETProcess.processModel import BindingBaseClass, Linear, Langmuir
from CADETProcess.processModel import UnitBaseClass, FlowSheet, Process
from CADETProcess.processModel import Inlet, TubularReactorBase, Cstr, Outlet
from CADETProcess.solution import SolutionBase
__all__ = [
'SerialZone',
'ParallelZone',
'CarouselBuilder',
'SMBBuilder',
'LinearSMBBuilder',
'LangmuirSMBBuilder',
]
class ZoneBaseClass(UnitBaseClass):
"""Base class for a multi-column zone with configurable columns and flow directions.
Attributes
----------
n_columns : UnsignedInteger
The number of columns in the zone.
flow_direction : Integer
The flow direction in the zone, where 1 indicates normal and -1 indicates
reverse flow.
valve_dead_volume : UnsignedFloat
The dead volume of the valves in the zone.
"""
n_columns = UnsignedInteger()
flow_direction = Integer(default=1)
valve_dead_volume = UnsignedFloat(default=1e-6)
def __init__(
self,
component_system: ComponentSystem,
name: str,
n_columns: int = 1,
flow_direction: int = 1,
initial_state: list = None,
*args,
**kwargs
) -> NoReturn:
"""
Initialize a ZoneBaseClass instance.
Parameters
----------
component_system : Any
The component system associated with this zone.
name : str
The name of the zone.
n_columns : int, optional
Number of columns in the zone.
flow_direction : int, optional
Flow direction in the zone.
initial_state : Any, optional
Initial state of the zone.
"""
self.n_columns = n_columns
self.flow_direction = flow_direction
self.initial_state = initial_state
self._inlet_unit = Cstr(component_system, f'{name}_inlet')
self._inlet_unit.V = self.valve_dead_volume
self._outlet_unit = Cstr(component_system, f'{name}_outlet')
self._outlet_unit.V = self.valve_dead_volume
super().__init__(component_system, name, *args, **kwargs)
@property
def initial_state(self) -> list[dict[str, list]]:
"""list: The initial state of the columns in the zone."""
return self._initial_state
@initial_state.setter
def initial_state(
self,
initial_state: list[dict[str, list]] | dict[str, list]
) -> NoReturn:
if initial_state is None:
self._initial_state = initial_state
return
if not isinstance(initial_state, list):
initial_state = self.n_columns * [initial_state]
if len(initial_state) != self.n_columns:
raise CADETProcessError(f"Expected size {self.n_columns}")
self._initial_state = initial_state
@property
def inlet_unit(self) -> Cstr:
"""Cstr: The inlet CSTR unit of the zone."""
return self._inlet_unit
@property
def outlet_unit(self) -> Cstr:
"""Cstr: The outlet CSTR unit of the zone."""
return self._outlet_unit
[docs]
class SerialZone(ZoneBaseClass):
"""Zone with columns connected in series."""
pass
[docs]
class ParallelZone(ZoneBaseClass):
"""Zone with columns connected in parallel."""
pass
[docs]
class CarouselBuilder(Structure):
"""
Configurator for multi-column processes.
Attributes
----------
component_system : Any
The system of components for which the carousel is configured.
name : str
Name of the carousel system.
switch_time : float
Column switch time.
"""
switch_time = UnsignedFloat()
def __init__(self, component_system: ComponentSystem, name: str) -> NoReturn:
"""
Initialize a CarouselBuilder instance.
Parameters
----------
component_system : Any
The system of components that will be used in the carousel.
name : str
The carousel name.
"""
self.component_system = component_system
self.name = name
self._flow_sheet = FlowSheet(component_system, name)
self._column = None
@property
def flow_sheet(self) -> FlowSheet:
"""FlowSheet: The flow sheet instance associated with the carousel."""
return self._flow_sheet
@property
def column(self) -> TubularReactorBase:
"""TubularReactorBase: The column template for all zones."""
return self._column
@column.setter
def column(self, column: TubularReactorBase) -> NoReturn:
if not isinstance(column, TubularReactorBase):
raise TypeError("Column must be an instance of TubularReactorBase.")
if self.component_system is not column.component_system:
raise CADETProcessError('Number of components does not match.')
self._column = column
@wraps(FlowSheet.add_unit)
def add_unit(self, *args, **kwargs):
"""Wrap FlowSheet.add_unit to add a unit to the flow sheet."""
self.flow_sheet.add_unit(*args, **kwargs)
@wraps(FlowSheet.add_connection)
def add_connection(self, *args, **kwargs):
"""Wrap FlowSheet.add_connection to add a connection between units."""
self.flow_sheet.add_connection(*args, **kwargs)
@wraps(FlowSheet.set_output_state)
def set_output_state(self, *args, **kwargs) -> NoReturn:
"""Wrap FlowSheet.set_output_state to set the output state of a unit."""
self.flow_sheet.set_output_state(*args, **kwargs)
@property
def zones(self) -> list[ZoneBaseClass]:
"""
Get all zones in the carousel system.
Returns
-------
list
A list of all zones in the carousel system.
"""
return [
unit for unit in self.flow_sheet.units if isinstance(unit, ZoneBaseClass)
]
@property
def zone_names(self) -> list[str]:
"""list: Zone names."""
return [zone.name for zone in self.zones]
@property
def zones_dict(self) -> dict[str, ZoneBaseClass]:
"""dict: Zone names and objects."""
return {zone.name: zone for zone in self.zones}
@property
def n_zones(self) -> int:
"""int: Number of zones in the Carousel System."""
return len(self.zones)
@property
def n_columns(self) -> int:
"""int: Number of columns in the Carousel System."""
return sum([zone.n_columns for zone in self.zones])
[docs]
def build_flow_sheet(self) -> FlowSheet:
"""
Assemble the flow sheet.
Returns
-------
FlowSheet
The assembled flow sheet.
"""
if self.column is None:
raise CADETProcessError("No column associated with Carousel.")
flow_sheet = FlowSheet(self.component_system, self.name)
self._add_units(flow_sheet)
self._add_inter_zone_connections(flow_sheet)
self._add_intra_zone_connections(flow_sheet)
return flow_sheet
def _add_units(self, flow_sheet: FlowSheet) -> NoReturn:
"""Add units to flow_sheet."""
col_index = 0
for unit in self.flow_sheet.units:
if not isinstance(unit, ZoneBaseClass):
is_feed_inlet = unit in self.flow_sheet.feed_inlets
is_eluent_inlet = unit in self.flow_sheet.eluent_inlets
is_output_outlet = unit in self.flow_sheet.product_outlets
flow_sheet.add_unit(
unit,
feed_inlet=is_feed_inlet,
eluent_inlet=is_eluent_inlet,
product_outlet=is_output_outlet,
)
else:
flow_sheet.add_unit(unit.inlet_unit)
flow_sheet.add_unit(unit.outlet_unit)
for i_col in range(unit.n_columns):
col = deepcopy(self.column)
col.component_system = self.component_system
col.name = f'column_{col_index}'
if unit.initial_state is not None:
col.initial_state = unit.initial_state[i_col]
flow_sheet.add_unit(col)
col_index += 1
def _add_inter_zone_connections(self, flow_sheet: FlowSheet) -> NoReturn:
"""Add connections between zones."""
for unit, connections in self.flow_sheet.connections.items():
if isinstance(unit, ZoneBaseClass):
origin = unit.outlet_unit
else:
origin = unit
for destination in connections.destinations:
if isinstance(destination, ZoneBaseClass):
destination = destination.inlet_unit
flow_sheet.add_connection(origin, destination)
for zone in self.zones:
output_state = self.flow_sheet.output_states[zone]
flow_sheet.set_output_state(zone.outlet_unit, output_state)
def _add_intra_zone_connections(self, flow_sheet: FlowSheet) -> NoReturn:
"""Add connections within zones."""
for zone in self.zones:
for col_index in range(self.n_columns):
col = flow_sheet[f'column_{col_index}']
flow_sheet.add_connection(zone.inlet_unit, col)
col = flow_sheet[f'column_{col_index}']
flow_sheet.add_connection(col, zone.outlet_unit)
for col_index in range(self.n_columns):
col_orig = flow_sheet[f'column_{col_index}']
if col_index < self.n_columns - 1:
col_dest = flow_sheet[f'column_{col_index + 1}']
else:
col_dest = flow_sheet[f'column_{0}']
flow_sheet.add_connection(col_orig, col_dest)
[docs]
def build_process(self) -> Process:
"""
Assemble the process object.
Returns
-------
Process
The assembled process object.
"""
flow_sheet = self.build_flow_sheet()
process = Process(flow_sheet, self.name)
self._add_events(process)
return process
@property
def cycle_time(self) -> float:
"""float: cycle time of the process."""
return self.n_columns * self.switch_time
def _add_events(self, process):
"""Add events to process."""
process.cycle_time = self.n_columns * self.switch_time
process.add_duration('switch_time', self.switch_time)
for carousel_state in range(self.n_columns):
position_counter = 0
for i_zone, zone in enumerate(self.zones):
col_indices = np.arange(zone.n_columns)
col_indices += position_counter
col_indices = self.column_indices_at_state(
col_indices, carousel_state
)
if isinstance(zone, SerialZone):
evt = process.add_event(
f'{zone.name}_{carousel_state}',
f'flow_sheet.output_states.{zone.inlet_unit}',
col_indices[0]
)
process.add_event_dependency(
evt.name, 'switch_time', [carousel_state]
)
for i, col in enumerate(col_indices):
if i < (zone.n_columns - 1):
evt = process.add_event(
f'column_{col}_{carousel_state}',
f'flow_sheet.output_states.column_{col}',
self.n_zones
)
else:
evt = process.add_event(
f'column_{col}_{carousel_state}',
f'flow_sheet.output_states.column_{col}',
i_zone
)
process.add_event_dependency(
evt.name, 'switch_time', [carousel_state]
)
elif isinstance(zone, ParallelZone):
output_state = self.n_columns * [0]
for col in col_indices:
output_state[col] = 1/zone.n_columns
evt = process.add_event(
f'{zone.name}_{carousel_state}',
f'flow_sheet.output_states.{zone.inlet_unit}',
output_state
)
process.add_event_dependency(
evt.name, 'switch_time', [carousel_state]
)
for col in col_indices:
evt = process.add_event(
f'column_{col}_{carousel_state}',
f'flow_sheet.output_states.column_{col}',
i_zone
)
process.add_event_dependency(
evt.name, 'switch_time', [carousel_state]
)
for i, col in enumerate(col_indices):
evt = process.add_event(
f'column_{col}_{carousel_state}_velocity',
f'flow_sheet.column_{col}.flow_direction',
zone.flow_direction
)
process.add_event_dependency(
evt.name, 'switch_time', [carousel_state]
)
position_counter += zone.n_columns
[docs]
def carousel_state(self, t) -> float:
"""int: Carousel state at given time.
Parameters
----------
t: float
Time
"""
return int(np.floor((t % self.cycle_time) / self.switch_time))
[docs]
def column_indices_at_state(
self,
carousel_positions: np.typing.NDArray[int],
carousel_state: int
) -> np.ndarray[int]:
"""Determine index of column unit at given carousel position and state.
Parameters
----------
carousel_positions: np.typing.NDArray[np.int_]
Carousel position indices (e.g. wash position, elute position).
carousel_state : int
Curent state of the carousel system.
Returns
-------
np.ndarray[int]
Indices of column units at given carousel positions and state.
"""
carousel_positions = np.array(carousel_positions, dtype=int)
return (carousel_positions + carousel_state) % self.n_columns
[docs]
def column_indices_at_time(
self,
t: float,
carousel_positions: np.typing.NDArray[int],
) -> int:
"""Determine index of column unit at given carousel position and time.
Parameters
----------
t: float
Time
carousel_position : int
Carousel position.
Returns
-------
np.ndarray[int]
Indices of column units at given carousel positions and time.
"""
carousel_positions = np.array(carousel_positions, dtype=int)
carousel_state = self.carousel_state(t)
column_indices = self.column_indices_at_state(
carousel_positions, carousel_state
)
return column_indices
class SMBBuilder(CarouselBuilder):
"""
Configurator for 4 Zone SMB systems.
Attributes
----------
binding_model_type : Type[BindingBaseClass]
Specifies the type of binding model used in the SMB process.
"""
binding_model_type = BindingBaseClass
def __init__(
self,
feed: Inlet,
eluent: Inlet,
column: TubularReactorBase,
name: str = 'SMB'
) -> NoReturn:
"""
Initialize an SMBBuilder instance.
Parameters
----------
feed : Inlet
The feed inlet to the SMB system.
eluent : Inlet
The eluent inlet to the SMB system.
column : TubularReactorBase
The column equipment in the SMB system.
name : str, optional
The name of the SMB setup, by default 'SMB'.
Raises
------
CADETProcessError
If component systems of feed, eluent, and column do not match.
TypeError
If feed, eluent, or column are not of the expected type.
"""
component_system = feed.component_system
if not (component_system is eluent.component_system is column.component_system):
raise CADETProcessError("ComponentSystems do not match.")
if not isinstance(feed, Inlet):
raise TypeError(f"Expected inlet object. Got {type(feed)}.")
if not isinstance(eluent, Inlet):
raise TypeError(f"Expected inlet object. Got {type(eluent)}.")
if not isinstance(column, TubularReactorBase):
raise TypeError(f"Expected Column object. Got {type(column)}.")
self._validate_binding_model(column.binding_model)
super().__init__(component_system, name)
raffinate = Outlet(component_system, name='raffinate')
extract = Outlet(component_system, name='extract')
zone_I = SerialZone(component_system, 'zone_I', 1)
zone_II = SerialZone(component_system, 'zone_II', 1)
zone_III = SerialZone(component_system, 'zone_III', 1)
zone_IV = SerialZone(component_system, 'zone_IV', 1)
# Carousel Builder
self.column = column
self.add_unit(feed, feed_inlet=True)
self.add_unit(eluent, eluent_inlet=True)
self.add_unit(raffinate, product_outlet=True)
self.add_unit(extract, product_outlet=True)
self.add_unit(zone_I)
self.add_unit(zone_II)
self.add_unit(zone_III)
self.add_unit(zone_IV)
self.add_connection(eluent, zone_I)
self.add_connection(zone_I, extract)
self.add_connection(zone_I, zone_II)
self.add_connection(zone_II, zone_III)
self.add_connection(feed, zone_III)
self.add_connection(zone_III, raffinate)
self.add_connection(zone_III, zone_IV)
self.add_connection(zone_IV, zone_I)
def _validate_binding_model(self, binding_model):
"""
Validate that the provided binding model matches the required type.
Parameters
----------
binding_model : BindingBaseClass
The binding model to be validated.
Raises
------
TypeError
If the binding model does not match the expected type.
"""
if not isinstance(binding_model, self.binding_model_type):
raise TypeError(
f'Invalid binding model. Expected {self.binding_model_type}.'
)
def _get_zone_flow_rates(self, m, switch_time):
m1, m2, m3, m4 = m
Vc = self.column.volume
et = self.column.total_porosity
Q_I = Vc*(m1*(1-et)+et)/switch_time # Flow rate Zone I
Q_II = Vc*(m2*(1-et)+et)/switch_time # Flow rate Zone II
Q_III = Vc*(m3*(1-et)+et)/switch_time # Flow rate Zone III
Q_IV = Vc*(m4*(1-et)+et)/switch_time # Flow rate Zone IV
return [Q_I, Q_II, Q_III, Q_IV]
def _get_unit_flow_rates(self, Q_zones):
Q_I, Q_II, Q_III, Q_IV = Q_zones
Q_feed = Q_III - Q_II
Q_eluent = Q_I - Q_IV
Q_raffinate = Q_III - Q_IV
Q_extract = Q_I - Q_II
return [Q_feed, Q_eluent, Q_raffinate, Q_extract]
def _get_split_ratios(self, Q_zones, Q_units):
Q_I, Q_II, Q_III, Q_IV = Q_zones
Q_feed, Q_eluent, Q_raffinate, Q_extract = Q_units
w_r = Q_raffinate / Q_III
w_e = Q_extract / Q_I
return w_r, w_e
def get_design_parameters(
self,
binding_model: BindingBaseClass,
c_feed: np.ndarray,
) -> Any:
"""
Retrieve design parameters based on the binding model.
Parameters
----------
binding_model : BindingBaseClass
The binding model used to calculate design parameters.
c_feed : np.ndarray
The feed concentration.
Returns
-------
Any
The design parameters computed from the binding model.
Raises
------
NotImplementedError
This method should be implemented by subclasses.
"""
raise NotImplementedError("Subclass must implement this method.")
def calculate_m_opt(self, *design_parameters: Any) -> list:
"""
Calculate the optimal zone flow rates based on provided design parameters.
Parameters
----------
design_parameters : Any
A variable number of parameters defining the design.
Returns
-------
list
The optimal zone flow rates based on the design parameters.
Raises
------
NotImplementedError
This method should be implemented by subclasses.
"""
raise NotImplementedError("Subclass must implement this method.")
def apply_safety_factor(
self,
m_opt: list,
*design_parameters: Any,
gamma: float | list[float]
) -> Any:
"""
Apply a safety factor to the optimal zone flow rates.
Parameters
----------
m_opt : Any
The optimal zone flow rates calculated without safety considerations.
design_parameters : Any
A variable number of parameters defining the design.
gamma : float | list[float]
The safety factor(s) to apply to the zone flow rates. If float is provided,
the same value is applied to all zone flow rates.
Returns
-------
list
The adjusted zone flow rates after applying the safety factor.
Raises
------
NotImplementedError
This method should be implemented by subclasses.
"""
raise NotImplementedError("Subclass must implement this method.")
def triangle_design(
self,
binding_model: Optional[BindingBaseClass] = None,
c_feed: Optional[np.ndarray] = None,
switch_time: Optional[float] = None,
gamma: float | list[float] = 1,
set_values: bool = True
) -> list[float]:
"""
Design the SMB process according to the triangle theory.
Parameters
----------
binding_model : BindingBaseClass, optional
The binding model to be used.
If None, uses the binding model from the column attribute.
c_feed : np.ndarray, optional
Feed concentration matrix.
If None, uses current feed concentrations.
switch_time : float, optional
Switching time for the SMB zones.
If None, uses the existing switch_time attribute.
gamma : float | list[float]
The safety factor(s) to apply to the zone flow rates. If float is provided,
the same value is applied to all zone flow rates.
The default is 1.
set_values : bool, default True
If True, sets the calculated values to the process model.
Returns
-------
list
A list containing the feed flow rate, eluent flow rate, raffinate and
extract split ratios.
Raises
------
Warning
If binding_model is provided and set_values is True, values cannot be set.
"""
if binding_model is None:
binding_model = self.column.binding_model
elif set_values is True:
warnings.warn("Cannot set values if binding_model is given.")
set_values = False
self._validate_binding_model(binding_model)
if c_feed is None:
c_feed = self.flow_sheet.feed.c[:, 0]
elif set_values is True:
self.flow_sheet.feed.c = c_feed
design_parameters = self.get_design_parameters(binding_model, c_feed)
m_opt = self.calculate_m_opt(*design_parameters)
m = self.apply_safety_factor(
m_opt,
*design_parameters,
gamma=gamma
)
if switch_time is None:
switch_time = self.switch_time
elif set_values is True:
self.switch_time = switch_time
Q_zones = self._get_zone_flow_rates(m, switch_time)
Q_units = self._get_unit_flow_rates(Q_zones)
Q_feed, Q_eluent, Q_raffinate, Q_extract = Q_units
w_r, w_e = self._get_split_ratios(Q_zones, Q_units)
if set_values:
self.flow_sheet.feed.flow_rate = Q_feed
self.flow_sheet.eluent.flow_rate = Q_eluent
self.set_output_state('zone_I', [w_e, 1-w_e])
self.set_output_state('zone_III', [w_r, 1-w_r])
return [Q_feed, Q_eluent, w_r, w_e]
def plot_triangle(
self,
binding_model: Optional[BindingBaseClass] = None,
c_feed: Optional[np.ndarray] = None,
gamma: float | list[float] = 1,
fig: Optional[plt.Figure] = None,
ax: Optional[plt.Axes] = None
) -> tuple[plt.Figure, plt.Axes]:
"""
Plot the triangle diagram for the SMB process with the operating point marked.
Parameters
----------
binding_model : BindingBaseClass, optional
The binding model to use for the plot.
If None, uses the column's binding model.
c_feed : np.ndarray, optional
The feed concentration array.
If None, uses the current feed concentration from the flow sheet.
gamma : float | list[float]
The safety factor(s) to apply to the zone flow rates. If float is provided,
the same value is applied to all zone flow rates.
fig : plt.Figure, optional
The matplotlib figure object. If None, a new figure will be created.
ax : plt.Axes, optional
The matplotlib axes object.
If None, a new axes will be created in the new or specified figure.
Returns
-------
tuple[plt.Figure, plt.Axes]
The figure and axes objects containing the triangle diagram.
Notes
-----
This method uses internal methods to fetch design parameters and calculate
optimal zone flow rates, which should be defined in a subclass.
"""
if binding_model is None:
binding_model = self.column.binding_model
self._validate_binding_model(binding_model)
if c_feed is None:
c_feed = self.flow_sheet.feed.c[:, 0]
# Operating Point
design_parameters = self.get_design_parameters(binding_model, c_feed)
m_opt = self.calculate_m_opt(*design_parameters)
m1, m2, m3, m4 = self.apply_safety_factor(
m_opt,
*design_parameters,
gamma=gamma
)
# Setup figure
if fig is None:
fig, ax = plt.subplots(figsize=(10, 10))
# Plot Triangle
self._plot_triangle(ax, *design_parameters)
ax.set_xlabel('$m_{II}$')
ax.set_ylabel('$m_{III}$')
# Operating point
ax.scatter(m2, m3, c='k', marker='x', zorder=3)
ax.annotate(
'operating point',
xy=(m2, m3),
xytext=(1.1*m2, 0.9*m3),
arrowprops=dict(facecolor='black', shrink=0.01),
)
return fig, ax
def _plot_triangle(self, ax, *design_parameters):
"""
Plot a theoretical triangle diagram for SMB processes.
This method needs to be implemented by subclasses to define the specific way the
triangle is visualized based on provided design parameters.
Parameters
----------
ax : matplotlib.axes.Axes
The matplotlib axes on which the triangle will be plotted.
design_parameters : Any
Variable-length argument list of design parameters that define the triangle.
Raises
------
NotImplementedError
This is a placeholder method and should be implemented in subclasses.
"""
raise NotImplementedError()
class LinearSMBBuilder(SMBBuilder):
"""
Configure a 4-zone SMB system with linear isotherms.
Note, this is currently only supported for 2-Component systems.
Attributes
----------
binding_model_type : type
Specifies that this builder uses the Linear binding model.
"""
binding_model_type = Linear
def _validate_binding_model(self, binding_model):
"""
Validate the binding model for compatibility with Langmuir SMB systems.
Parameters
----------
binding_model : BindingBaseClass
The binding model to be validated.
Raises
------
CADETProcessError
If the binding model does not contain exactly two components.
Warns
-----
RuntimeWarning
If the binding model is kinetic, which conflicts with the assumption of
instant equilibrium.
"""
super()._validate_binding_model(binding_model)
if binding_model.n_comp != 2:
raise CADETProcessError("This only works for 2-Component Systems.")
if binding_model.is_kinetic:
warnings.warn(
"Isotherm uses kinetic binding, "
"however, triangle theory assumes instant equilibrium."
)
def get_design_parameters(
self,
binding_model: BindingBaseClass,
c_feed: np.ndarray
) -> tuple[float, float]:
"""
Calculate Henry's constants (H) based on adsorption and desorption rates.
Parameters
----------
binding_model : BindingBaseClass
The binding model containing adsorption and desorption rate data.
c_feed : np.ndarray
The concentration feed.
Returns
-------
Tuple[float, float]
Henry's constants for the strongly and weakly adsorbing components.
"""
k_ads = np.array(binding_model.adsorption_rate)
k_des = np.array(binding_model.desorption_rate)
H = k_ads / k_des
if H[1] < H[0]:
HA, HB = H
else:
HB, HA = H
return HA, HB
def calculate_m_opt(
self,
HA: float,
HB: float
) -> list[float]:
"""
Calculate the optimal flow rates for SMB zones based on Henry's constants.
Parameters
----------
HA : float
Henry's constant for strongly binding component.
HB : float
Henry's constant for weakly binding component.
Returns
-------
list[float]
List of optimal flow rates for zones 1 to 4, respectively.
"""
m1 = HA
m2 = HB
m3 = HA
m4 = HB
return [m1, m2, m3, m4]
def apply_safety_factor(
self,
m_opt: list[float],
*design_parameters: Any,
gamma: float | list[float] = 1
) -> list[float]:
"""
Adjust the optimal flow rates by applying safety factors to each zone.
Parameters
----------
m_opt : list[float]
A list containing the optimal flow rates for zones 1 to 4.
design_parameters : Union[float, List[float]]
Additional design parameters that might influence safety factor application.
gamma : float | list[float]
Safety factors to apply. Can be a single float applied to all zones, or a
list of floats for each zone. The default is 1.
Returns
-------
list[float]
Adjusted flow rates for zones 1 to 4 after applying the safety factors.
Raises
------
ValueError
If the product of gamma for zones 2 and 3 is not smaller than the ratio of
m3_opt to m2_opt, which is a critical operational constraint.
"""
m1_opt, m2_opt, m3_opt, m4_opt = m_opt
if np.isscalar(gamma):
gamma = 4 * [gamma]
gamma_1, gamma_2, gamma_3, gamma_4 = gamma
if gamma_2 * gamma_3 >= m3_opt / m2_opt:
raise ValueError("gamma_2 * gamma_3 must be smaller than HA / HB ")
m1 = gamma_1 * m1_opt
m2 = gamma_2 * m2_opt
m3 = m3_opt / gamma_3
m4 = m4_opt / gamma_4
return [m1, m2, m3, m4]
def _plot_triangle(
self, ax,
HA: float,
HB: float,
) -> NoReturn:
"""
Plot SMB triangle for linear isotherm.
Notable points:
- a: [HA, HA]
- b: [HB, HB]
- w_opt: [HB, HA]
Parameters
----------
ax : matplotlib.axes.Axes
Axes to plot on.
HA : float
Henry coefficient of strongly binding component.
HB : float
Henry coefficient of strongly binding component.
"""
# Bounds
lb = HB - 0.3 * (HA - HB)
ub = HA + 0.3 * (HA - HB)
ax.set_xlim(lb, ub)
ax.set_ylim(lb, ub)
# Diagonal
ax.plot((lb, ub), (lb, ub), 'k')
# Henry coefficients
for h in [HB, HA]:
ax.hlines(h, 0, h, 'k', 'dashed')
ax.vlines(h, h, ub, 'k', 'dashed')
# Triangle
ax.hlines(HA, HB, HA, 'k')
ax.vlines(HB, HB, HA, 'k')
# Label regions
ax.text(
(HB + (HA - HB) / 2), (0.95 * ub),
'Pure extract',
ha='center', va='center'
)
ax.text(
(1.05 * lb), (HB + (HA - HB) / 2),
'Pure raffinate',
ha='center', va='center',
rotation='vertical',
)
class LangmuirSMBBuilder(SMBBuilder):
"""
Configure a 4-zone SMB system with Langmuir isotherms.
Note, this is currently only supported for 2-Component systems.
Attributes
----------
binding_model_type : type
Specifies that this builder uses the Linear binding model.
"""
binding_model_type = Langmuir
def _validate_binding_model(self, binding_model: BindingBaseClass) -> NoReturn:
"""
Validate the binding model for compatibility with Langmuir SMB systems.
Parameters
----------
binding_model : BindingBaseClass
The binding model to be validated.
Raises
------
CADETProcessError
If the binding model does not contain exactly two components.
Warns
-----
RuntimeWarning
If the binding model is kinetic, which conflicts with the assumption of
instant equilibrium.
"""
super()._validate_binding_model(binding_model)
if binding_model.n_comp != 2:
raise CADETProcessError("This only works for 2-Component Systems.")
if binding_model.is_kinetic:
warnings.warn(
"Isotherm uses kinetic binding, "
"however, triangle theory assumes instant equilibrium."
)
def get_design_parameters(
self,
binding_model: BindingBaseClass,
c_feed: np.ndarray
) -> tuple[float, float]:
"""
Calculate the optimal flow rates for SMB zones based on the provided parameters.
Parameters
----------
binding_model : BindingBaseClass
The binding model parameters.
c_feed : np.ndarray
The concentration feed matrix.
Returns
-------
tuple
A tuple containing Henry's constants HA, HB, binding coefficients bA, bB,
feed concentrations cFA, cFB, and quadratic nulls wG, wF.
"""
k_ads = np.array(binding_model.adsorption_rate)
k_des = np.array(binding_model.desorption_rate)
k_eq = k_ads / k_des
q_sat = np.array(binding_model.capacity)
H = [k_eq * q_s for k_eq, q_s in zip(k_eq, q_sat)]
if H[1] < H[0]:
HA, HB = H
bA, bB = k_eq
cFA, cFB = c_feed
else:
HB, HA = H
bB, bA = k_eq
cFB, cFA = c_feed
a = -(HA * (1 + bB * cFB) + HB * (1 + bA * cFA)) / (1 + bB * cFB + bA * cFA)
b = HA * HB / (1 + bB * cFB + bA * cFA)
wG = -a / 2 + np.sqrt((-a / 2)**2 - b)
wF = -a / 2 - np.sqrt((-a / 2)**2 - b)
return HA, HB, bA, bB, cFA, cFB, wG, wF
def calculate_m_opt(
self,
HA: float,
HB: float,
bA: float,
bB: float,
cFA: float,
cFB: float,
wG: float,
wF: float
) -> list[float]:
"""
Calculate optimal zone flow rates based on Langmuir isotherm parameters.
Parameters
----------
HA : float
Henry's constant for strongly binding component.
HB : float
Henry's constant for weakly binding component.
bA : float
Binding coefficient for strongly binding component.
bB : float
Binding coefficient for weakly binding component.
cFA : float
Feed concentration for strongly binding component.
cFB : float
Feed concentration for weakly binding component.
wG : float
First quadratic null.
wF : float
Second quadratic null.
Returns
-------
list[float]
List of optimal flow rates for zones 1 to 4.
"""
m1 = HA
m2 = HB / HA * wG
m3 = wG * (wF * (HA - HB) + HB * (HB - wF)) / (HB * (HA - wF))
m4 = 1 / 2 * (HB + m3 + bB * cFB * (m3 - m2) - np.sqrt((HB + m3 + bB * cFB * (m3 - m2))**2 - 4 * HB * m3))
return [m1, m2, m3, m4]
def apply_safety_factor(
self,
m_opt: list[float],
HA: float,
HB: float,
bA: float,
bB: float,
cFA: float,
cFB: float,
wG: float,
wF: float,
gamma: float | list[float] = 1
) -> list[float]:
"""
Apply a safety factor to the optimal zone flow rates.
Parameters
----------
m_opt : list[float]
List of optimal flow rates for zones 1 to 4.
HA : float
Henry's constant for strongly binding component.
HB : float
Henry's constant for weakly binding component.
bA : float
Binding coefficient for strongly binding component.
bB : float
Binding coefficient for weakly binding component.
cFA : float
Feed concentration for strongly binding component.
cFB : float
Feed concentration for weakly binding component.
wG : float
First quadratic null.
wF : float
Second quadratic null.
gamma : float | list[float]
The safety factor(s) to apply to the zone flow rates. If float is provided,
the same value is applied to all zone flow rates.
Returns
-------
list[float]
Adjusted flow rates for zones 1 to 4 after applying the safety factors.
"""
m1_opt, m2_opt, m3_opt, m4_opt = m_opt
if np.isscalar(gamma):
W_opt = np.array([m2_opt, m3_opt])
B = np.array([HB, HB])
R = [
wG ** 2 / HA,
wG * (wF * (HA - wG) * (HA - HB) + HB * wG * (HA - wF)) / (HA * HB * (HA - wF))
]
# Calculating vectors WB and WA
WB = B - W_opt
WR = R - W_opt
# Normalizing vectors
norm_WB = WB / np.linalg.norm(WB)
norm_WR = WR / np.linalg.norm(WR)
# Calculating the bisector WB / WR
bisector = norm_WB + norm_WR
norm_dir_vec_bisector = bisector / np.linalg.norm(bisector)
# Calculating the new point W' using the safety factor gamma
W = W_opt + (gamma - 1) * norm_dir_vec_bisector
m1 = gamma * m1_opt
m2 = W[0]
m3 = W[1]
m4 = m4_opt / gamma
else:
gamma_1, gamma_2, gamma_3, gamma_4 = gamma
m1 = gamma_1 * m1_opt
m2 = gamma_2 * m2_opt
m3 = m3_opt / gamma_3
m4 = m4_opt / gamma_4
return [m1, m2, m3, m4]
def _plot_triangle(self, ax, HA, HB, bA, bB, cFA, cFB, wG, wF):
"""
Plot SMB triangle for Langmuir isotherm.
Parameters
----------
ax : matplotlib.axes.Axes
Axes to plot on.
HA : float
Henry's constant for strongly binding component.
HB : float
Henry's constant for weakly binding component.
bA : float
Binding coefficient for strongly binding component.
bB : float
Binding coefficient for weakly binding component.
cFA : float
Feed concentration for strongly binding component.
cFB : float
Feed concentration for weakly binding component.
wG : float
First quadratic null.
wF : float
Second quadratic null.
"""
m1, m2, m3, m4 = self.calculate_m_opt(HA, HB, bA, bB, cFA, cFB, wG, wF)
W = [m2, m3]
R = [
wG ** 2 / HA,
wG * (wF * (HA - wG) * (HA - HB) + HB * wG * (HA - wF)) / (HA * HB * (HA - wF))
]
# Bounds
lb = W[0] - 0.3 * (HA - W[0])
ub = HA + 0.3 * (HA - W[0])
ax.set_xlim(lb, ub)
ax.set_ylim(lb, ub)
# Diagonal
ax.plot((lb, ub), (lb, ub), 'k')
# Plot [W -> R]
m2WR = np.linspace(W[0], R[0], 50)
m3WR = 1 / (bA * cFA * wG) * (wG * (HA - wG) - (HA - wG * (1 + bA * cFA)) * m2WR)
ax.plot(m2WR, m3WR, 'k-')
# plot [W -> HB]
m2WHB = np.linspace(W[0], HB, 10)
m3WHB = 1 / (bA * cFA * HB) * (HB * (HA - HB) - (HA - HB * (1 + bA * cFA)) * m2WHB)
ax.plot(m2WHB, m3WHB, 'k-')
# plot [R -> HA]
m2RHA = np.linspace(R[0], HA, 10)
m3RHA = m2RHA + (np.sqrt(HA) - np.sqrt(m2RHA)) ** 2 / (bA * cFA)
ax.plot(m2RHA, m3RHA, 'k-')
# TODO: Equations that plot regions of pure extract / raffinate not clear yet.
class CarouselSolutionBulk(SolutionBase):
"""Solution at unit inlet or outlet.
N_COLUMNS * NCOL * NRAD
"""
_coordinates = ['axial_coordinates', 'radial_coordinates']
def __init__(self, builder, simulation_results):
self.builder = builder
self.simulation_results = simulation_results
@property
def component_system(self):
return self.builder.component_system
@property
def solution(self):
return self.simulation_results.solution
@property
def axial_coordinates(self):
return self.simulation_results.solution.column_0.bulk.axial_coordinates
@property
def radial_coordinates(self):
radial_coordinates = \
self.simulation_results.solution.column_0.bulk.radial_coordinates
if radial_coordinates is not None and len(radial_coordinates) == 1:
radial_coordinates = None
return radial_coordinates
@property
def time(self):
return self.simulation_results.solution.column_0.bulk.time
def plot_at_time(
self, t, overlay=None, y_min=None, y_max=None,
ax=None, lines=None):
"""Plot bulk solution over space at given time.
Parameters
----------
t : float
time for plotting
ax : Axes
Axes to plot on.
See Also
--------
CADETProcess.plotting
"""
n_cols = self.builder.n_columns
if ax is None:
fig, axs = plt.subplots(
ncols=n_cols,
figsize=(n_cols*4, 6),
gridspec_kw=dict(wspace=0.0, hspace=0.0),
sharey='row'
)
else:
axs = ax
t_i = np.where(t <= self.time)[0][0]
x = self.axial_coordinates
y_min_data = 0
y_max_data = 0
zone_counter = 0
column_counter = 0
if lines is None:
_lines = []
else:
_lines = None
for position, ax in enumerate(axs):
col_index = self.builder.column_indices_at_time(t, position)
y = self.solution[f'column_{col_index}'].bulk.solution[t_i, :]
y_min_data = min(y_min_data, min(0, np.min(y)))
y_max_data = max(y_max_data, 1.1*np.max(y))
if lines is not None:
for comp in range(self.n_comp):
lines[position][comp].set_ydata(y[..., comp])
else:
l = ax.plot(x, y)
_lines.append(l)
zone = self.builder.zones[zone_counter]
if zone.n_columns > 1:
ax.set_title(f'{zone.name}, position {column_counter}')
else:
ax.set_title(f'{zone.name}')
if column_counter < (zone.n_columns - 1):
column_counter += 1
else:
zone_counter += 1
column_counter = 0
plotting.add_text(ax, f'time = {t:.2f} s')
if y_min is None:
y_min = y_min_data
if y_min is None:
y_min = y_max_data
for position, ax in enumerate(axs):
ax.set_ylim((y_min, y_max))
if _lines is None:
_lines = lines
return axs, _lines