"""RoadRunner calculator for PEtab problems.
Handles all RoadRunner.simulate calls, calculates likelihoods and residuals.
"""
from __future__ import annotations
import numbers
from collections.abc import Sequence
import numpy as np
from ...C import (
FVAL,
MODE_FUN,
MODE_RES,
RES,
ROADRUNNER_LLH,
ROADRUNNER_SIMULATION,
TIME,
ModeType,
)
from .utils import (
ExpData,
SolverOptions,
simulation_to_measurement_df,
unscale_parameters,
)
try:
import petab.v1 as petab
from petab.v1.parameter_mapping import ParMappingDictQuadruple
except ImportError:
petab = None
try:
import roadrunner
except ImportError:
roadrunner = None
LLH_TYPES = {
"lin_normal": lambda measurement, simulation, sigma: (
-0.5
* (
np.log(2 * np.pi * (sigma**2))
+ ((measurement - simulation) / sigma) ** 2
)
),
"log_normal": lambda measurement, simulation, sigma: (
-0.5
* (
np.log(2 * np.pi * (sigma**2) * (measurement**2))
+ ((np.log(measurement) - np.log(simulation)) / sigma) ** 2
)
),
"log10_normal": lambda measurement, simulation, sigma: (
-0.5
* (
np.log(2 * np.pi * (sigma**2) * (measurement**2) * np.log(10) ** 2)
+ ((np.log10(measurement) - np.log10(simulation)) / sigma) ** 2
)
),
"lin_laplace": lambda measurement, simulation, sigma: (
-np.log(2 * sigma) - (np.abs(measurement - simulation) / sigma)
),
"log_laplace": lambda measurement, simulation, sigma: (
-np.log(2 * sigma * simulation)
- (np.abs(np.log(measurement) - np.log(simulation)) / sigma)
),
"log10_laplace": lambda measurement, simulation, sigma: (
-np.log(2 * sigma * simulation * np.log(10))
- (np.abs(np.log10(measurement) - np.log10(simulation)) / sigma)
),
}
[docs]
class RoadRunnerCalculator:
"""Class to handle RoadRunner simulation and obtain objective value."""
[docs]
def __call__(
self,
x_dct: dict, # TODO: sensi_order support
mode: ModeType,
roadrunner_instance: roadrunner.RoadRunner,
edatas: list[ExpData],
x_ids: Sequence[str],
parameter_mapping: list[ParMappingDictQuadruple],
petab_problem: petab.Problem,
solver_options: SolverOptions | None = None,
):
"""Perform the RoadRunner call and obtain objective function values.
Parameters
----------
x_dct:
Parameter dictionary.
mode:
Mode of the call.
roadrunner_instance:
RoadRunner instance.
edatas:
List of ExpData.
x_ids:
Sequence of parameter IDs.
parameter_mapping:
Parameter parameter_mapping.
petab_problem:
PEtab problem.
solver_options:
Solver options of the roadrunner instance Integrator. These will
modify the roadrunner instance inplace.
Returns
-------
Tuple of objective function values.
"""
# sanity check that edatas and conditions are consistent
if len(edatas) != len(parameter_mapping):
raise ValueError(
"Number of edatas and conditions are not consistent."
)
if solver_options is None:
solver_options = SolverOptions()
# apply solver options
solver_options.apply_to_roadrunner(roadrunner_instance)
simulation_results = {}
llh_tot = 0
for edata, mapping_per_condition in zip(
edatas, parameter_mapping, strict=True
):
sim_res, llh = self.simulate_per_condition(
x_dct, roadrunner_instance, edata, mapping_per_condition
)
simulation_results[edata.condition_id] = sim_res
llh_tot += llh
if mode == MODE_FUN:
return {
FVAL: -llh_tot,
ROADRUNNER_SIMULATION: simulation_results,
ROADRUNNER_LLH: llh_tot,
}
if mode == MODE_RES: # TODO: speed up by not using pandas
simulation_df = simulation_to_measurement_df(
simulation_results, petab_problem.measurement_df
)
res_df = petab.calculate_residuals(
petab_problem.measurement_df,
simulation_df,
petab_problem.observable_df,
petab_problem.parameter_df,
)
return {
RES: res_df,
ROADRUNNER_SIMULATION: simulation_results,
FVAL: -llh_tot,
}
[docs]
def simulate_per_condition(
self,
x_dct: dict,
roadrunner_instance: roadrunner.RoadRunner,
edata: ExpData,
parameter_mapping_per_condition: ParMappingDictQuadruple,
) -> tuple[np.ndarray, float]:
"""Simulate the model for a single condition.
Parameters
----------
x_dct:
Parameter dictionary.
roadrunner_instance:
RoadRunner instance.
edata:
ExpData of a single condition.
parameter_mapping_per_condition:
Parameter parameter_mapping for a single condition.
Returns
-------
Tuple of simulation results in form of a numpy array and the
negative log-likelihood.
"""
# get timepoints and outputs to simulate
timepoints = list(edata.timepoints)
if timepoints[0] != 0.0:
timepoints = [0.0] + timepoints
if len(timepoints) == 1:
timepoints = [0.0] + timepoints
observables_ids = edata.get_observable_ids()
# steady state stuff
steady_state_calculations = False
state_variables = roadrunner_instance.model.getFloatingSpeciesIds()
# some states might be hidden as parameters with rate rules
rate_rule_ids = roadrunner_instance.getRateRuleIds()
state_variables += [
rate_rule_id
for rate_rule_id in rate_rule_ids
if rate_rule_id not in state_variables
]
# obs_ss = [] # TODO: add them to return values with info
state_ss = []
# if the first and third parameter mappings are not empty, we need
# to pre-equlibrate the model
if (
parameter_mapping_per_condition[0]
and parameter_mapping_per_condition[2]
):
steady_state_calculations = True
roadrunner_instance.conservedMoietyAnalysis = True
self.fill_in_parameters(
x_dct,
roadrunner_instance,
parameter_mapping_per_condition,
preeq=True,
)
roadrunner_instance.setSteadyStateSolver("newton")
# allow simulation to reach steady state
roadrunner_instance.getSteadyStateSolver().setValue(
"allow_presimulation", True
)
roadrunner_instance.getSteadyStateSolver().setValue(
"presimulation_maximum_steps", 1000
)
roadrunner_instance.getSteadyStateSolver().setValue(
"presimulation_time", 1000
)
# steady state output = observables + state variables
steady_state_selections = observables_ids + state_variables
roadrunner_instance.steadyStateSelections = steady_state_selections
steady_state = roadrunner_instance.getSteadyStateValuesNamedArray()
# we split the steady state into observables and state variables
# obs_ss = steady_state[:, : len(observables_ids)].flatten()
state_ss = steady_state[:, len(observables_ids) :].flatten()
# turn off conserved moiety analysis
roadrunner_instance.conservedMoietyAnalysis = False
# reset the model
roadrunner_instance.reset()
# set parameters
par_map = self.fill_in_parameters(
x_dct, roadrunner_instance, parameter_mapping_per_condition
)
# if steady state calculations are required, set state variables
if steady_state_calculations:
roadrunner_instance.setValues(state_variables, state_ss)
# fill in overriden species
self.fill_in_parameters(
x_dct,
roadrunner_instance,
parameter_mapping_per_condition,
filling_mode="only_species",
)
sim_res = roadrunner_instance.simulate(
times=timepoints, selections=[TIME] + observables_ids
)
llhs = calculate_llh(sim_res, edata, par_map, roadrunner_instance)
# reset the model
roadrunner_instance.reset()
return sim_res, llhs
[docs]
def fill_in_parameters(
self,
problem_parameters: dict,
roadrunner_instance: roadrunner.RoadRunner | None = None,
parameter_mapping: ParMappingDictQuadruple | None = None,
preeq: bool = False,
filling_mode: str | None = None,
) -> dict:
"""Fill in parameters into the roadrunner instance.
Parameters
----------
roadrunner_instance:
RoadRunner instance to fill in parameters
problem_parameters:
Problem parameters as parameterId=>value dict. Only
parameters included here will be set. Remaining parameters will
be used as already set in `amici_model` and `edata`.
parameter_mapping:
Parameter mapping for current condition. Quadruple of dicts,
where the first dict contains the parameter mapping for pre-
equilibration, the second dict contains the parameter mapping
for the simulation, the third and fourth dict contain the scaling
factors for the pre-equilibration and simulation, respectively.
preeq:
Whether to fill in parameters for pre-equilibration.
filling_mode:
Which parameters to fill in. If None or "all",
all parameters are filled in.
Other options are "only_parameters" and "only_species".
Returns
-------
dict:
Mapping of parameter IDs to values.
"""
if filling_mode is None:
filling_mode = "all"
# check for valid filling modes
if filling_mode not in ["all", "only_parameters", "only_species"]:
raise ValueError(
"Invalid filling mode. Choose from 'all', "
"'only_parameters', 'only_species'."
)
mapping = parameter_mapping[1] # default: simulation condition mapping
scaling = parameter_mapping[3] # default: simulation condition scaling
if preeq:
mapping = parameter_mapping[0] # pre-equilibration mapping
scaling = parameter_mapping[2] # pre-equilibration scaling
# Parameter parameter_mapping may contain parameter_ids as values,
# these *must* be replaced
def _get_par(model_par, val):
"""Get parameter value from problem_parameters and mapping.
Replace parameter IDs in parameter_mapping dicts by values from
problem_parameters where necessary
"""
if isinstance(val, str):
try:
# estimated parameter
return problem_parameters[val]
except KeyError:
# condition table overrides must have been handled already,
# e.g. by the PEtab parameter parameter_mapping, but
# parameters from InitialAssignments may still be present.
if mapping[val] == model_par:
# prevent infinite recursion
raise
return _get_par(val, mapping[val])
if model_par in problem_parameters:
# user-provided
return problem_parameters[model_par]
# prevent nan-propagation in derivative
if np.isnan(val):
return 0.0
# constant value
return val
mapping_values = {
key: _get_par(key, val) for key, val in mapping.items()
}
# we assume the parameters to be given in the scale defined in the
# petab problem. Thus, they need to be unscaled.
mapping_values = unscale_parameters(mapping_values, scaling)
# seperate the parameters into ones that overwrite species and others
mapping_params = {}
mapping_species = {}
for key, value in mapping_values.items():
if key in roadrunner_instance.model.getFloatingSpeciesIds():
# values that originally were NaN are not set
if isinstance(mapping[key], str) or not np.isnan(mapping[key]):
mapping_species[key] = float(value)
else:
mapping_params[key] = value
if filling_mode == "only_parameters" or filling_mode == "all":
# set parameters.
roadrunner_instance.setValues(mapping_params)
# reset is necessary to apply the changes to initial assignments
roadrunner_instance.reset()
if filling_mode == "only_species" or filling_mode == "all":
# set species
roadrunner_instance.setValues(mapping_species)
return mapping_values
def calculate_llh(
simulations: np.ndarray,
edata: ExpData,
parameter_mapping: dict,
roadrunner_instance: roadrunner.RoadRunner,
) -> float:
"""Calculate the negative log-likelihood for a single condition.
Parameters
----------
simulations:
Simulations of condition.
edata:
ExpData of a single condition.
parameter_mapping:
Parameter mapping for the condition.
roadrunner_instance:
RoadRunner instance. Needed to retrieve complex formulae.
Returns
-------
float:
Negative log-likelihood.
"""
# if 0 is not in timepoints, remove the first row of the simulation
if 0.0 not in edata.timepoints:
simulations = simulations[1:, :]
if not np.array_equal(simulations[:, 0], edata.timepoints):
raise ValueError(
"Simulation and Measurement have different timepoints."
)
# check that simulation and condition have same dimensions and timepoints
if simulations.shape != edata.measurements.shape:
raise ValueError(
"Simulation and Measurement have different dimensions."
)
# we can now drop the timepoints
simulations = simulations[:, 1:]
measurements = edata.measurements[:, 1:]
def _fill_in_noise_formula(noise_formula):
"""Fill in the noise formula."""
if isinstance(noise_formula, numbers.Number):
return float(noise_formula)
# if it is not a number, it is assumed to be a string
if noise_formula in parameter_mapping.keys():
return parameter_mapping[noise_formula]
# if the string starts with "noiseFormula_" it is saved in the model
if noise_formula.startswith("noiseFormula_"):
return roadrunner_instance.getValue(noise_formula)
# replace noise formula with actual value from mapping
noise_formulae = np.array(
[_fill_in_noise_formula(formula) for formula in edata.noise_formulae]
)
# check that the rows of noise are the columns of the simulation
if noise_formulae.shape[0] != simulations.shape[1]:
raise ValueError("Noise and Simulation have different dimensions.")
# per observable, decide on the llh function based on the noise dist
llhs = np.array(
[
LLH_TYPES[noise_dist](
measurements[:, i], simulations[:, i], noise_formulae[i]
)
for i, noise_dist in enumerate(edata.noise_distributions)
]
).transpose()
# check whether all nan values in llhs coincide with nan measurements
if not np.all(np.isnan(llhs) == np.isnan(measurements)):
return np.nan
# sum over all observables, ignoring nans
llhs = np.nansum(llhs)
return llhs