Source code for pypesto.sample.sample

import logging
from collections.abc import Callable
from time import process_time

import numpy as np

from ..problem import Problem
from ..result import Result
from ..startpoint import PriorStartpoints
from ..store import autosave
from .adaptive_metropolis import AdaptiveMetropolisSampler
from .sampler import Sampler
from .util import bound_n_samples_from_env

logger = logging.getLogger(__name__)


[docs] def sample( problem: Problem, n_samples: int | None, sampler: Sampler = None, x0: np.ndarray | list[np.ndarray] = None, result: Result = None, warm_start: float = 1.0, filename: str | Callable | None = None, overwrite: bool = False, ) -> Result: """ Call to do parameter sampling. Parameters ---------- problem: The problem to be solved. If None is provided, a :class:`pypesto.AdaptiveMetropolisSampler` is used. n_samples: Number of samples to generate. `None` can be used if the sampler does not use `n_samples`. sampler: The sampler to perform the actual sampling. x0: Initial parameter for the Markov chain. If None, the best parameter found in optimization is used. Note that some samplers require an initial parameter, some may ignore it. x0 can also be a list, to have separate starting points for parallel tempering chains. warm_start: Whether to warm start from previous optimization results stored in `result` or to sample from the prior. The value is the convex combination between the two, with 1.0 meaning only warm start and 0.0 meaning only sample from prior. If `x0` is provided, `warm_start` is ignored. result: A result to write to. If None provided, one is created from the problem. filename: Name of the hdf5 file, where the result will be saved. Default is None, which deactivates automatic saving. If set to "Auto" it will automatically generate a file named `year_month_day_profiling_result.hdf5`. Optionally a method, see docs for `pypesto.store.auto.autosave`. overwrite: Whether to overwrite `result/sampling` in the autosave file if it already exists. Returns ------- result: A result with filled in sample_options part. """ # prepare result object if result is None: result = Result(problem) # number of samples if n_samples is not None: n_samples = bound_n_samples_from_env(n_samples) if warm_start > 1.0 or warm_start < 0.0: raise ValueError("warm_start must be in [0, 1].") # try to find initial parameters if x0 is None: if warm_start == 0: logger.info("Sampling initial points from prior.") get_start_params = PriorStartpoints(check_fval=True) x0 = get_start_params.sample( n_starts=1, lb=problem.lb, ub=problem.ub, priors=problem.x_priors, )[0] elif result.optimize_result is not None: result.optimize_result.sort() if len(result.optimize_result.list) > 0: x0 = problem.get_reduced_vector( result.optimize_result.list[0]["x"] ) if warm_start < 1.0 and x0 is not None: logger.info( f"Initializing sampling with a warm start from optimization " f"and prior sampling with weight: {warm_start}." ) get_start_params = PriorStartpoints(check_fval=True) x0_prior = get_start_params.sample( n_starts=1, lb=problem.lb, ub=problem.ub, priors=problem.x_priors, )[0] x0 = warm_start * x0 + (1 - warm_start) * x0_prior else: logger.info( "No initial point provided and no optimization result found. Set warm_start to 0 " "if you need initial points for sampling." ) # set sampler if sampler is None: sampler = AdaptiveMetropolisSampler() # initialize sampler to problem sampler.initialize(problem=problem, x0=x0) # perform the sampling and track time t_start = process_time() sampler.sample(n_samples=n_samples) t_elapsed = process_time() - t_start logger.info("Elapsed time: " + str(t_elapsed)) # extract results sampler_result = sampler.get_samples() # record time sampler_result.time = t_elapsed # record results result.sample_result = sampler_result autosave( filename=filename, result=result, store_type="sample", overwrite=overwrite, ) return result