"""
=======================================
Solution (:mod:`CADETProcess.solution`)
=======================================
.. currentmodule:: CADETProcess.solution
Module to store and plot solution of simulation.
.. autosummary::
:toctree: generated/
SolutionBase
SolutionIO
SolutionBulk
SolutionParticle
SolutionSolid
SolutionVolume
Method to slice a Solution:
.. autosummary::
:toctree: generated/
slice_solution
"""
import copy
import numpy as np
import matplotlib.pyplot as plt
from scipy.interpolate import PchipInterpolator
from scipy import integrate
from CADETProcess.dataStructure import Structure
from CADETProcess.dataStructure import (
String, UnsignedInteger, UnsignedFloat, Vector, SizedNdArray
)
from CADETProcess.processModel import ComponentSystem
from CADETProcess.dynamicEvents import TimeLine
from CADETProcess import plotting
from CADETProcess import CADETProcessError
from CADETProcess import smoothing
from CADETProcess import transform
__all__ = [
'SolutionIO',
'SolutionBulk',
'SolutionParticle',
'SolutionSolid',
'SolutionVolume',
'slice_solution',
]
[docs]
class SolutionBase(Structure):
"""Base class for solutions of component systems.
This class represents a solution of a component system at different time points.
It provides several properties that allow access to information about the solution,
such as the number of components, the total concentration, and the local purity.
Attributes
----------
name : str
Name of the solution.
time : np.ndarray
Array of time points.
solution : np.ndarray
Array of solution values.
c_min : float
Minimum concentration threshold, below which concentrations are considered zero.
dimensions : list of str
Names of the dimensions in the solution.
Notes
-----
This class is not meant to be used directly, but to be subclassed by specific
solution types.
"""
name = String()
time = Vector()
solution = SizedNdArray(size='solution_shape')
c_min = UnsignedFloat(default=1e-6)
dimensions = ['time']
def __init__(self, name, component_system, time, solution):
self.name = name
self.component_system_original = component_system
self.time_original = time
self.solution_original = solution
self.reset()
[docs]
def reset(self):
"""Reset component system, time, and solution arrays to their original values."""
self.component_system = self.component_system_original
self.time = self.time_original
self.solution = self.solution_original
[docs]
def update(self):
"""Update the solution."""
pass
@property
def component_system(self):
"""ComponentSystem: ComponentSystem of the Solution object."""
return self._component_system
@component_system.setter
def component_system(self, component_system):
if not isinstance(component_system, ComponentSystem):
raise TypeError('Expected ComponentSystem')
self._component_system = component_system
@property
def n_comp(self):
"""int: Number of components."""
return self.component_system.n_comp
@property
def cycle_time(self):
"""float: Cycle time."""
return self.time[-1]
@property
def nt(self):
"""int: Number of time steps."""
return len(self.time)
@property
def component_coordinates(self):
"""np.ndarray: Indices of the components."""
return np.arange(self.n_comp)
@property
def coordinates(self):
"""np.ndarray: Coordinates of the Solution."""
coordinates = {}
for c in self.dimensions:
v = getattr(self, c)
if v is None:
continue
coordinates[c] = v
return coordinates
@property
def solution_shape(self):
"""tuple: (Expected) shape of the solution."""
return tuple(len(c) for c in self.coordinates.values())
@property
def total_concentration_components(self):
"""np.ndarray: Total concentration of components (sum of species)."""
component_concentration = np.zeros(
self.solution.shape[0:-1] + (self.component_system.n_components,)
)
counter = 0
for index, comp in enumerate(self.component_system):
comp_indices = slice(counter, counter + comp.n_species)
c_comp = np.sum(self.solution[..., comp_indices], axis=-1)
component_concentration[..., index] = c_comp
counter += comp.n_species
return component_concentration
@property
def total_concentration(self):
"""np.ndarray: Total concentration (sum) of all components."""
return np.sum(self.solution, axis=-1, keepdims=True)
@property
def local_purity_components(self):
"""np.ndarray: Local purity of each component."""
solution = self.total_concentration_components
c_total = self.total_concentration
c_total[c_total < self.c_min] = np.nan
purity = np.zeros(solution.shape)
with np.errstate(divide='ignore', invalid='ignore'):
purity = solution/c_total
purity = np.nan_to_num(purity)
return purity
@property
def local_purity_species(self):
"""np.ndarray: Local purity of components."""
solution = self.solution
c_total = self.total_concentration
c_total[c_total < self.c_min] = np.nan
purity = np.zeros(solution.shape)
with np.errstate(divide='ignore', invalid='ignore'):
purity = solution/c_total
purity = np.nan_to_num(purity)
return purity
def __str__(self):
return self.name
[docs]
class SolutionIO(SolutionBase):
"""Solution representing streams at the inlet or outlet of a ``UnitOperation``.
Notes
-----
The `flow_rate` attribute is implemented as TimeLine to improve interpolation of
signals with discontinuous flow.
"""
dimensions = SolutionBase.dimensions + ['component_coordinates']
def __init__(self, name, component_system, time, solution, flow_rate):
super().__init__(name, component_system, time, solution)
if not isinstance(flow_rate, TimeLine):
flow_rate = TimeLine.from_profile(time, flow_rate)
self.flow_rate = flow_rate
self.reset()
@property
def derivative(self):
"""SolutionIO: Derivative of this solution."""
derivative = copy.deepcopy(self)
derivative.reset()
derivative_fun = derivative.solution_interpolated.derivative
derivative.solution_original = derivative_fun(derivative.time)
derivative.reset()
return derivative
@property
def antiderivative(self):
"""SolutionIO: Antiderivative of this solution."""
antiderivative = copy.deepcopy(self)
antiderivative.reset()
antiderivative_fun = antiderivative.solution_interpolated.antiderivative
antiderivative.solution_original = antiderivative_fun(antiderivative.time)
antiderivative.reset()
return antiderivative
[docs]
def reset(self):
super().reset()
self.is_resampled = False
self.is_normalized = False
self.is_smoothed = False
self.s = None
self.crit_fs = None
self.crit_fs_der = None
self.is_smoothed = False
self.update()
self.update_transform()
[docs]
def update(self):
self._solution_interpolated = None
self._dm_dt_interpolated = None
@property
def solution_interpolated(self):
if self._solution_interpolated is None:
self._solution_interpolated = \
InterpolatedSignal(self.time, self.solution)
return self._solution_interpolated
@property
def dm_dt_interpolated(self):
if self._dm_dt_interpolated is None:
self.resample()
dm_dt = self.solution * self.flow_rate.value(self.time)
self._dm_dt_interpolated = InterpolatedSignal(self.time, dm_dt)
return self._dm_dt_interpolated
[docs]
def normalize(self):
"""Normalize the solution using the transformation function."""
if self.is_normalized:
return
self.solution = self.transform.transform(self.solution)
self.update()
self.is_normalized = True
[docs]
def denormalize(self):
"""Denormalize the solution using the transformation function."""
if not self.is_normalized:
return
self.solution = self.transform.untransform(self.solution)
self.update()
self.is_normalized = False
[docs]
def resample(self, start=None, end=None, nt=5001):
"""Resample solution to nt time points.
Parameters
----------
nt : int, optional
Number of points to resample. The default is 5001.
"""
if self.is_resampled:
return
if start is None:
start = self.time[0]
if end is None:
end = self.time[-1]
solution_interpolated = self.solution_interpolated
self.time = np.linspace(start, end, nt)
self.solution = solution_interpolated(self.time)
self.update()
self.is_resampled = True
[docs]
def smooth_data(self, s=None, crit_fs=None, crit_fs_der=None):
"""Smooth data.
Parameters
----------
s : float or list, optional
DESCRIPTION. The default is 0.
crit_fs : float or list
DESCRIPTION.
crit_fs_der : float or list
DESCRIPTION.
"""
if self.is_smoothed:
return
if not self.is_resampled:
self.resample()
if not self.is_normalized:
normalized = True
self.normalize()
else:
normalized = False
if None in (s, crit_fs, crit_fs_der):
s_ = []
crit_fs_ = []
crit_fs_der_ = []
for i in range(self.n_comp):
s_i, crit_fs_i, crit_fs_der_i = smoothing.find_smoothing_factors(
self.time, self.solution[..., i], rmse_target=1e-3
)
s_.append(s_i)
crit_fs_.append(crit_fs_i)
crit_fs_der_.append(crit_fs_der_i)
if s is None:
s = s_
if crit_fs is None:
crit_fs = crit_fs_
if crit_fs_der is None:
crit_fs_der = crit_fs_der_
if np.isscalar(s):
s = self.n_comp * [s]
elif len(s) == 1:
s = self.n_comp * s
self.s = s
if np.isscalar(crit_fs):
crit_fs = self.n_comp * [crit_fs]
elif len(crit_fs) == 1:
crit_fs = self.n_comp * crit_fs
self.crit_fs = crit_fs
if np.isscalar(crit_fs_der):
crit_fs_der = self.n_comp * [crit_fs_der]
elif len(crit_fs_der) == 1:
crit_fs_der = self.n_comp * crit_fs_der
self.crit_fs_der = crit_fs_der
solution = np.zeros((self.solution.shape))
for i, (s, crit_fs, crit_fs_der) in enumerate(zip(s, crit_fs, crit_fs_der)):
smooth, smooth_der = smoothing.full_smooth(
self.time, self.solution[..., i],
crit_fs, s, crit_fs_der
)
solution[..., i] = smooth
self.solution = solution
if normalized:
self.denormalize()
self.update()
self.is_smoothed = True
[docs]
def integral(self, start=None, end=None):
"""Peak area in a fraction interval.
Parameters
----------
start : float
Start time of the fraction
end: float
End time of the fraction
Returns
-------
Area : np.ndarray
Mass of all components in the fraction
"""
if end is None:
end = self.cycle_time
return self.solution_interpolated.integral(start, end)
[docs]
def fraction_mass(self, start=None, end=None):
"""Component mass in a fraction interval
Parameters
----------
start : float
Start time of the fraction
end: float
End time of the fraction
Returns
-------
fraction_mass : np.ndarray
Mass of all components in the fraction
"""
if end is None:
end = self.cycle_time
def dm_dt(t, flow_rate, solution):
dm_dt = flow_rate.value(t)*solution(t)
return dm_dt
points = None
if len(self.flow_rate.section_times) > 2:
points = self.flow_rate.section_times[1:-1]
mass = integrate.quad_vec(
dm_dt,
start, end,
epsabs=1e-6,
epsrel=1e-8,
args=(self.flow_rate, self.solution_interpolated),
points=points,
)[0]
return mass
[docs]
def fraction_volume(self, start=None, end=None):
"""Volume of a fraction interval
Parameters
----------
start : float
Start time of the fraction
end: float
End time of the fraction
Returns
-------
fraction_volume : np.ndarray
Volume of the fraction
"""
if end is None:
end = self.cycle_time
return float(self.flow_rate.integral(start, end).squeeze())
[docs]
@plotting.create_and_save_figure
def plot(
self,
start=None,
end=None,
components=None,
layout=None,
y_max=None,
ax=None,
*args,
**kwargs,
):
"""Plot the entire time_signal for each component.
Parameters
----------
start : float, optional
Start time for plotting. The default is 0.
end : float, optional
End time for plotting.
components : list, optional.
List of components to be plotted. If None, all components are plotted.
layout : plotting.Layout
Plot layout options.
y_max : float, optional
Maximum value of y axis.
If None, value is automatically deferred from solution.
ax : Axes
Axes to plot on.
Returns
-------
ax : Axes
Axes object with concentration profile.
See Also
--------
_plot_solution_1D
slice_solution
plotlib
plot_purity
"""
solution = slice_solution(
self,
components=components,
use_total_concentration=False,
use_total_concentration_components=False,
coordinates={'time': [start, end]}
)
x = solution.time / 60
if layout is None:
layout = plotting.Layout()
layout.x_label = '$time~/~min$'
layout.y_label = '$c~/~mM$'
if start is not None:
start /= 60
if end is not None:
end /= 60
layout.x_lim = (start, end)
if y_max is not None:
layout.y_lim = (None, y_max)
ax = _plot_solution_1D(ax, x, solution, layout, *args, **kwargs)
return ax
[docs]
@plotting.create_and_save_figure
def plot_purity(
self,
start=None,
end=None,
components=None,
layout=None,
y_max=None,
plot_components_purity=True,
plot_species_purity=False,
alpha=1, hide_labels=False,
show_legend=True,
ax=None):
"""Plot local purity for each component of the concentration profile.
Parameters
----------
start : float, optional
Start time for plotting. The default is 0.
end : float, optional
End time for plotting.
components : list, optional.
List of components to be plotted. If None, all components are plotted.
Note that if components are excluded, they will also not be considered in
the calculation of the purity.
layout : plotting.Layout
Plot layout options.
y_max : float, optional
Maximum value of y axis.
If None, value is automatically deferred from solution.
plot_components_purity : bool, optional
If True, plot purity of total component concentration. The default is True.
plot_species_purity : bool, optional
If True, plot purity of individual species. The default is False.
alpha : float, optional
Opacity of line.
hide_labels : bool, optional
If True, hide labels. The default is False.
show_legend : bool, optional
If True, show legend. The default is True.
ax : Axes
Axes to plot on.
Returns
-------
ax : Axes
Axes with plot of purity over time.
Raises
------
CADETProcessError
If solution has less than 2 components.
See Also
--------
slice_solution
plotlib
plot
"""
solution = slice_solution(
self,
components=components,
use_total_concentration=False,
use_total_concentration_components=False,
coordinates={'time': [start, end]}
)
if solution.n_comp < 2:
raise CADETProcessError(
"Purity undefined for systems with less than 2 components."
)
x = solution.time / 60
if layout is None:
layout = plotting.Layout()
layout.x_label = r'$time~/~min$'
layout.y_label = r'$Purity ~/~\%$'
if start is not None:
start /= 60
if end is not None:
end /= 60
layout.x_lim = (start, end)
if y_max is not None:
layout.y_lim = (None, y_max)
local_purity_components = solution.local_purity_components * 100
local_purity_species = solution.local_purity_species * 100
colors = iter(plt.rcParams["axes.prop_cycle"].by_key()["color"])
species_index = 0
for i, comp in enumerate(solution.component_system.components):
color = next(colors)
if hide_labels:
label = None
else:
label = comp.name
if plot_components_purity:
y = local_purity_components[..., i]
ax.plot(
x, y,
label=label,
color=color,
alpha=alpha
)
if plot_species_purity:
if comp.n_species == 1:
species_index += 1
continue
for s, species in enumerate(comp.species):
label = s
y = local_purity_species[..., species_index]
ax.plot(
x, y, '--',
label=label,
color=color,
alpha=alpha
)
species_index += 1
plotting.set_layout(
ax,
layout,
show_legend,
)
return ax
[docs]
class SolutionBulk(SolutionBase):
"""Solution in the bulk phase of the ``UnitOperation``.
Dimensions: NCOL * NRAD * NCOMP
"""
dimensions = SolutionBase.dimensions + [
'axial_coordinates',
'radial_coordinates',
'component_coordinates',
]
def __init__(
self,
name,
component_system,
time, solution,
axial_coordinates=None, radial_coordinates=None
):
self.name = name
self.component_system_original = component_system
self.time_original = time
self.axial_coordinates = axial_coordinates
# Account for dimension reduction in case of only one cell (e.g. LRMP)
if radial_coordinates is not None and len(radial_coordinates) == 1:
radial_coordinates = None
self.radial_coordinates = radial_coordinates
self.solution_original = solution
self.reset()
@property
def ncol(self):
"""int: Number of axial discretization points."""
if self.axial_coordinates is None:
return
return len(self.axial_coordinates)
@property
def nrad(self):
"""int: Number of radial discretization points."""
if self.radial_coordinates is None:
return
return len(self.radial_coordinates)
[docs]
@plotting.create_and_save_figure
def plot(
self,
start=None,
end=None,
components=None,
layout=None,
y_max=None,
ax=None,
*args,
**kwargs,
):
"""Plot the entire time_signal for each component.
Parameters
----------
start : float, optional
Start time for plotting. The default is 0.
end : float, optional
End time for plotting.
components : list, optional.
List of components to be plotted. If None, all components are plotted.
layout : plotting.Layout
Plot layout options.
y_max : float, optional
Maximum value of y axis.
If None, value is automatically deferred from solution.
ax : Axes
Axes to plot on.
Returns
-------
ax : Axes
Axes object with concentration profile.
Raises
------
CADETProcessError
If solution is not 1D.
See Also
--------
_plot_solution_1D
slice_solution
plotlib
plot_purity
"""
if not (self.ncol is None and self.nrad is None):
raise CADETProcessError(
"Solution has more than single dimension. "
"Please use `plot_at_time` or `plot_at_position`."
)
solution = slice_solution(
self,
components=components,
use_total_concentration=False,
use_total_concentration_components=False,
coordinates={'time': [start, end]}
)
x = solution.time / 60
if layout is None:
layout = plotting.Layout()
layout.x_label = '$time~/~min$'
layout.y_label = '$c~/~mM$'
if start is not None:
start /= 60
if end is not None:
end /= 60
layout.x_lim = (start, end)
if y_max is not None:
layout.y_lim = (None, y_max)
ax = _plot_solution_1D(ax, x, solution, layout, *args, **kwargs)
return ax
[docs]
@plotting.create_and_save_figure
def plot_at_time(
self,
t,
components=None,
layout=None,
ax=None,
*args,
**kwargs,
):
"""Plot bulk solution over space at given time.
Parameters
----------
t : float
Time for plotting
If t == -1, the final solution is plotted.
components : list, optional.
List of components to be plotted. If None, all components are plotted.
layout : plotting.Layout
Plot layout options.
ax : Axes
Axes to plot on.
Returns
-------
ax : Axes
Axes object with concentration profile.
See Also
--------
_plot_solution_1D
slice_solution
plot_at_position
plotlib
"""
solution = slice_solution(
self,
components=components,
use_total_concentration=False,
use_total_concentration_components=False,
coordinates={'time': [t, t]},
)
x = self.axial_coordinates
if layout is None:
layout = plotting.Layout()
layout.x_label = '$z~/~m$'
layout.y_label = '$c~/~mM$'
ax = _plot_solution_1D(ax, x, solution, layout, *args, **kwargs)
plotting.add_text(ax, f'time = {t:.2f} s')
return ax
[docs]
@plotting.create_and_save_figure
def plot_at_position(
self,
z,
components=None,
layout=None,
ax=None,
*args,
**kwargs,
):
"""Plot bulk solution over time at given position.
Parameters
----------
z : float
Position for plotting.
components : list, optional.
List of components to be plotted. If None, all components are plotted.
layout : plotting.Layout
Plot layout options.
ax : Axes
Axes to plot on.
Returns
-------
ax : Axes
Axes object with concentration profile.
See Also
--------
_plot_solution_1D
slice_solution
plot_at_position
plotlib
"""
solution = slice_solution(
self,
components=components,
use_total_concentration=False,
use_total_concentration_components=False,
coordinates={'axial_coordinates': [z, z]},
)
x = self.time / 60
if layout is None:
layout = plotting.Layout()
layout.x_label = '$time~/~min$'
layout.y_label = '$c~/~mM$'
ax = _plot_solution_1D(ax, x, solution, layout, *args, **kwargs)
plotting.add_text(ax, f'z = {z:.2f} m')
return ax
[docs]
class SolutionParticle(SolutionBase):
"""Solution in the particle liquid phase of the ``UnitOperation``.
Dimensions: NCOL * NRAD * sum_{j}^{NPARTYPE}{NCOMP * NPAR,j}
"""
dimensions = SolutionBase.dimensions + [
'axial_coordinates',
'radial_coordinates',
'particle_coordinates',
'component_coordinates',
]
def __init__(
self,
name,
component_system,
time, solution,
axial_coordinates=None,
radial_coordinates=None,
particle_coordinates=None
):
self.axial_coordinates = axial_coordinates
# Account for dimension reduction in case of only one cell (e.g. LRMP)
if radial_coordinates is not None and len(radial_coordinates) == 1:
radial_coordinates = None
self.radial_coordinates = radial_coordinates
# Account for dimension reduction in case of only one cell (e.g. CSTR)
if particle_coordinates is not None and len(particle_coordinates) == 1:
particle_coordinates = None
self.particle_coordinates = particle_coordinates
super().__init__(name, component_system, time, solution)
@property
def ncol(self):
"""int: Number of axial discretization points."""
if self.axial_coordinates is None:
return
return len(self.axial_coordinates)
@property
def nrad(self):
"""int: Number of radial discretization points."""
if self.radial_coordinates is None:
return
return len(self.radial_coordinates)
@property
def npar(self):
if self.particle_coordinates is None:
return
return len(self.particle_coordinates)
def _plot_1D(
self,
t,
components=None,
layout=None,
ax=None,
*args,
**kwargs,
):
"""Plot bulk solution over space at given time.
Parameters
----------
t : float
Time for plotting
If t == -1, the final solution is plotted.
components : list, optional.
List of components to be plotted. If None, all components are plotted.
layout : plotting.Layout
Plot layout options.
ax : Axes
Axes to plot on.
Returns
-------
ax : Axes
Axes object with concentration profile.
See Also
--------
_plot_solution_1D
_plot_2D
slice_solution
plotlib
"""
solution = slice_solution(
self,
components=components,
use_total_concentration=False,
use_total_concentration_components=False,
coordinates={'time': [t, t]},
)
x = self.axial_coordinates
if layout is None:
layout = plotting.Layout()
layout.x_label = '$z~/~m$'
layout.y_label = '$c~/~mM$'
ax = _plot_solution_1D(ax, x, solution, layout, *args, **kwargs)
plotting.add_text(ax, f'time = {t:.2f} s')
return ax
def _plot_2D(self, t, comp, vmax, ax=None):
x = self.axial_coordinates
y = self.particle_coordinates
if not self.time[0] <= t <= self.time[-1]:
raise ValueError("Time exceeds bounds.")
i = np.where(t <= self.time)[0][0]
v = self.solution[i, :, :, comp].transpose()
if vmax is None:
vmax = v.max()
try:
mesh = ax.get_children()[0]
mesh.set_array(v.flatten())
except:
mesh = ax.pcolormesh(x, y, v, shading='gouraud', vmin=0, vmax=vmax)
plotting.add_text(ax, f'time = {t:.2f} s')
layout = plotting.Layout()
layout.x_label = '$z~/~m$'
layout.y_label = '$r~/~m$'
layout.title = f'Solid phase concentration, comp={comp}'
plotting.set_layout(ax, layout)
plt.colorbar(mesh)
return ax
[docs]
@plotting.create_and_save_figure
def plot_at_time(self, t, comp=0, vmax=None, ax=None):
"""Plot particle liquid solution over space at given time.
Parameters
----------
t : float
time for plotting
comp : int
component index
ax : Axes
Axes to plot on.
See Also
--------
CADETProcess.plotting
"""
if self.npar is None:
ax = self._plot_1D(ax, t, vmax)
else:
ax = self._plot_2D(ax, t, comp, vmax)
return ax
[docs]
class SolutionSolid(SolutionBase):
"""Solution in the particle solid phase of the ``UnitOperation``.
Dimensions: NCOL * NRAD * sum_{j}^{NPARTYPE}{NBOUND,j * NPAR,j}
"""
n_bound = UnsignedInteger()
dimensions = SolutionBase.dimensions + [
'axial_coordinates',
'radial_coordinates',
'particle_coordinates',
'component_coordinates',
]
def __init__(
self,
name,
component_system, bound_states,
time, solution,
axial_coordinates=None,
radial_coordinates=None,
particle_coordinates=None):
self.bound_states = bound_states
self.axial_coordinates = axial_coordinates
# Account for dimension reduction in case of only one cell (e.g. LRMP)
if radial_coordinates is not None and len(radial_coordinates) == 1:
radial_coordinates = None
self.radial_coordinates = radial_coordinates
# Account for dimension reduction in case of only one cell (e.g. CSTR)
if particle_coordinates is not None and len(particle_coordinates) == 1:
particle_coordinates = None
self.particle_coordinates = particle_coordinates
super().__init__(name, component_system, time, solution)
@property
def n_comp(self):
"""int: Number of components."""
return sum(self.bound_states)
@property
def ncol(self):
"""int: Number of axial discretization points."""
if self.axial_coordinates is None:
return None
else:
return len(self.axial_coordinates)
@property
def nrad(self):
"""int: Number of radial discretization points."""
if self.radial_coordinates is None:
return
return len(self.radial_coordinates)
@property
def npar(self):
if self.particle_coordinates is None:
return
return len(self.particle_coordinates)
[docs]
@plotting.create_and_save_figure
def plot(
self,
start=None,
end=None,
components=None,
layout=None,
y_max=None,
ax=None,
*args,
**kwargs,
):
"""Plot the entire time_signal for each component.
Parameters
----------
start : float, optional
Start time for plotting. The default is 0.
end : float, optional
End time for plotting.
components : list, optional.
List of components to be plotted. If None, all components are plotted.
layout : plotting.Layout
Plot layout options.
y_max : float, optional
Maximum value of y axis.
If None, value is automatically deferred from solution.
ax : Axes
Axes to plot on.
Returns
-------
ax : Axes
Axes object with concentration profile.
Raises CADETProcessError
If solution is not 1D.
See Also
--------
_plot_solution_1D
slice_solution
plotlib
plot_purity
"""
if not (self.ncol is None and self.nrad is None):
raise CADETProcessError(
"Solution has more than single dimension. "
"Please use `plot_at_time`."
)
solution = slice_solution(
self,
components=components,
use_total_concentration=False,
use_total_concentration_components=False,
coordinates={'time': [start, end]}
)
x = solution.time / 60
if layout is None:
layout = plotting.Layout()
layout.x_label = '$time~/~min$'
layout.y_label = '$c~/~mM$'
if start is not None:
start /= 60
if end is not None:
end /= 60
layout.x_lim = (start, end)
if y_max is not None:
layout.y_lim = (None, y_max)
ax = _plot_solution_1D(ax, x, solution, layout, *args, **kwargs)
return ax
def _plot_1D(
self,
t,
components=None,
layout=None,
ax=None,
*args,
**kwargs,
):
"""Plot bulk solution over space at given time.
Parameters
----------
t : float
Time for plotting
If t == -1, the final solution is plotted.
components : list, optional.
List of components to be plotted. If None, all components are plotted.
layout : plotting.Layout
Plot layout options.
ax : Axes
Axes to plot on.
Returns
-------
ax : Axes
Axes object with concentration profile.
See Also
--------
_plot_solution_1D
slice_solution
plot_at_position
plotlib
"""
solution = slice_solution(
self,
components=components,
use_total_concentration=False,
use_total_concentration_components=False,
coordinates={'time': [t, t]},
)
x = self.axial_coordinates
if layout is None:
layout = plotting.Layout()
layout.x_label = '$z~/~m$'
layout.y_label = '$c~/~mM$'
ax = _plot_solution_1D(ax, x, solution, layout, *args, **kwargs)
plotting.add_text(ax, f'time = {t:.2f} s')
return ax
def _plot_2D(self, t, comp, state, vmax, ax=None):
x = self.axial_coordinates
y = self.particle_coordinates
if not self.time[0] <= t <= self.time[-1]:
raise ValueError("Time exceeds bounds.")
t_i = np.where(t <= self.time)[0][0]
c_i = comp*self.n_bound + state
v = self.solution[t_i, :, :, c_i].transpose()
if vmax is None:
vmax = v.max()
mesh = ax.pcolormesh(x, y, v, shading='gouraud', vmin=0, vmax=vmax)
plotting.add_text(ax, f'time = {t:.2f} s')
layout = plotting.Layout()
layout.title = f'Solid phase concentration, comp={comp}, state={state}'
layout.x_label = '$z~/~m$'
layout.y_label = '$r~/~m$'
layout.labels = self.component_system.species[c_i]
plotting.set_layout(ax, layout)
return ax, mesh
[docs]
@plotting.create_and_save_figure
def plot_at_time(self, t, comp=0, state=0, vmax=None, ax=None):
"""Plot particle solid solution over space at given time.
Parameters
----------
t : float
time for plotting
comp : int, optional
component index
state : int, optional
bound state
vmax : float, optional
Maximum values for plotting.
ax : Axes
Axes to plot on.
See Also
--------
CADETProcess.plotting
"""
if self.npar is None:
ax = self._plot_1D(ax, t, vmax)
else:
ax, mesh = self._plot_2D(ax, t, comp, vmax)
plt.colorbar(mesh, ax)
return ax
[docs]
class SolutionVolume(SolutionBase):
"""Volume solution (of e.g. CSTR)."""
@property
def solution_shape(self):
"""tuple: (Expected) shape of the solution"""
return (self.nt, 1)
[docs]
@plotting.create_and_save_figure
def plot(self, start=None, end=None, ax=None, update_layout=True, **kwargs):
"""Plot the whole time_signal for each component.
Parameters
----------
start : float
start time for plotting
end : float
end time for plotting
ax : Axes
Axes to plot on.
See Also
--------
CADETProcess.plot
"""
x = self.time / 60
y = self.solution * 1000
y_min = np.min(y)
y_max = 1.1 * np.max(y)
layout = plotting.Layout()
layout.x_label = '$time~/~min$'
layout.y_label = '$V~/~L$'
if start is not None:
start /= 60
if end is not None:
end /= 60
layout.x_lim = (start, end)
layout.y_lim = (y_min, y_max)
ax.plot(x, y, **kwargs)
if update_layout:
plotting.set_layout(ax, layout)
return ax
def _plot_solution_1D(
ax,
x,
solution,
layout=None,
plot_species=False,
plot_components=True,
plot_total_concentration=False,
alpha=1, hide_labels=False, hide_species_labels=False,
secondary_axis=None, secondary_layout=None,
show_legend=True,
update_layout=True,
):
sol = solution.solution
c_total_comp = solution.total_concentration_components
if secondary_axis is not None:
ax_secondary = ax.twinx()
else:
ax_secondary = None
species_index = 0
y_min = 0
y_max = 0
y_min_sec = 0
y_max_sec = 0
colors = iter(plt.rcParams["axes.prop_cycle"].by_key()["color"])
for i, comp in enumerate(solution.component_system.components):
color = next(colors)
if hide_labels or not update_layout:
label = None
else:
label = comp.name
if secondary_axis is not None and comp.name in secondary_axis.components:
a = ax_secondary
else:
a = ax
if plot_components:
y_comp = c_total_comp[..., i]
if secondary_axis is not None and comp.name in secondary_axis.components:
if secondary_axis.transform is not None:
y_comp = secondary_axis.transform(y_comp)
y_min_sec = min(min(y_comp), y_min_sec)
y_max_sec = max(max(y_comp), y_max_sec)
else:
y_min = np.min((np.min(y_comp), y_min))
y_max = np.max((np.max(y_comp), y_max))
y_comp = np.squeeze(y_comp)
a.plot(
x, y_comp,
linestyle='-',
color=color,
alpha=alpha,
label=label,
)
if plot_species:
if comp.n_species == 1:
if plot_components:
species_index += 1
continue
linestyle_iter = iter(plotting.linestyle_cycler)
for s, species in enumerate(comp.label):
if hide_species_labels or not update_layout:
label = None
else:
label = species
if comp.n_species == 1 \
and not plot_total_concentration\
and not plot_components:
linestyle = '-'
else:
linestyle = next(linestyle_iter)['linestyle']
if secondary_axis is not None \
and secondary_axis.transform is not None \
and comp.name in secondary_axis.components:
y_spec = secondary_axis.transform(sol[..., species_index])
y_min_sec = min(min(y_spec), y_min_sec)
y_max_sec = max(max(y_spec), y_max_sec)
else:
y_spec = sol[..., species_index]
y_min = min(min(y_spec), y_min)
y_max = max(max(y_spec), y_max)
y_spec = np.squeeze(y_spec)
a.plot(
x, y_spec,
linestyle=linestyle,
label=label,
color=color,
alpha=alpha
)
species_index += 1
if plot_total_concentration:
if hide_labels or not update_layout:
label = None
else:
label = 'Total concentration'
y_total = solution.total_concentration
y_total = np.squeeze(y_total)
y_min = min(min(y_total), y_min)
y_max = max(max(y_total), y_max)
a.plot(
x, y_total, '-',
label=label,
color='k',
alpha=alpha
)
if layout.y_lim is None:
layout.y_lim = (y_min, 1.1*y_max)
if secondary_axis is not None and secondary_layout is None:
secondary_layout = plotting.Layout()
secondary_layout.y_label = secondary_axis.y_label
secondary_layout.y_lim = (y_min_sec, 1.1*y_max_sec)
if update_layout:
plotting.set_layout(
ax,
layout,
show_legend,
ax_secondary,
secondary_layout,
)
return ax
class InterpolatedSignal():
def __init__(self, time, signal):
if len(signal.shape) == 1:
signal = np.array(signal, ndmin=2).transpose()
self._solutions = [
PchipInterpolator(time, signal[:, comp])
for comp in range(signal.shape[1])
]
self._derivatives = [signal.derivative() for signal in self._solutions]
self._antiderivatives = [signal.antiderivative() for signal in self._solutions]
@property
def solutions(self):
return self._solutions
@property
def derivatives(self):
return self._derivatives
def derivative(self, time):
"""Derivatives of the solution spline(s) at given time points.
Parameters
----------
time : np.ndarray
The time points to evaluate the derivatives at.
Returns
-------
der : ndarray
Derivatives of the solution spline(s) at given time.
"""
der = np.empty((len(time), len(self.solutions)))
for comp, der_i in enumerate(self.derivatives):
der[:, comp] = der_i(time)
return der
@property
def antiderivatives(self):
return self._antiderivatives
def antiderivative(self, time):
""" Return all antiderivative of the spline(s) at given time.
x : np.ndarray
The time points to evaluate the antiderivatives at.
Returns
-------
anti : ndarray
Antiderivatives of the solution spline(s) at given time.
"""
anti = np.empty((len(time), len(self.solutions)))
for comp, anti_i in enumerate(self.antiderivatives):
anti[:, comp] = anti_i(time)
return anti
def integral(self, start=None, end=None):
"""Definite integral between start and end.
Parameters
----------
start : float
Lower integration bound.
end : end
Upper integration bound.
Returns
-------
integral : np.ndarray
Definite integral of the solution spline(s) between limits.
"""
return np.array([
self._solutions[comp].integral(start, end)
for comp in range(len(self._solutions))
])
def __call__(self, t):
return np.array([
self._solutions[comp](t) for comp in range(len(self._solutions))
]).transpose()
[docs]
def slice_solution(
solution_original,
components=None,
use_total_concentration=False,
use_total_concentration_components=False,
coordinates=None):
"""Slice a `Solution` object along specified dimensions, components or both.
Parameters
----------
solution_original : Solution
The `Solution` object to slice.
components : str or list of str, optional
The names of the components to keep in the sliced `Solution`.
If `None`, all components are kept. Defaults to `None`.
use_total_concentration : bool, optional
If `True`, only the total concentration data is kept in the sliced `Solution`.
Defaults to `False`.
use_total_concentration_components : bool, optional
If `True`, the total concentration data is kept for each individual species
of each component in the sliced `Solution`. Defaults to `False`.
coordinates : dict, optional
A dictionary mapping dimensions to slice coordinates. Each dimension in the
`Solution` object is represented by a key in the dictionary, and the corresponding
value is a tuple of two or three elements specifying the start, stop and step
coordinates of the slice along that dimension. If a value is `None`, the corresponding
coordinate is not sliced. Defaults to `None`.
Returns
-------
Solution
A new `Solution` object representing the sliced data.
Raises
------
ValueError
If any of the slice coordinates exceeds the bounds of its corresponding dimension.
CADETProcessError
If any of the specified components or dimensions does not exist in the original `Solution`.
"""
solution = copy.deepcopy(solution_original)
slices = ()
if coordinates is not None:
coordinates = copy.deepcopy(coordinates)
for i, (dim, coord) in enumerate(solution.coordinates.items()):
if dim == 'component_coordinates':
continue
if dim in coordinates:
sl = list(coordinates.pop(dim))
if sl[0] is None:
sl[0] = coord[0]
if sl[1] is None:
sl[1] = coord[-1]
if not coord[0] <= sl[0] <= sl[1] <= coord[-1]:
raise ValueError(f"{dim} coordinates exceed bounds.")
start_index = np.where(coord >= sl[0])[0][0]
if sl[1] is not None:
end_index = np.where(coord >= sl[1])[0][0] + 1
else:
end_index = None
sl = slice(start_index, end_index)
slices += (sl,)
setattr(solution, dim, coord[sl])
else:
slices += (slice(None),)
if len(coordinates) > 0:
raise CADETProcessError(f"Unknown dimensions: {coordinates.keys()}")
solution.solution = solution_original.solution[slices]
# First calculate total_concentration of components, if required.
if use_total_concentration_components:
sol_total_concentration_comp = solution.total_concentration_components
for comp in solution.component_system:
if comp.n_species > 1:
comp._species = []
comp.add_species(comp.name)
solution.solution = sol_total_concentration_comp
# Then, slice components. Note that component index can only be used if total
# concentration of components has already been calculated and set as solution array.
if components is not None:
if not isinstance(components, list):
components = [components]
components = copy.deepcopy(components)
component_system = copy.deepcopy(solution.component_system)
component_indices = []
for i, (name, component) in enumerate(
solution.component_system.components_dict.items()):
if name not in components:
component_system.remove_component(component.name)
else:
name = str(name)
components.remove(name)
if use_total_concentration_components:
component_indices.append(i)
else:
component_indices += solution.component_system.indices[component.name]
if len(components) != 0:
raise CADETProcessError(f"Unknown components: {components}")
solution_components = solution.solution[..., component_indices]
solution.component_system = component_system
solution.solution = solution_components
# Only calculate total concentration after removing unwanted components.
if use_total_concentration:
solution_total_concentration = solution.total_concentration
solution.component_system = ComponentSystem(['total_concentration'])
solution.solution = solution_total_concentration
solution.update()
solution.update_transform()
return solution