Source code for CADETProcess.smoothing

"""
=========================================
Smoothing (:mod:`CADETProcess.smoothing`)
=========================================

.. currentmodule:: CADETProcess.smoothing

This module provides functionality for smoothing data.


.. autosummary::
    :toctree: generated/

    find_smoothing_factors
    full_smooth

"""

import multiprocessing

import numpy as np
import scipy.signal

from pymoo.core.problem import ElementwiseProblem
from pymoo.algorithms.soo.nonconvex.pattern import PatternSearch
from pymoo.optimize import minimize

butter_order = 3

__all__ = ['find_smoothing_factors', 'full_smooth']


class TargetProblem(ElementwiseProblem):
    def __init__(self, lb, ub, sse_target, func, values, fs):
        super().__init__(n_var=1, n_obj=1, n_constr=0, xl=lb, xu=ub)
        self.sse_target = sse_target
        self.func = func
        self.values = values
        self.fs = fs

    def _evaluate(self, crit_fs, out, *args, **kwargs):
        crit_fs = 10**crit_fs
        try:
            sos = self.func(crit_fs, self.fs)
            low_passed = scipy.signal.sosfiltfilt(sos, self.values)
            sse = np.sum((low_passed - self.values) ** 2)

            error = (sse - self.sse_target)**2
        except (ValueError, np.linalg.LinAlgError):
            error = np.inf
        out["F"] = error


class MaxDistance(ElementwiseProblem):
    def __init__(self, lb, ub, func, fs, values, x_min, y_min, p1, p2, factor):
        super().__init__(n_var=1, n_obj=1, n_constr=0, xl=lb, xu=ub)
        self.func = func
        self.fs = fs
        self.values = values
        self.x_min = x_min
        self.y_min = y_min
        self.p1 = p1
        self.p2 = p2
        self.factor = factor

    def _evaluate(self, crit_fs, out, *args, **kwargs):
        crit_fs = 10.0 ** crit_fs[0]
        try:
            sos = self.func(crit_fs, self.fs)
        except ValueError:
            out['F'] = 1e6
            return

        try:
            low_passed = scipy.signal.sosfiltfilt(sos, self.values)
        except np.linalg.LinAlgError:
            out['F'] = 1e6
            return

        sse = np.sum((low_passed - self.values) ** 2)

        pT = np.array([crit_fs - self.x_min, np.log(sse) - self.y_min]).T / self.factor

        d = np.cross(self.p2 - self.p1, self.p1 - pT) / np.linalg.norm(self.p2 - self.p1)
        out["F"] = -d


def get_p(x, y):
    x = np.array(x)
    y = np.array(y)

    sort_idx = np.argsort(x)

    x = x[sort_idx]
    y = y[sort_idx]

    y_min = y - min(y)
    x_min = x - min(x)

    p3 = np.array([x_min, y_min]).T
    factor = np.max(p3, 0)
    p3 = p3 / factor
    p1 = p3[0, :]
    p2 = p3[-1, :]

    return x, min(x), y, min(y), p1, p2, p3, factor


def signal_bessel(crit_fs, fs):
    return scipy.signal.bessel(
        butter_order, crit_fs, btype="lowpass", analog=False, fs=fs,
        output="sos", norm="delay"
    )


def signal_butter(crit_fs, fs):
    return scipy.signal.butter(
        butter_order, crit_fs, btype="lowpass", analog=False, fs=fs,
        output="sos"
    )


def refine_signal(func, times, values, x, y, fs, start):
    x, x_min, y, y_min, p1, p2, p3, factor = get_p(x, y)

    lb = np.log10(x[0])
    ub = np.log10(x[-1])

    problem = MaxDistance(
        lb, ub, func, fs, values, x_min, y_min, p1, p2, factor
    )

    algorithm = PatternSearch(n_sample_points=50, eps=1e-13)

    res = minimize(
        problem,
        algorithm,
        verbose=False,
        seed=1
    )

    crit_fs = 10**res.X[0]

    return crit_fs


def find_L(x, y):
    # find the largest value greater than 0
    # otherwise return none to just turn off butter filter
    x, x_min, y, y_min, p1, p2, p3, factor = get_p(x, y)

    d = np.cross(p2 - p1, p1 - p3) / np.linalg.norm(p2 - p1)

    max_idx = np.argmax(d)
    max_d = d[max_idx]
    l_x = x[max_idx]
    l_y = y[max_idx]

    if max_d <= 0:
        return None, None

    return l_x, l_y


