import os
from collections import defaultdict
from functools import wraps
from typing import Any, Callable, Optional
import numpy as np
from addict import Dict
from matplotlib.axes import Axes
from CADETProcess import CADETProcessError, plotting, settings
from CADETProcess.dataStructure import String
from CADETProcess.dynamicEvents import Event, EventHandler
from CADETProcess.fractionation.fractions import Fraction, FractionPool
from CADETProcess.performance import Performance
from CADETProcess.processModel import ComponentSystem, Process, ProcessMeta
from CADETProcess.simulationResults import SimulationResults
from CADETProcess.solution import SolutionIO, slice_solution
__all__ = ["Fractionator"]
[docs]
class Fractionator(EventHandler):
"""
Class for Chromatogram Fractionation.
This class is responsible for setting events for starting and ending fractionation,
handling multiple chromatograms, and calculating various performance metrics.
Attributes
----------
name : String
Name of the fractionator, defaulting to 'Fractionator'.
performance_keys : list
Keys for performance metrics including mass, concentration, purity, recovery,
productivity, and eluent consumption.
"""
name = String(default="Fractionator")
performance_keys: list[str] = [
"mass",
"concentration",
"purity",
"recovery",
"productivity",
"eluent_consumption",
]
def __init__(
self,
simulation_results: SimulationResults,
components: Optional[list[str]] = None,
use_total_concentration_components: bool = True,
*args: Any,
**kwargs: Any,
) -> None:
"""
Initialize the Fractionator.
Parameters
----------
simulation_results : SimulationResults
Simulation results containing chromatograms.
components : list, optional
List of components to be fractionated. Default is None.
use_total_concentration_components : bool, optional
Use total concentration components. Default is True.
*args
Variable length argument list.
**kwargs
Arbitrary keyword arguments.
"""
self.components: Optional[list[str]] = components
self.use_total_concentration_components: bool = use_total_concentration_components
self.simulation_results = simulation_results
super().__init__(*args, **kwargs)
@property
def simulation_results(self) -> SimulationResults:
"""SimulationResults: The simulation results containing the chromatograms."""
return self._simulation_results
@simulation_results.setter
def simulation_results(self, simulation_results: SimulationResults) -> None:
"""
Set the simulation results.
Parameters
----------
simulation_results : SimulationResults
Simulation results containing chromatograms.
Raises
------
TypeError
If simulation_results is not of type SimulationResults.
CADETProcessError
If the simulation results do not contain any chromatograms.
"""
if not isinstance(simulation_results, SimulationResults):
raise TypeError("Expected SimulationResults")
if len(simulation_results.chromatograms) == 0:
raise CADETProcessError("Simulation results do not contain chromatogram")
self._simulation_results = simulation_results
self._chromatograms = [
slice_solution(
chrom,
components=self.components,
use_total_concentration_components=self.use_total_concentration_components,
)
for chrom in simulation_results.chromatograms
]
m_feed = np.zeros((self.component_system.n_comp,))
counter = 0
for comp, indices in simulation_results.component_system.indices.items():
if comp in self.component_system.names:
m_feed_comp = simulation_results.process.m_feed[indices]
if self.use_total_concentration_components:
m_feed[counter] = np.sum(m_feed_comp)
counter += 1
else:
n_species = len(indices)
m_feed[counter : counter + n_species] = m_feed_comp
counter += n_species
self.m_feed: np.ndarray = m_feed
self._fractionation_states = Dict({chrom: [] for chrom in self.chromatograms})
for chrom in self.chromatograms:
self.set_fractionation_state(chrom, -1)
self._chromatogram_events = Dict({chrom: [] for chrom in self.chromatograms})
self.reset()
@property
def component_system(self) -> ComponentSystem:
"""ComponentSystem: The component system of the chromatograms."""
return self.chromatograms[0].component_system
"""Enable calling functions with chromatogram object or name."""
def _call_by_chrom_name(func: Callable) -> Callable:
@wraps(func)
def wrapper_call_by_chrom_name(
self: "Fractionator", chrom: str | SolutionIO, *args: Any, **kwargs: Any
) -> Any:
"""Enable calling functions with chromatogram object or name."""
if isinstance(chrom, str):
try:
chrom = self.chromatograms_dict[chrom]
except KeyError:
raise CADETProcessError("Not a valid unit")
return func(self, chrom, *args, **kwargs)
return wrapper_call_by_chrom_name
@property
def chromatograms(self) -> list[SolutionIO]:
"""
list[SolutionIO]: Chromatograms to be fractionized.
See Also
--------
SolutionIO
reset
cycle_time
"""
return self._chromatograms
@property
def chromatograms_dict(self) -> dict[str, SolutionIO]:
"""dict: Chromatogram names and objects."""
return {chrom.name: chrom for chrom in self.chromatograms}
@property
def chromatogram_names(self) -> list[str]:
"""list[str]: Names of chromatogram."""
return [chrom.name for chrom in self.chromatograms]
@property
def n_chromatograms(self) -> int:
"""int: Number of Chromatograms Fractionator."""
return len(self.chromatograms)
@property
def chromatogram_events(self) -> dict[SolutionIO, list[Event]]:
"""dict[SolutionIO, list[Event]]: Events sorted by chromatogram."""
chrom_events = {
chrom: sorted(events, key=lambda evt: evt.time)
for chrom, events in self._chromatogram_events.items()
}
return chrom_events
@property
def process(self) -> Process:
"""Process: The process from the simulation results."""
return self.simulation_results.process
@property
def process_meta(self) -> ProcessMeta:
"""ProcessMeta: Process meta information."""
return self.simulation_results.process_meta
@property
def n_comp(self) -> int:
"""int: Number of components to be fractionized."""
return self.chromatograms[0].n_comp
@property
def cycle_time(self) -> float:
"""
The cycle time of the Fractionator.
See Also
--------
productivity
"""
return self.time[-1]
@property
def time(self) -> np.ndarray:
"""np.ndarray: solution times of Chromatogram."""
return self.chromatograms[0].time
[docs]
@plotting.figure_utils
def plot_fraction_signal(
self,
chromatogram: Optional[SolutionIO | str] = None,
x_axis_in_minutes: bool = True,
ax: Optional[Axes] = None,
*args: Any,
**kwargs: Any,
) -> Axes:
"""
Plot the signal without the waste fractions.
Parameters
----------
chromatogram : Optional[SolutionIO | str]
Chromatogram to be plotted. If None, the first one is plotted.
ax : Axes, optional
Axes to plot on. If None, a new figure is created.
x_axis_in_minutes: bool, optional
Option to use x-aches (time) in minutes, default is set to True.
*args : Any
Optional Parameter passed down to plot function.
**kwargs : Any
Additional Parameter passed down to plot function.
Returns
-------
ax : Axes
Axes with plot of parameter state.
See Also
--------
CADETProcess.plot
plot_purity
"""
if chromatogram is None:
chromatogram = list(
self.performer_timelines["fractionation_states"].keys()
)[0]
if isinstance(chromatogram, str):
chromatogram = self.chromatograms_dict[chromatogram]
try:
start: float = kwargs["start"]
if x_axis_in_minutes:
start = start / 60
except KeyError:
start = 0
try:
end: float = kwargs["end"]
if x_axis_in_minutes:
end = end / 60
except KeyError:
end = np.max(chromatogram.time)
fig, ax = chromatogram.plot(
ax=ax,
x_axis_in_minutes=x_axis_in_minutes,
*args,
**kwargs,
)
primary_ax = fig.get_axes()[0]
y_max = primary_ax.get_ylim()[1]
self._fill_fraction_overlay(
ax,
chromatogram,
y_max,
x_axis_in_minutes,
start,
end,
)
return fig, ax
def _fill_fraction_overlay(
self,
ax: Axes,
chromatogram: SolutionIO,
y_max: float,
x_axis_in_minutes: bool,
start: float,
end: float,
) -> None:
"""Fill fraction overlay."""
try:
time_line = self.performer_timelines["fractionation_states"][chromatogram.name]
for sec in time_line.sections:
comp_index = int(np.where(sec.coeffs)[0].squeeze())
if comp_index == self.n_comp:
color_index = -1
text = "W"
else:
color_index = comp_index
text = self.component_system.names[comp_index]
sec_start = sec.start
sec_end = sec.end
if x_axis_in_minutes:
sec_start = sec_start / 60
sec_end = sec_end / 60
if sec_start != sec_end:
plotting.fill_between(
ax,
sec_start,
sec_end,
y_max,
color_index=color_index,
text=text,
)
if len(time_line.sections) == 0:
plotting.fill_between(
ax,
sec_start,
sec_end,
y_max,
color_index=-1,
text="W",
)
except KeyError:
plotting.fill_between(
ax,
start,
end,
y_max,
color_index=-1,
text="W",
)
@property
def fractionation_states(self) -> Dict:
"""dict: Fractionation state of Chromatograms.
Notes
-----
This is just a dummy variable to support interfacing with Events.
"""
return self._fractionation_states
[docs]
@_call_by_chrom_name
def set_fractionation_state(
self, chrom: SolutionIO, state: int | list[float]
) -> None:
"""
Set fractionation states of Chromatogram.
Parameters
----------
chrom : SolutionIO
Chromatogram object which is to be fractionated.
state : int or list of floats
New fractionation state of the Chromatogram.
Raises
------
CADETProcessError
If Chromatogram not in Fractionator
If state is integer and the state >= the n_comp.
If the length of the states is unequal the state_length.
If the sum of the states is not equal to 1.
Notes
-----
Waste is always the last fraction.
"""
if chrom not in self.chromatograms:
raise CADETProcessError("Chromatogram not in Fractionator")
state_length = self.n_comp + 1
if state_length == 0:
fractionation_state = []
if type(state) is int:
if state >= state_length:
raise CADETProcessError("Index exceeds fractionation states")
fractionation_state = [0] * state_length
fractionation_state[state] = 1
else:
if len(state) != state_length:
raise CADETProcessError(f"Expected length {state_length}.")
elif sum(state) != 1:
raise CADETProcessError("Sum of fractions must be 1")
fractionation_state = state
self._fractionation_states[chrom] = fractionation_state
@property
def n_fractions_per_pool(self) -> list[int]:
"""list: number of fractions per pool."""
return [pool.n_fractions for pool in self.fraction_pools]
@property
def fraction_pools(self) -> list[FractionPool]:
"""
List of the component and waste fraction pools.
For every event, the end time is determined and a Fraction object is
created which holds information about start and end time, as well as
the mass and the volume of the fraction. The fractions are pooled
depending on the event state.
Returns
-------
fraction_pools : list
List with fraction pools.
"""
if self._fraction_pools is None:
self._fraction_pools = [
FractionPool(self.n_comp) for _ in range(self.n_comp + 1)
]
for chrom_index, chrom in enumerate(self.chromatograms):
chrom_events = self.chromatogram_events[chrom]
for evt_index, evt in enumerate(chrom_events):
target = np.nonzero(evt.full_state)[0].squeeze()
frac_start = evt.time
if evt_index < len(chrom_events) - 1:
frac_end = chrom_events[evt_index + 1].time
fraction = self._create_fraction(
chrom_index, frac_start, frac_end
)
self.add_fraction(fraction, target)
else:
frac_end = self.cycle_time
fraction = self._create_fraction(
chrom_index, frac_start, frac_end
)
self.add_fraction(fraction, target)
frac_start = 0
frac_end = chrom_events[0].time
fraction = self._create_fraction(
chrom_index, frac_start, frac_end
)
self.add_fraction(fraction, target)
return self._fraction_pools
def _create_fraction(self, chrom_index: int, start: float, end: float) -> Fraction:
"""
Create Fraction object calculate mass.
Parameters
----------
chrom_index : int
index of the chromatogram
start : float
start time of the fraction
end : float
end time of the fraction
Returns
-------
fraction : Fraction
Chromatogram fraction
"""
fraction = self.chromatograms[chrom_index].create_fraction(start, end)
return fraction
[docs]
def add_fraction(self, fraction: Fraction, target: int) -> None:
"""
Add Fraction to the FractionPool of target component.
Notes
-----
Waste is always the last fraction
"""
if not isinstance(fraction, Fraction):
raise TypeError("Expected Fraction")
if target not in range(self.n_comp + 1):
raise CADETProcessError("Not a valid target")
self._fraction_pools[target].add_fraction(fraction)
@property
def mass(self) -> np.ndarray:
"""ndarray: Component mass in corresponding fraction pools."""
if self._mass is None:
self._mass = np.array([
pool.mass[comp]
for comp, pool in enumerate(self.fraction_pools[:-1])
])
return self._mass
@property
def total_mass(self) -> np.ndarray:
"""ndarray: Total mass of each component in all fraction pools."""
return np.sum([pool.mass for pool in self.fraction_pools], axis=0)
@property
def concentration(self) -> np.ndarray:
"""ndarray: Component concentration in corresponding fraction pool."""
return np.array([
pool.concentration[comp]
for comp, pool in enumerate(self.fraction_pools[:-1])
])
@property
def purity(self) -> np.ndarray:
"""ndarray: Component purity in corresponding fraction pool."""
return np.array([
pool.purity[comp]
for comp, pool in enumerate(self.fraction_pools[:-1])
])
@property
def recovery(self) -> np.ndarray:
"""ndarray: Component recovery yield in corresponding fraction pool."""
with np.errstate(divide="ignore", invalid="ignore"):
recovery = self.mass / self.m_feed
return np.nan_to_num(recovery)
@property
def mass_balance_difference(self) -> np.ndarray:
"""
ndarray: Difference in mass balance between m_feed and fraction pools.
The mass balance is calculated as the difference between the feed mass (m_feed)
and the mass in the fraction pools. It represents the discrepancy or change in
mass during the fractionation process.
Returns
-------
ndarray
Difference in mass balance between m_feed and fraction pools for each
component.
Notes
-----
Positive values indicate a surplus of mass in the fraction pools compared to
the feed, while negative values indicate a deficit. A value of zero indicates
a mass balance where the mass in the fraction pools is equal to the feed mass.
"""
return self.total_mass - self.m_feed
@property
def productivity(self) -> np.ndarray:
"""ndarray: Specific productivity in corresponding fraction pool."""
return self.mass / (self.process_meta.cycle_time * self.process_meta.V_solid)
@property
def eluent_consumption(self) -> np.ndarray:
"""
ndarray: Specific eluent consumption in corresponding fraction pool.
Notes
-----
This is the inverse of the regularly used specific eluent
consumption. It is preferred here in order to avoid numeric issues
if the collected mass is 0.
"""
return self.mass / self.process_meta.V_eluent
@property
def performance(self) -> Performance:
"""Performance: The performance metrics of the fractionation."""
self.reset()
return Performance(
self.mass,
self.concentration,
self.purity,
self.recovery,
self.productivity,
self.eluent_consumption,
self.mass_balance_difference,
self.component_system,
)
[docs]
def reset(self) -> None:
"""Reset the results when fractionation times are changed."""
self._fraction_pools = None
self._mass = None
[docs]
def add_fractionation_event(
self,
event_name: str,
target: str | int,
time: float,
chromatogram: Optional[SolutionIO | str] = None,
) -> None:
"""
Add a fractionation event.
Parameters
----------
event_name : str
The name of the event.
target : str | int
The indice or name of target component in Component System.
time : float
The time of the event.
chromatogram : Optional[SolutionIO | str]
The chromatogram associated with the event.
If None and there is only one chromatogram, it will be used.
Raises
------
CADETProcessError
If the chromatogram is not found.
"""
if chromatogram is None and self.n_chromatograms > 1:
raise CADETProcessError(
"Missing chromatogram for which the fractionation is added."
)
elif chromatogram is None and self.n_chromatograms == 1:
chromatogram = self.chromatograms[0]
if isinstance(chromatogram, str):
try:
chromatogram = self.chromatograms_dict[f"{chromatogram}"]
except KeyError:
raise CADETProcessError("Could not find chromatogram.")
if chromatogram not in self.chromatograms:
raise CADETProcessError("Could not find chromatogram.")
param_path = f"fractionation_states.{chromatogram.name}"
if isinstance(target, str):
target = self.component_system.names.index(target)
evt = self.add_event(event_name, param_path, target, time)
self._chromatogram_events[chromatogram].append(evt)
self.reset()
[docs]
def initial_values(self, purity_required: float | list[float] = 0.95) -> None:
"""
Create events from chromatogram with minimum purity.
Function creates fractions for areas in the chromatogram, where the
local purity profile is higher than the purity required.
Parameters
----------
purity_required : float or list of floats
Minimum purity required for the components in the fractionation
Raises
------
ValueError
If the size of the purity parameter does not match the number of components
"""
if isinstance(purity_required, float):
purity_required = [purity_required] * self.n_comp
elif len(purity_required) != self.n_comp:
raise ValueError(f"Expected purity array with size {self.n_comp}")
self._events = []
self._chromatogram_events = Dict({chrom: [] for chrom in self.chromatograms})
self.reset()
for chrom_index, chrom in enumerate(self.chromatograms):
purity_min = np.zeros(chrom.solution.shape)
purity_min[chrom.local_purity_components > purity_required] = 1
diff = np.vstack((
purity_min[0, :] - purity_min[-1, :],
np.diff(purity_min, axis=0)
))
for comp in range(self.n_comp):
if purity_required[comp] > 0:
on_indices = np.where(diff[:, comp] == 1)[0]
off_indices = np.where(diff[:, comp] == -1)[0] - 1
# Handle the case where the entire array is above the threshold
if (
len(on_indices) == 0
and len(off_indices) == 0
and purity_min[0, comp] == 1
):
on_indices = np.array([0])
off_indices = np.array([purity_min.shape[0] - 1])
# Ensure regions with a single value are ignored
valid_regions = [
(on, off)
for on, off in zip(on_indices, off_indices)
if off != on
]
for index, (on_evt, off_evt) in enumerate(valid_regions):
# Add start event
time = chrom.time[int(on_evt)]
event_name = f"chrom_{chrom_index}_comp_{comp}_start_{index}"
param_path = f"fractionation_states.{chrom.name}"
evt = self.add_event(event_name, param_path, comp, time)
self._chromatogram_events[chrom].append(evt)
# Add end event
time = chrom.time[int(off_evt)]
event_name = f"chrom_{chrom_index}_comp_{comp}_end_{index}"
param_path = f"fractionation_states.{chrom.name}"
evt = self.add_event(event_name, param_path, self.n_comp, time)
self._chromatogram_events[chrom].append(evt)
if not self.check_duplicate_events():
chrom_events = self._chromatogram_events.copy()
for chrom, events in chrom_events.items():
events_at_time = defaultdict(list)
for event in events:
events_at_time[event.time].append(event)
for events in events_at_time.values():
if len(events) == 1:
continue
for evt in events:
if evt.state == self.n_comp:
self.remove_event(evt.name)
self._chromatogram_events[chrom].remove(evt)
@property
def parameters(self) -> Dict:
"""dict: Parameters of the fractionator."""
parameters = super().parameters
parameters["fractionation_states"] = {
chrom.name: self.fractionation_states[chrom] for chrom in self.chromatograms
}
return Dict(parameters)
@parameters.setter
def parameters(self, parameters: Dict) -> None:
try:
fractionation_states = parameters.pop("fractionation_states")
for chrom, state in fractionation_states.items():
self.set_fractionation_state(chrom, state)
except KeyError:
pass
super(Fractionator, self.__class__).parameters.fset(self, parameters)
@property
def section_dependent_parameters(self) -> Dict:
"""dict: Section dependent parameters of the fractionator."""
return self.parameters
[docs]
def save(
self,
case_dir: str,
start: float = 0,
end: Optional[float] = None,
) -> None:
"""
Save chromatogram and purity plots to a specified directory.
Parameters
----------
case_dir : str
Directory name within the working directory to save plots.
start : float, optional
Start time for plotting purity, default is 0.
end : Optional[float]
End time for plotting purity. If None, includes all data.
"""
path = os.path.join(settings.working_directory, case_dir)
for index, chrom in enumerate(self.chromatograms):
chrom.plot(save_path=path + f"/chrom_{index}.png")
chrom.plot_purity(
start=start, end=end, save_path=path + "/chrom_purity.png"
)
for chrom in enumerate(self.chromatograms):
self.plot_fraction_signal(
chromatogram=chrom,
start=start,
end=end,
save_path=path + f"/fractionation_signal_{index}.png",
index=index,
)
def __str__(self) -> str:
"""str: String representation of the fractionator."""
return self.__class__.__name__