from abc import abstractmethod
import copy
from functools import wraps
from warnings import warn
import numpy as np
from scipy.integrate import simpson
from scipy.special import expit
from CADETProcess import CADETProcessError
from CADETProcess.dataStructure import UnsignedInteger
from CADETProcess.solution import SolutionBase, slice_solution
from CADETProcess.metric import MetricBase
from .shape import pearson, pearson_offset
from .peaks import find_peaks, find_breakthroughs
__all__ = [
'DifferenceBase',
'calculate_sse', 'calculate_rmse',
'SSE', 'RMSE', 'NRMSE',
'Norm', 'L1', 'L2',
'AbsoluteArea', 'RelativeArea',
'Shape',
'PeakHeight', 'PeakPosition',
'BreakthroughHeight', 'BreakthroughPosition',
]
def squishify(*args, **kwargs):
warn(
'This function is deprecated, use sigmoid_distance.',
DeprecationWarning, stacklevel=2
)
return sigmoid_distance(*args, **kwargs)
def sigmoid_distance(measurement, target, normalization=1):
"""Calculate the distance between two values using a sigmoid function.
The distance is defined as the sigmoid transformation of the difference between the
measurement and target values, normalized by a user-specified factor.
Parameters
----------
measurement : float or numpy.ndarray
The measurement value(s).
target : float or numpy.ndarray
The target value(s).
normalization : float, optional
The factor to use for normalization. Default is 1.
Returns
-------
float or numpy.ndarray
The sigmoid distance between the measurement and target values.
Examples
--------
>>> sigmoid_distance(3, 5)
0.7310585786300049
>>> sigmoid_distance([1, 2, 3], [3, 2, 1])
array([0.73105858, 0.5 , 0.26894142])
"""
input = (measurement - target) / normalization
output = np.abs(2 * expit(input) - 1)
return output
[docs]
class DifferenceBase(MetricBase):
"""
Base class for difference metric evaluation between a reference and a solution.
Parameters
----------
reference : ReferenceIO
Reference used for calculating difference metric.
components : {str, list}, optional
Solution components to be considered.
If None, all components are considered. The default is None.
use_total_concentration : bool, optional
If True, use sum of all components. The default is False.
use_total_concentration_components : bool, optional
If True, sum concentration of species. The default is True.
start : float, optional
End time of solution slice to be considered. The default is None.
end : float, optional
End time of solution slice to be considered. The default is None.
transform : callable, optional
Function to transform solution. The default is None.
resample : bool, optional
If True, resample data. The default is True.
smooth : bool, optional
If True, smooth data. The default is False.
normalize : bool, optional
If True, normalize data. The default is False.
"""
def __init__(
self,
reference,
components=None,
use_total_concentration=False,
use_total_concentration_components=True,
start=None,
end=None,
transform=None,
resample=True,
smooth=False,
normalize=False):
"""Initialize an instance of DifferenceBase.
Parameters
----------
reference : ReferenceIO
Reference used for calculating difference metric.
components : {str, list}, optional
Solution components to be considered.
If None, all components are considered. The default is None.
use_total_concentration : bool, optional
If True, use sum of all components. The default is False.
use_total_concentration_components : bool, optional
If True, sum concentration of species. The default is True.
start : float, optional
End time of solution slice to be considerd. The default is None.
end : float, optional
End time of solution slice to be considerd. The default is None.
transform : callable, optional
Function to transform solution. The default is None.
resample : bool, optional
If True, resample data. The default is True.
smooth : bool, optional
If True, smooth data. The default is False.
normalize : bool, optional
If True, normalize data. The default is False.
"""
self.components = components
self.use_total_concentration = use_total_concentration
self.use_total_concentration_components = \
use_total_concentration_components
self.start = start
self.end = end
self.transform = transform
self.resample = resample
self.smooth = smooth
self.normalize = normalize
self.reference = reference
@property
def n_metrics(self):
"""int: Number of metrics."""
return self.reference.solution.shape[-1]
@property
def bad_metrics(self):
"""list: Worst case values for each metric."""
return self.n_metrics * [np.inf]
@property
def reference(self):
"""SolutionBase: The reference Solution, sliced and transformed."""
return self._reference_sliced_and_transformed
@reference.setter
def reference(self, reference):
if not isinstance(reference, SolutionBase):
raise TypeError("Expected SolutionBase")
self._reference = copy.deepcopy(reference)
if self.resample and not self._reference.is_resampled:
self._reference.resample()
if self.normalize and not self._reference.is_normalized:
self._reference.normalize()
if self.smooth and not self._reference.is_smoothed:
self._reference.smooth_data()
reference = slice_solution(
self._reference,
self.components,
self.use_total_concentration,
self.use_total_concentration_components,
coordinates={'time': (self.start, self.end)}
)
self._reference_sliced_and_transformed = reference
[docs]
def checks_dimensions(func):
"""Decorator to automatically check reference and solution dimensions."""
@wraps(func)
def wrapper(self, solution, *args, **kwargs):
if solution.solution.shape != self.reference.solution.shape:
raise CADETProcessError(
"Simulation shape does not match experiment"
)
value = func(self, solution, *args, **kwargs)
return value
return wrapper
[docs]
def slices_solution(func):
"""Decorator to automatically slice solution."""
@wraps(func)
def wrapper(self, solution, slice=True, *args, **kwargs):
if slice:
solution = slice_solution(
solution,
self.components,
self.use_total_concentration,
self.use_total_concentration_components,
coordinates={'time': (self.start, self.end)}
)
value = func(self, solution, *args, **kwargs)
return value
return wrapper
[docs]
def resamples_smoothes_and_normalizes_solution(func):
"""Decorator to automatically smooth and normalize solution."""
@wraps(func)
def wrapper(self, solution, *args, **kwargs):
solution = copy.deepcopy(solution)
solution.resample(
self._reference.time[0],
self._reference.time[-1],
len(self._reference.time),
)
if self.normalize and not solution.is_normalized:
solution.normalize()
if self.smooth and not solution.is_smoothed:
solution.smooth_data()
value = func(self, solution, *args, **kwargs)
return value
return wrapper
@abstractmethod
def _evaluate(self):
"""Abstract method to compute the difference metric.
Returns
-------
np.ndarray
The computed difference metric.
"""
return
[docs]
@resamples_smoothes_and_normalizes_solution
@slices_solution
@transforms_solution
@checks_dimensions
def evaluate(self, solution):
"""Compute the difference between the reference solution and the input solution.
Parameters
----------
solution : Solution
The solution to compare with the reference solution.
Returns
-------
np.ndarray
The difference between the two solutions.
"""
metric = self._evaluate(solution)
return metric
__call__ = evaluate
def calculate_sse(simulation, reference):
"""Calculate the sum of squared errors (SSE) between simulation and reference.
Parameters
----------
simulation : np.ndarray
Array of simulated values.
reference : np.ndarray
Array of reference values.
Returns
-------
np.ndarray
The SSE between simulation and reference.
"""
return np.sum((simulation - reference) ** 2, axis=0)
[docs]
class SSE(DifferenceBase):
"""Sum of squared errors (SSE) difference metric."""
def _evaluate(self, solution):
sse = calculate_sse(solution.solution, self.reference.solution)
return sse
def calculate_rmse(simulation, reference):
"""Calculate the root mean squared error (RMSE) between simulation and reference.
Parameters
----------
simulation : np.ndarray
Array of simulated values.
reference : np.ndarray
Array of reference values.
Returns
-------
np.ndarray
The RMSE between simulation and reference.
"""
return np.sqrt(np.mean((simulation - reference) ** 2, axis=0))
[docs]
class RMSE(DifferenceBase):
"""Root mean squared errors (RMSE) difference metric."""
def _evaluate(self, solution):
rmse = calculate_rmse(solution.solution, self.reference.solution)
return rmse
[docs]
class NRMSE(DifferenceBase):
"""Normalized root mean squared errors (RRMSE) difference metric."""
def _evaluate(self, solution):
rmse = calculate_rmse(solution.solution, self.reference.solution)
nrmse = rmse / np.max(self.reference.solution, axis=0)
return nrmse
[docs]
class Norm(DifferenceBase):
"""Norm difference metric.
Attributes
----------
order : int
The order of the norm.
"""
order = UnsignedInteger()
def _evaluate(self, solution):
norm = np.linalg.norm(
(solution.solution - self.reference.solution), ord=self.order
)
return norm
[docs]
class L1(Norm):
"""L1 norm difference metric."""
order = 1
[docs]
class L2(Norm):
"""L2 norm difference metric."""
order = 2
[docs]
class AbsoluteArea(DifferenceBase):
"""Absolute difference in area difference metric."""
def _evaluate(self, solution):
"""np.array: Absolute difference in area compared to reference.
Parameters
----------
solution : SolutionIO
Concentration profile of simulation.
"""
area_ref = simpson(
self.reference.solution, self.reference.time, axis=0
)
area_sol = simpson(solution.solution, solution.time, axis=0)
return abs(area_ref - area_sol)
[docs]
class RelativeArea(DifferenceBase):
"""Relative difference in area difference metric."""
def _evaluate(self, solution):
"""np.array: Relative difference in area compared to reference.
Parameters
----------
solution : SolutionIO
Concentration profile of simulation.
"""
area_ref = simpson(
self.reference.solution, x=self.reference.time, axis=0
)
area_new = simpson(solution.solution, x=solution.time, axis=0)
return abs(area_ref - area_new)/area_ref
[docs]
class Shape(DifferenceBase):
"""Shape similarity difference metric.
The similarity is calculated using the Pearson correlation between the reference
and solution profiles, as well as the time offset between their peak positions, and
the peak height of the solution profile. Additionally, if `use_derivative` is set to
True, the similarity is also calculated using the Pearson correlation of the derivative
profiles, and the minimum and maximum peak heights of the derivative profile.
Attributes
----------
n_metrics : int
Number of similarity metrics calculated by the class.
labels : list of str
List of labels for each similarity metric calculated by the class.
Raises
------
CADETProcessError
If `components` is not None and has more than one element.
Notes
-----
Currently, this class only works for single-component systems with one peak.
"""
@wraps(DifferenceBase.__init__)
def __init__(
self, *args,
use_derivative=True, normalize_metrics=True, normalization_factor=None,
include_height=True,
**kwargs):
"""Initialize Shape metric.
Parameters
----------
*args :
Positional arguments for DifferenceBase.
use_derivative : bool, optional
If True, use the derivative profiles to calculate similarity.
Default is True.
normalize_metrics : bool, optional
If True, normalize the similarity metrics to a range of [0, 1] using a
sigmoid function.
Default is True.
normalization_factor : float, optional
Normalization factor used by the sigmoid function.
Default is None, which sets it to 1/10 of the simulation time.
include_height : bool, optional
If True, also return a metric for the height difference.
If `use_derivative` is also true, also return height differences
for the derivatives.
**kwargs : dict
Keyword arguments passed to the base class constructor.
"""
super().__init__(*args, **kwargs)
if self.reference.n_comp > 1 and not self.use_total_concentration:
if self.components is not None and len(self.components) == 1:
pass
else:
raise CADETProcessError(
"Shape currently only supports single component."
)
self.include_height = include_height
if include_height:
warn("Peak height will be removed from the Shape difference metric.", DeprecationWarning)
self.peak_height = PeakHeight(
*args, normalize=False, normalize_metrics=normalize_metrics, **kwargs
)
self.use_derivative = use_derivative
if use_derivative:
self.reference_der = self.reference.derivative
self.reference_der.resample(
start=self._reference.time[0],
end=self._reference.time[-1],
nt=len(self._reference.time)
)
self.reference_der_sliced = slice_solution(
self.reference_der,
None,
self.use_total_concentration,
self.use_total_concentration_components,
coordinates={'time': (self.start, self.end)}
)
self.peak_der_min = PeakHeight(
self.reference_der, *args[1:], normalize=False,
find_minima=True, normalize_metrics=normalize_metrics,
**kwargs
)
self.peak_der_max = PeakHeight(
self.reference_der, *args[1:], normalize=False,
find_minima=False, normalize_metrics=normalize_metrics,
**kwargs
)
self.normalize_metrics = normalize_metrics
if normalization_factor is None:
normalization_factor = self.reference.time[-1]/10
self.normalization_factor = normalization_factor
@property
def n_metrics(self):
n_metrics = 2
if self.include_height:
n_metrics += 1
if self.use_derivative:
n_metrics += 1
if self.include_height:
n_metrics += 2
return n_metrics
@property
def labels(self):
labels = ['Pearson Correleation', 'Time offset']
if self.include_height:
labels += ["Peak Height"]
if self.use_derivative:
labels += ['Pearson Correlation Derivative']
if self.include_height:
labels += [
'Peak Minimum Derivative',
'Peak Maximum Derivative'
]
return labels
def _evaluate(self, solution):
"""np.array: Shape similarity using pearson correlation.
Parameters
----------
solution : SolutionIO
Concentration profile of simulation.
"""
corr, offset_original = pearson(
self.reference.time,
self.reference.solution_interpolated.solutions[0],
solution.solution_interpolated.solutions[0],
)
if self.include_height:
peak_height = self.peak_height(solution, slice=False)
if self.normalize_metrics:
offset = sigmoid_distance(
offset_original, target=0, normalization=self.normalization_factor
)
else:
offset = np.abs(offset_original)
if not self.use_derivative:
if self.include_height:
return np.array([corr, offset, peak_height[0]])
elif not self.include_height:
return np.array([corr, offset])
solution_der = solution.derivative
solution_der_sliced = self.slice_and_transform(solution_der)
corr_der = pearson_offset(
self.reference_der_sliced.time,
self.reference_der_sliced.solution_interpolated.solutions[0],
solution_der_sliced.solution_interpolated.solutions[0],
offset_original,
)
der_min = self.peak_der_min(solution_der_sliced, slice=False)
der_max = self.peak_der_max(solution_der_sliced, slice=False)
if self.include_height:
return np.array(
[
corr, offset, peak_height[0],
corr_der, der_min[0], der_max[0]
]
)
elif not self.include_height:
return np.array(
[
corr, offset,
corr_der,
]
)
[docs]
class PeakHeight(DifferenceBase):
"""Absolute difference in peak height difference metric.
Attributes
----------
find_minima : bool
Indicates whether the minima instead of maxima of the peaks are found.
reference_peaks : list of tuple
Contains the peaks found in the reference.
normalize_metrics : bool
Indicates whether normalization is applied to the peak height difference scores.
normalization_factor : list of list
Contains the normalization factors for each peak in each component.
"""
@wraps(DifferenceBase.__init__)
def __init__(
self, *args,
find_minima=False, normalize_metrics=True, normalization_factor=None,
**kwargs):
"""Initialize the PeakHeight object.
Parameters
----------
*args : tuple
Positional arguments passed to the base class constructor.
find_minima : bool, optional
If True, finds the minima instead of maxima of the peaks, by default False.
normalize_metrics : bool, optional
If True, applies normalization to the peak height difference scores.
The default is True.
normalization_factor : int or None, optional
If not None, sets the normalization factor to a constant value for all peaks.
If None, calculates the normalization factor based on the reference peaks
The default is None.
**kwargs : dict
Keyword arguments passed to the base class constructor.
"""
super().__init__(*args, **kwargs)
self.find_minima = find_minima
self.reference_peaks = find_peaks(
self.reference, find_minima=find_minima
)
self.normalize_metrics = normalize_metrics
if normalization_factor is None:
normalization_factor = [
[peak[1] for peak in self.reference_peaks[i]]
for i in range(self.reference.n_comp)
]
else:
normalization_factor = [
len(self.reference_peaks[i])*[normalization_factor]
for i in range(self.reference.n_comp)
]
self.normalization_factor = normalization_factor
@property
def n_metrics(self):
return sum([len(peak) for peak in self.reference_peaks])
def _evaluate(self, solution):
"""np.array: Difference in peak height (concentration).
Parameters
----------
solution : SolutionIO
Concentration profile of simulation.
"""
solution_peaks = find_peaks(solution, find_minima=self.find_minima)
if self.normalize_metrics:
score = [
sigmoid_distance(sol[1], ref[1], factor)
for i in range(self.reference.n_comp)
for ref, sol, factor in zip(
self.reference_peaks[i],
solution_peaks[i],
self.normalization_factor
)
]
else:
score = [
ref[1] - sol[1]
for i in range(self.reference.n_comp)
for ref, sol in zip(self.reference_peaks[i], solution_peaks[i])
]
return np.abs(score).flatten()
[docs]
class PeakPosition(DifferenceBase):
"""Absolute difference in peak peak position difference metric.
Attributes
----------
reference_peaks : list of tuple
Contains the peaks found in the reference.
normalize_metrics : bool
Indicates whether normalization is applied to the peak height difference scores.
normalization_factor : list of list
Contains the normalization factors for each peak in each component.
"""
@wraps(DifferenceBase.__init__)
def __init__(self, *args, normalize_metrics=True, normalization_factor=None, **kwargs):
"""Initialize PeakPosition object.
Parameters
----------
*args : tuple
Positional arguments to pass to the parent class constructor.
normalize_metrics : bool, optional
Whether to normalize the difference metrics, by default True.
normalization_factor : float or list of float, optional
Normalization factor(s) to use for the difference metric(s). If a single
float is given, it will be used for all the metrics. If a list of floats is
given, it must have one element per component in the reference, by default
None.
**kwargs : dict
Keyword arguments passed to the base class constructor.
"""
super().__init__(*args, **kwargs)
self.reference_peaks = find_peaks(self.reference)
self.normalize_metrics = normalize_metrics
if normalization_factor is None:
normalization_factor = [
[peak[0] for peak in self.reference_peaks[i]]
for i in range(self.reference.n_comp)
]
else:
normalization_factor = [
len(self.reference_peaks[i])*[normalization_factor]
for i in range(self.reference.n_comp)
]
# if normalization_factor is None:
# normalization_factor = self.reference.time[-1]/10
self.normalization_factor = normalization_factor
@property
def n_metrics(self):
return sum([len(peak) for peak in self.reference_peaks])
def _evaluate(self, solution):
"""np.array: Difference in peak position (time).
Parameters
----------
solution : SolutionIO
Concentration profile of simulation.
"""
solution_peaks = find_peaks(solution)
if self.normalize_metrics:
score = [
sigmoid_distance(sol[0], ref[0], factor)
for i in range(self.reference.n_comp)
for ref, sol, factor in zip(
self.reference_peaks[i],
solution_peaks[i],
self.normalization_factor[i]
)
]
else:
score = [
ref[0] - sol[0]
for i in range(self.reference.n_comp)
for ref, sol in zip(self.reference_peaks[i], solution_peaks[i])
]
return np.abs(score).flatten()
[docs]
class BreakthroughHeight(DifferenceBase):
"""Absolute difference in breakthrough curve height difference metric.
Attributes
----------
normalize_metrics : bool
Whether to normalize the difference scores based on a sigmoid function.
reference_bt : list of tuple
List of breakthrough curves in the reference solution.
"""
@wraps(DifferenceBase.__init__)
def __init__(self, *args, normalize_metrics=True, **kwargs):
"""Initialize BreakthroughHeight metric.
Parameters
----------
*args : tuple
Positional arguments to pass to the base class constructor.
normalize_metrics : bool, optional
Whether to normalize the difference scores based on a sigmoid function.
Default is True.
**kwargs : dict
Keyword arguments passed to the base class constructor.
"""
super().__init__(*args, **kwargs)
self.normalize_metrics = normalize_metrics
self.reference_bt = find_breakthroughs(self.reference)
@property
def n_metrics(self):
return len(self.reference_bt)
def _evaluate(self, solution):
"""np.array: Difference in breakthrough height (concentration).
Parameters
----------
solution : SolutionIO
Concentration profile of simulation.
"""
solution_bt = find_breakthroughs(solution)
if self.normalize_metrics:
score = [
sigmoid_distance(sol[1], ref[1])
for ref, sol in zip(self.reference_bt, solution_bt)
]
else:
score = [
ref[1] - sol[1]
for ref, sol in zip(self.solution_bt, solution_bt)
]
return np.abs(score)
[docs]
class BreakthroughPosition(DifferenceBase):
"""Absolute difference in breakthrough curve position difference metric."""
@wraps(DifferenceBase.__init__)
def __init__(self, *args, normalize_metrics=True, normalization_factor=None, **kwargs):
"""
Initialize the BreakthroughPosition object.
Parameters
----------
*args :
Positional arguments for DifferenceBase.
normalize_metrics : bool, optional
Whether to normalize the metrics. Default is True.
normalization_factor : float, optional
Factor to use for normalization.
If None, it is set to the maximum of the difference between the reference
breakthrough and the start time, and the difference between the end time and
the reference breakthrough.
**kwargs : dict
Keyword arguments passed to the base class constructor.
"""
super().__init__(*args, **kwargs)
self.reference_bt = find_breakthroughs(self.reference)
t_min = self.reference.time[0]
t_max = self.reference.time[-1]
self.normalize_metrics = normalize_metrics
if normalization_factor is None:
normalization_factor = max(
self.reference_bt[0] - t_min, t_max - self.reference_bt[0]
)
self.normalization_factor = normalization_factor
@property
def n_metrics(self):
return len(self.reference_bt)
def _evaluate(self, solution):
"""np.array: Difference in breakthrough position (time).
Parameters
----------
solution : SolutionIO
Concentration profile of simulation.
"""
solution_bt = find_breakthroughs(solution)
if self.normalize_metrics:
score = [
sigmoid_distance(sol[0], ref[0], self.normalization_factor)
for ref, sol in zip(self.reference_bt, solution_bt)
]
else:
score = [
ref[0] - sol[0]
for ref, sol in zip(self.solution_bt, solution_bt)
]
return np.abs(score)