Fit Binding Model Parameters#

This example demonstrates how to estimate SMA binding parameters based on multiple gradient elution chromatograms. It assumes familiarity with the CADETProcess process models, as introduced in the Fit Column Transport Parameters example.

import os

import numpy as np

from CADETProcess.processModel import ComponentSystem, FlowSheet, Process, Inlet, Outlet, LumpedRateModelWithPores
from CADETProcess.processModel import StericMassAction
from CADETProcess.reference import ReferenceIO
from CADETProcess.comparison import Comparator
from CADETProcess.simulator import Cadet
from CADETProcess.optimization import OptimizationProblem

Process creation#

To simplify the creation of multiple Process instances with the correct configurations, the process creation was wrapped into a function below:

def create_process(cv_length=30):
    # Component System
    component_system = ComponentSystem()
    component_system.add_component('Salt')
    component_system.add_component('A')

    Q_ml_min = 0.5  # ml/min
    Q_m3_s = Q_ml_min / (60 * 1e6)

    initial_salt_concentration = 50
    final_salt_concentration = 500

    # Unit Operations
    inlet = Inlet(component_system, name='inlet')
    inlet.flow_rate = Q_m3_s

    column = create_column_model(component_system, final_salt_concentration, initial_salt_concentration)

    column_volume = column.length * (column.diameter / 2) ** 2 * np.pi

    outlet = Outlet(component_system, name='outlet')

    flow_sheet = FlowSheet(component_system)

    flow_sheet.add_unit(inlet)
    flow_sheet.add_unit(column)
    flow_sheet.add_unit(outlet)

    flow_sheet.add_connection(inlet, column)
    flow_sheet.add_connection(column, outlet)

    # Process
    process = Process(flow_sheet, f'{cv_length}')

    load_duration = 9
    t_gradient_start = 90.0
    gradient_volume = cv_length * column_volume
    gradient_duration = gradient_volume / inlet.flow_rate[0]
    duration_post_gradient_wash = gradient_duration / 10 + 180
    process.cycle_time = t_gradient_start + gradient_duration + duration_post_gradient_wash
    t_gradient_end = t_gradient_start + gradient_duration

    c_load = np.array([initial_salt_concentration, 1.0])
    c_wash = np.array([initial_salt_concentration, 0.0])
    c_elute = np.array([final_salt_concentration, 0.0])
    gradient_slope = (c_elute - c_wash) / gradient_duration
    c_gradient_poly = np.array(list(zip(c_wash, gradient_slope)))
    c_post_gradient_wash = np.array([final_salt_concentration, 0.0])

    process.add_event('load', 'flow_sheet.inlet.c', c_load)
    process.add_event('wash', 'flow_sheet.inlet.c', c_wash, load_duration)
    process.add_event('grad_start', 'flow_sheet.inlet.c', c_gradient_poly, t_gradient_start)
    process.add_event('grad_end', 'flow_sheet.inlet.c', c_post_gradient_wash, t_gradient_end)

    return process


def create_column_model(component_system, final_salt_concentration, initial_salt_concentration):
    column = LumpedRateModelWithPores(component_system, name='column')
    column.length = 0.02  # m
    column.diameter = 0.008
    column.particle_radius = 1.7e-5
    column.axial_dispersion = 3.16e-7
    column.bed_porosity = 0.3
    column.particle_porosity = 0.5
    column.film_diffusion = [1e-4, 1e-4]
    binding_model = create_binding_model(component_system, final_salt_concentration)
    column.binding_model = binding_model
    column.c = [initial_salt_concentration, 0, ]
    column.cp = [initial_salt_concentration, 0]
    column.q = [binding_model.capacity, 0]
    return column


def create_binding_model(component_system, final_salt_concentration):
    binding_model = StericMassAction(component_system, name='SMA')
    binding_model.is_kinetic = True
    binding_model.characteristic_charge = [0.0, 10]
    binding_model.capacity = 400.0
    binding_model.reference_solid_phase_conc = binding_model.capacity
    binding_model.reference_liquid_phase_conc = final_salt_concentration
    binding_model.steric_factor = [0.0, 10]
    binding_model.adsorption_rate = [0.0, 10]
    binding_model.desorption_rate = [0.0, 1e3]
    return binding_model

Creating “experimental” data#

To run the parameter estimation algorithm, we need experimental data. This function generates in-silico based “experimental” data for us to use.

