Source code for pypesto.hierarchical.relative.problem

"""Inner optimization problem in hierarchical optimization."""

import logging

import pandas as pd

from ...C import (
    MEASUREMENT_TYPE,
    PARAMETER_TYPE,
    SEMIQUANTITATIVE,
    InnerParameterType,
)
from ..base_problem import (
    AmiciInnerProblem,
    _get_timepoints_with_replicates,
    ix_matrices_from_arrays,
)
from .parameter import RelativeInnerParameter

try:
    import amici.sim.sundials as asd
    import petab.v1 as petab
    from petab.v1.C import (
        ESTIMATE,
        LOWER_BOUND,
        NOISE_PARAMETERS,
        OBSERVABLE_ID,
        OBSERVABLE_PARAMETERS,
        PARAMETER_ID,
        PARAMETER_SCALE,
        TIME,
        UPPER_BOUND,
    )
except ImportError:
    pass

logger = logging.getLogger(__name__)


[docs] class RelativeInnerProblem(AmiciInnerProblem): r"""Inner optimization problem for relative data with scaling/offset. Attributes ---------- xs: Mapping of (inner) parameter ID to ``InnerParameters``. data: Measurement data. One matrix (`num_timepoints` x `num_observables`) per simulation condition. Missing observations as NaN. edatas: AMICI ``ExpData``\s for each simulation condition. """
[docs] def __init__(self, **kwargs): super().__init__(**kwargs)
[docs] @staticmethod def from_petab_amici( petab_problem: "petab.Problem", amici_model: "asd.Model", edatas: list["asd.ExpData"], ) -> "RelativeInnerProblem": """Create an InnerProblem from a PEtab problem and AMICI objects.""" return inner_problem_from_petab_problem( petab_problem, amici_model, edatas )
[docs] def get_relative_observable_ids(self) -> list[str]: """Get IDs of all unique relative observables with scaling and/or offset.""" return list( { observable_id for x in self.xs.values() if x.inner_parameter_type in [ InnerParameterType.SCALING, InnerParameterType.OFFSET, ] for observable_id in x.observable_ids } )
[docs] def get_observable_indices_for_xs( self, inner_parameter_type: str ) -> list[int]: """Get unique list of ``RelativeParameter.observable_indices`` values.""" return list( { obs_idx for x in self.xs.values() if x.inner_parameter_type == inner_parameter_type for obs_idx in x.observable_indices } )
[docs] def get_xs_for_obs_idx(self, obs_idx: int) -> list[RelativeInnerParameter]: r"""Get ``RelativeParameter``\s that belong to the observable with index `obs_idx`.""" return [x for x in self.xs.values() if obs_idx in x.observable_indices]
def inner_problem_from_petab_problem( petab_problem: "petab.Problem", amici_model: "asd.Model", edatas: list["asd.ExpData"], ) -> RelativeInnerProblem: """ Create inner problem from PEtab problem. Hierarchical optimization is a pypesto-specific PEtab extension. """ # inner parameters inner_parameters = inner_parameters_from_parameter_df( petab_problem.parameter_df, petab_problem.measurement_df ) x_ids = [x.inner_parameter_id for x in inner_parameters] # used indices for all measurement specific parameters ixs = ixs_for_measurement_specific_parameters( petab_problem, amici_model, x_ids ) # transform experimental data data = [asd.ExpDataView(edata)["measurements"] for edata in edatas] # matrixify ix_matrices = ix_matrices_from_arrays(ixs, data) # assign matrices, observable indices and ids to inner parameters for par in inner_parameters: par.ixs = ix_matrices[par.inner_parameter_id] par.observable_indices = [ meas_indices[2] for meas_indices in ixs[par.inner_parameter_id] ] par.observable_ids = [ amici_model.get_observable_ids()[obs_idx] for obs_idx in par.observable_indices ] par_group_types = { tuple(obs_pars.split(";")): ( petab_problem.parameter_df.loc[obs_par, PARAMETER_TYPE] for obs_par in obs_pars.split(";") ) for (obs_id, obs_pars), _ in petab_problem.measurement_df.groupby( [petab.OBSERVABLE_ID, petab.OBSERVABLE_PARAMETERS], dropna=True ) if ";" in obs_pars # prefilter for at least 2 observable parameters } coupled_pars = { group for group, types in par_group_types.items() if ( (InnerParameterType.SCALING in types) and (InnerParameterType.OFFSET in types) ) } # Check each group is of length 2 for group in coupled_pars: if len(group) != 2: raise ValueError( f"Expected exactly 2 parameters in group {group}: a scaling " f"and an offset parameter." ) id_to_par = {par.inner_parameter_id: par for par in inner_parameters} # assign coupling for par in inner_parameters: if par.inner_parameter_type not in [ InnerParameterType.SCALING, InnerParameterType.OFFSET, ]: continue for group in coupled_pars: if par.inner_parameter_id in group: coupled_parameter_id = group[ group.index(par.inner_parameter_id) - 1 ] par.coupled = id_to_par[coupled_parameter_id] break return RelativeInnerProblem(xs=inner_parameters, data=data, edatas=edatas) def inner_parameters_from_parameter_df( par_df: pd.DataFrame, meas_df: pd.DataFrame, ) -> list[RelativeInnerParameter]: """ Create list of inner free parameters from PEtab parameter table. Inner parameters are those that have a non-empty `parameterType` in the PEtab problem. """ # create list of hierarchical parameters par_df = par_df.reset_index() for col in (PARAMETER_TYPE,): if col not in par_df: par_df[col] = None parameters = [] for _, row in par_df.iterrows(): if not row[ESTIMATE]: continue if petab.is_empty(row[PARAMETER_TYPE]): continue # If a sigma parameter belongs to a semiquantitative # observable, it is not a relative inner parameter. if row[PARAMETER_TYPE] == InnerParameterType.SIGMA: if MEASUREMENT_TYPE in meas_df.columns: par_id = row[PARAMETER_ID] corresponding_measurements = meas_df[ meas_df[NOISE_PARAMETERS] == par_id ] if any( corresponding_measurements[MEASUREMENT_TYPE] == SEMIQUANTITATIVE ): continue parameters.append( RelativeInnerParameter( inner_parameter_id=row[PARAMETER_ID], inner_parameter_type=row[PARAMETER_TYPE], scale=row[PARAMETER_SCALE], lb=row[LOWER_BOUND], ub=row[UPPER_BOUND], observable_ids=None, observable_indices=None, ) ) return parameters def ixs_for_measurement_specific_parameters( petab_problem: "petab.Problem", amici_model: "asd.Model", x_ids: list[str], ) -> dict[str, list[tuple[int, int, int]]]: """ Create mapping of parameters to measurements. Returns ------- A dictionary mapping parameter ID to a list of `(condition index, time index, observable index)` tuples in which this output parameter is used. For each condition, the time index refers to a sorted list of non-unique time points for which there are measurements. """ ixs_for_par = {} observable_ids = amici_model.get_observable_ids() simulation_conditions = ( petab_problem.get_simulation_conditions_from_measurement_df() ) for condition_ix, condition in simulation_conditions.iterrows(): # measurement table for current condition df_for_condition = petab.get_rows_for_condition( measurement_df=petab_problem.measurement_df, condition=condition ) # unique sorted list of timepoints timepoints = sorted(df_for_condition[TIME].unique().astype(float)) # non-unique sorted list of timepoints timepoints_w_reps = _get_timepoints_with_replicates( measurement_df=df_for_condition ) for time in timepoints: # subselect measurements for time `time` df_for_time = df_for_condition[df_for_condition[TIME] == time] time_ix_0 = timepoints_w_reps.index(time) # remember used time indices for each observable time_ix_for_obs_ix = {} # iterate over measurements for _, measurement in df_for_time.iterrows(): # extract observable index observable_ix = observable_ids.index( measurement[OBSERVABLE_ID] ) # as the time indices have to account for replicates, we need # to track which time indices have already been assigned for # the current observable if observable_ix in time_ix_for_obs_ix: # a replicate time_ix_for_obs_ix[observable_ix] += 1 else: # the first measurement for this `(observable, timepoint)` time_ix_for_obs_ix[observable_ix] = time_ix_0 time_w_reps_ix = time_ix_for_obs_ix[observable_ix] observable_overrides = petab.split_parameter_replacement_list( measurement.get(OBSERVABLE_PARAMETERS, None) ) noise_overrides = petab.split_parameter_replacement_list( measurement.get(NOISE_PARAMETERS, None) ) # try to insert if hierarchical parameter for override in observable_overrides + noise_overrides: if override in x_ids: ixs_for_par.setdefault(override, []).append( (condition_ix, time_w_reps_ix, observable_ix) ) return ixs_for_par