Source code for CADETProcess.solution

"""
=======================================
Solution (:mod:`CADETProcess.solution`)
=======================================

.. currentmodule:: CADETProcess.solution

Module to store and plot solution of simulation.

.. autosummary::
   :toctree: generated/

   SolutionBase
   SolutionIO
   SolutionBulk
   SolutionParticle
   SolutionSolid
   SolutionVolume


Method to slice a Solution:

.. autosummary::
   :toctree: generated/

   slice_solution
   slice_solution_front


"""  # noqa

from __future__ import annotations

import copy
from typing import Any, Optional

import matplotlib.pyplot as plt
import numpy as np
import numpy.typing as npt
from scipy import integrate
from scipy.interpolate import PchipInterpolator

from CADETProcess import CADETProcessError, plotting, smoothing, transform
from CADETProcess.dataStructure import (
    SizedNdArray,
    String,
    Structure,
    Typed,
    UnsignedFloat,
    Vector,
)
from CADETProcess.dynamicEvents import TimeLine
from CADETProcess.processModel import ComponentSystem

__all__ = [
    "SolutionIO",
    "SolutionBulk",
    "SolutionParticle",
    "SolutionSolid",
    "SolutionVolume",
    "slice_solution",
    "slice_solution_front",
]


[docs] class SolutionBase(Structure): """ Base class for solutions of component systems. This class represents a solution of a component system at different time points. It provides several properties that allow access to information about the solution, such as the number of components, the total concentration, and the local purity. Attributes ---------- name : str Name of the solution. time : np.ndarray Array of time points. solution : np.ndarray Array of solution values. c_min : float Minimum concentration threshold, below which concentrations are considered zero. dimensions : list of str Names of the dimensions in the solution. Notes ----- This class is not meant to be used directly, but to be subclassed by specific solution types. """ name = String() time = Vector() component_system = Typed(ty=ComponentSystem) solution = SizedNdArray(size="solution_shape") c_min = UnsignedFloat(default=1e-6) dimensions = ["time"] def __init__( self, name: str, component_system: ComponentSystem, time: npt.ArrayLike, solution: npt.ArrayLike, ) -> None: """ Initialize the SolutionBase. Parameters ---------- name : str The name of the solution. component_system : ComponentSystem The component system associated with the solution. time : npt.ArrayLike An array-like structure representing time points. solution : npt.ArrayLike An array-like structure representing the solution data. """ self.name = name self.component_system = component_system self.time = time self.solution = solution self.update_solution()
[docs] def update_solution(self) -> None: """Update the solution.""" pass
[docs] def update_transform(self) -> None: """Update the transforms.""" pass
@property def n_comp(self) -> int: """int: Number of components.""" return self.component_system.n_comp @property def cycle_time(self) -> float: """float: Cycle time.""" return self.time[-1] @property def nt(self) -> int: """int: Number of time steps.""" return len(self.time) @property def component_coordinates(self) -> np.ndarray: """np.ndarray: Indices of the components.""" return np.arange(self.n_comp) @property def coordinates(self) -> dict[str, np.ndarray]: """dict[str, np.ndarray]: Coordinates of the Solution.""" coordinates = {} for c in self.dimensions: v = getattr(self, c) if v is None: continue coordinates[c] = v return coordinates @property def solution_shape(self) -> tuple[int]: """tuple[int]: (Expected) shape of the solution.""" return tuple(len(c) for c in self.coordinates.values()) @property def total_concentration_components(self) -> np.ndarray: """np.ndarray: Total concentration of components (sum of species).""" component_concentration = np.zeros( self.solution.shape[0:-1] + (self.component_system.n_components,) ) counter = 0 for index, comp in enumerate(self.component_system): comp_indices = slice(counter, counter + comp.n_species) c_comp = np.sum(self.solution[..., comp_indices], axis=-1) component_concentration[..., index] = c_comp counter += comp.n_species return component_concentration @property def total_concentration(self) -> np.ndarray: """np.ndarray: Total concentration (sum) of all components.""" return np.sum(self.solution, axis=-1, keepdims=True) @property def local_purity_components(self) -> np.ndarray: """np.ndarray: Local purity of each component.""" solution = self.total_concentration_components c_total = self.total_concentration c_total[c_total < self.c_min] = np.nan purity = np.zeros(solution.shape) with np.errstate(divide="ignore", invalid="ignore"): purity = solution / c_total purity = np.nan_to_num(purity) return purity @property def local_purity_species(self) -> np.ndarray: """np.ndarray: Local purity of components.""" solution = self.solution c_total = self.total_concentration c_total[c_total < self.c_min] = np.nan purity = np.zeros(solution.shape) with np.errstate(divide="ignore", invalid="ignore"): purity = solution / c_total purity = np.nan_to_num(purity) return purity def __str__(self) -> str: return self.name
[docs] class SolutionIO(SolutionBase): """ Solution representing streams at the inlet or outlet of a ``UnitOperation``. Notes ----- The `flow_rate` attribute is implemented as TimeLine to improve interpolation of signals with discontinuous flow. """ dimensions = SolutionBase.dimensions + ["component_coordinates"] def __init__( self, name: str, component_system: ComponentSystem, time: npt.ArrayLike, solution: npt.ArrayLike, flow_rate: TimeLine | npt.ArrayLike, ) -> None: """ Initialize the SolutionIO. Parameters ---------- name : str The name of the solution object. component_system : ComponentSystem The component system associated with the solution. time : npt.ArrayLike An array-like structure representing time points. solution : npt.ArrayLike An array-like structure representing the solution data. flow_rate : TimeLine | npt.ArrayLike The flow rate data, which can be a TimeLine object or an array-like structure. """ if np.isscalar(flow_rate): flow_rate = flow_rate * np.ones(time.shape) if not isinstance(flow_rate, TimeLine): flow_rate = TimeLine.from_profile(time, flow_rate) self.flow_rate = flow_rate super().__init__(name, component_system, time, solution)
[docs] def offset(self, offset: float) -> "SolutionIO": """Shift solution by an offset.""" offset_solution = copy.deepcopy(self) offset_solution.time += offset offset_solution.flow_rate = self.flow_rate.offset(offset) offset_solution.update_solution() return offset_solution
[docs] def update_solution(self) -> None: """Update solution method.""" self._solution_interpolated = None self._dm_dt_interpolated = None self.update_transform() self.flow_rate = self.flow_rate.slice(self.time[0], self.time[-1])
@property def derivative(self) -> "SolutionIO": """SolutionIO: Derivative of this solution.""" derivative = copy.deepcopy(self) derivative_fun = derivative.solution_interpolated.derivative derivative.solution = derivative_fun(derivative.time) derivative.update_solution() return derivative @property def antiderivative(self) -> "SolutionIO": """SolutionIO: Antiderivative of this solution.""" antiderivative = copy.deepcopy(self) antiderivative_fun = antiderivative.solution_interpolated.antiderivative antiderivative.solution = antiderivative_fun(antiderivative.time) antiderivative.update_solution() return antiderivative
[docs] def update_transform(self) -> None: """Update the Transformer.""" self.transform = transform.NormLinearTransformer( np.min(self.solution, axis=0), np.max(self.solution, axis=0), allow_extended_input=True, allow_extended_output=True, )
@property def solution_interpolated(self) -> InterpolatedSignal: """InterpolatedSignal: The interpolated signal.""" if self._solution_interpolated is None: self._solution_interpolated = InterpolatedSignal(self.time, self.solution) return self._solution_interpolated @property def dm_dt_interpolated(self) -> InterpolatedSignal: """Return the dm/dt interpolated signal.""" if self._dm_dt_interpolated is None: dm_dt = self.solution * self.flow_rate.value(self.time) self._dm_dt_interpolated = InterpolatedSignal(self.time, dm_dt) return self._dm_dt_interpolated
[docs] def normalize(self) -> SolutionIO: """SolutionIO: Normalize the solution using the transformation function.""" solution = copy.deepcopy(self) solution.solution = self.transform.transform(self.solution) solution.update_solution() return solution
[docs] def resample( self, start: Optional[float] = None, end: Optional[float] = None, nt: Optional[int] = 5001, ) -> "SolutionIO": """ Resample solution to nt time points. Parameters ---------- start : Optional[float] Start time of the fraction. If None, the first time point is used. The default is None. end: Optional[float] End time of the fraction. If None, the last time point is used. The default is None. nt : int, optional Number of points to resample. The default is 5001. Returns ------- SolutionIO The resampled solution object. """ solution = copy.deepcopy(self) if start is None: start = self.time[0] if end is None: end = self.time[-1] solution.time = np.linspace(start, end, nt) solution.solution = self.solution_interpolated(solution.time) solution.update_solution() return solution
[docs] def smooth_data( self, s: Optional[float | list[float]] = None, crit_fs: Optional[float | list[float]] = None, crit_fs_der: Optional[float | list[float]] = None, ) -> "SolutionIO": """ Smooth data. Parameters ---------- s : Optional[float | list[float]] DESCRIPTION. crit_fs : Optional[float | list[float]] DESCRIPTION. crit_fs_der : Optional[float | list[float]] DESCRIPTION. Returns ------- SolutionIO The smoothed solution object. """ solution = copy.deepcopy(self) if None in (s, crit_fs, crit_fs_der): s_ = [] crit_fs_ = [] crit_fs_der_ = [] for i in range(self.n_comp): s_i, crit_fs_i, crit_fs_der_i = smoothing.find_smoothing_factors( self.time, self.solution[..., i], rmse_target=1e-3 ) s_.append(s_i) crit_fs_.append(crit_fs_i) crit_fs_der_.append(crit_fs_der_i) if s is None: s = s_ if crit_fs is None: crit_fs = crit_fs_ if crit_fs_der is None: crit_fs_der = crit_fs_der_ if np.isscalar(s): s = self.n_comp * [s] elif len(s) == 1: s = self.n_comp * s if np.isscalar(crit_fs): crit_fs = self.n_comp * [crit_fs] elif len(crit_fs) == 1: crit_fs = self.n_comp * crit_fs if np.isscalar(crit_fs_der): crit_fs_der = self.n_comp * [crit_fs_der] elif len(crit_fs_der) == 1: crit_fs_der = self.n_comp * crit_fs_der solution = np.zeros((self.solution.shape)) for i, (s, crit_fs, crit_fs_der) in enumerate(zip(s, crit_fs, crit_fs_der)): smooth, smooth_der = smoothing.full_smooth( self.time, self.solution[..., i], crit_fs, s, crit_fs_der ) solution[..., i] = smooth solution.solution = solution solution.update_solution() return solution
[docs] def integral( self, start: Optional[float] = None, end: Optional[float] = None, ) -> np.ndarray: """ Peak area in a fraction interval. Parameters ---------- start : Optional[float] Start time of the fraction. If None, the first time point is used. The default is None. end: Optional[float] End time of the fraction. If None, the last time point is used. The default is None. Returns ------- np.ndarray Area of each component in the fraction. """ if start is None: start = self.time[0] if end is None: end = self.cycle_time return self.solution_interpolated.integral(start, end)
[docs] def create_fraction( self, start: Optional[float] = None, end: Optional[float] = None, ) -> Fraction: # noqa:F821 """ Create fraction in interval [start, end]. Parameters ---------- start : Optional[float] Start time of the fraction. If None, the first time point is used. The default is None. end: Optional[float] End time of the fraction. If None, the last time point is used. The default is None. Returns ------- Fraction a Fraction object in the interval [start, end] """ if start is None: start = self.time[0] if end is None: end = self.cycle_time from CADETProcess.fractionation import Fraction mass = self.fraction_mass(start, end) volume = self.fraction_volume(start, end) return Fraction(mass, volume, start, end)
[docs] def fraction_mass( self, start: Optional[float] = None, end: Optional[float] = None, ) -> np.ndarray: """ Component mass in a fraction interval. Parameters ---------- start : Optional[float] Start time of the fraction. If None, the first time point is used. The default is None. end: Optional[float] End time of the fraction. If None, the last time point is used. The default is None. Returns ------- np.ndarray Mass of all components in the fraction. """ if start is None: start = self.time[0] if end is None: end = self.cycle_time all_times = np.sort(np.unique([ start, *[t for t in self.flow_rate.section_times if start <= t <= end], end, ])) mass = np.zeros((self.n_comp, )) for t0, t1 in zip(all_times[:-1], all_times[1:]): # Precompute values on a fine grid for this segment # move the right boundary infinitesimally to the left t1m = np.nextafter(t1, t0) t_eval = np.linspace(t0, t1m, 1001) flow_vals = self.flow_rate.value(t_eval) sol_vals = self.solution_interpolated(t_eval) mass += integrate.simpson(flow_vals * sol_vals, t_eval, axis=0) return mass
[docs] def fraction_volume( self, start: Optional[float] = None, end: Optional[float] = None, ) -> float: """ Volume of a fraction interval. Parameters ---------- start : Optional[float] Start time of the fraction. If None, the first time point is used. The default is None. end: Optional[float] End time of the fraction. If None, the last time point is used. The default is None. Returns ------- float Volume of the fraction """ if start is None: start = self.time[0] if end is None: end = self.cycle_time return float(self.flow_rate.integral(start, end).squeeze())
[docs] def plot( self, start: float | None = None, end: float | None = None, components: list[str] | None = None, x_axis_in_minutes: bool = True, *args: Any, **kwargs: Any, ) -> tuple[plt.Figure, plt.Axes | tuple[plt.Axes]]: """ Plot the entire time_signal for each component. Parameters ---------- start : float | None, optional, default=None Start time for plotting in seconds. If None, the first data point is used. end : float | None, optional, default=None End time for plotting in seconds. If None, the last data point is used. components : list[str] | None, optional, default=None List of components to be plotted. If None, all components are plotted. x_axis_in_minutes : bool, optional, default=True If True, the x-axis is displayed in minutes instead of seconds. *args : Any Optional arguments passed to `_plot_solution_1D`. **kwargs : Any Optional keyword arguments passed to `_plot_solution_1D`. Returns ------- tuple[plt.Figure, plt.Axes | tuple[plt.Axes]] The Matplotlib Figure and Axes objects. See Also -------- _plot_solution_1D : Low-level function for 1D plotting. slice_solution : Slice the solution for plotting. plot_purity : Plot purity-related data. CADETProcess.plotting: Plotting library utilities. """ solution = slice_solution( self, components=components, use_total_concentration=False, use_total_concentration_components=False, coordinates={"time": [start, end]}, ) time = solution.time time_unit = r"\text{s}" if x_axis_in_minutes: time = time / 60 time_unit = r"\text{min}" if start is not None: start = start / 60 if end is not None: end = end / 60 xlabel = rf"$Time~/~{time_unit}$" xlim = (start, end) ylabel = r"$c~/~\text{mM}$" return _plot_solution_1D( time, solution, *args, xlabel=xlabel, xlim=xlim, ylabel=ylabel, **kwargs, )
[docs] @plotting.figure_utils def plot_purity( self, start: float | None = None, end: float | None = None, components: list[str] | None = None, x_axis_in_minutes: bool = True, plot_components_purity: bool = True, plot_species_purity: bool = False, alpha: float = 1.0, xlabel: str | None = None, ylim: tuple[float, float] | None = None, ylabel: str | None = None, hide_labels: bool = False, hide_species_labels: bool = False, show_legend: bool = True, update_layout: bool = True, ax: plt.Axes | tuple[plt.Axes] | None = None, setup_figure_kwargs: dict | None = None, ) -> tuple[plt.Figure, plt.Axes]: """ Plot local purity for each component of the concentration profile. Parameters ---------- start : float | None, optional, default=None Start time for plotting in seconds. If None, the first data point is used. end : float | None, optional, default=None End time for plotting in seconds. If None, the last data point is used. components : list[str] | None, optional, default=None List of components to be plotted. If None, all components are plotted. x_axis_in_minutes : bool, optional, default=True If True, the x-axis is displayed in minutes instead of seconds. plot_components_purity : bool, optional If True, plot purity of total component concentration. Default is True. plot_species_purity : bool, optional If True, plot purity of individual species. Default is False. alpha : float, default=1.0 Opacity of plotted lines. ylim : tuple[float, float] | None, default=None Y-axis limits. hide_labels : bool, default=False If True, hide labels in the legend. hide_species_labels : bool, default=False If True, hide species labels in the legend. secondary_axes : list[SecondaryAxis] | None, default=None List of `SecondaryAxis` objects for secondary axes. show_legend : bool, default=True If True, display the legend. update_layout : bool, optional, default=True If True, update layout. ax : plt.Axes | tuple[plt.Axes] | None, default=None Optional Matplotlib Axes. If not provided, a new figure is created. setup_figure_kwargs : dict | None, default=None Additional options to setup the figure. Returns ------- tuple[plt.Figure, plt.Axes] The Matplotlib Figure and Axes objects. Raises ------ CADETProcessError If the solution has fewer than 2 components. """ solution = slice_solution( self, components=components, use_total_concentration=False, use_total_concentration_components=False, coordinates={"time": [start, end]}, ) if solution.n_comp < 2: raise CADETProcessError( "Purity is undefined for systems with fewer than 2 components." ) x = solution.time time_unit = r"\text{s}" if x_axis_in_minutes: x = x / 60 time_unit = r"\text{min}" if start is not None: start = start / 60 if end is not None: end = end / 60 xlabel = rf"$Time~/~{time_unit}$" ylabel = r"$Purity~/~\text{\%}$" if ax is None: fig, ax = plotting.setup_figure(**setup_figure_kwargs) else: fig = ax.get_figure() local_purity_components = solution.local_purity_components * 100 local_purity_species = solution.local_purity_species * 100 colors = iter(plt.rcParams["axes.prop_cycle"].by_key()["color"]) species_index = 0 for i, comp in enumerate(solution.component_system.components): color = next(colors) label = None if not update_layout or hide_labels: label = comp.name if plot_components_purity: y = local_purity_components[..., i] ax.plot(x, y, label=label, color=color, alpha=alpha) if plot_species_purity: if comp.n_species == 1: species_index += 1 continue for s, species in enumerate(comp.species): label = None if hide_labels else f"{comp.name} ({species.name})" y = local_purity_species[..., species_index] ax.plot(x, y, "--", label=label, color=color, alpha=alpha) species_index += 1 # Finalize layout if update_layout: ax.set_xlabel(xlabel) ax.set_xlim((start, end)) ax.set_ylabel(ylabel) if ylim is not None: ax.set_ylim(ylim) if show_legend: handles, labels = plotting.get_all_twin_handles_labels(ax) if labels: ax.legend(handles, labels) return fig, ax
[docs] class SolutionBulk(SolutionBase): """ Solution in the bulk phase of the ``UnitOperation``. Dimensions: NCOL * NRAD * NCOMP """ dimensions = SolutionBase.dimensions + [ "axial_coordinates", "radial_coordinates", "component_coordinates", ] def __init__( self, name: str, component_system: ComponentSystem, time: npt.ArrayLike, solution: npt.ArrayLike, axial_coordinates: Optional[npt.ArrayLike] = None, radial_coordinates: Optional[npt.ArrayLike] = None, ) -> None: """ Initialize the SolutionBulk. Parameters ---------- name : str The name of the solution object. component_system : ComponentSystem The component system associated with the solution. time : npt.ArrayLike An array-like structure representing time points. solution : npt.ArrayLike An array-like structure representing the solution data. axial_coordinates : Optional[npt.ArrayLike], An array-like structure representing axial coordinates. If None, it is assumed that the model has a singleton axial dimension. radial_coordinates : Optional[npt.ArrayLike], An array-like structure representing radial coordinates. If None, it is assumed that the model has a singleton bulk dimension. """ self.axial_coordinates = axial_coordinates self.radial_coordinates = radial_coordinates super().__init__(name, component_system, time, solution) @property def ncol(self) -> int: """int: Number of axial discretization points.""" if self.axial_coordinates is None: return return len(self.axial_coordinates) @property def nrad(self) -> int: """int: Number of radial discretization points.""" if self.radial_coordinates is None: return return len(self.radial_coordinates)
[docs] def plot( self, start: float | None = None, end: float | None = None, components: list[str] | None = None, x_axis_in_minutes: bool = True, *args: Any, **kwargs: Any, ) -> tuple[plt.Figure, plt.Axes | tuple[plt.Axes]]: """ Plot bulk solution for each component. Parameters ---------- start : float | None, optional, default=None Start time for plotting in seconds. If None, the first data point is used. end : float | None, optional, default=None End time for plotting in seconds. If None, the last data point is used. components : list[str] | None, optional, default=None List of components to be plotted. If None, all components are plotted. x_axis_in_minutes : bool, optional, default=True If True, the x-axis is displayed in minutes instead of seconds. *args : Any Optional arguments passed to `_plot_solution_1D`. **kwargs : Any Optional keyword arguments passed to `_plot_solution_1D`. Returns ------- tuple[plt.Figure, plt.Axes | tuple[plt.Axes]] The Matplotlib Figure and Axes objects. Raises ------ CADETProcessError If solution is not 1D. See Also -------- _plot_solution_1D : Low-level function for 1D plotting. slice_solution : Slice the solution for plotting. CADETProcess.plotting: Plotting library utilities. """ if not (self.ncol is None and self.nrad is None): raise CADETProcessError( "Solution has more than single dimension. " "Please use `plot_at_time` or `plot_at_position`." ) solution = slice_solution( self, components=components, use_total_concentration=False, use_total_concentration_components=False, coordinates={"time": [start, end]}, ) time = solution.time time_unit = r"\text{s}" if x_axis_in_minutes: time = time / 60 time_unit = r"\text{min}" if start is not None: start = start / 60 if end is not None: end = end / 60 xlabel = rf"$Time~/~{time_unit}$" xlim = (start, end) ylabel = r"$c~/~\text{mM}$" return _plot_solution_1D( time, solution, *args, xlabel=xlabel, xlim=xlim, ylabel=ylabel, **kwargs, )
[docs] def plot_at_time( self, t: float, components: Optional[list[str]] = None, *args: Any, **kwargs: Any, ) -> tuple[plt.Figure, plt.Axes | tuple[plt.Axes]]: """ Plot bulk solution over space at given time. Parameters ---------- t : float Time for plotting. If t == -1, the final solution is plotted. components : Optional[list[str]] List of components to be plotted. If None, all components are plotted. *args : Any Optional arguments passed to `_plot_solution_1D`. **kwargs : Any Optional keyword arguments passed to `_plot_solution_1D`. Returns ------- tuple[plt.Figure, plt.Axes | tuple[plt.Axes]] The Matplotlib Figure and Axes objects. See Also -------- _plot_solution_1D _plot_2D slice_solution CADETProcess.plotting """ solution = slice_solution( self, components=components, use_total_concentration=False, use_total_concentration_components=False, coordinates={"time": [t, t]}, ) x = self.axial_coordinates xlabel = r"$z~/~m$" ylabel = r"$c~/~\text{mM}$" fig, axs = _plot_solution_1D( x, solution, *args, xlabel=xlabel, ylabel=ylabel, **kwargs, ) ax = axs if not isinstance(axs, tuple) else axs[0] plotting.add_text(ax, f"time = {t:.2f} s") return fig, axs
[docs] def plot_at_position( self, z: float, start: Optional[float] = None, end: Optional[float] = None, components: Optional[list[str]] = None, x_axis_in_minutes: bool = True, *args: Any, **kwargs: Any, ) -> tuple[plt.Figure, plt.Axes | tuple[plt.Axes]]: """ Plot bulk solution over space at given time. Parameters ---------- z : float Position for plotting. start : float | None, optional, default=None Start time for plotting in seconds. If None, the first data point is used. end : float | None, optional, default=None End time for plotting in seconds. If None, the last data point is used. components : list[str] | None, optional, default=None List of components to be plotted. If None, all components are plotted. x_axis_in_minutes : bool, optional, default=True If True, the x-axis will be plotted using minutes. *args : Any Optional arguments passed to `_plot_solution_1D`. **kwargs : Any Optional keyword arguments passed to `_plot_solution_1D`. Returns ------- tuple[plt.Figure, plt.Axes | tuple[plt.Axes]] The Matplotlib Figure and Axes objects. See Also -------- _plot_solution_1D _plot_2D slice_solution CADETProcess.plotting """ solution = slice_solution( self, components=components, use_total_concentration=False, use_total_concentration_components=False, coordinates={"axial_coordinates": [z, z]}, ) time = solution.time time_unit = r"\text{s}" if x_axis_in_minutes: time = time / 60 time_unit = r"\text{min}" if start is not None: start = start / 60 if end is not None: end = end / 60 xlabel = rf"$Time~/~{time_unit}$" xlim = (start, end) ylabel = r"$c~/~\text{mM}$" fig, axs = _plot_solution_1D( time, solution, *args, xlabel=xlabel, xlim=xlim, ylabel=ylabel, **kwargs, ) ax = axs if not isinstance(axs, tuple) else axs[0] plotting.add_text(ax, f"z = {z:.2f} m") return fig, axs
[docs] class SolutionParticle(SolutionBase): """ Solution in the particle liquid phase of the ``UnitOperation``. Dimensions: NCOL * NRAD * sum_{j}^{NPARTYPE}{NCOMP * NPAR,j} """ dimensions = SolutionBase.dimensions + [ "axial_coordinates", "radial_coordinates", "particle_coordinates", "component_coordinates", ] def __init__( self, name: str, component_system: ComponentSystem, time: npt.ArrayLike, solution: npt.ArrayLike, axial_coordinates: Optional[npt.ArrayLike] = None, radial_coordinates: Optional[npt.ArrayLike] = None, particle_coordinates: Optional[npt.ArrayLike] = None, ) -> None: """ Initialize the SolutionParticle. Parameters ---------- name : str The name of the solution object. component_system : ComponentSystem The component system associated with the solution. time : npt.ArrayLike An array-like structure representing time points. solution : npt.ArrayLike An array-like structure representing the solution data. axial_coordinates : Optional[npt.ArrayLike], An array-like structure representing axial coordinates. If None, it is assumed that the model has a singleton axial dimension. radial_coordinates : Optional[npt.ArrayLike], An array-like structure representing radial coordinates. If None, it is assumed that the model has a singleton bulk dimension. particle_coordinates : Optional[npt.ArrayLike], An array-like structure representing particle coordinates. If None, it is assumed that the model has a singleton particle dimension. """ self.axial_coordinates = axial_coordinates self.radial_coordinates = radial_coordinates self.particle_coordinates = particle_coordinates super().__init__(name, component_system, time, solution) @property def ncol(self) -> int: """int: Number of axial discretization points.""" if self.axial_coordinates is None: return return len(self.axial_coordinates) @property def nrad(self) -> int: """int: Number of radial discretization points.""" if self.radial_coordinates is None: return return len(self.radial_coordinates) @property def npar(self) -> int: """int: Number of particle discretization points.""" if self.particle_coordinates is None: return return len(self.particle_coordinates)
[docs] def plot( self, start: float | None = None, end: float | None = None, components: list[str] | None = None, x_axis_in_minutes: bool = True, *args: Any, **kwargs: Any, ) -> tuple[plt.Figure, plt.Axes | tuple[plt.Axes]]: """ Plot the entire particle liquid phase solution for each component. Parameters ---------- start : float | None, optional, default=None Start time for plotting in seconds. If None, the first data point is used. end : float | None, optional, default=None End time for plotting in seconds. If None, the last data point is used. components : list[str] | None, optional, default=None List of components to be plotted. If None, all components are plotted. x_axis_in_minutes : bool, optional, default=True If True, the x-axis is displayed in minutes instead of seconds. *args : Any Optional arguments passed to `_plot_solution_1D`. **kwargs : Any Optional keyword arguments passed to `_plot_solution_1D`. Returns ------- tuple[plt.Figure, plt.Axes | tuple[plt.Axes]] The Matplotlib Figure and Axes objects. Raises ------ CADETProcessError If solution is not 1D. See Also -------- _plot_solution_1D : Low-level function for 1D plotting. slice_solution : Slice the solution for plotting. CADETProcess.plotting : Plotting library utilities. """ if not (self.ncol is None and self.nrad is None and self.npar is None): raise CADETProcessError( "Solution has more than single dimension. " "Please use `plot_at_time`." ) solution = slice_solution( self, components=components, use_total_concentration=False, use_total_concentration_components=False, coordinates={"time": [start, end]}, ) time = solution.time time_unit = r"\text{s}" if x_axis_in_minutes: time = time / 60 time_unit = r"\text{min}" if start is not None: start = start / 60 if end is not None: end = end / 60 xlabel = rf"$Time~/~{time_unit}$" xlim = (start, end) ylabel = r"$c~/~\text{mM}$" return _plot_solution_1D( time, solution, *args, xlabel=xlabel, xlim=xlim, ylabel=ylabel, **kwargs, )
def _plot_1D( self, t: float, components: Optional[list[str]] = None, *args: Any, **kwargs: Any, ) -> tuple[plt.Figure, plt.Axes | tuple[plt.Axes]]: """ Plot particle solution over space at given time. Parameters ---------- t : float Time for plotting. If t == -1, the final solution is plotted. components : list[str] | None, optional, default=None List of components to be plotted. If None, all components are plotted. ax : Optional[plt.Axes] plt.Axes to plot on. *args : Any Optional arguments passed to `_plot_solution_1D`. **kwargs : Any Optional keyword arguments passed to `_plot_solution_1D`. Returns ------- tuple[plt.Figure, plt.Axes | tuple[plt.Axes]] The Matplotlib Figure and Axes objects. See Also -------- _plot_solution_1D _plot_2D slice_solution CADETProcess.plotting """ solution = slice_solution( self, components=components, use_total_concentration=False, use_total_concentration_components=False, coordinates={"time": [t, t]}, ) x = self.axial_coordinates xlabel = r"$z~/~m$" ylabel = r"$c~/~\text{mM}$" fig, axs = _plot_solution_1D( x, solution, *args, xlabel=xlabel, ylabel=ylabel, **kwargs, ) ax = axs if not isinstance(axs, tuple) else axs[0] plotting.add_text(ax, f"time = {t:.2f} s") return fig, axs def _plot_2D( self, t: float, comp: int = 0, vmax: Optional[float] = None, fig: plt.Figure | None = None, ax: plt.Axes | None = None, setup_figure_kwargs: dict | None = None, ) -> tuple[plt.Figure, plt.Axes]: x = self.axial_coordinates y = self.particle_coordinates if not self.time[0] <= t <= self.time[-1]: raise ValueError("Time exceeds bounds.") i = np.where(t <= self.time)[0][0] v = self.solution[i, :, :, comp].transpose() if vmax is None: vmax = v.max() if ax is None: fig, ax = plotting.setup_figure(**setup_figure_kwargs) mesh = ax.pcolormesh(x, y, v, shading="gouraud", vmin=0, vmax=vmax) else: mesh = ax.get_children()[0] mesh.set_array(v.flatten()) ax.set_title(f"Solid phase concentration, comp={comp}") ax.set_xlabel("$z~/~\text{m}$") ax.set_ylabel("$r~/~\text{m}$") plt.colorbar(mesh) plotting.add_text(ax, f"time = {t:.2f} s") return fig, ax
[docs] @plotting.figure_utils def plot_at_time( self, t: float, comp: Optional[int] = None, vmax: Optional[float] = None, *args: Any, **kwargs: Any, ) -> tuple[plt.Figure, plt.Axes | tuple[plt.Axes]]: """ Plot particle liquid solution for a given component over space at given time. Parameters ---------- t : float Solution time at with to plot. comp : Optional[int] Component index. vmax: Optional[float] Maximum data value to scale color map. *args : Any Optional arguments passed to the local plot methods. **kwargs : Any Optional keyword arguments passed to the local plot methods. Returns ------- tuple[plt.Figure, plt.Axes | tuple[plt.Axes]] The Matplotlib Figure and Axes objects. See Also -------- CADETProcess.plotting """ if self.npar is None: if vmax is not None: kwargs["ylim"] = (None, vmax) fig, ax = self._plot_1D(t, comp, *args, **kwargs) else: if comp is None and self.n_comp > 1: raise ValueError("Must specify component index.") fig, ax = self._plot_2D(t, comp, vmax, *args, **kwargs) return fig, ax
[docs] class SolutionSolid(SolutionBase): """ Solution in the particle solid phase of the ``UnitOperation``. Dimensions: NCOL * NRAD * sum_{j}^{NPARTYPE}{NBOUND,j * NPAR,j} """ dimensions = SolutionBase.dimensions + [ "axial_coordinates", "radial_coordinates", "particle_coordinates", "component_coordinates", ] def __init__( self, name: str, component_system: ComponentSystem, bound_states: list[int], time: npt.ArrayLike, solution: npt.ArrayLike, axial_coordinates: Optional[npt.ArrayLike] = None, radial_coordinates: Optional[npt.ArrayLike] = None, particle_coordinates: Optional[npt.ArrayLike] = None, ) -> None: """ Initialize the SolutionSolid. Parameters ---------- name : str The name of the solution object. component_system : ComponentSystem The component system associated with the solution. bound_states: list[int] The number of bound states per component. time : npt.ArrayLike An array-like structure representing time points. solution : npt.ArrayLike An array-like structure representing the solution data. axial_coordinates : Optional[npt.ArrayLike], An array-like structure representing axial coordinates. If None, it is assumed that the model has a singleton axial dimension. radial_coordinates : Optional[npt.ArrayLike], An array-like structure representing radial coordinates. If None, it is assumed that the model has a singleton bulk dimension. particle_coordinates : Optional[npt.ArrayLike], An array-like structure representing particle coordinates. If None, it is assumed that the model has a singleton particle dimension. """ self.bound_states = bound_states self.axial_coordinates = axial_coordinates self.radial_coordinates = radial_coordinates self.particle_coordinates = particle_coordinates super().__init__(name, component_system, time, solution) @property def n_bound(self) -> int: """int: Number of bound states.""" return sum(self.bound_states) @property def n_comp(self) -> int: """int: Number of bound states.""" return self.n_bound @property def ncol(self) -> int: """int: Number of axial discretization points.""" if self.axial_coordinates is None: return None else: return len(self.axial_coordinates) @property def nrad(self) -> int: """int: Number of radial discretization points.""" if self.radial_coordinates is None: return return len(self.radial_coordinates) @property def npar(self) -> int: """int: Number of particle discretization points.""" if self.particle_coordinates is None: return return len(self.particle_coordinates)
[docs] def plot( self, start: float | None = None, end: float | None = None, components: list[str] | None = None, x_axis_in_minutes: bool = True, *args: Any, **kwargs: Any, ) -> tuple[plt.Figure, plt.Axes | tuple[plt.Axes]]: """ Plot the entire particle solid phase solution for each component. Parameters ---------- start : float | None, optional, default=None Start time for plotting in seconds. If None, the first data point is used. end : float | None, optional, default=None End time for plotting in seconds. If None, the last data point is used. components : list[str] | None, optional, default=None List of components to be plotted. If None, all components are plotted. x_axis_in_minutes : bool, optional, default=True If True, the x-axis is displayed in minutes instead of seconds. *args : Any Optional arguments passed to `_plot_solution_1D`. **kwargs : Any Optional keyword arguments passed to `_plot_solution_1D`. Returns ------- tuple[plt.Figure, plt.Axes | tuple[plt.Axes]] The Matplotlib Figure and Axes objects. Raises ------ CADETProcessError If solution is not 1D. See Also -------- _plot_solution_1D : Low-level function for 1D plotting. slice_solution : Slice the solution for plotting. CADETProcess.plotting : Plotting library utilities. """ if not (self.ncol is None and self.nrad is None and self.npar is None): raise CADETProcessError( "Solution has more than single dimension. " "Please use `plot_at_time`." ) solution = slice_solution( self, components=components, use_total_concentration=False, use_total_concentration_components=False, coordinates={"time": [start, end]}, ) time = solution.time time_unit = r"\text{s}" if x_axis_in_minutes: time = time / 60 time_unit = r"\text{min}" if start is not None: start = start / 60 if end is not None: end = end / 60 xlabel = rf"$Time~/~{time_unit}$" xlim = (start, end) ylabel = r"$c~/~\text{mM}$" return _plot_solution_1D( time, solution, *args, xlabel=xlabel, xlim=xlim, ylabel=ylabel, **kwargs, )
def _plot_1D( self, t: float, components: Optional[list[str]] = None, *args: Any, **kwargs: Any, ) -> tuple[plt.Figure, plt.Axes | tuple[plt.Axes]]: """ Plot solid solution over space at given time. Parameters ---------- t : float Time for plotting. If t == -1, the final solution is plotted. components : Optional[list[str]] List of components to be plotted. If None, all components are plotted. ax : Optional[plt.Axes] plt.Axes to plot on. *args : Any Optional arguments passed to `_plot_solution_1D`. **kwargs : Any Optional keyword arguments passed to `_plot_solution_1D`. Returns ------- tuple[plt.Figure, plt.Axes | tuple[plt.Axes]] The Matplotlib Figure and Axes objects. See Also -------- _plot_solution_1D _plot_2D slice_solution CADETProcess.plotting """ solution = slice_solution( self, components=components, use_total_concentration=False, use_total_concentration_components=False, coordinates={"time": [t, t]}, ) x = self.axial_coordinates xlabel = r"$z~/~m$" ylabel = r"$c~/~\text{mM}$" fig, axs = _plot_solution_1D( x, solution, *args, xlabel=xlabel, ylabel=ylabel, **kwargs, ) ax = axs if not isinstance(axs, tuple) else axs[0] plotting.add_text(ax, f"time = {t:.2f} s") return fig, axs def _plot_2D( self, t: float, comp: int = 0, bound_state: int = 0, vmax: Optional[float] = None, fig: plt.Figure | None = None, ax: plt.Axes | None = None, setup_figure_kwargs: dict | None = None, ) -> tuple[plt.Figure, plt.Axes]: x = self.axial_coordinates y = self.particle_coordinates if not self.time[0] <= t <= self.time[-1]: raise ValueError("Time exceeds bounds.") t_i = np.where(t <= self.time)[0][0] c_i = int(np.sum(self.bound_states[0:comp]) + bound_state) v = self.solution[t_i, :, :, c_i].transpose() if vmax is None: vmax = v.max() if ax is None: fig, ax = plotting.setup_figure(**setup_figure_kwargs) mesh = ax.pcolormesh(x, y, v, shading="gouraud", vmin=0, vmax=vmax) else: mesh = ax.get_children()[0] mesh.set_array(v.flatten()) ax.set_title( f"Solid phase concentration, comp={comp}, bound_state={bound_state}" ) ax.set_xlabel("$z~/~\text{m}$") ax.set_ylabel("$r~/~\text{m}$") plt.colorbar(mesh) plotting.add_text(ax, f"time = {t:.2f} s") return fig, ax
[docs] @plotting.figure_utils def plot_at_time( self, t: float, comp: Optional[int] = None, bound_state: Optional[int] = 0, vmax: Optional[float] = None, *args: Any, **kwargs: Any, ) -> tuple[plt.Figure, plt.Axes | tuple[plt.Axes]]: """ Plot particle solid solution for a given bound state over space at given time. Parameters ---------- t : float Solution time at with to plot. comp : Optional[int] Component index. bound_state : Optional[int], default=0 Bound state index. vmax: Optional[float] Maximum data value to scale color map. *args : Any Optional arguments passed to the local plot methods. **kwargs : Any Optional keyword arguments passed to the local plot methods. Returns ------- tuple[plt.Figure, plt.Axes | tuple[plt.Axes]] The Matplotlib Figure and Axes objects. See Also -------- CADETProcess.plotting """ if self.npar is None: if vmax is not None: kwargs["ylim"] = (None, vmax) fig, ax = self._plot_1D(t, comp, *args, **kwargs) else: if comp is None and self.n_comp > 1: raise ValueError("Must specify component index.") if comp is None: comp = 0 if bound_state is None and self.bound_states[comp] > 1: raise ValueError("Must specify bound state index.") fig, ax = self._plot_2D(t, comp, bound_state, vmax, *args, **kwargs) return fig, ax
[docs] class SolutionVolume(SolutionBase): """Volume solution (of e.g. CSTR).""" @property def solution_shape(self) -> tuple[int, ...]: """tuple: (Expected) shape of the solution.""" return (self.nt,)
[docs] @plotting.figure_utils def plot( self, start: Optional[float] = None, end: Optional[float] = None, x_axis_in_minutes: Optional[bool] = True, y_axis_in_liters: Optional[bool] = True, ylim: tuple[float, float] | None = None, update_layout: bool = True, ax: plt.Axes | tuple[plt.Axes] | None = None, setup_figure_kwargs: dict | None = None, ) -> tuple[plt.Figure, plt.Axes]: """ Plot the volume over time. Parameters ---------- start : float | None, optional, default=None Start time for plotting in seconds. If None, the first data point is used. end : float | None, optional, default=None End time for plotting in seconds. If None, the last data point is used. x_axis_in_minutes : bool, optional, default=True If True, the x-axis will be plotted using minutes. y_axis_in_liters : bool, optional, default=True If True, the y-axis is displayed in Liters instead of m³. ylim : tuple[float, float] | None, default=None Y-axis limits. update_layout : bool, optional, default=True If True, update layout. ax : plt.Axes | tuple[plt.Axes] | None, default=None Optional Matplotlib Axes. If not provided, a new figure is created. setup_figure_kwargs : dict | None, default=None Additional options to setup the figure. Returns ------- tuple[plt.Figure, plt.Axes | tuple[plt.Axes]] The Matplotlib Figure and Axes objects. See Also -------- CADETProcess.plotting """ time = self.time time_unit = r"\text{s}" if x_axis_in_minutes: time = time / 60 time_unit = r"\text{min}" if start is not None: start = start / 60 if end is not None: end = end / 60 xlabel = rf"$Time~/~{time_unit}$" xlim = (start, end) y = self.solution volume_unit = r"\text{m}^3" if y_axis_in_liters: y = y * 1000 volume_unit = r"\text{L}" ylabel = rf"$Volume$~/~{volume_unit}$" if ax is None: fig, ax = plotting.setup_figure(**setup_figure_kwargs) else: fig = ax.get_figure() if update_layout: ax.set_xlabel(xlabel) ax.set_xlim(xlim) ax.set_ylabel(ylabel) if ylim is not None: ax.set_ylim(ylim) return fig, ax
@plotting.figure_utils def _plot_solution_1D( x: np.ndarray, solution: SolutionBase, plot_species: bool = False, plot_components: bool = True, plot_total_concentration: bool = False, alpha: float = 1.0, xlim: tuple[float, float] | None = None, xlabel: str | None = None, ylim: tuple[float, float] | None = None, ylabel: str | None = None, hide_labels: bool = False, hide_species_labels: bool = False, secondary_axes: list[plotting.SecondaryAxis] | None = None, show_legend: bool = True, update_layout: bool = True, ax: plt.Axes | tuple[plt.Axes] | None = None, setup_figure_kwargs: dict | None = None, ) -> tuple[plt.Figure, plt.Axes | tuple[plt.Axes]]: """Plot 1D solution data with optional secondary axis. Parameters ---------- x : np.ndarray x-values for plotting. solution : SolutionBase Solution data to plot. plot_species : bool, default=False If True, plot individual species. plot_components : bool, default=True If True, plot total concentration of components. plot_total_concentration : bool, default=False If True, plot total concentration. alpha : float, default=1.0 Opacity of plotted lines. xlim : tuple[float, float] | None, default=None X-axis limits. xlabel : str | None, default=None X-axis label. ylim : tuple[float, float] | None, default=None Y-axis limits. ylabel : str | None, default=None Y-axis label. hide_labels : bool, default=False If True, hide labels in the legend. hide_species_labels : bool, default=False If True, hide species labels in the legend. secondary_axes : list[SecondaryAxis] | None, default=None List of `SecondaryAxis` objects for secondary axes. show_legend : bool, default=True If True, display the legend. update_layout : bool, optional, default=True If True, update layout. ax : plt.Axes | tuple[plt.Axes] | None, default=None Optional Matplotlib Axes. If not provided, a new figure is created. setup_figure_kwargs : dict | None, default=None Additional options to setup the figure. Returns ------- tuple[plt.Figure, plt.Axes | tuple[plt.Axes]] The Matplotlib Figure and Axes objects. """ if ax is None: # Create a new figure and primary axis fig, primary_ax = plotting.setup_figure(**setup_figure_kwargs) # Initialize list for secondary axes sec_axes = [] # Ensure secondary_axes is iterable if secondary_axes is not None: if not isinstance(secondary_axes, list): secondary_axes = [secondary_axes] # Create twin axes for each secondary axis for i, sec_axis in enumerate(secondary_axes): a = primary_ax.twinx() sec_axes.append(a) else: # Use provided axes if isinstance(ax, tuple): primary_ax = ax[0] sec_axes = list(ax[1:]) else: primary_ax = ax sec_axes = [] fig = primary_ax.get_figure() # Plot data sol = solution.solution c_total_comp = solution.total_concentration_components colors = iter(plt.rcParams["axes.prop_cycle"].by_key()["color"]) species_index = 0 # Initialize species index for i, comp in enumerate(solution.component_system.components): color = next(colors) label = None if update_layout and not hide_labels: label = comp.name # Determine which axis to plot on current_axis = primary_ax for sec_axis in secondary_axes or []: if comp.name in sec_axis.components: current_axis = sec_axes[secondary_axes.index(sec_axis)] # Plot components if plot_components: y_comp = c_total_comp[..., i] for sec_axis in secondary_axes or []: if comp.name in sec_axis.components and sec_axis.transform is not None: y_comp = sec_axis.transform(y_comp) current_axis.plot( x, np.squeeze(y_comp), linestyle="-", color=color, alpha=alpha, label=label, ) # Plot species if plot_species: if comp.n_species == 1 and plot_components: species_index += 1 continue linestyle_iter = iter(plotting.linestyle_cycler) for s, species in enumerate(comp.label): label = None if update_layout and not hide_labels: label = species y_spec = sol[..., species_index] for sec_axis in secondary_axes or []: if comp.name in sec_axis.components and sec_axis.transform is not None: y_spec = sec_axis.transform(y_spec) current_axis.plot( x, np.squeeze(y_spec), linestyle=next(linestyle_iter)["linestyle"], color=color, alpha=alpha, label=label, ) species_index += 1 # Plot total concentration (if requested) if plot_total_concentration: y_total = solution.total_concentration label = None if update_layout and not hide_labels: label = "Total concentration" primary_ax.plot( x, np.squeeze(y_total), "-", color="k", alpha=alpha, label=label, ) # Finalize layout if update_layout: if xlabel is not None: primary_ax.set_xlabel(xlabel) if xlim is not None: primary_ax.set_xlim(xlim) if ylabel is not None: primary_ax.set_ylabel(ylabel) if ylim is not None: primary_ax.set_ylim(ylim) # Update secondary axis Layout for sec_axis in secondary_axes or []: if sec_axis.ylabel is not None: a.set_ylabel(sec_axis.ylabel) if sec_axis.ylim is not None: a.set_ylim(sec_axis.ylim) plotting.offset_secondary_yaxes(primary_ax) if show_legend: handles, labels = plotting.get_all_twin_handles_labels(primary_ax) if labels: primary_ax.legend(handles, labels) if not sec_axes: return fig, primary_ax return fig, (primary_ax, *sec_axes) class InterpolatedSignal: def __init__(self, time: np.ndarray, signal: npt.ArrayLike) -> None: if len(signal.shape) == 1: signal = np.array(signal, ndmin=2).transpose() self._solutions = [ PchipInterpolator(time, signal[:, comp]) for comp in range(signal.shape[1]) ] self._derivatives = [signal.derivative() for signal in self._solutions] self._antiderivatives = [signal.antiderivative() for signal in self._solutions] @property def solutions(self) -> list[PchipInterpolator]: """list[PchipInterpolator]: Interpolators for each component.""" return self._solutions @property def derivatives(self) -> list[PchipInterpolator]: """list[PchipInterpolator]: Derivative interpolators for each component.""" return self._derivatives def derivative(self, time: np.ndarray) -> np.ndarray: """ Calculate derivatives of the solution spline(s) at given time points. Parameters ---------- time : np.ndarray The time points to evaluate the derivatives at. Returns ------- np.ndarray Derivatives of the solution spline(s) at given time. """ der = np.empty((len(time), len(self.solutions))) for comp, der_i in enumerate(self.derivatives): der[:, comp] = der_i(time) return der @property def antiderivatives(self) -> list[PchipInterpolator]: """list[PchipInterpolator]: Antiderivative interpolators for each component.""" return self._antiderivatives def antiderivative(self, time: np.ndarray) -> np.ndarray: """ Antiderivative of the solution of the spline(s) at given time. Parameters ---------- time : np.ndarray The time points to evaluate the antiderivatives at. Returns ------- np.ndarray Antiderivatives of the solution spline(s) at given time. """ anti = np.empty((len(time), len(self.solutions))) for comp, anti_i in enumerate(self.antiderivatives): anti[:, comp] = anti_i(time) return anti def integral( self, start: Optional[float] = None, end: Optional[float] = None, ) -> np.ndarray: """ Definite integral between start and end. Parameters ---------- start : Optional[float] Lower integration bound. end: Optional[float] Upper integration bound. Returns ------- np.ndarray Definite integral of the solution spline(s) between limits. """ return np.array([comp.integrate(start, end) for comp in self._solutions]) def __call__(self, t: float) -> np.ndarray: return np.array([comp(t) for comp in self._solutions]).transpose()
[docs] def slice_solution( solution_original: SolutionBase, components: Optional[str | list[str]] = None, use_total_concentration: Optional[bool] = False, use_total_concentration_components: Optional[bool] = False, coordinates: Optional[dict[str, Optional[tuple[Optional[int], Optional[int]]]]] = None, ) -> SolutionBase: """ Slice a `Solution` object along specified dimensions, components or both. Parameters ---------- solution_original : SolutionBase The `Solution` object to slice. components : Optional[str | list[str]] The names of the components to keep in the sliced `Solution`. If `None`, all components are kept. The default is None. use_total_concentration : Optional[bool], default=False If `True`, only the total concentration data is kept in the sliced `Solution`. use_total_concentration_components : Optional[bool], default=False If `True`, the total concentration data is kept for each individual species of each component in the sliced `Solution`. coordinates : Optional[dict[str, Optional[tuple[Optional[int], Optional[int]]]]] A dictionary mapping dimensions to slice coordinates. Each dimension in the solution object is represented by a key in the dictionary, and the corresponding value is a tuple of two elements specifying the start and end coordinates of the slice along that dimension. If a value is `None`, the corresponding coordinate is not sliced. Returns ------- SolutionBase A new solution object representing the sliced data. Raises ------ ValueError If any of the slice coordinates exceeds the bounds of its corresponding dimension. CADETProcessError If any of the specified components or dimensions does not exist in the original solution. """ solution = copy.deepcopy(solution_original) slices = () if coordinates is not None: coordinates = copy.deepcopy(coordinates) for i, (dim, coord) in enumerate(solution.coordinates.items()): if dim == "component_coordinates": continue if dim in coordinates: sl = list(coordinates.pop(dim)) # Update slice bounds if they are None if sl[0] is None: sl[0] = coord[0] if sl[1] is None: sl[1] = coord[-1] # Check bounds if not (coord[0] <= sl[0] <= sl[1] <= coord[-1]): raise ValueError(f"{dim} coordinates exceed bounds.") # Calculate start and end indices using searchsorted start_index = np.searchsorted(coord, sl[0], side="right") - 1 end_index = np.searchsorted(coord, sl[1], side="left") + 1 # Ensure only a single entry is returned if start and end elements are the same if sl[0] == sl[1]: end_index = start_index + 1 # Create slice and update solution sl = slice(start_index, end_index) slices += (sl,) setattr(solution, dim, coord[sl]) else: slices += (slice(None),) if len(coordinates) > 0: raise CADETProcessError(f"Unknown dimensions: {coordinates.keys()}") solution.solution = solution_original.solution[slices] # First calculate total_concentration of components, if required. if use_total_concentration_components: sol_total_concentration_comp = solution.total_concentration_components for comp in solution.component_system: if comp.n_species > 1: comp._species = [] comp.add_species(comp.name) solution.solution = sol_total_concentration_comp # Then, slice components. Note that component index can only be used if total # concentration of components has already been calculated and set as solution array. if components is not None: if not isinstance(components, list): components = [components] components = copy.deepcopy(components) component_system = copy.deepcopy(solution.component_system) component_indices = [] for i, (name, component) in enumerate( solution.component_system.components_dict.items() ): if name not in components: component_system.remove_component(component.name) else: name = str(name) components.remove(name) if use_total_concentration_components: component_indices.append(i) else: component_indices += solution.component_system.indices[component.name] if len(components) != 0: raise CADETProcessError(f"Unknown components: {components}") solution_components = solution.solution[..., component_indices] solution.component_system = component_system solution.solution = solution_components # Only calculate total concentration after removing unwanted components. if use_total_concentration: solution_total_concentration = solution.total_concentration solution.component_system = ComponentSystem(["total_concentration"]) solution.solution = solution_total_concentration solution.update_solution() return solution
[docs] def slice_solution_front( solution_original: SolutionIO, min_percent: Optional[float] = 0.02, max_percent: Optional[float] = 0.98, use_max_slope: Optional[bool] = False, return_indices: Optional[bool] = False, ) -> SolutionIO | tuple[SolutionIO, int, int]: """ Slice the front of a given solution. Parameters ---------- solution_original : SolutionIO The `Solution` object to slice. min_percent : Optional[float] Minimum percentage of the peak height to consider. Default is 0.02. max_percent : Optional[float] Maximum percentage of the peak height to consider. Default is 0.98. use_max_slope : Optional[bool] If True, cut use the maximum slope as end. Default if False. return_indices : Optional[bool] If True, also return the start end end indices of the front. Default if False. Returns ------- SolutionIO or tuple[SolutionIO, int, int] The sliced `SolutionIO` object. If `return_indices` is True, also returns the start and end indices of the slice. """ solution = copy.deepcopy(solution_original) # Initialize new sequence with zeros solution.solution[:] = 0 # Determine whether to use derivative array solution_array = solution_original.solution if use_max_slope: solution_array = solution_original.derivative.solution # Determine the max value and its index max_value = np.max(solution_array) max_index = np.argmax(solution_array) # Determine the min and max percent values select_min = min_percent * max_value select_max = max_percent * max_value # Find the indices for slicing using logical indexing idx_min = np.where(solution_array[:max_index] <= select_min)[0][-1] idx_max = np.where(solution_array[:max_index] >= select_max)[0][0] # Slice the sequence solution.solution[idx_min : idx_max + 1] = \ solution_original.solution[idx_min : idx_max + 1] solution.update_solution() if not return_indices: return solution return solution, idx_min, idx_max