Source code for pypesto.sample.evidence

"""Various methods for estimating the log evidence of a model."""

import logging
from collections.abc import Callable

import numpy as np
from scipy import stats
from scipy.integrate import simpson, trapezoid
from scipy.optimize import minimize_scalar
from scipy.special import logsumexp

from ..C import SIMPSON, STEPPINGSTONE, TRAPEZOID
from ..objective import (
    AggregatedObjective,
    NegLogParameterPriors,
    NegLogPriors,
)
from ..problem import Problem
from ..result import Result
from .diagnostics import geweke_test

logger = logging.getLogger(__name__)


[docs] def laplace_approximation_log_evidence( problem: Problem, x: np.ndarray ) -> float: """ Compute the log evidence using the Laplace approximation. The objective in your `problem` must be a negative log posterior, and support Hessian computation. Parameters ---------- problem: The problem to compute the log evidence for. x: The maximum a posteriori estimate at which to compute the log evidence. Returns ------- log_evidence: float """ hessian = problem.objective( problem.get_reduced_vector(x), sensi_orders=(2,) ) _, log_det = np.linalg.slogdet(hessian) log_prop_posterior = -problem.objective(problem.get_reduced_vector(x)) log_evidence = ( 0.5 * np.log(2 * np.pi) * len(problem.x_free_indices) - 0.5 * log_det + log_prop_posterior ) return log_evidence
[docs] def harmonic_mean_log_evidence( result: Result, prior_samples: np.ndarray | None = None, neg_log_likelihood_fun: Callable | None = None, ) -> float: """ Compute the log evidence using the harmonic mean estimator. Stabilized harmonic mean estimator is used if prior samples are provided. Newton and Raftery (1994): https://doi.org/10.1111/j.2517-6161.1994.tb01956.x Parameters ---------- result: prior_samples: Samples from the prior distribution. If samples from the prior are provided, the stabilized harmonic mean is computed (recommended). Then, the likelihood function must be provided as well. neg_log_likelihood_fun: callable Function to evaluate the negative log likelihood. Necessary if prior_samples is not `None`. Returns ------- log_evidence """ if result.sample_result is None: raise ValueError("No samples available. Run sampling first.") # compute negative log likelihood from traces burn_in = geweke_test(result) trace_neglogpost = result.sample_result.trace_neglogpost[0, burn_in:] trace_neglogprior = result.sample_result.trace_neglogprior[0, burn_in:] neg_log_likelihoods_posterior = trace_neglogpost - trace_neglogprior if prior_samples is None: # compute harmonic mean from samples return -logsumexp(neg_log_likelihoods_posterior) + np.log( neg_log_likelihoods_posterior.size ) # compute stabilized harmonic mean if prior_samples is not None and neg_log_likelihood_fun is None: raise ValueError( "you need to provide a likelihood function to evaluate prior samples" ) # compute delta (ratio of prior to posterior samples) n_samples_prior = len(prior_samples) n_samples_posterior = len(trace_neglogpost) delta = n_samples_prior / (n_samples_prior + n_samples_posterior) neg_log_likelihoods_prior = np.array( [neg_log_likelihood_fun(x) for x in prior_samples] ) log_likelihoods_stack = -np.concatenate( [neg_log_likelihoods_prior, neg_log_likelihoods_posterior] ) def _log_evidence_objective(log_p: float): # Helper function to compute the log evidence with stabilized harmonic mean log_w_i = logsumexp( np.stack( ( log_p * np.ones_like(log_likelihoods_stack), log_likelihoods_stack, ), axis=1, ), b=np.array([delta, 1 - delta]), axis=1, ) res, sign = logsumexp( [ log_p, logsumexp(log_likelihoods_stack - log_w_i) - logsumexp(-log_w_i), ], b=[1, -1], return_sign=True, ) return sign * res sol = minimize_scalar(_log_evidence_objective) return sol.x
[docs] def parallel_tempering_log_evidence( result: Result, method: str = "trapezoid", use_all_chains: bool = True, ) -> float | None: """Perform thermodynamic integration or steppingstone sampling to estimate the log evidence. Thermodynamic integration is performed by integrating the mean log likelihood over the temperatures. Errors might come from the samples itself or the numerical integration. Steppingstone sampling is a form of importance sampling that uses the maximum likelihood of each temperature. It does not require an integration, but can be biased for a small number of temperatures. See (Annis et al., 2019), https://doi.org/10.1016/j.jmp.2019.01.005, for more details. This should be used with a beta decay temperature schedule and not with the adaptive version of parallel tempering sampling as the temperature schedule is not optimal for thermodynamic integration. Parameters ---------- result: Result object containing the samples. method: Integration method, either 'trapezoid' or 'simpson' to perform thermodynamic integration (uses scipy for integration) or 'steppingstone' to perform steppingstone sampling. use_all_chains: If True, calculate burn-in for each chain and use the maximal burn-in for all chains for the integration. This will fail if not all chains have converged yet. Otherwise, use only the converged chains for the integration (might increase the integration error). """ # compute burn in for all chains but the last one (prior only) burn_ins = np.zeros(len(result.sample_result.betas), dtype=int) for i_chain in range(len(result.sample_result.betas)): burn_ins[i_chain] = geweke_test(result, chain_number=i_chain) max_burn_in = int(np.max(burn_ins)) if max_burn_in >= result.sample_result.trace_x.shape[1]: logger.warning( f"At least {np.sum(burn_ins >= result.sample_result.trace_x.shape[1])} chains seem to not have " f"converged yet. You may want to use a larger number of samples." ) if use_all_chains: raise ValueError( "Not all chains have converged yet. You may want to use a larger number of samples, " "or try ´use_all_chains=False´, which might increase the integration error." ) if use_all_chains: # estimate mean of log likelihood for each beta trace_loglike = ( result.sample_result.trace_neglogprior[::-1, max_burn_in:] - result.sample_result.trace_neglogpost[::-1, max_burn_in:] ) mean_loglike_per_beta = np.mean(trace_loglike, axis=1) temps = result.sample_result.betas[::-1] else: # estimate mean of log likelihood for each beta if chain has converged mean_loglike_per_beta = [] trace_loglike = [] temps = [] for i_chain in reversed(range(len(result.sample_result.betas))): if burn_ins[i_chain] < result.sample_result.trace_x.shape[1]: # save temperature-chain as it is converged temps.append(result.sample_result.betas[i_chain]) # calculate mean log likelihood for each beta trace_loglike_i = ( result.sample_result.trace_neglogprior[ i_chain, burn_ins[i_chain] : ] - result.sample_result.trace_neglogpost[ i_chain, burn_ins[i_chain] : ] ) trace_loglike.append(trace_loglike_i) mean_loglike_per_beta.append(np.mean(trace_loglike_i)) if method == TRAPEZOID: log_evidence = trapezoid( # integrate from low to high temperature y=mean_loglike_per_beta, x=temps, ) elif method == SIMPSON: log_evidence = simpson( # integrate from low to high temperature y=mean_loglike_per_beta, x=temps, ) elif method == STEPPINGSTONE: log_evidence = steppingstone(temps=temps, trace_loglike=trace_loglike) else: raise ValueError( f"Unknown method {method}. Choose 'trapezoid', 'simpson' for thermodynamic integration or ", "'steppingstone' for steppingstone sampling.", ) return log_evidence
def steppingstone(temps: np.ndarray, trace_loglike: np.ndarray) -> float: """Perform steppingstone sampling to estimate the log evidence. Implementation based on Annis et al. (2019): https://doi.org/10.1016/j.jmp.2019.01.005. Parameters ---------- temps: Temperature values. trace_loglike: Log likelihood values for each temperature. """ from scipy.special import logsumexp ss_log_evidences = np.zeros(len(temps) - 1) for t_i in range(1, len(temps)): # we use the maximum likelihood times the temperature difference to stabilize the logsumexp # original formulation uses only the maximum likelihood, this is equivalent ss_log_evidences[t_i - 1] = logsumexp( trace_loglike[t_i - 1] * (temps[t_i] - temps[t_i - 1]) ) - np.log(trace_loglike[t_i - 1].size) log_evidence = np.sum(ss_log_evidences) return log_evidence
[docs] def bridge_sampling_log_evidence( result: Result, n_posterior_samples_init: int | None = None, initial_guess_log_evidence: float | None = None, max_iter: int = 1000, tol: float = 1e-6, ) -> float: """ Compute the log evidence using bridge sampling. Based on "A Tutorial on Bridge Sampling" by Gronau et al. (2017): https://doi.org/10.1016/j.jmp.2017.09.005. Using the optimal bridge function by Meng and Wong (1996) which minimises the relative mean-squared error. Proposal function is calibrated using posterior samples, which are not used for the final bridge estimate (as this may result in an underestimation of the marginal likelihood, see Overstall and Forster (2010)). Parameters ---------- result: The pyPESTO result object with filled sample result. n_posterior_samples_init: Number of samples used to calibrate the proposal function. By default, half of the posterior samples are used. initial_guess_log_evidence: Initial guess for the log evidence. By default, the Laplace approximation is used to compute the initial guess. max_iter: Maximum number of iterations. Default is 1000. tol: Tolerance for convergence. Default is 1e-6. Returns ------- log_evidence """ if result.sample_result is None: raise ValueError("No samples available. Run sampling first.") if not isinstance(result.problem.objective, AggregatedObjective): raise ValueError("Objective must be an AggregatedObjective.") # use Laplace approximation to get initial guess for p(y) if initial_guess_log_evidence is None: initial_guess_log_evidence = laplace_approximation_log_evidence( problem=result.problem, x=result.optimize_result.x[0] ) # extract posterior samples burn_in = geweke_test(result) posterior_samples = result.sample_result.trace_x[0, burn_in:] # build proposal function from posterior samples if n_posterior_samples_init is None: n_posterior_samples_init = int(posterior_samples.shape[0] * 0.5) # randomly select samples for calibration calibration_index = np.random.choice( np.arange(posterior_samples.shape[0]), n_posterior_samples_init, replace=False, ) samples_calibration = posterior_samples[calibration_index] # remove calibration samples from posterior samples posterior_samples = posterior_samples[ [ j for j in range(posterior_samples.shape[0]) if j not in calibration_index ] ] # generate proposal samples and define proposal function n_proposal_samples = posterior_samples.shape[0] posterior_mean = np.mean(samples_calibration, axis=0) posterior_cov = np.cov(samples_calibration.T) # if covariance matrix is not positive definite (numerically), use diagonal covariance matrix only try: # proposal density function log_proposal_fun = stats.multivariate_normal( mean=posterior_mean, cov=posterior_cov ).logpdf except np.linalg.LinAlgError: posterior_cov = np.diag(np.diag(posterior_cov)) log_proposal_fun = stats.multivariate_normal( mean=posterior_mean, cov=posterior_cov ).logpdf # generate proposal samples if posterior_cov.size == 1: # univariate case proposal_samples = np.random.normal( loc=posterior_mean, scale=np.sqrt(posterior_cov), size=n_proposal_samples, ) proposal_samples = proposal_samples.reshape(-1, 1) else: # multivariate case proposal_samples = np.random.multivariate_normal( mean=posterior_mean, cov=posterior_cov, size=n_proposal_samples ) # Compute the weights for the bridge sampling estimate log_s1 = np.log( posterior_samples.shape[0] / (posterior_samples.shape[0] + n_proposal_samples) ) log_s2 = np.log( n_proposal_samples / (posterior_samples.shape[0] + n_proposal_samples) ) # Start with the initial guess for p(y) log_p_y = initial_guess_log_evidence # Compute the log-likelihood, log-prior, and log-proposal for the posterior and proposal samples # assumes that the objective function is the negative log-likelihood + negative log-prior # get index of prior in the objective function likelihood_fun_indices = [] for i, obj in enumerate(result.problem.objective._objectives): if not isinstance(obj, NegLogParameterPriors) and not isinstance( obj, NegLogPriors ): likelihood_fun_indices.append(i) def log_likelihood_fun(x_array): return np.array( [ np.sum( [ -obj( result.problem.get_full_vector( x=x, x_fixed_vals=result.problem.x_fixed_vals ) ) for obj_i, obj in enumerate( result.problem.objective._objectives ) if obj_i in likelihood_fun_indices ] ) for x in x_array ] ) def log_prior_fun(x_array): return np.array( [ np.sum( [ -obj( result.problem.get_full_vector( x=x, x_fixed_vals=result.problem.x_fixed_vals ) ) for obj_i, obj in enumerate( result.problem.objective._objectives ) if obj_i not in likelihood_fun_indices ] ) for x in x_array ] ) log_likelihood_posterior = log_likelihood_fun(posterior_samples) log_prior_posterior = log_prior_fun(posterior_samples) log_proposal_posterior = log_proposal_fun(posterior_samples) log_likelihood_proposal = log_likelihood_fun(proposal_samples) log_prior_proposal = log_prior_fun(proposal_samples) log_proposal_proposal = log_proposal_fun(proposal_samples) log_h_posterior_1 = log_s1 + log_likelihood_posterior + log_prior_posterior log_h_proposal_1 = log_s1 + log_likelihood_proposal + log_prior_proposal for i in range(max_iter): # Compute h(θ) for posterior samples log_h_posterior_2 = log_s2 + log_p_y + log_proposal_posterior log_h_posterior = logsumexp([log_h_posterior_1, log_h_posterior_2]) # Compute h(θ) for proposal samples log_h_proposal_2 = log_s2 + log_p_y + log_proposal_proposal log_h_proposal = logsumexp([log_h_proposal_1, log_h_proposal_2]) # Calculate the numerator and denominator for the bridge sampling estimate temp = log_likelihood_proposal + log_prior_proposal + log_h_proposal log_numerator = logsumexp(temp) - np.log( temp.size ) # compute mean in log space temp = log_proposal_posterior + log_h_posterior log_denominator = logsumexp(temp) - np.log( temp.size ) # compute mean in log space # Update p(y) log_p_y_new = log_numerator - log_denominator # Check for convergence if abs(log_p_y_new - log_p_y) < tol: break log_p_y = log_p_y_new if i == max_iter - 1: logger.warning( "Bridge sampling did not converge in the given number of iterations." ) return log_p_y