def find_signal(func, times, values, sse_target):
    filters = []
    sse = []

    fs = 1.0 / (times[1] - times[0])

    ub = fs / 2.0

    for i in np.logspace(-8, np.log10(ub), 50):
        try:
            sos = func(i, fs)
            low_passed = scipy.signal.sosfiltfilt(sos, values)

            filters.append(i)
            sse.append(np.sum((low_passed - values) ** 2))
        except (ValueError, np.linalg.LinAlgError):
            continue

    crit_fs_max = find_max_signal(
        func, times, values, sse_target, filters, sse
    )

    L_x, L_y = find_L(filters, np.log(sse))

    if L_x is not None:
        L_x = refine_signal(
            func, times, values, filters, np.log(sse), fs, L_x
        )

    if L_x is not None and crit_fs_max < L_x:
        L_x = crit_fs_max

    return L_x


def find_max_signal(func, times, values, sse_target, filters, sse):
    fs = 1.0 / (times[1] - times[0])

    filters = np.log10(filters)
    problem = TargetProblem(
        filters[0], filters[-1], sse_target, func, values, fs
    )

    algorithm = PatternSearch(n_sample_points=50, eps=1e-13)

    res = minimize(
        problem,
        algorithm,
        verbose=False,
        seed=1
    )

    crit_fs = 10**res.X[0]

    return crit_fs


def smoothing_filter_signal(func, times, values, crit_fs):
    if crit_fs is None:
        return values
    fs = 1.0 / (times[1] - times[0])
    sos = func(crit_fs, fs)
    low_passed = scipy.signal.sosfiltfilt(sos, values)
    return low_passed


[docs] def find_smoothing_factors(times, values, rmse_target=1e-4): sse_target = (rmse_target**2.0)*len(values) try: crit_fs = find_signal(signal_bessel, times, values, sse_target) except np.linalg.LinAlgError: crit_fs = None if crit_fs is None: multiprocessing.get_logger().info( "butter filter disabled, no viable L point found" ) values_filter = smoothing_filter_signal( signal_bessel, times, values, crit_fs ) s = sse_target spline, factor = create_spline(times, values, crit_fs, s) # run a quick butter pass to remove high frequency noise in the derivative # (needed for some experimental data) values_filter = spline.derivative()(times) / factor factor = 1.0/np.max(values_filter) values_filter = values_filter * factor crit_fs_der = find_signal(signal_bessel, times, values_filter, sse_target) return s, crit_fs, crit_fs_der
def create_spline(times, values, crit_fs, s): factor = 1.0 / np.max(values) values = values * factor values_filter = smoothing_filter_signal( signal_bessel, times, values, crit_fs ) return ( scipy.interpolate.UnivariateSpline( times, values_filter, s=s, k=5, ext=3 ), factor, ) def smooth_data(times, values, crit_fs, s): spline, factor = create_spline(times, values, crit_fs, s) return spline(times) / factor def smooth_data_derivative( times, values, crit_fs, s, crit_fs_der, smooth=True): spline, factor = create_spline(times, values, crit_fs, s) if smooth: values_filter_der = spline.derivative()(times) / factor factor_der = 1.0/np.max(values_filter_der) values_filter_der = values_filter_der * factor_der values_filter_der = butter( times, values_filter_der, crit_fs_der ) values_filter_der = values_filter_der / factor_der spline_der = scipy.interpolate.InterpolatedUnivariateSpline( times, values_filter_der, k=5, ext=3 ) values_filter_der = spline_der(times) else: values_filter_der = spline.derivative()(times) / factor return values_filter_der
[docs] def full_smooth(times, values, crit_fs, s, crit_fs_der, smooth=True): # return smooth data derivative of data spline, factor = create_spline(times, values, crit_fs, s) values_filter = spline(times) / factor # run a quick butter pass to remove high frequency noise in the derivative # (needed for some experimental data) if smooth: values_filter_der = spline.derivative()(times) / factor factor_der = 1.0/np.max(values_filter_der) values_filter_der = values_filter_der * factor_der values_filter_der = butter( times, values_filter_der, crit_fs_der ) values_filter_der = values_filter_der / factor_der spline_der = scipy.interpolate.InterpolatedUnivariateSpline( times, values_filter_der, k=5, ext=3 ) values_filter_der = spline_der(times) else: values_filter_der = spline.derivative()(times) / factor return values_filter, values_filter_der
def butter(times, values, crit_fs_der): values_filter = \ smoothing_filter_signal(signal_bessel, times, values, crit_fs_der) return values_filter