def create_in_silico_experimental_data():
    def save_csv(results, directory=None, filename=None, units=None, noise_percentage=5):
        if not os.path.exists(directory):
            os.makedirs(directory)

        if units is None:
            units = results.solution.keys()

        for unit in units:
            solution = results.solution[unit]["outlet"].solution
            solution_without_salt = solution[:, 1:]
            solution_without_salt = (solution_without_salt + (np.random.random(solution_without_salt.shape) - 0.5)
                                     * noise_percentage / 100 * solution_without_salt.max(axis=0)
                                     )
            solution[:, 1:] = solution_without_salt
            solution_times = results.solution[unit]["outlet"].time

            full_solution = np.concatenate([np.atleast_2d(solution_times), solution.T]).T

            if filename is None:
                filename = unit + '_output.csv'

            file_path = os.path.join(directory, filename)
            with open(file_path, "w") as file_handle:
                np.savetxt(file_handle, full_solution, delimiter=",")

    simulator = Cadet()
    case_dir = "experimental_data"
    os.makedirs(case_dir, exist_ok=True)

    for cv in [5, 30, 120]:
        process = create_process(cv_length=cv)
        simulation_results = simulator.simulate(process)
        save_csv(simulation_results, directory=case_dir, filename=f"{cv}.csv", units=["outlet"])

Below are two convenience functions, written to simplifly loading references and creating comparators.

def load_reference(file_name, component_index=2):
    data = np.loadtxt(file_name, delimiter=',')

    time_experiment = data[:, 0]
    c_experiment = data[:, component_index]

    reference = ReferenceIO(
        f'Peak {file_name}',
        time_experiment, c_experiment,
        component_system=ComponentSystem(["A"])
    )
    return reference


def create_comparator(reference):
    comparator = Comparator(name=f"Comp {reference.name}")
    comparator.add_reference(reference)
    comparator.add_difference_metric(
        'PeakPosition', reference,
        'outlet.outlet', components=["A"]
    )
    comparator.add_difference_metric(
        'PeakHeight', reference,
        'outlet.outlet', components=["A"]
    )
    return comparator
if __name__ == '__main__':
    create_in_silico_experimental_data()

    optimization_problem = OptimizationProblem('SMA_binding_parameters')

    simulator = Cadet()
    optimization_problem.add_evaluator(simulator)

    comparators = dict()

    for cv in [5, 30, 120]:
        process = create_process(cv)

        reference = load_reference(f"experimental_data/{cv}.csv")

        optimization_problem.add_evaluation_object(process)
        comparator = create_comparator(reference)

        # Here I stored the comparators for easy access from within the callback function.
        # Note, that the dictionary keys need to be identical to the process names
        # as defined in create_process() for this to work.
        comparators[str(cv)] = comparator

        optimization_problem.add_objective(
            comparator,
            name=f"Objective {comparator.name}",
            evaluation_objects=[process],  # limit this comparator to be applied to only this one process
            n_objectives=comparator.n_metrics,
            requires=[simulator]
        )

    optimization_problem.add_variable(
        name='adsorption_rate',
        parameter_path='flow_sheet.column.binding_model.adsorption_rate',
        lb=1e-3, ub=1e3,
        transform='auto',
        indices=[1]  # modify only the protein (component index 1) parameter
    )

    optimization_problem.add_variable(
        name='desorption_rate',
        parameter_path='flow_sheet.column.binding_model.desorption_rate',
        lb=1e-3, ub=1e3,
        transform='auto',
        indices=[1]  # modify only the protein (component index 1) parameter
    )

    optimization_problem.add_variable(
        name='equilibrium_constant',
        evaluation_objects=None,
        lb=1e-4, ub=1e3,
        transform='auto',
        indices=[1]  # modify only the protein (component index 1) parameter
    )

    optimization_problem.add_variable(
        name='kinetic_constant',
        evaluation_objects=None,
        lb=1e-4, ub=1e3,
        transform='auto',
        indices=[1]  # modify only the protein (component index 1) parameter
    )

    optimization_problem.add_variable(
        name='characteristic_charge',
        parameter_path='flow_sheet.column.binding_model.characteristic_charge',
        lb=1, ub=50,
        transform='auto',
        indices=[1]  # modify only the protein (component index 1) parameter
    )

    optimization_problem.add_variable_dependency(
        dependent_variable="desorption_rate",
        independent_variables=["kinetic_constant", ],
        transform=lambda k_kin: 1 / k_kin
    )

    optimization_problem.add_variable_dependency(
        dependent_variable="adsorption_rate",
        independent_variables=["kinetic_constant", "equilibrium_constant"],
        transform=lambda k_kin, k_eq: k_eq / k_kin
    )


    def callback(simulation_results, individual, evaluation_object, callbacks_dir='./'):
        comparator = comparators[evaluation_object.name]
        comparator.plot_comparison(
            simulation_results,
            file_name=f'{callbacks_dir}/{individual.id}_{evaluation_object}_comparison.png',
            show=False
        )


    optimization_problem.add_callback(callback, requires=[simulator])

    print(optimization_problem.variable_names)
    x0 = [1, 1, 1e-2, 1e-3, 10]
    ind = optimization_problem.create_individual(x0)
    optimization_problem.evaluate_callbacks(ind)
['adsorption_rate', 'desorption_rate', 'equilibrium_constant', 'kinetic_constant', 'characteristic_charge']
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
Cell In[6], line 99
     97 x0 = [1, 1, 1e-2, 1e-3, 10]
     98 ind = optimization_problem.create_individual(x0)
---> 99 optimization_problem.evaluate_callbacks(ind)

File ~/checkouts/readthedocs.org/user_builds/cadet-process/conda/latest/lib/python3.11/site-packages/CADETProcess/optimization/optimizationProblem.py:1635, in OptimizationProblem.evaluate_callbacks(self, ind, current_iteration, force)
   1632 callback._current_iteration = current_iteration
   1634 try:
-> 1635     self._evaluate(ind.x_transformed, callback, force, untransform=True)
   1636 except CADETProcessError:
   1637     self.logger.warning(
   1638         f'Evaluation of {callback} failed at {ind.x}.'
   1639     )

File ~/checkouts/readthedocs.org/user_builds/cadet-process/conda/latest/lib/python3.11/site-packages/CADETProcess/optimization/optimizationProblem.py:136, in OptimizationProblem.untransforms.<locals>.wrapper(self, x, untransform, *args, **kwargs)
    133 if untransform:
    134     x = self.untransform(x)
--> 136 return func(self, x, *args, **kwargs)

File ~/checkouts/readthedocs.org/user_builds/cadet-process/conda/latest/lib/python3.11/site-packages/CADETProcess/optimization/optimizationProblem.py:782, in OptimizationProblem._evaluate(self, x, func, force)
    779 else:
    780     requires = [func]
--> 782 self.set_variables(x)
    783 evaluation_objects = func.evaluation_objects
    785 if len(evaluation_objects) == 0:

File ~/checkouts/readthedocs.org/user_builds/cadet-process/conda/latest/lib/python3.11/site-packages/CADETProcess/optimization/optimizationProblem.py:136, in OptimizationProblem.untransforms.<locals>.wrapper(self, x, untransform, *args, **kwargs)
    133 if untransform:
    134     x = self.untransform(x)
--> 136 return func(self, x, *args, **kwargs)

File ~/checkouts/readthedocs.org/user_builds/cadet-process/conda/latest/lib/python3.11/site-packages/CADETProcess/optimization/optimizationProblem.py:662, in OptimizationProblem.set_variables(self, x, evaluation_objects)
    659 values = self.get_dependent_values(x)
    661 for variable, value in zip(self.variables, values):
--> 662     variable.set_value(value)

File ~/checkouts/readthedocs.org/user_builds/cadet-process/conda/latest/lib/python3.11/site-packages/CADETProcess/optimization/optimizationProblem.py:3603, in OptimizationVariable.set_value(self, value)
   3601     raise ValueError("Exceeds lower bound")
   3602 if value > self.ub:
-> 3603     raise ValueError("Exceeds upper bound")
   3605 if self.evaluation_objects is None:
   3606     return

ValueError: Exceeds upper bound

Note

For performance reasons, the optimization is currently not run when building the documentation. In future, we will try to sideload pre-computed results to also discuss them here.

# if __name__ == '__main__':
#     from CADETProcess.optimization import U_NSGA3
#
#     optimizer = U_NSGA3()
#     optimizer.n_cores = 4
#     optimizer.pop_size = 50
#     optimizer.n_max_gen = 5
#
#     optimization_results = optimizer.optimize(
#         optimization_problem,
#         use_checkpoint=False
#     )