"""
:py:mod:`core.py`
-------------------------------------
"""
from alabi import utility as ut
from alabi import visualization as vis
from alabi import gp_utils
from alabi import mcmc_utils
from alabi import cache_utils
from alabi import parallel_utils
import numpy as np
from functools import partial
import george
from george import kernels
import multiprocess as mp
import time
import os
import warnings
import tqdm
import pickle
import scipy.optimize as op
__all__ = ["SurrogateModel", "CachedSurrogateLikelihood"]
[docs]
class CachedSurrogateLikelihood:
"""
A picklable cached surrogate likelihood function.
This class creates a callable object that caches the GP computation
and can be pickled for use with multiprocessing.
"""
[docs]
def __init__(self, gp_iter, _y_cond, theta_scaler, y_scaler, ndim, return_var=False):
"""
Initialize the cached surrogate likelihood.
:param gp_iter: Pre-computed GP object
:param y_cond: Training target values
:param theta_scaler: Parameter scaler object
:param y_scaler: Target scaler object
:param ndim: Number of dimensions
"""
self.gp_iter = gp_iter
self._y_cond = _y_cond
self.theta_scaler = theta_scaler
self.y_scaler = y_scaler
self.ndim = ndim
self.return_var = return_var
def __call__(self, theta_xs):
"""
Evaluate the cached surrogate likelihood.
:param theta_xs: Point(s) to evaluate. Same format as surrogate_log_likelihood.
:param return_var: Whether to return variance as well.
:returns: Same format as surrogate_log_likelihood.
"""
# Convert input to numpy array and handle dimensionality
theta_xs = np.asarray(theta_xs)
original_shape_1d = False
# Handle 1D input (single point)
if theta_xs.ndim == 1:
theta_xs = theta_xs.reshape(1, -1)
original_shape_1d = True
elif theta_xs.ndim != 2:
raise ValueError(f"theta_xs must be 1D or 2D array, got {theta_xs.ndim}D")
# Apply scaling transformation
_theta_xs = self.theta_scaler.transform(theta_xs)
# Ensure proper shape for george GP
_theta_xs = np.atleast_2d(_theta_xs)
if _theta_xs.shape[0] == 1 and _theta_xs.shape[1] != self.ndim:
if _theta_xs.shape[1] == 1 and _theta_xs.shape[0] == self.ndim:
_theta_xs = _theta_xs.T
elif len(_theta_xs.flatten()) == self.ndim:
_theta_xs = _theta_xs.reshape(1, -1)
# Use the pre-computed GP (this is fast since gp.compute() was already called)
if self.return_var == False:
_ypred = self.gp_iter.predict(self._y_cond, _theta_xs, return_var=False, return_cov=False)
ypred = self.y_scaler.inverse_transform(_ypred.reshape(-1, 1)).flatten()
# Return single value if input was 1D, otherwise return array
if original_shape_1d:
return ypred[0]
else:
return ypred
else:
_ypred, _varpred = self.gp_iter.predict(self._y_cond, _theta_xs, return_var=True, return_cov=False)
ypred = self.y_scaler.inverse_transform(_ypred.reshape(-1, 1)).flatten()
# Variance transformation: variance scales as scale_factor² for linear transforms
# For FunctionTransformer (like nlog_scaler), we need to handle this carefully
if hasattr(self.y_scaler, 'scale_') and self.y_scaler.scale_ is not None:
# For StandardScaler or similar: var_unscaled = scale_factor² × var_scaled
var_scale_factor = self.y_scaler.scale_[0] ** 2
varpred = _varpred * var_scale_factor
else:
# For FunctionTransformer or other scalers, compute numerical derivative
# This is more accurate than using inverse_transform on variance
try:
# Use a small epsilon to estimate the derivative
eps = 1e-6
test_vals = np.array([[0.0], [eps]])
transformed = self.y_scaler.inverse_transform(test_vals)
scale_factor = (transformed[1] - transformed[0]) / eps
varpred = _varpred * (scale_factor ** 2)
except:
# Fallback: use absolute value of incorrect transform to ensure positivity
varpred = np.abs(self.y_scaler.inverse_transform(_varpred.reshape(-1, 1)).flatten())
# Return single values if input was 1D, otherwise return arrays
if original_shape_1d:
return ypred[0], varpred[0]
else:
return ypred, varpred
[docs]
class SurrogateModel(object):
"""
Gaussian Process surrogate model for Bayesian inference and optimization.
A SurrogateModel uses a Gaussian Process to create a fast approximation of expensive
likelihood functions, enabling efficient Bayesian inference, parameter estimation,
and active learning. The model supports various active learning algorithms and
scalers for handling different types of likelihood functions.
:param lnlike_fn: (*callable, required*)
Log-likelihood function that takes parameter array theta and returns scalar
log-likelihood value. For Bayesian inference, this is your model's log-likelihood.
Signature: lnlike_fn(theta) -> float
:param bounds: (*array-like, required*)
Prior bounds for each parameter. List/array of (min, max) tuples for each dimension.
Example: bounds = [(0, 1), (2, 3), (-1, 1)]
:param param_names: (*array-like, optional*)
Names/labels for each parameter. If None, defaults to θ₀, θ₁, etc.
Length must match number of dimensions in bounds.
:param cache: (*bool, optional, default=True*)
Whether to cache the trained model to disk for reuse
:param savedir: (*str, optional, default="results/"*)
Directory for saving results, plots, and cached models
:param model_name: (*str, optional, default="surrogate_model"*)
Name prefix for cached model files
:param verbose: (*bool, optional, default=True*)
Print progress information during training and inference
:param ncore: (*int, optional, default=cpu_count()*)
Number of CPU cores to use for parallel computation
:param ignore_warnings: (*bool, optional, default=True*)
Suppress sklearn and other package warnings
:param random_state: (*int, optional, default=None*)
Random seed for reproducible results
.. attribute:: gp
:type: george.GP
Trained Gaussian Process model
.. attribute:: bounds
:type: ndarray
Original parameter bounds (unscaled)
.. attribute:: _bounds
:type: ndarray
Scaled parameter bounds used for GP training
.. attribute:: _theta
:type: ndarray
Training parameter samples (scaled)
.. attribute:: _y
:type: ndarray
Training likelihood values (scaled)
.. attribute:: ntrain
:type: int
Number of initial training samples
.. attribute:: ndim
:type: int
Number of parameters/dimensions
.. attribute:: emcee_samples
:type: ndarray
MCMC samples from emcee (if run_emcee called)
.. attribute:: dynesty_samples
:type: ndarray
Nested sampling results from dynesty (if run_dynesty called)
**Examples**
Basic usage for Bayesian inference:
.. code-block:: python
def log_likelihood(theta):
# Your model likelihood function
return -0.5 * np.sum((theta - 2)**2)
bounds = [(0, 4), (0, 4)] # 2D parameter space
sm = SurrogateModel(log_likelihood, bounds)
sm.init_samples(ntrain=100) # Initial training data
sm.init_gp() # Initialize Gaussian Process
sm.active_train(niter=50) # Active learning
sm.run_dynesty() # Bayesian inference
For optimization problems:
.. code-block:: python
sm.active_train(algorithm="jones") # Use Jones algorithm for optimization
.. seealso::
:meth:`init_samples` : Initialize training data
:meth:`init_gp` : Initialize Gaussian Process
:meth:`active_train` : Perform active learning
:meth:`run_dynesty` : Run nested sampling with dynesty
:meth:`run_emcee` : Run MCMC sampling with emcee
:meth:`run_ultranest` : Run nested sampling with UltraNest
:meth:`run_pymultinest` : Run nested sampling with PyMultiNest
"""
[docs]
def __init__(self, lnlike_fn=None, bounds=None, param_names=None,
cache=True, savedir="results/", model_name="surrogate_model",
verbose=True, show_warnings=False, ncore=1, pool_method="forkserver", ignore_warnings=True,
random_state=None):
# Check all required inputs are specified
if lnlike_fn is None:
raise ValueError("Must supply lnlike_fn to train GP surrogate model.")
if bounds is None:
raise ValueError("Must supply prior bounds.")
# Set random seed for reproducibility
if random_state is None:
# Use a time-based seed to avoid clustering when called repeatedly
import time
random_state = int(time.time() * 1000000) % (2**32)
self.random_state = random_state
# Set function for training the GP, and initial training samples
# For bayesian inference problem this would be your log likelihood function
self.lnlike_fn = lnlike_fn
self.true_log_likelihood = lnlike_fn
# unscaled bounds for theta
self.bounds = np.array(bounds)
# define prior sampler with unscaled bounds
self.prior_sampler = partial(ut.prior_sampler, bounds=self.bounds, sampler="uniform", random_state=self.random_state)
# Determine dimensionality
self.ndim = len(self.bounds)
# Set parameter names
if param_names is not None:
if len(param_names) != len(bounds):
raise ValueError("Length of param_names must match length of bounds.")
self.param_names = param_names
self.labels = param_names
else:
self.param_names = [r"$\theta_%s$"%(i) for i in range(self.ndim)]
self.labels = ["theta_{i}" for i in range(self.ndim)]
# Cache surrogate model as pickle
self.cache = cache
# Directory to save results and plots; defaults to local dir
self.savedir = savedir
if not os.path.exists(self.savedir):
os.makedirs(self.savedir)
# Name of model cache
self.model_name = model_name
# Print progress statements
self.verbose = verbose
self.show_warnings = show_warnings
# Ignore warnings
if ignore_warnings:
warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings("ignore", category=FutureWarning)
# Number of cores alabi is allowed to use
self.pool_method = pool_method
if ncore <= 0:
self.ncore = 1
else:
self.ncore = min(ncore, mp.cpu_count())
# Check if MPI is active
self.mpi_is_active = parallel_utils.is_mpi_active()
if self.mpi_is_active:
from mpi4py.futures import MPIPoolExecutor
# false if emcee, dynesty, and ultranest have not been run for this object
self.emcee_run = False
self.dynesty_run = False
self.ultranest_run = False
def __getstate__(self):
state = self.__dict__.copy()
state["pool"] = None
# If self.gp is the problematic object, we need to handle it
if hasattr(self, 'gp') and hasattr(self.gp, '__dict__'):
# Try to find and remove unpickleable attributes from gp
gp_dict = self.gp.__dict__.copy()
cleaned_gp_dict = {}
for key, value in gp_dict.items():
try:
pickle.dumps(value)
cleaned_gp_dict[key] = value
except:
print(f"Removing unpickleable attribute from gp: {key}")
return state
def __setstate__(self, state):
self.__dict__.update(state)
def _get_pool(self, ncore=None):
if (ncore is None) or (ncore <= 1):
return None
if self.mpi_is_active:
pool = MPIPoolExecutor(max_workers=ncore)
return pool
else:
pool = mp.get_context(self.pool_method).Pool(processes=ncore)
return pool
def _close_pool(self, pool):
if pool is None:
pool = None
elif self.mpi_is_active:
pool.shutdown(wait=True)
pool = None
else:
pool.close()
pool.join()
pool = None
[docs]
def save(self):
"""
Pickle ``SurrogateModel`` object and write summary to a text file.
"""
file = os.path.join(self.savedir, self.model_name)
# pickle surrogate model object
print(f"Caching model to {file}...")
# Create a temporary file first, then rename (atomic operation)
temp_file = file + ".pkl.tmp"
try:
with open(temp_file, "wb") as f:
pickle.dump(self, f)
# Atomic rename to prevent corruption
os.rename(temp_file, file + ".pkl")
except Exception as e:
# Clean up temp file if something went wrong
if os.path.exists(temp_file):
os.remove(temp_file)
raise e
if hasattr(self, "gp"):
try:
cache_utils.write_report_gp(self, file)
except Exception as e:
print(f"Error writing GP report: {e}")
if self.emcee_run == True:
cache_utils.write_report_emcee(self, file)
if self.dynesty_run == True:
cache_utils.write_report_dynesty(self, file)
def _lnlike_fn(self, _theta):
"""
Internal function to evaluate the model function ``lnlike_fn`` at scaled theta.
This is used to avoid scaling the theta in the main function call.
"""
# Unscale theta
theta = self.theta_scaler.inverse_transform(_theta).flatten()
# Evaluate function
y = self.true_log_likelihood(theta)
# Scale y - ensure y is a numpy array
y = np.asarray(y).reshape(-1, 1)
_y = self.y_scaler.transform(y).flatten()
return _y
[docs]
def theta(self):
"""
Return unscaled training theta values
"""
return self.theta_scaler.inverse_transform(self._theta)
[docs]
def y(self):
"""
Return unscaled training y values
"""
return self.y_scaler.inverse_transform(self._y.reshape(-1, 1)).flatten()
[docs]
def refit_scalers(self, theta, y, theta_scaler=None, y_scaler=None):
"""
Refit the theta and y scalers using current training data.
Useful if training data has changed significantly.
"""
if theta_scaler is not None:
self.theta_scaler = theta_scaler
if y_scaler is not None:
self.y_scaler = y_scaler
self.theta_scaler.fit(self.bounds.T)
_theta = self.theta_scaler.transform(theta)
y_2d = y.reshape(-1, 1)
_y = self.y_scaler.fit_transform(y_2d).flatten()
if np.any(np.isnan(_theta)):
raise ValueError("Refitted theta_scaler produced NaN values!")
if np.any(np.isinf(_theta)):
raise ValueError("Refitted theta_scaler produced Inf values!")
if np.any(np.isnan(_y)):
raise ValueError("Refitted y_scaler produced NaN values!")
if np.any(np.isinf(_y)):
raise ValueError("Refitted y_scaler produced Inf values!")
return _theta, _y
[docs]
def init_train(self, nsample=None, sampler="lhs", fname="initial_training_sample.npz"):
"""
:param nsample: (*int, optional*)
Number of samples. Defaults to ``nsample = 50 * self.ndim``
:param sampler: (*str, optional*)
Sampling method. Defaults to ``'lhs'``.
See ``utility.prior_sampler`` for more details.
"""
if nsample is None:
nsample = 50 * self.ndim
# note: initial samples should be drawn uniformly in scaled space
# if theta_scaler is a non-linear transform, then samples in real space will be non-uniform
# _theta = self._prior_sampler(nsample=nsample, sampler=sampler, random_state=self.random_state)
# theta = self.theta_scaler.inverse_transform(_theta)
theta = self.prior_sampler(nsample=nsample, sampler=sampler, random_state=self.random_state)
# create pool for parallel evaluation of likelihood function
pool = self._get_pool(ncore=self.ncore)
# evaluate initial samples in parallel or sequential
if self.ncore > 1:
results = list(tqdm.tqdm(
pool.imap(self.true_log_likelihood, theta),
total=len(theta)
))
y = np.array(results).reshape(-1, 1)
else:
y = np.array([self.true_log_likelihood(tt) for tt in theta]).reshape(-1,1)
# close init_train pool
self._close_pool(pool)
# replace any nan or inf values
for ii in range(len(y)):
if np.isnan(y[ii]) or np.isinf(y[ii]):
ynan = True
while ynan == True:
# resample theta
new_theta = self.prior_sampler(nsample=1, sampler="uniform", random_state=self.random_state)
y[ii] = self.true_log_likelihood(new_theta).reshape(-1, 1)
theta[ii] = new_theta
if not (np.isnan(y[ii]) or np.isinf(y[ii])):
ynan = False
if self.cache:
np.savez(f"{self.savedir}/{fname}", theta=theta, y=y)
return theta, y
[docs]
def load_train(self, cache_file):
"""
Reload training samples from cache file and apply scalers.
:param cache_file: (*str, required*)
Name of cache file relative to savedir. Must be a .npz file containing 'theta' and 'y' arrays.
:returns: (*tuple*)
Scaled training samples (_theta, _y) after loading from cache.
"""
sims = np.load(cache_file)
theta = sims["theta"]
y = sims["y"]
if self.ndim != theta.shape[1]:
raise ValueError(f"Dimension of bounds (n={self.ndim}) does not \
match dimension of training theta (n={theta.shape[1]})")
return theta, y
[docs]
def init_samples(self, ntrain=100, ntest=0, sampler="uniform", train_file=None, test_file=None):
"""
Initialize training and test samples for the surrogate model.
Creates initial dataset by either loading cached samples or computing new ones
by evaluating the likelihood function at randomly sampled parameter values.
:param ntrain: (*int, optional, default=100*)
Number of training samples to generate. Used only if not loading cached samples.
:param ntest: (*int, optional, default=0*)
Number of test samples to generate. Currently unused.
:param sampler: (*str, optional, default="uniform"*)
Sampling method for generating parameter values. Options:
- "uniform": Uniform sampling within bounds (default)
- "sobol": Low-discrepancy Sobol sequence sampling
- "lhs": Latin hypercube sampling
:param train_file: (*str, optional, default="initial_training_sample.npz"*)
Filename for cached training samples relative to savedir.
Format: .npz file containing 'theta' and 'y' arrays.
:param test_file: (*str, optional, default="initial_test_sample.npz"*)
Filename for cached test samples relative to savedir. Currently unused.
"""
# Load or create training sample
if train_file is not None:
# check if file path exists
if os.path.exists(train_file):
cache_file = train_file
# if not, check if it exists in the savedir
elif os.path.exists(f"{self.savedir}/{train_file}"):
cache_file = f"{self.savedir}/{train_file}"
try:
theta, y = self.load_train(cache_file)
print(f"Loaded {len(theta)} train samples from {cache_file}.")
except Exception as e:
print(f"Unable to reload {cache_file} due to error: {e}. Computing new samples with {self.ncore} cores...")
theta, y = self.init_train(nsample=ntrain, sampler=sampler, fname=train_file)
else:
train_file = "initial_train_file_sample.npz"
theta, y = self.init_train(nsample=ntrain, sampler=sampler, fname=train_file)
# --------------------------------------------------------
# Load or create test sample
if ntest > 0:
if test_file is not None:
# check if file path exists
if os.path.exists(test_file):
cache_file = test_file
# if not, check if it exists in the savedir
elif os.path.exists(f"{self.savedir}/{test_file}"):
cache_file = f"{self.savedir}/{test_file}"
try:
theta_test, y_test = self.load_train(cache_file)
print(f"Loaded {len(theta_test)} test samples from {cache_file}.")
except Exception as e:
print(f"Unable to reload {cache_file} due to error: {e}. Computing new samples with {self.ncore} cores...")
theta_test, y_test = self.init_train(nsample=ntest, sampler=sampler, fname=test_file)
else:
test_file="initial_test_sample.npz"
theta_test, y_test = self.init_train(nsample=ntest, sampler=sampler, fname=test_file)
# Save test dataset
self.theta_test = theta_test
self.y_test = y_test
self.ntest = len(theta_test)
else:
self.theta_test = []
self.y_test = []
self.ntest = 0
# Save initial training sample
self.theta_train = theta
self.y_train = y
# record number of training samples
self.ninit_train = len(theta)
self.ntrain = self.ninit_train
self.nactive = 0
[docs]
def set_hyperparam_prior_bounds(self):
"""
Configure prior bounds for GP hyperparameters based on current training data.
By default ranges for parameters:
- mean: [mean(y) - std(y), mean(y) + std(y)]
- amplitude: [0.1, 10]
- white noise: [white_noise - 3, white_noise + 3]
"""
# Configure GP hyperparameter prior
# hp_bounds = self.gp.get_parameter_bounds()
# pnames = self.gp.get_parameter_names(include_frozen=False)
if self.uniform_scales == True:
pnames = self.param_names_optimized
else:
pnames = self.param_names_full
hp_bounds = [[None, None] for _ in pnames]
if self.fit_mean:
mean_y, std_y = np.mean(self._y), np.std(self._y)
mean_bounds = [mean_y - std_y, mean_y + std_y]
hp_bounds[pnames.index("mean:value")] = mean_bounds
if self.fit_amp:
# log_constant is stored in natural-log space, so bounds must also be in log space.
hp_bounds[pnames.index(f"{self.kernel_amp_key}:log_constant")] = self.ln_gp_amp_rng
if self.fit_white_noise:
wn_bounds = [self.white_noise - 3, self.white_noise + 3]
hp_bounds[pnames.index("white_noise:value")] = wn_bounds
if self.uniform_scales == True:
hp_bounds[pnames.index(f"{self.kernel_scale_key}:metric:log_M")] = self.ln_gp_scale_rng
else:
for ii in range(self.ndim):
hp_bounds[pnames.index(f"{self.kernel_scale_key}:metric:log_M_{ii}_{ii}")] = self.ln_gp_scale_rng
self.hp_bounds = np.array(hp_bounds)
self.gp_hyper_prior = partial(ut.lnprior_uniform, bounds=self.hp_bounds)
[docs]
def expand_hyperparameter_vector(self, optimized_params):
if optimized_params is None:
raise ValueError("optimized_params cannot be None")
if self.uniform_scales == False:
return optimized_params
full_params = np.ones(len(self.param_names_full))
if self.fit_mean:
full_params[self.param_names_full.index("mean:value")] = optimized_params[self.param_names_optimized.index("mean:value")]
if self.fit_amp:
full_params[self.param_names_full.index(f"{self.kernel_amp_key}:log_constant")] = optimized_params[self.param_names_optimized.index(f"{self.kernel_amp_key}:log_constant")]
if self.fit_white_noise:
full_params[self.param_names_full.index("white_noise:value")] = optimized_params[self.param_names_optimized.index("white_noise:value")]
# if self.uniform_scales == True: use same scale length for all dimensions
for ii in range(self.ndim):
full_params[self.param_names_full.index(f"{self.kernel_scale_key}:metric:log_M_{ii}_{ii}")] = optimized_params[self.param_names_optimized.index(f"{self.kernel_scale_key}:metric:log_M")]
return np.array(full_params)
[docs]
def set_hyperparameter_vector(self, tmp_gp, optimized_params):
if optimized_params is None:
raise ValueError("optimized_params cannot be None. Cannot set hyperparameters.")
if self.uniform_scales == True:
full_params = self.expand_hyperparameter_vector(optimized_params)
else:
full_params = optimized_params
tmp_gp.set_parameter_vector(full_params)
return tmp_gp
[docs]
def get_hyperparameter_dict(self, gp):
"""
Get current GP hyperparameters as a dictionary.
:returns: (*dict*)
Dictionary of current GP hyperparameters with names as keys and values as values.
"""
hp_dict = gp.get_parameter_dict()
if self.uniform_scales == True:
hp_dict[f"{self.kernel_scale_key}:metric:log_M"] = hp_dict.pop(f"{self.kernel_scale_key}:metric:log_M_0_0")
for ii in range(1, self.ndim):
del hp_dict[f"{self.kernel_scale_key}:metric:log_M_{ii}_{ii}"]
return hp_dict
[docs]
def get_hyperparameter_vector(self, gp):
hp_dict = self.get_hyperparameter_dict(gp)
hp_vector = np.fromiter(hp_dict.values(), dtype=float)
return hp_vector
[docs]
def init_gp(self,
kernel="ExpSquaredKernel",
fit_amp=True,
fit_mean=True,
fit_white_noise=True,
white_noise=-12,
gp_scale_rng=[-2,2],
gp_amp_rng=[-1,1],
uniform_scales=False,
overwrite=False,
theta_scaler=ut.no_scaler,
y_scaler=ut.no_scaler,
gp_opt_method="l-bfgs-b",
gp_nopt=3,
optimizer_kwargs={"maxiter": 100, "xatol": 1e-4, "fatol": 1e-3, "adaptive": True},
hyperopt_method="cv",
regularize=True,
amp_0=1.0,
mu_0=1.0,
sigma_0=2.0,
cv_folds=5,
cv_scoring="mse",
cv_n_candidates=100,
cv_stage2_candidates=50,
cv_stage2_width=0.5,
cv_stage3_candidates=25,
cv_stage3_width=0.25,
cv_weighted_factor=1.0,
multi_proc=True):
"""
Initialize the Gaussian Process surrogate model with specified kernel and hyperparameters.
This function sets up a Gaussian Process (GP) using the george library with the specified
kernel type and configuration. The GP is initialized with random scale lengths and then
fitted to the current training data.
:param kernel: (*str or george kernel object, optional*)
Kernel type for the Gaussian Process. Can be either a string specifying one of the
built-in kernels or a george kernel object. Default is "ExpSquaredKernel".
Built-in options:
- ``'ExpSquaredKernel'``: Squared exponential (RBF) kernel, smooth functions
- ``'Matern32Kernel'``: Matérn kernel with ν=3/2, moderately smooth functions
- ``'Matern52Kernel'``: Matérn kernel with ν=5/2, smooth functions
- ``'RationalQuadraticKernel'``: Rational quadratic kernel, scale mixture of RBF kernels
See https://george.readthedocs.io/en/latest/user/kernels/ for more details.
:param fit_amp: (*bool, optional*)
Whether to optimize the amplitude (overall scale) hyperparameter of the kernel.
If True, the GP will learn the optimal amplitude from data. Default is True.
:param fit_mean: (*bool, optional*)
Whether to optimize the mean function hyperparameter. If True, the GP will learn
a constant mean offset. If False, assumes zero mean. Default is True.
:param fit_white_noise: (*bool, optional*)
Whether to optimize the white noise (nugget) hyperparameter. If True, the GP will
learn the optimal noise level. If False, uses the fixed value from white_noise.
Default is True.
:param white_noise: (*float, optional*)
Log-scale white noise parameter. If fit_white_noise=False, this fixed value is used.
If fit_white_noise=True, this serves as the initial guess. Typical values are
between -15 (very low noise) and -5 (high noise). Default is -12.
:param gp_scale_rng: (*list of two floats, optional*)
Log-scale bounds for the characteristic length scale parameters of the kernel.
Format: [log_min_scale, log_max_scale]. These bounds apply to all input dimensions.
Default is [-2, 2], corresponding to scales between ~0.14 and ~7.4 in original units.
:param uniform_scales: (*bool, optional*)
If True, the same scale length will be used for all input dimensions. If False, each dimension
will have its own independent scale length. Default is False.
:param overwrite: (*bool, optional*)
If True, allows reinitializing the GP even if one already exists. If False and a GP
already exists, raises an AssertionError. Default is False.
:param theta_scaler: (*sklearn transformer, optional, default=no_scaler*)
Scaler for input parameters. Applied to theta values before GP training.
Common options: MinMaxScaler() (scale to [0,1]) or StandardScaler()
:param y_scaler: (*sklearn transformer, optional, default=no_scaler*)
Scaler for output values (log-likelihoods). Options include:
- no_scaler: No scaling (default)
- minmax_scaler: Scale to [0,1]
- nlog_scaler: Apply -log10(-y) transformation for negative log-likelihoods
- log_scaler: Apply log10(y) for positive values
:param gp_opt_method: (*str, optional*)
Optimization method for GP hyperparameter optimization. Passed to scipy.optimize.minimize.
Common options: 'l-bfgs-b', 'newton-cg', 'bfgs', 'cg'. Default is 'l-bfgs-b'.
:param gp_nopt: (*int, optional*)
Number of optimization restarts for GP hyperparameter optimization. Multiple restarts
help avoid local minima. Default is 3.
:param optimizer_kwargs: (*dict, optional*)
Additional keyword arguments passed to the scipy optimizer. Common options include
'maxiter' (maximum iterations) and convergence tolerances.
Default is {"maxiter": 50}.
:param hyperopt_method: (*str, optional, default='ml'*)
Method for optimizing GP hyperparameters:
- 'ml': Maximum marginal likelihood (fast, may overfit)
- 'cv': k-fold cross-validation (slower, prevents overfitting)
:param cv_folds: (*int, optional, default=5*)
Number of folds for cross-validation (only used if hyperopt_method='cv').
:param cv_scoring: (*str, optional, default='mse'*)
Scoring metric for cross-validation. Options: 'mse', 'mae', 'r2'.
:param cv_n_candidates: (*int, optional, default=20*)
Number of hyperparameter candidates to evaluate for CV.
:param cv_stage2_candidates: (*int, optional, default=20*)
Number of candidates for stage 2 grid search. Only used when cv_two_stage=True.
:param cv_stage2_width: (*float, optional, default=0.3*)
Width factor for stage 2 search around best parameters from stage 1.
Smaller values = tighter search. Only used when cv_two_stage=True.
:param cv_stage3_candidates: (*int, optional, default=None*)
Number of candidates for stage 3 ultra-fine search. If None, uses
max(cv_stage2_candidates // 2, 3). Only used when cv_three_stage=True.
:param cv_stage3_width: (*float, optional, default=0.2*)
Width factor for stage 3 search around best parameters from stage 2.
Should be smaller than cv_stage2_width for finer refinement.
Only used when cv_three_stage=True.
:raises AssertionError:
If a GP already exists and overwrite=False.
:raises ValueError:
If an invalid kernel name is provided.
:raises Exception:
If GP initialization fails after multiple attempts with different scale lengths.
.. note::
This function must be called after init_samples() since it requires training data
to initialize the GP. The function will automatically retry initialization with
different random scale lengths if the initial attempt fails.
.. code-block:: python
>>> # Basic initialization with default settings
>>> sm.init_gp()
>>> # Custom kernel with specific hyperparameter settings
>>> sm.init_gp(kernel="Matern52Kernel",
... fit_white_noise=False,
... white_noise=-10,
... gp_scale_rng=[-1, 1])
>>> # High-precision optimization
>>> sm.init_gp(gp_opt_method="l-bfgs-b",
... gp_nopt=5,
... optimizer_kwargs={"maxiter": 100, "ftol": 1e-9})
"""
if hasattr(self, 'gp') and (overwrite == False):
raise AssertionError(
"GP kernel already assigned. Use overwrite=True to re-assign the kernel.")
if not hasattr(self, "theta_train") or not hasattr(self, "y_train"):
raise AssertionError(
"Training data not found. Call init_samples() before init_gp().")
# optional hyperparameter choices
self.fit_amp = fit_amp
self.fit_mean = fit_mean
self.fit_white_noise = fit_white_noise
self.white_noise = white_noise
self.uniform_scales = uniform_scales
# GP hyperparameter optimization method
self.gp_opt_method = gp_opt_method
# GP hyperparameter number of opt restarts
self.gp_nopt = gp_nopt
# Save all kwargs needed for opt_gp function
self.opt_gp_kwargs = {"hyperopt_method": hyperopt_method,
"regularize": regularize,
"amp_0": amp_0,
"mu_0": mu_0,
"sigma_0": sigma_0,
"optimizer_kwargs": optimizer_kwargs,
"cv_folds": cv_folds,
"cv_scoring": cv_scoring,
"cv_n_candidates": cv_n_candidates,
"cv_stage2_candidates": cv_stage2_candidates,
"cv_stage2_width": cv_stage2_width,
"cv_stage3_candidates": cv_stage3_candidates,
"cv_stage3_width": cv_stage3_width,
"cv_weighted_factor": cv_weighted_factor,
"multi_proc": multi_proc,
}
# -------------------------------------------------------------------------
# define scaling functions
# Scale inputs between 0 and 1
self.theta_scaler = theta_scaler
self.theta_scaler.fit(self.bounds.T)
# Scale bounds to [0, 1] for training
self._bounds = self.theta_scaler.transform(self.bounds.T).T
self._prior_sampler = partial(ut.prior_sampler, bounds=self._bounds, sampler="uniform", random_state=self.random_state)
# Output scaling function
self.y_scaler = y_scaler
# Fit scalers
self._theta, self._y = self.refit_scalers(self.theta_train, self.y_train)
# Save scaled training data for GP fitting
if self.ntest > 0:
self._theta_test = self.theta_scaler.transform(self.theta_test)
self._y_test = self.y_scaler.transform(np.array(self.y_test).reshape(-1, 1)).flatten()
self._theta_train = self._theta
self._y_train = self._y
self.training_results = {"iteration" : [],
"gp_hyperparameters" : [],
"gp_hyperparameter_opt_iteration" : [],
"gp_hyperparam_opt_time" : [],
"training_mse" : [],
"test_mse" : [],
"training_scaled_mse" : [],
"test_scaled_mse" : [],
"gp_kl_divergence" : [],
"gp_train_time" : [],
"obj_fn_opt_time" : [],
"acquisition_optimizer_niter" : []
}
# -------------------------------------------------------------------------
# set the bounds for scale length parameters
self.log_gp_scale_rng = np.array(gp_scale_rng, dtype=float)
self.log_gp_amp_rng = np.array(gp_amp_rng, dtype=float)
# input bounds are in base 10 log space, but george stores log_constant in natural log space, so we need to convert the bounds
self.ln_gp_scale_rng = np.log(10**self.log_gp_scale_rng)
self.ln_gp_amp_rng = np.log(10**self.log_gp_amp_rng)
# metric_bounds expects log-scale bounds
log_metric_bounds = [(min(self.ln_gp_scale_rng), max(self.ln_gp_scale_rng)) for _ in range(self.ndim)]
metric_bounds = [(np.e**min(self.ln_gp_scale_rng), np.e**max(self.ln_gp_scale_rng)) for _ in range(self.ndim)]
valid_scales = False
max_attempts = 10 # Prevent infinite loops
attempt = 0
while valid_scales == False and attempt < max_attempts:
attempt += 1
# Generate initial scale length in linear scale (metric parameter expects linear scale)
# ln_gp_scale_rng is in log scale, so convert to linear scale for initial guess
log_initial_lscale = np.random.uniform(min(self.ln_gp_scale_rng), max(self.ln_gp_scale_rng), self.ndim)
initial_lscale = np.exp(log_initial_lscale)
# Note: metric is linear scale, but metric_bounds are log scale!
# https://github.com/dfm/george/issues/150
# Initialize GP kernel
try:
# Stationary kernels
if kernel == "ExpSquaredKernel":
# Guess initial metric, or scale length of the covariances (must be > 0)
self.kernel = kernels.ExpSquaredKernel(metric=initial_lscale, metric_bounds=log_metric_bounds, ndim=self.ndim)
self.kernel_name = kernel
print("Initialized GP with squared exponential kernel.")
elif kernel == "RationalQuadraticKernel":
self.kernel = kernels.RationalQuadraticKernel(log_alpha=1, metric=initial_lscale, metric_bounds=log_metric_bounds, ndim=self.ndim)
self.kernel_name = kernel
print("Initialized GP with rational quadratic kernel.")
elif kernel == "Matern32Kernel":
self.kernel = kernels.Matern32Kernel(metric=initial_lscale, metric_bounds=log_metric_bounds, ndim=self.ndim)
self.kernel_name = kernel
print("Initialized GP with Matérn-3/2 kernel.")
elif kernel == "Matern52Kernel":
self.kernel = kernels.Matern52Kernel(metric=initial_lscale, metric_bounds=log_metric_bounds, ndim=self.ndim)
self.kernel_name = kernel
print("Initialized GP with Matérn-5/2 kernel.")
else:
raise ValueError(f"Kernel '{kernel}' is not a valid option. Valid options: ExpSquaredKernel, Matern32Kernel, Matern52Kernel, RationalQuadraticKernel")
# create GP first time
self.gp = gp_utils.configure_gp(self._theta, self._y, self.kernel,
fit_amp=self.fit_amp,
fit_mean=self.fit_mean,
fit_white_noise=self.fit_white_noise,
white_noise=self.white_noise)
if self.gp is None:
print(f"Warning: configure_gp returned None.")
print(f"Data shape: theta={self._theta.shape}, y={self._y.shape}")
print(f"Scale lengths: {initial_lscale} (log: {log_initial_lscale})")
print(f"Scale bounds: {metric_bounds} (log: {log_metric_bounds})")
print(f"Retrying with new initial scale length...\n")
continue
else:
valid_scales = True
print(f"Successfully initialized GP on attempt {attempt}")
except Exception as e:
print(f"Exception during GP initialization on attempt {attempt}:")
print(f" Kernel: {kernel}")
print(f" Initial scales: {initial_lscale}")
print(f" Scale bounds: {log_metric_bounds}")
print(f" Data shape: theta={self._theta.shape}, y={self._y.shape}")
print(f" Exception: {type(e).__name__}: {e}")
print("Retrying with new initial scale length...\n")
continue
if not valid_scales:
raise RuntimeError(f"Failed to initialize GP after {max_attempts} attempts. "
f"Check your data, kernel choice, and scale bounds. "
f"Current settings: kernel={kernel}, log_gp_scale_rng={self.log_gp_scale_rng}")
self.param_names_full = self.gp.get_parameter_names(include_frozen=False)
self.param_names_optimized = []
self.kernel_scale_key = list(filter(lambda x: "metric:log_M" in x, self.param_names_full))[0].split(":metric:log_M")[0]
if fit_mean:
self.param_names_optimized.append("mean:value")
if fit_amp:
self.kernel_amp_key = list(filter(lambda x: "log_constant" in x, self.param_names_full))[0].split(":log_constant")[0]
self.param_names_optimized.append(f"{self.kernel_amp_key}:log_constant")
if fit_white_noise:
self.param_names_optimized.append("white_noise:value")
if self.uniform_scales:
self.param_names_optimized.append(f"{self.kernel_scale_key}:metric:log_M")
else:
for ii in range(self.ndim):
self.param_names_optimized.append(f"{self.kernel_scale_key}:metric:log_M_{ii}_{ii}")
# Infer lengthscale indices (used if regularization is enabled)
self.hp_length_indices = []
self.hp_other_indices = []
for ii, name in enumerate(self.param_names_full):
# Common patterns for lengthscale parameters in george kernels
if any(pattern in name.lower() for pattern in ["metric:log_m"]):
self.hp_length_indices.append(ii)
else:
self.hp_other_indices.append(ii)
if self.uniform_scales:
# Only one lengthscale parameter when uniform scales are used
self.hp_length_index = [self.param_names_optimized.index(f"{self.kernel_scale_key}:metric:log_M")]
# record initial hyperparameters
self.initial_gp_hyperparameters = self.get_hyperparameter_vector(self.gp)
# Optimize GP hyperparameters
self.gp, _ = self._opt_gp(**self.opt_gp_kwargs)
if hasattr(self, "_theta_test") & hasattr(self, "_y_test"):
_ytest = self.gp.predict(self._y, self._theta_test, return_cov=False, return_var=False)
ytest = self.y_scaler.inverse_transform(_ytest.reshape(-1, 1)).flatten()
ytest_true = self.y_scaler.inverse_transform(self._y_test.reshape(-1, 1)).flatten()
test_mse = np.mean((ytest_true - ytest)**2)
return test_mse
else:
return None
def _fit_gp(self, _theta=None, _y=None, hyperparameters=None):
"""
Fit Gaussian Process to training data with current hyperparameters.
:param _theta: (*array, optional*)
Scaled training parameter samples. If None, uses ``self._theta``.
:param _y: (*array, optional*)
Scaled training output values. If None, uses ``self._y``.
:returns: (*tuple*)
- gp: Fitted george.GP object
- timing: Time taken to fit the GP in seconds
"""
if _theta is None:
_theta = self._theta
if _y is None:
_y = self._y
t0 = time.time()
self.set_hyperparam_prior_bounds()
# Validate input data
if not np.all(np.isfinite(_theta)):
raise ValueError(f"_theta contains NaN or Inf values")
if not np.all(np.isfinite(_y)):
raise ValueError(f"_y contains NaN or Inf values: {_y[~np.isfinite(_y)]}")
y_median = np.median(_y)
y_var = np.var(_y)
if not np.isfinite(y_median):
raise ValueError(f"median(_y) is not finite: {y_median}")
if not np.isfinite(y_var) or y_var == 0:
raise ValueError(f"var(_y) is not finite or zero: {y_var}")
if self.fit_amp == True:
kernel = self.kernel * y_var
else:
kernel = self.kernel
gp = george.GP(kernel=kernel, fit_mean=self.fit_mean, mean=y_median,
white_noise=self.white_noise, fit_white_noise=self.fit_white_noise)
# Check if hyperparameters contain NaN or Inf
if hyperparameters is not None:
hyperparameters_array = np.atleast_1d(hyperparameters)
if not np.all(np.isfinite(hyperparameters_array)):
if self.show_warnings:
print(f"Warning: Hyperparameters contain NaN or Inf: {hyperparameters_array}")
print("Reoptimizing hyperparameters from scratch...")
gp, _ = self._opt_gp(**self.opt_gp_kwargs, _theta=_theta, _y=_y)
# Validate the reoptimized GP
reopt_params = gp.get_parameter_vector()
if not np.all(np.isfinite(reopt_params)):
raise ValueError(f"Reoptimized GP still has invalid parameters: {reopt_params}")
return gp, time.time() - t0
gp = self.set_hyperparameter_vector(gp, hyperparameters)
gp.compute(_theta)
return gp, time.time() - t0
def _opt_gp(self, hyperopt_method="ml", regularize=True, amp_0=1.0, mu_0=1.0, sigma_0=2.0,
optimizer_kwargs={"maxiter": 100, "xatol": 1e-4, "fatol": 1e-3, "adaptive": True},
cv_folds=5, cv_scoring="mse", cv_n_candidates=20, multi_proc=True,
cv_stage2_candidates=None, cv_stage2_width=0.5, cv_stage3_candidates=None, cv_stage3_width=0.2,
cv_weighted_mse_method="exponential", cv_weighted_factor=1.0, _theta=None, _y=None,
theta_scaler=None, y_scaler=None):
"""
Optimize GP hyperparameters using marginal likelihood or cross-validation.
:param method: (*str, optional, default="ml"*)
Optimization method to use:
- "ml": Maximum marginal likelihood (default, fast)
- "cv": k-fold cross-validation (slower but prevents overfitting)
:param cv_folds: (*int, optional, default=5*)
Number of folds for cross-validation (only used if method="cv")
:param cv_scoring: (*str, optional, default='mse'*)
Scoring metric for cross-validation. Options: 'mse', 'mae', 'r2'
:param cv_n_candidates: (*int, optional, default=20*)
Number of hyperparameter candidates to evaluate for CV
:param multi_proc: (*bool, optional, default=True*)
Whether to use multiprocessing for CV evaluation.
Only used when method="cv".
:param cv_two_stage: (*bool, optional, default=False*)
Whether to use two-stage CV optimization (explore-exploit strategy).
Only used when method="cv".
:param cv_stage2_candidates: (*int, optional, default=None*)
Number of candidates for stage 2 grid search. If None, uses
cv_n_candidates // 2.
:param cv_stage2_width: (*float, optional, default=0.5*)
Width factor for stage 2 search around best parameters.
Smaller values = tighter search around best from stage 1.
:param cv_three_stage: (*bool, optional, default=False*)
Whether to use three-stage CV optimization (explore-exploit-refine).
Requires cv_two_stage=True. Only used when method="cv".
:param cv_stage3_candidates: (*int, optional, default=None*)
Number of candidates for stage 3 ultra-fine search. If None, uses
max(cv_stage2_candidates // 2, 3). Only used when cv_three_stage=True.
:param cv_stage3_width: (*float, optional, default=0.2*)
Width factor for stage 3 search around stage 2 best parameters.
Should be smaller than cv_stage2_width for finer refinement.
:returns: (*tuple*)
- op_gp: Optimized george.GP object with updated hyperparameters
- timing: Time taken for optimization in seconds
"""
t0 = time.time()
# Use provided _theta and _y, or fall back to self._theta and self._y
if _theta is None:
_theta = self._theta
if _y is None:
_y = self._y
if hyperopt_method.lower() not in ["ml", "cv"]:
hyperopt_method = "ml"
print(f"Invalid method '{hyperopt_method}'. Must be 'ml' or 'cv'. Defaulting to 'ml'.")
if self.gp_opt_method in ["newton-cg", "l-bfgs-b"]:
use_gradient = True
self.set_hyperparam_prior_bounds()
if hyperopt_method.lower() == "cv":
# Cross-validation hyperparameter optimization
if self.verbose:
print(f"\nOptimizing GP hyperparameters using {cv_folds}-fold cross-validation...")
try:
candidates = ut.prior_sampler(bounds=self.hp_bounds, nsample=cv_n_candidates, sampler="lhs", random_state=self.random_state)
# Add current hyperparameters as a candidate if GP exists
if hasattr(self, "gp"):
candidates[0] = self.get_hyperparameter_vector(self.gp)
# Expand hyperparameters if using uniform scales
# CV function expects full parameter vectors that can be set directly on GP
if self.uniform_scales:
candidates_expanded = np.array([self.expand_hyperparameter_vector(c) for c in candidates])
else:
candidates_expanded = candidates
# suppress outputs if running parallel chains
if multi_proc:
verbose_cv = False # Suppress for parallel chains
else:
verbose_cv = True # Always show CV diagnostics
# Optimize using cross-validation
pool = self._get_pool(ncore=self.ncore) if multi_proc else None
op_gp = gp_utils.optimize_gp_kfold_cv(
self.gp, _theta, _y,
candidates_expanded,
self.y_scaler,
k_folds=cv_folds,
scoring=cv_scoring,
pool=pool,
stage2_candidates=cv_stage2_candidates,
stage2_width=cv_stage2_width,
stage3_candidates=cv_stage3_candidates,
stage3_width=cv_stage3_width,
weighted_mse_method=cv_weighted_mse_method,
weighted_mse_factor=cv_weighted_factor,
verbose=verbose_cv
)
self._close_pool(pool)
except Exception as e:
import traceback
import sys
exc_type, exc_value, exc_traceback = sys.exc_info()
tb_line = traceback.extract_tb(exc_traceback)[-1]
if self.show_warnings:
print(f"Warning: CV hyperparameter optimization failed: {str(e)}")
print(f"Error at line {tb_line.lineno} in {tb_line.filename}: {tb_line.line}")
print("Falling back to maximum likelihood optimization...")
# Fall back to ML optimization if CV fails
op_gp = None
hyperopt_method = "ml"
if hyperopt_method.lower() == "ml":
current_gp = self.gp
current_gp.compute(_theta)
# Define the objective function (negative log-likelihood in this case).
def nll(p_opt):
if self.uniform_scales:
p = self.expand_hyperparameter_vector(p_opt)
else:
p = p_opt
tmp_gp = self.set_hyperparameter_vector(current_gp, p)
ll = -tmp_gp.log_likelihood(_y, quiet=True)
if regularize:
reg = gp_utils.regularization_term(p, self.hp_length_indices, amp_0=amp_0, mu_0=mu_0, sigma_0=sigma_0)
ll += reg
return ll if np.isfinite(ll) else 1e25
# And the gradient of the objective function.
def grad_nll(p_opt):
if self.uniform_scales:
p = self.expand_hyperparameter_vector(p_opt)
else:
p = p_opt
tmp_gp = self.set_hyperparameter_vector(current_gp, p)
grad_lnlike = -tmp_gp.grad_log_likelihood(_y, quiet=True)
if self.uniform_scales:
gll = np.zeros(len(p_opt))
gll[self.hp_length_index] = np.mean(grad_lnlike[self.hp_length_indices])
gll[self.hp_other_indices] = grad_lnlike[self.hp_other_indices]
else:
gll = grad_lnlike
if regularize:
reg_grad = gp_utils.regularization_gradient(p, self.hp_length_indices, amp_0=amp_0, mu_0=mu_0, sigma_0=sigma_0)
if self.uniform_scales:
reg_grad_avg = np.mean(reg_grad[self.hp_length_indices])
gll[self.hp_length_index] += reg_grad_avg
else:
gll += reg_grad
return gll
if use_gradient:
jac = grad_nll
else:
jac = None
def _optimize_fn(x0):
return op.minimize(fun=nll, x0=x0, jac=jac, method=self.gp_opt_method, bounds=self.hp_bounds, options=optimizer_kwargs)
current_hp = self.get_hyperparameter_vector(current_gp)
if self.gp_nopt <= 1:
results = _optimize_fn(current_hp)
else:
p0 = ut.prior_sampler(bounds=self.hp_bounds, nsample=self.gp_nopt, sampler="lhs", random_state=self.random_state)
p0[0] = current_hp
if self.ncore <= 1:
# Sequential with progress bar
opt_results = [_optimize_fn(p) for p in tqdm.tqdm(p0, desc="Optimizing GP")]
else:
pool = self._get_pool(ncore=self.ncore)
# Parallel with progress bar using imap
opt_results = list(tqdm.tqdm(
pool.imap(_optimize_fn, p0),
total=len(p0),
desc=f"Optimizing GP ({self.ncore} cores)"
))
self._close_pool(pool)
def get_fun_value(res):
return res.fun
results = min(opt_results, key=get_fun_value)
op_gp = self.set_hyperparameter_vector(current_gp, results.x)
op_gp.compute(_theta)
if self.verbose:
nll_0 = nll(current_hp)
nll_fit = nll(results.x)
if regularize:
reg_0 = gp_utils.regularization_term(self.expand_hyperparameter_vector(current_hp), self.hp_length_indices, amp_0=amp_0, mu_0=mu_0, sigma_0=sigma_0)
reg_fit = gp_utils.regularization_term(self.expand_hyperparameter_vector(results.x), self.hp_length_indices, amp_0=amp_0, mu_0=mu_0, sigma_0=sigma_0)
print(f"Initial -logL: \t {nll_0:.4f} | \t Regularization: {reg_0:.4f} | \t Total: {nll_0 + reg_0:.4f}")
print(f"Final -logL: \t {nll_fit:.4f} | \t Regularization: {reg_fit:.4f} | \t Total: {nll_fit + reg_fit:.4f} ")
else:
print(f"Initial -logL: \t {nll_0:.4f} | \t Regularization: None")
print(f"Final -logL: \t {nll_fit:.4f} | \t Regularization: None ")
print(f"{results.nit} iterations | Success: {results.success} | Message: {results.message} \n")
# If op_gp is not set (CV failed or wasn't used), use current GP or initialize one
if 'op_gp' not in locals() or op_gp is None:
if hasattr(self, 'gp'):
op_gp = self.gp
else:
# Create a basic GP with default hyperparameters
if self.fit_amp:
kernel = self.kernel * np.var(_y)
else:
kernel = self.kernel
op_gp = george.GP(kernel=kernel, fit_mean=self.fit_mean, mean=np.median(_y),
white_noise=self.white_noise, fit_white_noise=self.fit_white_noise)
op_gp.compute(_theta)
tf = time.time()
timing = tf - t0
self.training_results["gp_hyperparam_opt_time"].append(timing)
return op_gp, timing
[docs]
def eval_gp_at_iteration(self, iter, return_var=False):
# gp_iter = self.gp
if (iter == 0) or (len(self.training_results["iteration"]) == 0):
_theta_cond = self._theta[:self.ninit_train]
_y_cond = self._y[:self.ninit_train]
try:
gp_iter = self.set_hyperparameter_vector(self.gp, self.training_results["gp_hyperparameters"][0])
except:
gp_iter = self.set_hyperparameter_vector(self.gp, self.initial_gp_hyperparameters)
# Handle iter=-1 case to use all data instead of excluding last element
elif (iter == -1) or (iter == len(self.training_results["iteration"])):
_theta_cond = self._theta
_y_cond = self._y
# Use the latest parameters (last in the list)
gp_iter = self.set_hyperparameter_vector(self.gp, self.training_results["gp_hyperparameters"][-1])
elif (iter > 0) and (iter < len(self.training_results["iteration"])):
_theta_cond = self._theta[:self.ninit_train + iter]
_y_cond = self._y[:self.ninit_train + iter]
last_hp = self.training_results["gp_hyperparameters"][iter]
gp_iter = self.set_hyperparameter_vector(self.gp, last_hp)
else:
raise ValueError(f"Iteration {iter} exceeds available training iterations ({self.training_results['iteration'][-1]}).")
gp_iter.compute(_theta_cond)
def gp_predict(x):
# Ensure x is properly shaped for george GP
x = np.atleast_2d(x)
if x.shape[0] == 1 and x.shape[1] != self.ndim:
# If we have a single point but wrong shape, transpose
if x.shape[1] == 1 and x.shape[0] == self.ndim:
x = x.T
elif len(x.flatten()) == self.ndim:
x = x.reshape(1, -1)
return gp_iter.predict(_y_cond, x, return_var=return_var, return_cov=False)
return gp_predict
[docs]
def surrogate_log_likelihood(self, theta_xs, iter=-1, return_var=False):
"""
Evaluate predictive mean of the GP at point(s) ``theta_xs``
This method is vectorized to handle both single parameter vectors and
arrays of parameter vectors efficiently.
:param theta_xs: Point(s) to evaluate GP mean at. Can be:
- 1D array of shape (ndim,) for single point
- 2D array of shape (npoints, ndim) for multiple points
:type theta_xs: *array-like*
:param iter: Iteration number of GP to use. If -1, uses most recent GP.
:type iter: *int, optional*
:param return_var: Whether to also return variance predictions.
:type return_var: *bool, optional*
:returns:
- **ypred** (*array*) -- GP mean(s) evaluated at ``theta_xs``. Shape matches input.
- **varpred** (*array, optional*) -- GP variance(s) if return_var=True.
:rtype: *array or tuple of arrays*
"""
# Convert input to numpy array and handle dimensionality
theta_xs = np.asarray(theta_xs)
original_shape_1d = False
# Handle 1D input (single point)
if theta_xs.ndim == 1:
theta_xs = theta_xs.reshape(1, -1)
original_shape_1d = True
elif theta_xs.ndim != 2:
raise ValueError(f"theta_xs must be 1D or 2D array, got {theta_xs.ndim}D")
# Apply scaling transformation
_theta_xs = self.theta_scaler.transform(theta_xs)
# Get GP at specified iteration
if hasattr(self, "training_results") and len(self.training_results["iteration"]) > 0:
gp_ii = self.eval_gp_at_iteration(iter, return_var=return_var)
else:
gp_ii = lambda x: self.gp.predict(self._y, x, return_var=return_var, return_cov=False)
# Apply the GP and handle return values
if return_var == False:
_ypred = gp_ii(_theta_xs)
ypred = self.y_scaler.inverse_transform(_ypred.reshape(-1, 1)).flatten()
# Return single value if input was 1D, otherwise return array
if original_shape_1d:
return ypred[0]
else:
return ypred
else:
_ypred, _varpred = gp_ii(_theta_xs)
ypred = self.y_scaler.inverse_transform(_ypred.reshape(-1, 1)).flatten()
varpred = self.y_scaler.inverse_transform(_varpred.reshape(-1, 1)).flatten()
# Return single values if input was 1D, otherwise return arrays
if original_shape_1d:
return ypred[0], varpred[0]
else:
return ypred, varpred
[docs]
def surrogate_likelihood(self, theta_xs):
"""
Evaluate predictive probability (not log-probability) of the GP at point(s) theta_xs
This method is vectorized to handle both single parameter vectors and
arrays of parameter vectors efficiently.
:param theta_xs: Point(s) to evaluate GP probability at. Can be:
- 1D array of shape (ndim,) for single point
- 2D array of shape (npoints, ndim) for multiple points
:type theta_xs: *array-like*
:returns: GP probability/probabilities evaluated at ``theta_xs``. Shape matches input.
:rtype: *float or array*
"""
# Get log-probability from GP (already vectorized)
log_prob = self.surrogate_log_likelihood(theta_xs)
# Convert to probability (works element-wise for arrays)
prob = np.exp(log_prob)
return prob
[docs]
def create_cached_surrogate_likelihood(self, iter=-1, return_var=False):
"""
Create a cached surrogate likelihood function that computes the GP once
and can be evaluated multiple times without recomputing the GP.
This is useful when you need to evaluate the surrogate likelihood at many
different points with the same GP configuration, as it avoids the expensive
GP computation (gp.compute()) on each call.
:param iter: Iteration number of GP to use. If -1, uses most recent GP.
:type iter: *int, optional*
:returns: A cached likelihood function that can be called with theta_xs
:rtype: *callable*
"""
# Determine training data and hyperparameters for the specified iteration
if hasattr(self, "training_results") and len(self.training_results["iteration"]) > 0:
# Handle iter=-1 case to use all data
if iter == -1:
_theta_cond = self._theta
_y_cond = self._y
hyperparams = self.training_results["gp_hyperparameters"][-1]
else:
_theta_cond = self._theta[:self.ninit_train + iter]
_y_cond = self._y[:self.ninit_train + iter]
hyperparams = self.training_results["gp_hyperparameters"][-1]
else:
_theta_cond = self._theta
_y_cond = self._y
hyperparams = self.gp.get_parameter_vector()
# Create a fresh GP instance with the same kernel
gp_iter = gp_utils.configure_gp(_theta_cond, _y_cond, self.kernel,
fit_amp=self.fit_amp,
fit_mean=self.fit_mean,
fit_white_noise=self.fit_white_noise,
white_noise=self.white_noise,
hyperparameters=hyperparams)
# Compute the GP and save so that it doesn't need to be recomputed later
try:
gp_iter.compute(_theta_cond)
except Exception as e:
print(f"create_cached_surrogate_likelihood: Error computing GP at iteration {iter}: {e}")
raise
# Return a picklable cached surrogate likelihood object
return CachedSurrogateLikelihood(gp_iter, _y_cond, self.theta_scaler,
self.y_scaler, self.ndim, return_var=return_var)
[docs]
def find_max_surrogate(self, nopt=10, method="l-bfgs-b"):
"""
Find the parameter values that maximize the GP surrogate model prediction.
Uses multi-start numerical optimization on the negative surrogate log
likelihood to locate the global maximum of the surrogate surface.
:param nopt: Number of optimization restarts. More restarts reduce the
chance of converging to a local maximum. Default is 10.
:type nopt: *int, optional*
:param method: Scipy optimization method. Default is "l-bfgs-b".
:type method: *str, optional*
:returns:
- **theta_max** (*ndarray of shape (ndim,)*) -- Parameter values at the surrogate maximum (unscaled).
- **y_max** (*float*) -- Surrogate log likelihood value at the maximum.
:rtype: *tuple*
"""
def neg_surrogate(_theta_scaled):
theta = self.theta_scaler.inverse_transform(_theta_scaled.reshape(1, -1))
return -float(self.surrogate_log_likelihood(theta.flatten()))
starting_points = self._prior_sampler(nsample=nopt)
best_theta = None
best_val = np.inf
for x0 in starting_points:
try:
res = op.minimize(neg_surrogate, x0, method=method,
bounds=self._bounds,
options={"maxiter": 200})
if res.success and res.fun < best_val:
best_val = res.fun
best_theta = res.x
except Exception:
continue
if best_theta is None:
# fall back to best training point if all restarts failed
idx = np.argmax(self._y)
best_theta = self._theta[idx]
theta_max = self.theta_scaler.inverse_transform(best_theta.reshape(1, -1)).flatten()
y_max = float(self.surrogate_log_likelihood(theta_max))
return theta_max, y_max
[docs]
def find_next_point(self, nopt=3, optimizer_kwargs={}):
"""
Find next set of ``(theta, y)`` training points by maximizing the
active learning utility function.
:param nopt: (*int, optional*)
Number of times to restart the objective function optimization.
Defaults to 1. Increase to avoid converging to local minima.
:param optimizer_kwargs: (*dict, optional*)
Additional keyword arguments passed to scipy optimizer. Default is {}.
"""
opt_timing_0 = time.time()
# Safe GP prediction wrapper: if the kernel matrix is non-PD (e.g. after a bad
# hyperparameter optimisation step), return zero mean and large variance so that
# the acquisition function degrades gracefully to exploration-only behaviour
# instead of raising an unhandled LinAlgError.
def predict_gp(_theta_xs):
try:
return self.gp.predict(self._y, _theta_xs, return_var=True)
except Exception:
n = _theta_xs.shape[0] if hasattr(_theta_xs, 'shape') else 1
return np.zeros(n), np.ones(n) * 1e10
# Create objective function with appropriate parameters for different algorithms
if self.algorithm == "jones":
# Jones (Expected Improvement) requires y_best parameter
y_best = np.max(self._y) # Current best observed value
obj_fn = partial(self.utility, predict_gp=predict_gp, bounds=self._bounds, y_best=y_best)
else:
# Other algorithms (BAPE, AGP, etc.) don't need y_best
obj_fn = partial(self.utility, predict_gp=predict_gp, bounds=self._bounds)
# Get gradient function if available
grad_obj_fn = None
if hasattr(self, 'grad_utility') and self.grad_utility is not None:
if self.algorithm == "jones":
grad_obj_fn = partial(self.grad_utility, gp=self.gp, bounds=self._bounds, y_best=y_best)
else:
grad_obj_fn = partial(self.grad_utility, gp=self.gp, bounds=self._bounds)
# Always use serial execution for acquisition function optimization
_thetaN, _ = ut.minimize_objective(obj_fn,
bounds=self._bounds,
nopt=nopt,
ps=self._prior_sampler,
method=self.obj_opt_method,
options=optimizer_kwargs,
grad_obj_fn=grad_obj_fn,
pool=None,
show_warnings=self.show_warnings)
opt_timing = time.time() - opt_timing_0
# self.training_results["acquisition_optimizer_niter"].append(opt_result.nit)
# Validate optimization result
if not np.all(np.isfinite(_thetaN)):
if self.show_warnings:
current_iter = self.training_results["iteration"][-1] if len(self.training_results["iteration"]) > 0 else 0
print(f"Warning: Acquisition function optimization failed at iteration {current_iter}. Falling back to random sampling.")
# Fall back to random sampling from prior
_thetaN = self._prior_sampler(nsample=1).flatten()
thetaN = self.theta_scaler.inverse_transform(_thetaN.reshape(1, -1))
yN = np.atleast_1d(self.true_log_likelihood(thetaN.flatten()))
# Validate new training point
if not np.any(np.isfinite(thetaN.flatten())):
print(f"New theta contains NaN or Inf: {thetaN}")
return None, None, opt_timing
if not np.any(np.isfinite(yN.flatten())):
print(f"New y value is NaN or Inf: {yN}. Check your likelihood function at theta={thetaN}")
return None, None, opt_timing
# add theta and y to training samples
theta_prop = np.append(self.theta(), thetaN, axis=0)
y_prop = np.append(self.y(), yN)
# refit scaling functions including new point
_theta_prop, _y_prop = self.refit_scalers(theta_prop, y_prop)
if _theta_prop.shape[0] != _y_prop.shape[0]:
return None, None, opt_timing
if not np.all(np.isfinite(_theta_prop)):
print("_theta_prop contains NaN or Inf after scaling. Check training data and scalers.")
return None, None, opt_timing
if not np.all(np.isfinite(_y_prop)):
print("_y_prop contains NaN or Inf after scaling. Check training data and scalers.")
return None, None, opt_timing
return _theta_prop, _y_prop, opt_timing
[docs]
def active_train(self, niter=100, algorithm="bape", gp_opt_freq=20, save_progress=False,
obj_opt_method="l-bfgs-b", nopt=5, optimizer_kwargs={}, use_grad_opt=True,
show_progress=True, allow_opt_multiproc=True, max_attempts=10):
"""
Perform active learning to iteratively improve the surrogate model.
Uses acquisition functions to intelligently select new training points that
will most improve the Gaussian Process model. Different algorithms balance
exploration (uncertainty reduction) vs exploitation (finding optima).
:param niter: (*int, optional, default=100*)
Number of active learning iterations. Each iteration adds one new training point.
:param algorithm: (*str, optional, default="bape"*)
Active learning algorithm. Options:
- "bape": Bayesian Active Parameter Estimation (exploration-focused)
- "jones": Jones algorithm (exploitation-focused, good for optimization)
- "agp": Augmented Gaussian Process (balanced)
- "alternate": Alternates between exploration and exploitation
:param gp_opt_freq: (*int, optional, default=20*)
Frequency of GP hyperparameter re-optimization. GP hyperparameters are
re-optimized every gp_opt_freq iterations. Lower values = more optimization.
:param save_progress: (*bool, optional, default=False*)
Whether to save training progress data for later analysis.
:param obj_opt_method: (*str, optional, default="nelder-mead"*)
Optimization method for acquisition function. Options:
- "l-bfgs-b": L-BFGS-B (good with gradients)
- "nelder-mead": Nelder-Mead simplex (gradient-free)
:param nopt: (*int, optional, default=1*)
Number of optimization restarts for acquisition function. Higher values
help avoid local minima but increase computation time.
:param use_grad_opt: (*bool, optional, default=True*)
Whether to use gradient information if available. Set False for
gradient-free optimization.
:param optimizer_kwargs: (*dict, optional, default={}*)
Additional keyword arguments passed to the optimizer.
:param show_progress: (*bool, optional, default=True*)
Whether to display progress bar during training.
.. note::
Active learning algorithms have different purposes:
- **BAPE**: Best for uncertainty quantification and space-filling
- **Jones**: Best for finding likelihood maxima/minima (optimization)
- **Alternate**: Good balance for both exploration and exploitation
- **AGP**: Another balanced approach
The method automatically handles GP re-training and hyperparameter optimization
based on the specified frequency. Training data is accumulated in _theta and _y
attributes.
.. code-block:: python
Basic active learning with BAPE:
>>> sm.active_train(niter=50, algorithm="bape")
Optimization-focused active learning:
>>> sm.active_train(niter=30, algorithm="jones", gp_opt_freq=10)
Balanced approach with frequent GP optimization:
>>> sm.active_train(niter=40, algorithm="alternate", gp_opt_freq=5)
"""
# Set algorithm
self.algorithm = str(algorithm).lower()
self.utility, self.grad_utility = ut.assign_utility(self.algorithm)
if use_grad_opt == False:
self.grad_utility = None
# GP hyperparameter optimization frequency
self.gp_opt_freq = gp_opt_freq
# Objective function optimization method
self.obj_opt_method = obj_opt_method
if len(self.training_results["iteration"]) == 0:
first_iter = 0
else:
first_iter = self.training_results["iteration"][-1]
if self.verbose:
print(f"Running {niter} active learning iterations using {self.algorithm}...")
# Create iterator with or without progress bar based on show_progress parameter
iterator = tqdm.tqdm(range(1, niter+1)) if show_progress else range(1, niter+1)
for ii in iterator:
attempts = 0
success = False
while not success and attempts < max_attempts:
# Initialize to None so they are always defined after the loop
_theta_prop, _y_prop = None, None
try:
# Find next training point! (always single-threaded)
_theta_prop, _y_prop, opt_timing = self.find_next_point(nopt=nopt, optimizer_kwargs=optimizer_kwargs)
except Exception as e:
if self.show_warnings:
print(f"Warning: find_next_point raised exception at iteration {ii}: {e}. Retrying.")
if _theta_prop is None or _y_prop is None:
attempts += 1
else:
try:
# Fit GP with new training point
self.gp, fit_gp_timing = self._fit_gp(_theta=_theta_prop, _y=_y_prop, hyperparameters=self.gp.get_parameter_vector())
success = True
except Exception as e:
if self.show_warnings:
print(f"Warning: GP fit failed at iteration {ii}: {e}. Retrying with new point.")
attempts += 1
if attempts >= max_attempts:
raise RuntimeError(f"Failed to find a valid training point after {max_attempts} attempts. \
Check your likelihood function and training data for issues or increase max_attempts.")
# If proposed (theta, y) did not cause fitting issues, save to surrogate model obj
self._theta = _theta_prop
self._y = _y_prop
# Optimize GP?
if (ii + first_iter) % self.gp_opt_freq == 0:
reopt_kwargs = self.opt_gp_kwargs.copy()
reopt_kwargs["multi_proc"] = allow_opt_multiproc
# Save current params in case optimization leads to non-PD kernel
prev_params = self.gp.get_parameter_vector()
try:
self.gp, _ = self._opt_gp(**reopt_kwargs)
# Validate GP is still usable after optimization
self.gp.predict(self._y, self._theta, return_cov=False)
except Exception as e:
if self.show_warnings:
print(f"Warning: GP re-optimization at iteration {ii + first_iter} produced invalid GP: {e}. Reverting hyperparameters.")
self.gp.set_parameter_vector(prev_params)
# record which iteration hyperparameters were optimized
self.training_results["gp_hyperparameter_opt_iteration"].append(ii + first_iter)
if (save_progress == True) and (ii != 0):
self.save()
self.plot(plots=["gp_error", "gp_hyperparam"])
if self.ndim == 2:
self.plot(plots=["gp_fit_2D"])
else:
self.plot(plots=["gp_train_scatter"])
# evaluate gp training error (scaled)
try:
_ypred = self.gp.predict(_y_prop, _theta_prop, return_cov=False, return_var=False)
ypred = self.y_scaler.inverse_transform(_ypred.reshape(-1, 1)).flatten()
training_mse = np.mean((self.y() - ypred)**2)
training_scaled_mse = training_mse / np.var(self.y())
# if hyperparameters were reoptimized, report train error
if ((ii + first_iter) % self.gp_opt_freq == 0) & self.verbose:
print("Train MSE:", training_mse)
except Exception as e:
if self.show_warnings:
print(f"Warning: Error evaluating GP training error at iteration {ii + first_iter}: {e}")
training_mse = np.nan
training_scaled_mse = np.nan
# evaluate gp test error (scaled)
if hasattr(self, '_theta_test') and hasattr(self, '_y_test'):
try:
_ytest = self.gp.predict(self._y, self._theta_test, return_cov=False, return_var=False)
ytest = self.y_scaler.inverse_transform(_ytest.reshape(-1, 1)).flatten()
ytest_true = self.y_scaler.inverse_transform(self._y_test.reshape(-1, 1)).flatten()
test_mse = np.mean((ytest_true - ytest)**2)
test_scaled_mse = test_mse / np.var(self.y())
# if hyperparameters were reoptimized, report test error
if ((ii + first_iter) % self.gp_opt_freq == 0) & self.verbose:
print("Test MSE:", test_mse)
except Exception as e:
if self.show_warnings:
print(f"Warning: Error evaluating GP test error at iteration {ii + first_iter}: {e}")
test_mse = np.nan
test_scaled_mse = np.nan
else:
test_mse = np.nan
test_scaled_mse = np.nan
# evaluate convergence criteria
gp_kl_divergence = np.nan
# save results to a dictionary
self.training_results["iteration"].append(ii + first_iter)
self.training_results["gp_hyperparameters"].append(self.gp.get_parameter_vector())
self.training_results["training_mse"].append(training_mse)
self.training_results["test_mse"].append(test_mse)
self.training_results["training_scaled_mse"].append(training_scaled_mse)
self.training_results["test_scaled_mse"].append(test_scaled_mse)
self.training_results["gp_kl_divergence"].append(gp_kl_divergence)
self.training_results["gp_train_time"].append(fit_gp_timing)
self.training_results["obj_fn_opt_time"].append(opt_timing)
# record total number of training samples
self.ntrain = len(self._theta)
# number of active training samples
self.nactive = self.ntrain - self.ninit_train
if self.cache:
self.save()
[docs]
def active_train_parallel(self, niter=100, nchains=4, algorithm="bape", gp_opt_freq=20,
obj_opt_method="nelder-mead", nopt=1,
use_grad_opt=True, optimizer_kwargs={},
show_progress=True):
"""
Run multiple active learning chains in parallel.
:param niter: (*int, optional*)
Number of iterations per chain. Default 100.
:param nchains: (*int, optional*)
Number of parallel chains to run. Default 4.
:param algorithm: (*str, optional*)
Active learning algorithm. Default "bape".
:param gp_opt_freq: (*int, optional*)
Frequency of GP hyperparameter optimization. Default 20.
:param obj_opt_method: (*str, optional*)
Optimization method for acquisition function. Default "nelder-mead".
:param nopt: (*int, optional*)
Number of restarts for acquisition optimization. Default 1.
:param use_grad_opt: (*bool, optional*)
Whether to use gradient-based optimization. Default True.
:param optimizer_kwargs: (*dict, optional*)
Additional optimizer kwargs. Default {}.
:param show_progress: (*bool, optional*)
Whether to display progress bar during parallel chain execution. Default is True.
Notes
-----
This function uses multiprocessing.Pool instead of threading, which can provide
better performance for CPU-intensive tasks and avoids GIL limitations. However:
- All model data must be pickleable (which it should be for SurrogateModel)
- Each process runs in separate memory space (higher memory usage)
- Process startup overhead is higher than threading
- Better isolation between chains (one chain failure won't affect others)
- Can achieve true parallelism on multi-core systems
The function automatically respects the ncore limit and won't create more processes
than specified in self.ncore.
"""
# Set algorithm attribute to avoid save issues
self.algorithm = str(algorithm).lower()
# Initialize training results if not present
if not hasattr(self, 'training_results') or not self.training_results:
self.training_results = {"iteration" : [],
"gp_hyperparameters" : [],
"training_mse" : [],
"test_mse" : [],
"training_scaled_mse" : [],
"test_scaled_mse" : [],
"gp_kl_divergence" : [],
"gp_train_time" : [],
"obj_fn_opt_time" : [],
"gp_hyperparameter_opt_iteration" : [],
"gp_hyperparam_opt_time" : [],
"acquisition_optimizer_niter": []}
if self.verbose:
print(f"\nRunning {nchains} parallel active learning chains for {niter} iterations each...")
print(f"Algorithm: {algorithm}, Method: {obj_opt_method}")
print(f"Using multiprocessing with max {min(nchains, self.ncore)} processes")
# Store original training data
original_ntrain = self.ntrain
# Track results from all chains
all_new_theta = []
all_new_y = []
chain_results = []
# Determine number of processes to use (respect ncore limit)
max_processes = min(nchains, self.ncore)
use_multiprocessing = (nchains > 1) and (self.ncore > 1)
if use_multiprocessing:
if self.verbose:
print(f"Running with {max_processes} processes (limited by ncore={self.ncore})")
try:
if use_multiprocessing:
# Prepare arguments for each chain
chain_args = []
for i in range(nchains):
# Create a pickleable state dictionary for each chain
chain_state = self._get_pickleable_state()
chain_state['chain_id'] = i
chain_state['savedir'] = f"{self.savedir}/chain_{i}"
args = (
chain_state,
niter,
algorithm,
gp_opt_freq,
obj_opt_method,
nopt,
use_grad_opt,
optimizer_kwargs,
)
chain_args.append(args)
pool = self._get_pool(ncore=max_processes)
if show_progress:
results = []
with tqdm.tqdm(total=nchains, desc="Running parallel chains (MP)") as pbar:
for result in pool.imap(_run_chain_worker_mp, chain_args):
results.append(result)
pbar.update(1)
else:
# Use map for simpler execution
results = pool.map(_run_chain_worker_mp, chain_args)
self._close_pool(pool)
# Process results
for i, result in enumerate(results):
if result is not None and len(result) == 3:
new_theta, new_y, training_results = result
all_new_theta.append(new_theta)
all_new_y.append(new_y)
chain_results.append(training_results)
else:
if self.verbose:
print(f"Chain {i} failed or returned invalid result")
all_new_theta.append(np.array([]).reshape(0, self.ndim))
all_new_y.append(np.array([]))
chain_results.append({})
else:
# Fallback to sequential execution
if self.verbose:
print("Running chains sequentially...")
if show_progress:
sequential_progress = tqdm.tqdm(total=nchains, desc="Running chains sequentially")
for i in range(nchains):
if self.verbose:
print(f"Running chain {i+1}/{nchains}...")
result = self._run_chain_worker(i, niter=niter, algorithm=algorithm,
gp_opt_freq=gp_opt_freq, obj_opt_method=obj_opt_method,
nopt=nopt, use_grad_opt=use_grad_opt,
optimizer_kwargs=optimizer_kwargs, show_progress=False,
allow_opt_multiproc=False)
if result is not None:
new_theta, new_y, training_results = result
all_new_theta.append(new_theta)
all_new_y.append(new_y)
chain_results.append(training_results)
else:
if self.verbose:
print(f"Chain {i} failed")
all_new_theta.append(np.array([]).reshape(0, self.ndim))
all_new_y.append(np.array([]))
chain_results.append({})
# Update sequential progress bar
if show_progress:
sequential_progress.update(1)
# Close sequential progress bar
if show_progress:
sequential_progress.close()
except Exception as e:
import traceback
import sys
exc_type, exc_value, exc_traceback = sys.exc_info()
tb_line = traceback.extract_tb(exc_traceback)[-1]
print(f"Multiprocessing execution failed ({e}) line {tb_line}.")
raise e
# Combine all new training samples
if self.verbose:
print("\nCombining training samples from all chains...")
self._combine_chain_results(all_new_theta, all_new_y, chain_results)
if self.verbose:
total_new_samples = sum(len(theta) for theta in all_new_theta)
print(f"Successfully combined {total_new_samples} new training samples from {nchains} chains")
print(f"Total training samples: {self.ntrain} (was {original_ntrain})")
# Final GP optimization with all combined data
if self.verbose:
print("\nPerforming final GP optimization with combined dataset...")
self.gp, _ = self._opt_gp(**self.opt_gp_kwargs)
if self.cache:
self.save()
[docs]
def lnprob(self, theta):
"""
Log probability function used for ``emcee``, which sums the prior with the surrogate model likelihood
.. math::
\\ln P(\\theta | x) \\propto \\ln P(x | \\theta) + \\ln P(\\theta)
where \\ln P(x | \\theta) is the surrogate likelihood function and \\ln P(\\theta) is the prior function.
:param theta: (*array, required*)
Array of model input parameters to evaluate model probability at.
"""
if (self.like_fn_name == "surrogate") and (not hasattr(self, 'gp')):
raise NameError("GP has not been trained")
if not hasattr(self, 'prior_fn'):
raise NameError("prior_fn has not been specified")
if not hasattr(self, 'like_fn'):
self.like_fn = self.surrogate_log_likelihood
theta = np.asarray(theta).reshape(1,-1)
lnp = self.like_fn(theta) + self.prior_fn(theta)
return lnp
[docs]
def find_map(self, theta0=None, prior_fn=None, method="nelder-mead", nRestarts=15, options=None):
raise NotImplementedError("Not implemented.")
[docs]
def run_emcee(self, like_fn=None, prior_fn=None, nwalkers=None, nsteps=int(5e4), sampler_kwargs={}, run_kwargs={},
opt_init=False, multi_proc=True, prior_fn_comment=None, burn=None, thin=None, samples_file=None, min_ess=int(1e4)):
"""
Sample the posterior using the emcee affine-invariant MCMC algorithm.
This method uses the emcee package to perform Markov Chain Monte Carlo (MCMC)
sampling on either the trained GP surrogate model or the true likelihood function.
The affine-invariant ensemble sampler is robust and works well for a wide variety
of posterior shapes without requiring manual tuning of step sizes.
:param like_fn: (*callable, str, or None, optional*)
Likelihood function to sample. Options:
- None (default): Uses the trained GP surrogate model (self.surrogate_log_likelihood)
- "surrogate", "gp": Uses the GP surrogate model explicitly
- "true": Uses the true likelihood function (self.true_log_likelihood)
- callable: Custom likelihood function with signature like_fn(theta)
Default is None.
:param prior_fn: (*callable or None, optional*)
Log-prior function with signature prior_fn(theta). Should return log-probability
density. If None, uses uniform prior with bounds from self.bounds.
Default is None.
:param nwalkers: (*int or None, optional*)
Number of MCMC walkers in the ensemble. Should be at least 2*ndim.
If None, defaults to 10*ndim. More walkers improve convergence but increase
computational cost. Default is None.
:param nsteps: (*int, optional*)
Number of MCMC steps per walker. Total number of likelihood evaluations
will be nwalkers * nsteps. Default is 50000.
:param sampler_kwargs: (*dict, optional*)
Additional keyword arguments passed to emcee.EnsembleSampler constructor.
Common options include:
- 'a': Stretch move scale parameter (default: 2.0)
- 'moves': Custom proposal moves
Default is {}.
:param run_kwargs: (*dict, optional*)
Additional keyword arguments passed to the run_mcmc() method.
Common options include:
- 'progress': Show progress bar (default: True)
- 'store': Store chain in memory (default: True)
Default is {}.
:param opt_init: (*bool, optional*)
Whether to initialize walkers near the maximum a posteriori (MAP) estimate.
If True, uses find_map() to locate starting point. If False, initializes
walkers randomly from the prior. Default is False.
:param multi_proc: (*bool, optional*)
Whether to use multiprocessing with self.ncore processes. Generally
recommended for expensive likelihood evaluations. Default is True.
:param prior_fn_comment: (*str or None, optional*)
Comment describing the prior function for logging purposes. If None
and prior_fn is provided, attempts to extract function name.
Default is None.
:param burn: (*int or None, optional*)
Number of burn-in samples to discard from each walker. If None,
automatically estimates burn-in using autocorrelation analysis.
Default is None.
:param thin: (*int or None, optional*)
Thinning factor - keep every thin-th sample to reduce autocorrelation.
If None, automatically estimates based on autocorrelation time.
Default is None.
:param samples_file: (*str or None, optional*)
If provided, saves the final samples to this file in NumPy .npz format.
Default is None.
:param min_ess: (*int, optional*)
Minimum effective sample size. If the number of final samples is less than
min_ess, will run additional sampling rounds and combine samples until the
total number of samples exceeds min_ess. Default is 0 (no minimum required).
Attributes Set
--------------
sampler : emcee.EnsembleSampler
The emcee sampler object containing full chain and metadata
emcee_samples : ndarray of shape (nsamples_final, ndim)
Final MCMC samples after burn-in and thinning
emcee_samples_full : ndarray of shape (nsteps, nwalkers, ndim)
Full MCMC chain before processing
emcee_samples_true : ndarray of shape (nsamples_final, ndim)
Final samples when using true likelihood (like_fn="true")
emcee_samples_gp : ndarray of shape (nsamples_final, ndim)
Final samples when using surrogate likelihood
emcee_run : bool
Flag indicating emcee has been successfully run
emcee_runtime : float
Wall-clock time taken for emcee sampling in seconds
nwalkers : int
Number of walkers used
nsteps : int
Number of steps per walker
burn : int
Burn-in length used for final samples
thin : int
Thinning factor used for final samples
iburn : int
Automatically estimated burn-in length
ithin : int
Automatically estimated thinning factor
acc_frac : float
Mean acceptance fraction across all walkers
autcorr_time : float
Mean autocorrelation time in steps
like_fn_name : str
Name of likelihood function used ("true", "surrogate", or "likelihood")
prior_fn_comment : str
Description of prior function used
.. code-block:: python
Sample surrogate model with default settings:
>>> sm.run_emcee()
Sample true likelihood with specific settings:
>>> sm.run_emcee(like_fn="true", nwalkers=100, nsteps=10000)
Use custom prior and optimize initialization:
>>> def log_prior(theta):
... # Custom Gaussian prior
... return -0.5 * np.sum((theta/2)**2)
>>> sm.run_emcee(prior_fn=log_prior, opt_init=True)
Run with manual burn-in and thinning:
>>> sm.run_emcee(nsteps=100000, burn=10000, thin=10)
References
----------
emcee documentation: https://emcee.readthedocs.io/
Foreman-Mackey et al. (2013): "emcee: The MCMC Hammer", PASP, 125, 306-312
"""
import emcee
if like_fn is None:
print("Initializing emcee with self.surrogate_log_likelihood surrogate model as likelihood.")
self.like_fn_name = "surrogate"
self.like_fn = self.surrogate_log_likelihood
else:
self.like_fn = like_fn
self.like_fn_name = "likelihood"
if prior_fn is None:
print(f"No prior_fn specified. Defaulting to uniform prior with bounds {self.bounds}")
self.prior_fn = partial(ut.lnprior_uniform, bounds=self.bounds)
# Comment for output log file
self.prior_fn_comment = f"Default uniform prior. \n"
self.prior_fn_comment += f"Prior function: ut.prior_fn_uniform\n"
self.prior_fn_comment += f"\twith bounds {self.bounds}"
else:
self.prior_fn = prior_fn
# Comment for output log file
if prior_fn_comment is None:
self.prior_fn_comment = f"User defined prior."
try:
self.prior_fn_comment += f"Prior function: {self.prior_fn.__name__}"
except:
self.prior_fn_comment += "Prior function: unrecorded"
else:
self.prior_fn_comment = prior_fn_comment
# number of walkers, and number of steps per walker
if nwalkers is None:
self.nwalkers = int(10 * self.ndim)
else:
self.nwalkers = int(nwalkers)
self.nsteps = int(nsteps)
# Optimize walker initialization?
if opt_init == True:
# start walkers near the estimated maximum
p0 = self.find_map(prior_fn=self.prior_fn)
else:
# start walkers at random points in the prior space
p0 = ut.prior_sampler(nsample=self.nwalkers, bounds=self.bounds, sampler="uniform", random_state=self.random_state)
# set up multiprocessing pool with MPI safety
emcee_pool = self._get_pool(ncore=self.ncore) if multi_proc and self.ncore > 1 else None
emcee_ncore = self.ncore if (multi_proc and self.ncore > 1) else 1
if self.verbose:
print(f"Running emcee with {self.nwalkers} walkers for {self.nsteps} steps on {emcee_ncore} cores...")
# Multi-run setup for minimum effective sample size
all_chains = []
all_run_times = []
accumulated_samples = 0
run_number = 1
while accumulated_samples < min_ess:
if min_ess > 0:
print(f"\nRun {run_number}: Need {max(0, min_ess - accumulated_samples)} more samples...")
print("="*50)
# Run the sampler!
emcee_t0 = time.time()
self.emcee_sampler = emcee.EnsembleSampler(self.nwalkers,
self.ndim,
self.lnprob,
pool=emcee_pool,
**sampler_kwargs)
self.emcee_sampler.run_mcmc(p0, self.nsteps, progress=True, **run_kwargs)
# close pool
self._close_pool(emcee_pool)
# record emcee runtime
current_runtime = time.time() - emcee_t0
all_run_times.append(current_runtime)
# burn, thin, and flatten samples for this run
current_iburn, current_ithin = mcmc_utils.estimate_burnin(self.emcee_sampler, verbose=self.verbose)
current_samples_full = self.emcee_sampler.get_chain()
current_burn = burn if burn is not None else current_iburn
current_thin = thin if thin is not None else current_ithin
current_samples = self.emcee_sampler.get_chain(discard=current_burn, thin=current_thin, flat=True)
all_chains.append(current_samples)
current_nsamples = current_samples.shape[0]
accumulated_samples += current_nsamples
if min_ess > 0:
print(f"Run {run_number} complete: {current_nsamples} samples")
print(f"Total accumulated samples: {accumulated_samples}")
# If min_ess requirement met, break
if accumulated_samples >= min_ess:
break
run_number += 1
# Reset initial positions for next run (start from end of previous run)
if run_number <= 10: # Limit to prevent infinite loops
# Use final positions from this run as starting positions for next run
p0 = self.emcee_sampler.get_last_sample().coords
else:
print(f"WARNING: Reached maximum of 10 runs, stopping with {accumulated_samples} samples")
break
# Combine all chains and compute overall statistics
if len(all_chains) > 1:
self.emcee_samples = np.vstack(all_chains)
if self.verbose:
print(f"\nCombined {len(all_chains)} runs into {self.emcee_samples.shape[0]} total samples")
else:
self.emcee_samples = all_chains[0]
# Use final run for chain statistics and samples_full
self.emcee_samples_full = current_samples_full
self.iburn = current_iburn
self.ithin = current_ithin
self.burn = current_burn
self.thin = current_thin
self.emcee_runtime = sum(all_run_times)
if self.like_fn_name == "true":
self.emcee_samples_true = self.emcee_samples
elif self.like_fn_name == "surrogate":
self.emcee_samples_gp = self.emcee_samples
# get acceptance fraction and autocorrelation time
self.acc_frac = np.mean(self.emcee_sampler.acceptance_fraction)
self.autcorr_time = np.mean(self.emcee_sampler.get_autocorr_time())
if self.verbose:
print(f"Total samples: {self.emcee_samples.shape[0]}")
print("Mean acceptance fraction: {0:.3f}".format(self.acc_frac))
print("Mean autocorrelation time: {0:.3f} steps".format(self.autcorr_time))
# record that emcee has been run
self.emcee_run = True
if self.cache:
try:
self.save()
except:
pass
if samples_file is not None:
fname = f"{self.savedir}/{samples_file}"
else:
if self.like_fn_name == "true":
fname = f"{self.savedir}/emcee_samples_final_{self.like_fn_name}.npz"
else:
if len(self.training_results["iteration"]) == 0:
current_iter = 0
else:
current_iter = self.training_results["iteration"][-1]
fname = f"{self.savedir}/emcee_samples_final_{self.like_fn_name}_iter_{current_iter}.npz"
print(f"Saving final emcee samples to {fname} ...")
np.savez(fname, samples=self.emcee_samples)
[docs]
def run_dynesty(self, like_fn=None, prior_transform=None, mode="dynamic", sampler_kwargs={}, run_kwargs={},
multi_proc=False, save_iter=None, prior_transform_comment=None, samples_file=None, min_ess=int(1e4)):
"""
Sample the posterior using the dynesty nested sampling algorithm.
This method uses the dynesty package to perform nested sampling on either the
trained GP surrogate model or the true likelihood function. Dynesty is particularly
effective for estimating the Bayesian evidence and exploring multi-modal posteriors.
:param like_fn: (*callable, str, or None, optional*)
Likelihood function to sample. Options:
- None (default): Uses the trained GP surrogate model (self.surrogate_log_likelihood)
- "surrogate", "gp": Uses the GP surrogate model explicitly
- "true": Uses the true likelihood function (self.true_log_likelihood)
- callable: Custom likelihood function with signature like_fn(theta)
Default is None.
:param prior_transform: (*callable or None, optional*)
Prior transformation function that maps from unit hypercube [0,1]^ndim
to the parameter space. Should have signature prior_transform(u) where
u is array of shape (ndim,) with values in [0,1]. If None, uses uniform
prior with bounds from self.bounds. Default is None.
:param mode: (*{"dynamic", "static"}, optional*)
Dynesty sampling mode. "dynamic" uses DynamicNestedSampler which adaptively
allocates live points, while "static" uses fixed number of live points.
Dynamic mode is generally more efficient. Default is "dynamic".
:param sampler_kwargs: (*dict, optional*)
Additional keyword arguments passed to the dynesty sampler constructor.
Common options include:
- 'nlive': Number of live points (default: 50*ndim)
- 'bound': Bounding method ('multi', 'single', 'none')
- 'sample': Sampling method ('auto', 'unif', 'rwalk', 'slice', 'rslice', 'hslice')
Default is {}.
:param run_kwargs: (*dict, optional*)
Additional keyword arguments passed to the run_nested() method.
Common options include:
- 'dlogz': Target evidence uncertainty (default: 0.5)
- 'maxiter': Maximum number of iterations (default: 50000)
- 'wt_kwargs': Weight function arguments (default: {'pfrac': 1.0})
- 'stop_kwargs': Stopping criterion arguments (default: {'pfrac': 1.0})
Default is {}.
:param multi_proc: (*bool, optional*)
Whether to use multiprocessing. If True, uses self.ncore processes.
Note that multiprocessing can sometimes be slower due to overhead.
Default is False.
:param save_iter: (*int or None, optional*)
If provided, saves the sampler state every save_iter iterations to allow
for checkpointing and resuming long runs. Saves to
'{savedir}/dynesty_sampler_{like_fn_name}.pkl'. Default is None.
:param prior_transform_comment: (*str or None, optional*)
Comment describing the prior transform for logging purposes. If None
and prior_transform is provided, attempts to extract function name.
Default is None.
:param samples_file: (*str or None, optional*)
If provided, saves the final samples to this file in NumPy .npz format.
Default is None.
:param min_ess: (*int, optional*)
Minimum effective sample size. If the number of final samples is less than
min_ess, will run additional sampling rounds and combine samples until the
total number of samples exceeds min_ess. Default is 0 (no minimum required).
Attributes Set
--------------
res : dynesty.results.Results
Complete dynesty results object containing samples, weights, evidence, etc.
dynesty_samples : ndarray of shape (nsamples, ndim)
Resampled posterior samples with equal weights
dynesty_samples_true : ndarray of shape (nsamples, ndim)
Posterior samples when using true likelihood (like_fn="true")
dynesty_samples_surrogate : ndarray of shape (nsamples, ndim)
Posterior samples when using surrogate likelihood
dynesty_run : bool
Flag indicating dynesty has been successfully run
dynesty_runtime : float
Wall-clock time taken for dynesty sampling in seconds
like_fn_name : str
Name of likelihood function used ("true", "surrogate", or "custom")
prior_transform_comment : str
Description of prior transform used
Notes
-----
Dynesty is particularly well-suited for:
- Computing Bayesian evidence for model comparison
- Exploring multi-modal posteriors
- Providing robust posterior sampling without tuning
The default settings prioritize posterior sampling over evidence estimation
by setting pfrac=1.0, which focuses computational effort on high-likelihood
regions rather than exploring the full prior volume.
Examples
--------
Sample surrogate model with default settings:
>>> sm.run_dynesty()
Sample true likelihood with more live points:
>>> sm.run_dynesty(like_fn="true", sampler_kwargs={'nlive': 1000})
Use custom prior with bounds [-5, 5] for each parameter:
>>> def my_prior(u):
... return 10*u - 5 # maps [0,1] to [-5,5]
>>> sm.run_dynesty(prior_transform=my_prior)
Run with checkpointing every 1000 iterations:
>>> sm.run_dynesty(save_iter=1000, run_kwargs={'maxiter': 50000})
References
----------
Dynesty documentation: https://dynesty.readthedocs.io/
Speagle (2020): "dynesty: a dynamic nested sampling package for estimating
Bayesian posteriors and evidences", MNRAS, 493, 3132-3158
"""
import dynesty
from dynesty import NestedSampler
from dynesty import DynamicNestedSampler
from dynesty import utils as dyfunc
# Determine likelihood function and name
if like_fn is None:
print("Initializing dynesty with self.surrogate_log_likelihood surrogate model as likelihood.")
self.like_fn_name = "surrogate"
self.like_fn = self.surrogate_log_likelihood
elif callable(like_fn):
# like_fn is a function/callable
if like_fn == self.surrogate_log_likelihood:
print("Initializing dynesty with self.surrogate_log_likelihood surrogate model as likelihood.")
self.like_fn_name = "surrogate"
self.like_fn = self.surrogate_log_likelihood
elif like_fn == self.true_log_likelihood:
print("Initializing dynesty with self.true_log_likelihood as likelihood.")
self.like_fn_name = "true"
self.like_fn = like_fn
else:
# Custom function provided
print("Initializing dynesty with user-provided likelihood function.")
self.like_fn_name = "custom"
self.like_fn = like_fn
elif isinstance(like_fn, str):
# like_fn is a string
like_fn_lower = like_fn.lower()
if like_fn_lower in ["surrogate", "gp", "surrogate_log_likelihood"]:
print("Initializing dynesty with self.surrogate_log_likelihood surrogate model as likelihood.")
self.like_fn_name = "surrogate"
self.like_fn = self.surrogate_log_likelihood
elif like_fn_lower in ["true", "true_log_likelihood"]:
print("Initializing dynesty with self.true_log_likelihood as likelihood.")
self.like_fn_name = "true"
self.like_fn = self.true_log_likelihood
else:
raise ValueError(f"Unknown string identifier for like_fn: '{like_fn}'. "
f"Valid options: 'surrogate', 'true', 'gp', 'surrogate_log_likelihood', 'true_log_likelihood'")
else:
raise TypeError(f"like_fn must be None, a string, or a callable function. "
f"Received type: {type(like_fn)}")
# set up prior transform
if prior_transform is None:
self.prior_transform = partial(ut.prior_transform_uniform, bounds=self.bounds)
# Comment for output log file
self.prior_transform_comment = f"Default uniform prior transform. \n"
self.prior_transform_comment += f"Prior function: ut.prior_transform_uniform\n"
self.prior_transform_comment += f"\twith bounds {self.bounds}"
else:
self.prior_transform = prior_transform
# Comment for output log file
if prior_transform_comment is None:
self.prior_transform_comment = f"User defined prior transform."
try:
self.prior_transform_comment += f"Prior function: {self.prior_transform.__name__}"
except:
self.prior_transform_comment += "Prior function: unrecorded"
else:
self.prior_transform_comment = prior_transform_comment
# start timing dynesty
dynesty_t0 = time.time()
# set up sampler kwargs
default_sampler_kwargs = {"bound": "multi",
"nlive": 50*self.ndim,
"sample": "auto"}
for key in default_sampler_kwargs:
if key not in sampler_kwargs:
sampler_kwargs[key] = default_sampler_kwargs[key]
# set up multiprocessing pool
# default to false. multiprocessing usually slower for some reason
dynesty_pool = self._get_pool(ncore=self.ncore) if multi_proc and self.ncore > 1 else None
sampler_kwargs["pool"] = dynesty_pool
sampler_kwargs["queue_size"] = self.ncore if (multi_proc and self.ncore > 1) else 1
dynesty_ncore = sampler_kwargs["queue_size"]
# initialize our nested sampler
if mode == "dynamic":
dsampler = DynamicNestedSampler(self.like_fn,
self.prior_transform,
self.ndim,
**sampler_kwargs)
print("Initialized dynesty DynamicNestedSampler.")
elif mode == "static":
dsampler = NestedSampler(self.like_fn,
self.prior_transform,
self.ndim,
**sampler_kwargs)
print("Initialized dynesty NestedSampler.")
else:
raise ValueError(f"mode {mode} is not a valid option. Choose 'dynamic' or 'static'.")
# set up run kwargs. default: 100% weight on posterior, 0% evidence
default_run_kwargs = {"wt_kwargs": {'pfrac': 1.0},
"stop_kwargs": {'pfrac': 1.0},
"maxiter": int(5e4),
"dlogz_init": 0.5}
for key in default_run_kwargs:
if key not in run_kwargs:
run_kwargs[key] = default_run_kwargs[key]
if self.verbose:
print(f"Running dynesty with {sampler_kwargs['nlive']} live points on {dynesty_ncore} cores...")
# Multi-run setup for minimum effective sample size
all_samples = []
all_weights = []
all_logz = []
all_run_times = []
accumulated_samples = 0
run_number = 1
while accumulated_samples < min_ess:
if min_ess > 0:
print(f"\nRun {run_number}: Need {max(0, min_ess - accumulated_samples)} more samples...")
print("="*50)
# Set timing for this run
run_start_time = dynesty_t0 if run_number == 1 else time.time()
# Pickle sampler?
if save_iter is not None:
run_sampler = True
last_iter = 0
while run_sampler == True:
dsampler.run_nested(maxiter=save_iter, **run_kwargs)
current_results = dsampler.results
file = os.path.join(self.savedir, f"dynesty_sampler_{self.like_fn_name}_run{run_number}.pkl")
# pickle dynesty sampler object
print(f"Caching model to {file}...")
with open(file, "wb") as f:
pickle.dump(dsampler, f)
# check if converged (i.e. hasn't run for more iterations)
if dsampler.results.niter > last_iter:
last_iter = dsampler.results.niter
run_sampler = True
else:
run_sampler = False
else:
dsampler.run_nested(**run_kwargs)
current_results = dsampler.results
# Record current run statistics
current_runtime = time.time() - run_start_time
all_run_times.append(current_runtime)
# Get samples and weights for this run
current_samples = current_results.samples
current_weights = np.exp(current_results.logwt - current_results.logz[-1])
current_logz = current_results.logz[-1]
# Resample weighted samples for this run
current_resampled = dyfunc.resample_equal(current_samples, current_weights)
all_samples.append(current_resampled)
all_weights.append(current_weights)
all_logz.append(current_logz)
current_nsamples = current_resampled.shape[0]
accumulated_samples += current_nsamples
if min_ess > 0:
print(f"Run {run_number} complete: {current_nsamples} samples, logZ = {current_logz:.3f}")
print(f"Total accumulated samples: {accumulated_samples}")
# If min_ess requirement met, break
if accumulated_samples >= min_ess:
break
run_number += 1
# Setup for next run
if run_number <= 10: # Limit to prevent infinite loops
# Create a new sampler for the next run
if mode == "dynamic":
dsampler = dynesty.DynamicNestedSampler(self.like_fn,
self.prior_transform,
ndim=self.ndim,
**sampler_kwargs)
else:
dsampler = dynesty.NestedSampler(self.like_fn,
self.prior_transform,
ndim=self.ndim,
**sampler_kwargs)
else:
print(f"WARNING: Reached maximum of 10 runs, stopping with {accumulated_samples} samples")
break
# Combine results from all runs
if len(all_samples) > 1:
self.dynesty_samples = np.vstack(all_samples)
# Use the best (highest) log evidence
self.dynesty_logz = max(all_logz)
print(f"\nCombined {len(all_samples)} runs into {self.dynesty_samples.shape[0]} total samples")
print(f"Best log evidence: {self.dynesty_logz:.3f}")
else:
self.dynesty_samples = all_samples[0]
self.dynesty_logz = all_logz[0]
# Use final run for primary results
self.dynesty_sampler = dsampler
self.dynesty_results = current_results
self.dynesty_logz_err = current_results.logzerr[-1]
self.dynesty_runtime = sum(all_run_times)
if self.like_fn_name == "true":
self.dynesty_samples_true = self.dynesty_samples
elif self.like_fn_name == "surrogate":
self.dynesty_samples_surrogate = self.dynesty_samples
# record that dynesty has been run
self.dynesty_run = True
# record dynesty runtime
self.dynesty_runtime = time.time() - dynesty_t0
if self.cache:
try:
self.save()
except:
pass
if samples_file is not None:
fname = f"{self.savedir}/{samples_file}"
else:
if self.like_fn_name == "true":
fname = f"{self.savedir}/dynesty_samples_final_{self.like_fn_name}.npz"
else:
if len(self.training_results["iteration"]) == 0:
current_iter = 0
else:
current_iter = self.training_results["iteration"][-1]
fname = f"{self.savedir}/dynesty_samples_final_{self.like_fn_name}_iter_{current_iter}.npz"
print(f"Saved dynesty samples to {fname}")
np.savez(fname, samples=self.dynesty_samples)
[docs]
def run_pymultinest(self, like_fn=None, prior_transform=None, sampler_kwargs={}, multi_proc=True,
prior_transform_comment=None, samples_file=None, prefix=None, resume=False,
n_clustering_params=None, outputfiles_basename=None, min_ess=int(1e4)):
"""
Sample the posterior using the PyMultiNest nested sampling algorithm.
This method uses the PyMultiNest package (Python wrapper for MultiNest) to perform
nested sampling on either the trained GP surrogate model or the true likelihood function.
MultiNest is particularly effective for multi-modal posteriors and computing Bayesian
evidence with high accuracy.
:param like_fn: (*callable, str, or None, optional*)
Likelihood function to sample. Options:
- None (default): Uses the trained GP surrogate model (self.surrogate_log_likelihood)
- "surrogate", "gp": Uses the GP surrogate model explicitly
- "true": Uses the true likelihood function (self.true_log_likelihood)
- callable: Custom likelihood function with signature like_fn(theta)
Default is None.
:param prior_transform: (*callable or None, optional*)
Prior transformation function that maps from unit hypercube [0,1]^ndim
to the parameter space. Should have signature prior_transform(cube) where cube
is array of shape (ndim,) with values in [0,1]. The function should modify
cube in-place. If None, uses uniform prior with bounds from self.bounds.
Default is None.
:param sampler_kwargs: (*dict, optional*)
Additional keyword arguments passed to pymultinest.run().
Common options include:
- 'n_live_points': Number of live points (default: 1000)
- 'evidence_tolerance': Target evidence uncertainty (default: 0.5)
- 'sampling_efficiency': Sampling efficiency parameter (default: 0.8)
- 'n_iter_before_update': Iterations before evidence/posterior update (default: 100)
- 'null_log_evidence': Null evidence for model comparison (default: -1e90)
- 'max_modes': Maximum number of modes to find (default: 100)
- 'mode_tolerance': Mode separation tolerance (default: -1e90)
- 'seed': Random seed for reproducibility (default: -1, auto)
- 'verbose': Verbosity level (default: True)
- 'importance_nested_sampling': Use importance nested sampling (default: True)
- 'multimodal': Enable multimodal mode detection (default: True)
- 'const_efficiency_mode': Use constant efficiency mode (default: False)
Default is {}.
:param multi_proc: (*bool, optional*)
Whether to use multiprocessing. If True, uses self.ncore processes.
MultiNest handles parallelization internally when MPI is available.
.. note::
This parameter is ignored for PyMultiNest as it uses MPI for
parallelization, not Python's multiprocessing. When PyMultiNest
runs with MPI, other alabi functions automatically disable their
multiprocessing pools to avoid conflicts.
Default is True.
:param prior_transform_comment: (*str or None, optional*)
Comment describing the prior function for logging purposes. If None
and prior_transform is provided, attempts to extract function name.
Default is None.
:param samples_file: (*str or None, optional*)
If provided, saves the final samples to this file in NumPy .npz format.
Default is None.
:param prefix: (*str or None, optional*)
Prefix for MultiNest output files. If None, uses default based on
likelihood function name and current directory. Default is None.
:param resume: (*bool, optional*)
Whether to resume from previous run if output files exist.
Default is False.
:param n_clustering_params: (*int or None, optional*)
Number of parameters to use for mode clustering. If None, uses all
parameters (ndim). Set to lower value if some parameters are nuisance.
Default is None.
:param outputfiles_basename: (*str or None, optional*)
Base name for MultiNest output files. If None, constructs from savedir
and likelihood function name. Default is None.
:param min_ess: (*int, optional*)
Minimum effective sample size. If the number of final samples is less than
min_ess, will run additional sampling rounds and combine samples until the
total number of samples exceeds min_ess. Default is 0 (no minimum required).
Attributes Set
--------------
pymultinest_samples : ndarray of shape (nsamples, ndim)
Posterior samples from MultiNest
pymultinest_samples_true : ndarray of shape (nsamples, ndim)
Posterior samples when using true likelihood (like_fn="true")
pymultinest_samples_surrogate : ndarray of shape (nsamples, ndim)
Posterior samples when using surrogate likelihood
pymultinest_weights : ndarray of shape (nsamples,)
Sample weights from nested sampling
pymultinest_logz : float
Log Bayesian evidence estimate
pymultinest_logz_err : float
Uncertainty in log evidence estimate
pymultinest_run : bool
Flag indicating PyMultiNest has been successfully run
pymultinest_runtime : float
Wall-clock time taken for MultiNest sampling in seconds
pymultinest_analyzer : pymultinest.Analyzer
MultiNest analyzer object for accessing detailed results
like_fn_name : str
Name of likelihood function used ("true", "surrogate", or "custom")
prior_transform_comment : str
Description of prior function used
.. note::
PyMultiNest is particularly well-suited for:
- Multi-modal posterior exploration with automatic mode detection
- High-accuracy Bayesian evidence computation for model comparison
- Robust sampling without manual tuning of MCMC parameters
- Handling complex, irregular posterior shapes
MultiNest generates several output files including detailed posterior
samples, evidence estimates, and mode information. These files are
saved to the model's savedir for later analysis.
**MPI and Multiprocessing Compatibility:**
PyMultiNest uses MPI for parallelization across multiple nodes/cores.
When MPI is active, alabi automatically disables Python multiprocessing
in other functions (run_emcee, run_dynesty) to prevent conflicts.
This ensures that:
- PyMultiNest can run efficiently with MPI
- Other alabi functions fall back to serial execution when MPI is detected
- No deadlocks or resource conflicts occur between MPI and multiprocessing
To run PyMultiNest with MPI:
>>> # Single node, multiple cores
>>> sm.run_pymultinest() # Uses OpenMP if available
>>> # Multiple nodes with MPI (run from command line)
>>> # mpirun -n 4 python your_script.py
:example:
Sample surrogate model with default settings:
>>> sm.run_pymultinest()
Sample true likelihood with more live points:
>>> sm.run_pymultinest(like_fn="true",
... sampler_kwargs={'n_live_points': 2000})
Use custom prior with bounds [-10, 10] for each parameter:
>>> def my_prior(cube):
... for i in range(len(cube)):
... cube[i] = 20*cube[i] - 10 # maps [0,1] to [-10,10]
>>> sm.run_pymultinest(prior_transform=my_prior)
Enable multimodal mode detection with high accuracy:
>>> sm.run_pymultinest(sampler_kwargs={
... 'multimodal': True,
... 'evidence_tolerance': 0.1,
... 'max_modes': 20
... })
Run with custom output file prefix and resume capability:
>>> sm.run_pymultinest(prefix="my_run_", resume=True)
References
----------
PyMultiNest documentation: https://johannesbuchner.github.io/PyMultiNest/
Feroz et al. (2009): "MultiNest: an efficient and robust Bayesian inference
tool for cosmology and particle physics", MNRAS, 398, 1601-1614
"""
try:
import pymultinest
except ImportError:
raise ImportError("PyMultiNest is required but not installed. "
"Install with: pip install pymultinest")
# Start timing
pymultinest_t0 = time.time()
# Determine likelihood function and name
if like_fn is None:
print("Initializing PyMultiNest with self.surrogate_log_likelihood surrogate model as likelihood.")
self.like_fn_name = "surrogate"
self.like_fn = self.surrogate_log_likelihood
elif callable(like_fn):
# like_fn is a function/callable
if like_fn == self.surrogate_log_likelihood:
print("Initializing PyMultiNest with self.surrogate_log_likelihood surrogate model as likelihood.")
self.like_fn_name = "surrogate"
self.like_fn = self.surrogate_log_likelihood
elif like_fn == self.true_log_likelihood:
print("Initializing PyMultiNest with self.true_log_likelihood as likelihood.")
self.like_fn_name = "true"
self.like_fn = like_fn
else:
# Custom function provided
print("Initializing PyMultiNest with user-provided likelihood function.")
self.like_fn_name = "custom"
self.like_fn = like_fn
elif isinstance(like_fn, str):
if like_fn.lower() in ["surrogate", "gp"]:
print("Initializing PyMultiNest with self.surrogate_log_likelihood surrogate model as likelihood.")
self.like_fn_name = "surrogate"
self.like_fn = self.surrogate_log_likelihood
elif like_fn.lower() == "true":
print("Initializing PyMultiNest with self.true_log_likelihood as likelihood.")
self.like_fn_name = "true"
self.like_fn = self.true_log_likelihood
else:
raise ValueError(f"Invalid like_fn string: {like_fn}. "
"Must be 'surrogate', 'gp', 'true', or callable.")
else:
raise ValueError(f"like_fn must be callable, string, or None. Got {type(like_fn)}")
# Set up prior transformation function
if prior_transform is None:
# Default uniform prior using self.bounds
prior_transform_fn = partial(ut.prior_transform_uniform, bounds=self.bounds)
self.prior_transform_comment = f"Default uniform prior transform.\nPrior function: ut.prior_transform_uniform\nBounds: {self.bounds}"
else:
prior_transform_fn = prior_transform
if prior_transform_comment is None:
if hasattr(prior_transform, '__name__'):
self.prior_transform_comment = f"Custom prior function: {prior_transform.__name__}"
else:
self.prior_transform_comment = "Custom prior function"
else:
self.prior_transform_comment = prior_transform_comment
# Create PyMultiNest-compatible wrapper for prior transform
# PyMultiNest expects Prior(cube, ndim, nparams) where cube is modified in-place
def pymultinest_prior_wrapper(cube, ndim, nparams):
"""Wrapper to make prior transform compatible with PyMultiNest calling convention."""
# Extract values from ctypes array to regular Python list
cube_values = [cube[i] for i in range(ndim)]
cube_array = np.array(cube_values)
# Apply the prior transformation
transformed = prior_transform_fn(cube_array)
# Modify cube in-place as expected by PyMultiNest
for i in range(ndim):
cube[i] = transformed[i]
# Use the wrapper as the actual prior function
prior_transform = pymultinest_prior_wrapper
# Create PyMultiNest-compatible wrapper for likelihood function
# PyMultiNest expects LogLikelihood(cube, ndim, nparams) where cube contains parameters
def pymultinest_likelihood_wrapper(cube, ndim, nparams):
"""Wrapper to make likelihood function compatible with PyMultiNest calling convention."""
# Extract values from ctypes array to regular Python list
params_values = [cube[i] for i in range(ndim)]
params = np.array(params_values)
return self.like_fn(params)
# Use the wrapper as the actual likelihood function
likelihood_fn = pymultinest_likelihood_wrapper
# Set up output file basename
if outputfiles_basename is None:
if prefix is None:
if len(self.training_results.get("iteration", [])) == 0:
current_iter = 0
else:
current_iter = self.training_results["iteration"][-1]
prefix = f"pymultinest_{self.like_fn_name}_iter_{current_iter}_"
outputfiles_basename = f"{self.savedir}/{prefix}"
# Set default sampler kwargs
default_sampler_kwargs = {
'n_live_points': 1000,
'evidence_tolerance': 0.5,
'sampling_efficiency': 0.8,
'n_iter_before_update': 100,
'null_log_evidence': -1e90,
'max_modes': 100,
'mode_tolerance': -1e90,
'seed': -1,
'verbose': True,
'importance_nested_sampling': True,
'multimodal': True,
'const_efficiency_mode': False,
}
# Update with user-provided kwargs
final_sampler_kwargs = {**default_sampler_kwargs, **sampler_kwargs}
# Set clustering parameters
if n_clustering_params is None:
n_clustering_params = self.ndim
# Multi-run setup for minimum effective sample size
all_samples = []
all_weights = []
all_logz = []
all_logz_err = []
all_run_times = []
accumulated_samples = 0
run_number = 1
while accumulated_samples < max(min_ess, 1):
if min_ess > 0:
print(f"\nRun {run_number}: Need {max(0, min_ess - accumulated_samples)} more samples...")
print("="*50)
# Create unique output basename for this run
current_outputfiles_basename = f"{outputfiles_basename}_run{run_number:02d}" if min_ess > 0 else outputfiles_basename
print(f"Running PyMultiNest with {final_sampler_kwargs['n_live_points']} live points...")
print(f"Evidence tolerance: {final_sampler_kwargs['evidence_tolerance']}")
print(f"Output files: {current_outputfiles_basename}*")
print(f"Prior: {self.prior_transform_comment}")
# Start timing this run
current_pymultinest_t0 = time.time()
# Run MultiNest
pymultinest.run(
LogLikelihood=likelihood_fn,
Prior=prior_transform,
n_dims=self.ndim,
n_params=self.ndim,
n_clustering_params=n_clustering_params,
outputfiles_basename=current_outputfiles_basename,
resume=resume,
**final_sampler_kwargs
)
# Fix malformed scientific notation in PyMultiNest output files
self._fix_pymultinest_output_format(current_outputfiles_basename)
# Analyze results for this run
current_analyzer = pymultinest.Analyzer(
outputfiles_basename=current_outputfiles_basename,
n_params=self.ndim
)
# Get samples and evidence for this run
current_samples = current_analyzer.get_equal_weighted_posterior()
current_samples_only = current_samples[:, :-1] # Remove log-likelihood column
current_weights = np.ones(len(current_samples_only)) # Equal weights
# Get evidence estimates for this run
current_stats = current_analyzer.get_stats()
current_logz = current_stats['global evidence']
current_logz_err = current_stats['global evidence error']
# Record this run's results
all_samples.append(current_samples_only)
all_weights.append(current_weights)
all_logz.append(current_logz)
all_logz_err.append(current_logz_err)
current_nsamples = len(current_samples_only)
accumulated_samples += current_nsamples
# Record runtime for this run
current_run_time = time.time() - current_pymultinest_t0
all_run_times.append(current_run_time)
if min_ess > 0:
print(f"Run {run_number} complete: {current_nsamples} samples, logZ = {current_logz:.3f} ± {current_logz_err:.3f}")
print(f"Total accumulated samples: {accumulated_samples}")
# If min_ess requirement met, break
if accumulated_samples >= min_ess:
break
run_number += 1
# Limit to prevent infinite loops
if run_number > 10:
print(f"WARNING: Reached maximum of 10 runs, stopping with {accumulated_samples} samples")
break
# Combine results from all runs
if len(all_samples) > 1:
self.pymultinest_samples = np.vstack(all_samples)
self.pymultinest_weights = np.concatenate(all_weights)
# Use weighted average of log evidence (weights by number of samples)
sample_counts = np.array([len(s) for s in all_samples])
total_samples = np.sum(sample_counts)
weights = sample_counts / total_samples
self.pymultinest_logz = np.average(all_logz, weights=weights)
self.pymultinest_logz_err = np.sqrt(np.average(np.array(all_logz_err)**2, weights=weights))
print(f"\nCombined {len(all_samples)} runs into {len(self.pymultinest_samples)} total samples")
print(f"Combined log evidence: {self.pymultinest_logz:.3f} ± {self.pymultinest_logz_err:.3f}")
else:
self.pymultinest_samples = all_samples[0]
self.pymultinest_weights = all_weights[0]
self.pymultinest_logz = all_logz[0]
self.pymultinest_logz_err = all_logz_err[0]
# Set the final analyzer to the last run for compatibility
self.pymultinest_analyzer = current_analyzer
# Calculate total runtime
self.pymultinest_runtime = sum(all_run_times)
# Store samples based on likelihood function used
if self.like_fn_name == "true":
self.pymultinest_samples_true = self.pymultinest_samples.copy()
print(f"PyMultiNest complete. {len(self.pymultinest_samples_true)} posterior samples collected.")
else:
self.pymultinest_samples_surrogate = self.pymultinest_samples.copy()
print(f"PyMultiNest complete. {len(self.pymultinest_samples_surrogate)} posterior samples collected.")
print(f"Log Evidence: {self.pymultinest_logz:.3f} ± {self.pymultinest_logz_err:.3f}")
# Record that PyMultiNest has been run
self.pymultinest_run = True
print(f"PyMultiNest runtime: {self.pymultinest_runtime:.2f} seconds")
# Save results if caching is enabled
if self.cache:
try:
self.save()
except:
pass
# Save samples to file
if samples_file is not None:
fname = f"{self.savedir}/{samples_file}"
else:
if self.like_fn_name == "true":
fname = f"{self.savedir}/pymultinest_samples_final_{self.like_fn_name}.npz"
else:
if len(self.training_results.get("iteration", [])) == 0:
current_iter = 0
else:
current_iter = self.training_results["iteration"][-1]
fname = f"{self.savedir}/pymultinest_samples_final_{self.like_fn_name}_iter_{current_iter}.npz"
print(f"Saved PyMultiNest samples to {fname}")
np.savez(fname,
samples=self.pymultinest_samples,
weights=self.pymultinest_weights,
logz=self.pymultinest_logz,
logz_err=self.pymultinest_logz_err)
[docs]
def run_ultranest(self, like_fn=None, prior_transform=None, sampler_kwargs={}, run_kwargs={},
multi_proc=False, prior_transform_comment=None, samples_file=None,
log_dir=None, resume="overwrite", min_ess=int(1e4), slice_steps=0):
"""
Sample the posterior using the UltraNest nested sampling algorithm.
This method uses the UltraNest package to perform nested sampling on either
the trained GP surrogate model or the true likelihood function. UltraNest
is a highly robust nested sampling algorithm that automatically adapts to
the problem complexity and provides reliable evidence computation.
:param like_fn: (*callable, str, or None, optional*)
Likelihood function to sample. Options:
- None (default): Uses the trained GP surrogate model (self.surrogate_log_likelihood)
- "surrogate", "gp": Uses the GP surrogate model explicitly
- "true": Uses the true likelihood function (self.true_log_likelihood)
- callable: Custom likelihood function with signature like_fn(theta)
Default is None.
:param prior_transform: (*callable or None, optional*)
Prior transformation function that maps from unit hypercube [0,1]^ndim
to the parameter space. Should have signature prior_transform(cube) where
cube is array of shape (ndim,) with values in [0,1]. Must return transformed
parameters as array. If None, creates uniform prior from self.bounds.
Default is None.
:param sampler_kwargs: (*dict, optional*)
Additional keyword arguments passed to UltraNest ReactiveNestedSampler().
Common options include:
- 'derived_param_names': List of derived parameter names (default: [])
- 'wrapped_params': List of bool indicating circular parameters (default: None)
- 'resume': Resume behavior 'resume'/'overwrite'/'subfolder' (default: 'subfolder')
- 'run_num': Run number for subdirectory creation (default: None)
- 'num_test_samples': Number of test samples for validation (default: 2)
- 'draw_multiple': Enable dynamic point drawing (default: True)
- 'num_bootstraps': Number of bootstrap samples (default: 30)
- 'vectorized': Whether functions accept arrays (default: False)
- 'ndraw_min': Minimum points to draw per iteration (default: 128)
- 'ndraw_max': Maximum points to draw per iteration (default: 65536)
- 'storage_backend': Storage format 'hdf5'/'csv'/'tsv' (default: 'hdf5')
- 'warmstart_max_tau': Warmstart maximum tau (default: -1)
Default is {}.
:param run_kwargs: (*dict, optional*)
Additional keyword arguments passed to sampler.run().
Common options include:
- 'update_interval_volume_fraction': Volume fraction for region updates (default: 0.8)
- 'update_interval_ncall': Number of calls between updates (optional, omit for auto)
- 'log_interval': Iterations between status updates (optional, omit for auto)
- 'show_status': Show integration progress (default: True)
- 'viz_callback': Visualization callback function (default: False, disabled)
- 'dlogz': Target log-evidence uncertainty (default: 0.5)
- 'dKL': Target posterior uncertainty in nats (default: 0.5)
- 'frac_remain': Fraction of evidence remaining to terminate (default: 0.01)
- 'Lepsilon': Likelihood contour tolerance (default: 0.001)
- 'min_ess': Target effective sample size (default: 400)
- 'max_iters': Maximum number of iterations (optional, omit for unlimited)
- 'max_ncalls': Maximum number of likelihood calls (optional, omit for unlimited)
- 'max_num_improvement_loops': Maximum improvement loops (default: -1)
- 'min_num_live_points': Minimum number of live points (default: 400)
- 'cluster_num_live_points': Live points per cluster (default: 40)
- 'insertion_test_zscore_threshold': Z-score threshold for insertion test (default: 4)
- 'insertion_test_window': Window size for insertion test (default: 10)
- 'region_class': Region sampling class (optional, can be passed via kwargs)
- 'widen_before_initial_plateau_num_warn': Warning threshold for plateau (default: 10000)
- 'widen_before_initial_plateau_num_max': Maximum plateau points (optional, omit for auto)
Default is {}.
:param multi_proc: (*bool, optional*)
**Deprecated and ignored.** This parameter is kept for backwards compatibility
but no longer has any effect. UltraNest now runs in MPI-compatible mode without
multiprocessing pools to avoid conflicts with MPI environments.
Default is False.
:param prior_transform_comment: (*str or None, optional*)
Comment describing the prior transform for logging purposes. If None
and prior_transform is provided, attempts to extract function name.
Default is None.
:param samples_file: (*str or None, optional*)
If provided, saves the final samples to this file in NumPy .npz format.
Default is None.
:param log_dir: (*str or None, optional*)
Directory to store UltraNest output files and logs. If None, uses
a subdirectory in self.savedir. Default is None.
:param resume: (*str, optional*)
Resume behavior for interrupted runs. Options:
- 'resume': Resume if possible, otherwise start fresh
- 'resume-similar': Resume with similar but not identical setup
- 'overwrite': Always start fresh, overwriting existing files (default)
- 'subfolder': Create new timestamped subfolder
Default is 'overwrite'.
:param min_ess: (*int, optional*)
Minimum effective sample size. If the number of final samples is less than
min_ess, will run additional sampling rounds and combine samples until the
total number of samples exceeds min_ess. Default is 0 (no minimum required).
Attributes Set
--------------
ultranest_results : ultranest.integrator.Result
Complete UltraNest results object with samples, evidence, etc.
ultranest_samples : ndarray of shape (nsamples, ndim)
Equally weighted posterior samples from UltraNest
ultranest_samples_true : ndarray of shape (nsamples, ndim)
Posterior samples when using true likelihood (like_fn="true")
ultranest_samples_surrogate : ndarray of shape (nsamples, ndim)
Posterior samples when using surrogate likelihood
ultranest_weights : ndarray of shape (nsamples,)
Sample weights (typically all equal after resampling)
ultranest_logz : float
Log Bayesian evidence estimate
ultranest_logz_err : float
Uncertainty in log evidence estimate
ultranest_run : bool
Flag indicating UltraNest has been successfully run
ultranest_runtime : float
Wall-clock time taken for UltraNest sampling in seconds
ultranest_sampler : ultranest.ReactiveNestedSampler
UltraNest sampler object for accessing detailed information
like_fn_name : str
Name of likelihood function used ("true", "surrogate", or "custom")
prior_transform_comment : str
Description of prior transform used
.. note::
UltraNest is particularly well-suited for:
- Robust nested sampling without manual tuning
- Automatic adaptation to problem complexity
- High-dimensional and multi-modal problems
- Reliable evidence computation for model comparison
- Problems with complex, irregular likelihood shapes
- MPI environments (runs in serial mode to avoid multiprocessing conflicts)
UltraNest automatically determines the number of live points and
adapts its sampling strategy based on the problem characteristics.
It provides excellent performance across a wide range of problems
without requiring parameter tuning.
**MPI Compatibility:** This function runs UltraNest in serial mode,
making it fully compatible with MPI environments. While UltraNest
itself can use MPI for parallelization, this implementation avoids
multiprocessing pools that can conflict with MPI.
:example:
Sample surrogate model with default settings:
>>> sm.run_ultranest()
Sample true likelihood with custom termination criteria:
>>> sm.run_ultranest(like_fn="true",
... run_kwargs={'dlogz': 0.1, 'min_ess': 1000})
Use custom prior transform with bounds [-5, 5] for each parameter:
>>> def my_prior_transform(cube):
... return 10 * cube - 5 # maps [0,1] to [-5,5]
>>> sm.run_ultranest(prior_transform=my_prior_transform)
Run with increased live points for better accuracy:
>>> sm.run_ultranest(like_fn="true",
... run_kwargs={'min_num_live_points': 800})
Customize output directory and resume behavior:
>>> sm.run_ultranest(log_dir="ultranest_output",
... resume="overwrite")
References
----------
UltraNest documentation: https://johannesbuchner.github.io/UltraNest/
Buchner (2021): "UltraNest - a robust, general purpose Bayesian inference
library for cosmology and particle physics", Journal of Open Source Software
"""
try:
import ultranest
from ultranest import ReactiveNestedSampler
except ImportError:
raise ImportError("UltraNest is required but not installed. "
"Install with: pip install ultranest")
# Start timing
ultranest_t0 = time.time()
# Determine likelihood function and name
if like_fn is None:
print("Initializing UltraNest with self.surrogate_log_likelihood surrogate model as likelihood.")
self.like_fn_name = "surrogate"
self.like_fn = self.surrogate_log_likelihood
elif callable(like_fn):
# like_fn is a function/callable
if like_fn == self.surrogate_log_likelihood:
print("Initializing UltraNest with self.surrogate_log_likelihood surrogate model as likelihood.")
self.like_fn_name = "surrogate"
self.like_fn = self.surrogate_log_likelihood
elif like_fn == self.true_log_likelihood:
print("Initializing UltraNest with self.true_log_likelihood as likelihood.")
self.like_fn_name = "true"
self.like_fn = like_fn
else:
# Custom function provided
print("Initializing UltraNest with user-provided likelihood function.")
self.like_fn_name = "custom"
self.like_fn = like_fn
elif isinstance(like_fn, str):
if like_fn.lower() in ["surrogate", "gp"]:
print("Initializing UltraNest with self.surrogate_log_likelihood surrogate model as likelihood.")
self.like_fn_name = "surrogate"
self.like_fn = self.surrogate_log_likelihood
elif like_fn.lower() == "true":
print("Initializing UltraNest with self.true_log_likelihood as likelihood.")
self.like_fn_name = "true"
self.like_fn = self.true_log_likelihood
else:
raise ValueError(f"Invalid like_fn string: {like_fn}. "
"Must be 'surrogate', 'gp', 'true', or callable.")
else:
raise ValueError(f"like_fn must be callable, string, or None. Got {type(like_fn)}")
# Set up prior transformation function
if prior_transform is None:
# Default uniform prior using self.bounds
prior_transform_fn = partial(ut.prior_transform_uniform, bounds=self.bounds)
self.prior_transform_comment = f"Uniform prior with bounds {self.bounds}"
else:
prior_transform_fn = prior_transform
if prior_transform_comment is None:
if hasattr(prior_transform, '__name__'):
self.prior_transform_comment = f"Custom prior transform: {prior_transform.__name__}"
else:
self.prior_transform_comment = "Custom prior transform"
else:
self.prior_transform_comment = prior_transform_comment
# Set up log directory
if log_dir is None:
if len(self.training_results.get("iteration", [])) == 0:
current_iter = 0
else:
current_iter = self.training_results["iteration"][-1]
log_dir = f"{self.savedir}/ultranest_{self.like_fn_name}_iter_{current_iter}"
# Set default sampler kwargs (these go to ReactiveNestedSampler constructor)
default_sampler_kwargs = {
'derived_param_names': [],
'wrapped_params': None,
'num_test_samples': 2,
'draw_multiple': True,
'num_bootstraps': 30,
'vectorized': False,
'ndraw_min': 128,
'ndraw_max': 65536,
'storage_backend': 'hdf5',
'warmstart_max_tau': -1,
}
# Update with user-provided kwargs
final_sampler_kwargs = {**default_sampler_kwargs, **sampler_kwargs}
# Set default run kwargs (these go to sampler.run() method)
default_run_kwargs = {
'update_interval_volume_fraction': 0.8,
'show_status': True,
'viz_callback': False, # Disable visualization to avoid ipywidgets dependency
'dlogz': 0.5,
'dKL': 0.5,
'frac_remain': 0.01,
'Lepsilon': 0.001,
'min_ess': 400,
'max_num_improvement_loops': 1,
'min_num_live_points': 400,
'cluster_num_live_points': 40,
'insertion_test_zscore_threshold': 4,
'insertion_test_window': 10,
'widen_before_initial_plateau_num_warn': 10000,
}
# Update with user-provided kwargs
final_run_kwargs = {**default_run_kwargs, **run_kwargs}
# Check if MPI is active for informational purposes
if parallel_utils.is_mpi_active():
print("MPI environment detected. UltraNest will run in MPI-compatible mode.")
elif multi_proc:
print("Warning: multi_proc=True ignored. UltraNest now runs in MPI-compatible serial mode.")
# Define wrapped likelihood function for UltraNest
def ultranest_likelihood(params):
"""Likelihood function wrapper for UltraNest."""
return self.like_fn(params)
print(f"Running UltraNest with {final_run_kwargs['min_num_live_points']} minimum live points...")
print(f"Log directory: {log_dir}")
print(f"Execution mode: MPI-compatible (no multiprocessing pools)")
print(f"Prior: {self.prior_transform_comment}")
print(f"Termination: dlogz={final_run_kwargs['dlogz']}, min_ess={final_run_kwargs['min_ess']}\n")
# Multi-run setup for minimum effective sample size
all_samples = []
all_weights = []
all_logz = []
all_logz_err = []
all_run_times = []
accumulated_samples = 0
run_number = 1
param_names = [f"param_{i}" for i in range(self.ndim)]
while accumulated_samples < min_ess:
if min_ess > 0:
print(f"\nRun {run_number}: Need {max(0, min_ess - accumulated_samples)} more samples...")
print("="*50)
# Set timing for this run
run_start_time = ultranest_t0 if run_number == 1 else time.time()
# Create a new sampler for each run (with updated log_dir if needed)
current_log_dir = log_dir if run_number == 1 else f"{log_dir}_run{run_number}" if log_dir else None
# Create UltraNest sampler instance
self.ultranest_sampler = ReactiveNestedSampler(
param_names=param_names,
loglike=ultranest_likelihood,
transform=prior_transform_fn,
log_dir=current_log_dir,
resume=resume if run_number == 1 else "overwrite", # Only resume on first run
**final_sampler_kwargs
)
if slice_steps > 0:
from ultranest import stepsampler
self.ultranest_sampler.stepsampler = stepsampler.SliceSampler(
nsteps=slice_steps,
generate_direction=stepsampler.generate_mixture_random_direction,
)
# Run UltraNest sampling (always in serial/MPI-compatible mode)
current_results = self.ultranest_sampler.run(
**final_run_kwargs
)
# Record current run statistics
current_runtime = time.time() - run_start_time
all_run_times.append(current_runtime)
# Extract results for this run
current_samples = current_results['samples']
# Extract weights from weighted_samples if available, otherwise use equal weights
if 'weighted_samples' in current_results:
current_weights = current_results['weighted_samples']['weights']
else:
# For equally weighted samples, create uniform weights
current_weights = np.ones(len(current_samples)) / len(current_samples)
current_logz = current_results['logz']
current_logz_err = current_results['logzerr']
# Store results from this run
all_samples.append(current_samples)
all_weights.append(current_weights)
all_logz.append(current_logz)
all_logz_err.append(current_logz_err)
current_nsamples = len(current_samples)
accumulated_samples += current_nsamples
if min_ess > 0:
print(f"Run {run_number} complete: {current_nsamples} samples, logZ = {current_logz:.3f} ± {current_logz_err:.3f}")
print(f"Total accumulated samples: {accumulated_samples}")
# If min_ess requirement met, break
if accumulated_samples >= min_ess:
break
run_number += 1
# Limit to prevent infinite loops
if run_number > 10:
print(f"WARNING: Reached maximum of 10 runs, stopping with {accumulated_samples} samples")
break
# Combine results from all runs
if len(all_samples) > 1:
self.ultranest_samples = np.vstack(all_samples)
self.ultranest_weights = np.concatenate(all_weights)
# Use the best (highest) log evidence
best_run_idx = np.argmax(all_logz)
self.ultranest_logz = all_logz[best_run_idx]
self.ultranest_logz_err = all_logz_err[best_run_idx]
print(f"\nCombined {len(all_samples)} runs into {len(self.ultranest_samples)} total samples")
print(f"Best log evidence: {self.ultranest_logz:.3f} ± {self.ultranest_logz_err:.3f}")
else:
self.ultranest_samples = all_samples[0]
self.ultranest_weights = all_weights[0]
self.ultranest_logz = all_logz[0]
self.ultranest_logz_err = all_logz_err[0]
# Use final run for primary results
self.ultranest_results = current_results
self.ultranest_runtime = sum(all_run_times)
# Store samples based on likelihood function used
if self.like_fn_name == "true":
self.ultranest_samples_true = self.ultranest_samples.copy()
print(f"UltraNest complete. {len(self.ultranest_samples_true)} posterior samples collected.")
else:
self.ultranest_samples_surrogate = self.ultranest_samples.copy()
print(f"UltraNest complete. {len(self.ultranest_samples_surrogate)} posterior samples collected.")
print(f"Log Evidence: {self.ultranest_logz:.3f} ± {self.ultranest_logz_err:.3f}")
print(f"Effective Sample Size: {self.ultranest_results.get('ess', 'N/A')}")
print(f"Number of likelihood evaluations: {self.ultranest_results.get('ncall', 'N/A')}")
# Record that UltraNest has been run
self.ultranest_run = True
print(f"UltraNest runtime: {self.ultranest_runtime:.2f} seconds\n")
# Save results if caching is enabled
if self.cache:
try:
self.save()
except:
pass
# Save samples to file
if samples_file is not None:
fname = f"{self.savedir}/{samples_file}"
else:
if self.like_fn_name == "true":
fname = f"{self.savedir}/ultranest_samples_final_true.npz"
else:
if len(self.training_results.get("iteration", [])) == 0:
current_iter = 0
else:
current_iter = self.training_results["iteration"][-1]
fname = f"{self.savedir}/ultranest_samples_final_surrogate_iter_{current_iter}.npz"
print(f"Saved UltraNest samples to {fname}")
np.savez(fname,
samples=self.ultranest_samples,
weights=self.ultranest_weights,
logz=self.ultranest_logz,
logz_err=self.ultranest_logz_err)
[docs]
def plot(self, plots=None, show=False, cb_rng=[None, None], log_scale=False):
"""
Generate diagnostic plots for training progress, GP performance, and MCMC results.
This method creates various diagnostic plots to assess the quality of the surrogate
model training, GP hyperparameter optimization, and MCMC sampling results. Plots
are automatically saved to the model's save directory.
:param plots: (*list of str, optional*)
List of plot types to generate. Each plot requires specific data to be available
(e.g., 'emcee_corner' requires run_emcee() to have been called first). If None,
no plots are generated. Available options:
**Training diagnostics:**
- 'test_mse': Mean squared error vs training iteration
- 'test_scaled_mse': Scaled MSE vs training iteration
- 'test_log_mse': Log-scale MSE vs training iteration
- 'gp_hyperparameters': GP hyperparameter evolution during training
- 'gp_train_time': GP training time vs iteration
- 'gp_train_corner': Corner plot of final training samples
- 'gp_train_scatter': Scatter plot of training samples vs predictions
**GP visualization:**
- 'gp_fit_2D': 2D contour plot of GP surrogate surface (2D only)
- 'gp_predictions_1D': 1D slices of GP mean and variance through the surrogate maximum
**MCMC diagnostics:**
- 'emcee_corner': Corner plot of emcee posterior samples
- 'emcee_walkers': Walker trajectories for emcee chains
- 'dynesty_corner': Corner plot of dynesty posterior samples
- 'dynesty_corner_kde': KDE version of dynesty corner plot
- 'dynesty_traceplot': Trace plot of dynesty sampling
- 'dynesty_runplot': Dynesty convergence diagnostics
**Comparison plots:**
- 'mcmc_comparison': Compare emcee and dynesty posteriors
**Convenience options:**
- 'gp_all': Generate all available GP training plots
Default is None.
:param show: (*bool, optional*)
Whether to display plots interactively in addition to saving them.
If False, plots are only saved to disk. Default is False.
:param cb_rng: (*list of [float, float], optional*)
Colorbar range for 2D contour plots as [vmin, vmax]. If [None, None],
uses automatic range determination. Only applies to plots with colorbars
like 'gp_fit_2D'. Default is [None, None].
:param log_scale: (*bool, optional*)
Whether to use logarithmic color scale for 2D contour plots. If True,
applies matplotlib.colors.LogNorm to the colorbar. Only applies to
plots with colorbars. Default is False.
:returns: *None or matplotlib.figure.Figure*
Some individual plots may return figure objects for further customization.
:raises NameError:
If required data for a requested plot is not available (e.g., requesting
'emcee_corner' before running run_emcee()).
:raises AttributeError:
If the model has not been properly initialized or trained.
Notes
-----
Plots are automatically saved to the model's save directory (self.savedir)
with descriptive filenames. The save directory is created if it doesn't exist.
Training diagnostic plots help assess:
- Convergence of active learning process
- Quality of GP hyperparameter optimization
- Efficiency of training sample selection
MCMC diagnostic plots help assess:
- Posterior sampling convergence
- Chain mixing and autocorrelation
- Comparison between different sampling methods
Examples
--------
Generate all GP training plots:
>>> sm.plot(plots=['gp_all'])
Create MCMC comparison plots:
>>> sm.run_emcee()
>>> sm.run_dynesty()
>>> sm.plot(plots=['emcee_corner', 'dynesty_corner', 'mcmc_comparison'])
Generate 2D GP visualization with custom colorbar:
>>> sm.plot(plots=['gp_fit_2D'], cb_rng=[-10, 0], log_scale=True)
Show plots interactively:
>>> sm.plot(plots=['test_mse', 'gp_hyperparameters'], show=True)
"""
# ================================
# GP training plots
# ================================
if "gp_all" in plots:
gp_plots = ["test_mse", "test_scaled_mse", "test_log_mse", "gp_hyperparam", "gp_timing", "gp_train_scatter",
"gp_predictions_1D"]
if self.ndim == 2:
gp_plots.append("gp_fit_2D")
for pl in gp_plots:
plots.append(pl)
# GP mean squared error vs iteration
if "test_mse" in plots:
savename = "gp_mse_vs_iteration.png"
if hasattr(self, "training_results"):
print(f"Plotting the gp mean squared error with {self.ntest} test samples...")
print(f"Saving to {self.savedir}/{savename}")
iarray = np.array(self.training_results["iteration"])
# MSE
return vis.plot_error_vs_iteration(iarray,
self.training_results["training_mse"],
self.training_results["test_mse"],
metric="Mean Squared Error",
log=False,
title=f"{self.kernel_name} surrogate",
savedir=self.savedir,
savename=savename,
show=show)
else:
raise NameError("Must run active_train before plotting test_mse.")
# GP scaled mean squared error vs iteration
if "test_scaled_mse" in plots:
savename = "gp_scaled_mse_vs_iteration.png"
if hasattr(self, "training_results"):
print(f"Plotting the scaled gp mean squared error with {self.ntest} test samples...")
print(f"Saving to {self.savedir}/{savename}")
iarray = np.array(self.training_results["iteration"])
# Scaled MSE
return vis.plot_error_vs_iteration(iarray,
self.training_results["training_scaled_mse"],
self.training_results["test_scaled_mse"],
metric="Mean Squared Error / Variance",
log=False,
title=f"{self.kernel_name} surrogate",
savedir=self.savedir,
savename=savename,
show=show)
else:
raise NameError("Must run active_train before plotting test_scaled_mse.")
# GP mean squared error vs iteration
if "test_log_mse" in plots:
savename = "gp_mse_vs_iteration_log.png"
if hasattr(self, "training_results"):
print(f"Plotting the log gp mean squared error with {self.ntest} test samples...")
print(f"Saving to {self.savedir}/{savename}")
iarray = np.array(self.training_results["iteration"])
# Log MSE
return vis.plot_error_vs_iteration(iarray,
self.training_results["training_mse"],
self.training_results["test_mse"],
metric="Log(Mean Squared Error)",
log=True,
title=f"{self.kernel_name} surrogate",
savedir=self.savedir,
savename=savename,
show=show)
else:
raise NameError("Must run active_train before plotting test_log_mse.")
# GP hyperparameters vs iteration
if "gp_hyperparameters" in plots:
if hasattr(self, "training_results"):
print("Plotting gp hyperparameters...")
return vis.plot_hyperparam_vs_iteration(self, title=f"{self.kernel_name} surrogate", show=show)
else:
raise NameError("Must run active_train before plotting gp_hyperparameters.")
# GP training time vs iteration
if "gp_train_time" in plots:
if hasattr(self, "training_results"):
print("Plotting gp timing...")
return vis.plot_train_time_vs_iteration(self, title=f"{self.kernel_name} surrogate", show=show)
else:
raise NameError("Must run active_train before plotting gp_timing.")
# N-D scatterplots and histograms colored by function value
if "gp_train_corner" in plots:
if hasattr(self, "_theta") and hasattr(self, "_y"):
print("Plotting training sample corner plot...")
return vis.plot_corner_lnp(self, show=show);
else:
raise NameError("Must run init_train and/or active_train before plotting gp_train_corner.")
# N-D scatterplots and histograms
if "gp_train_scatter" in plots:
if hasattr(self, "_theta") and hasattr(self, "_y"):
print("Plotting training sample corner plot...")
return vis.plot_corner_scatter(self, show=show);
else:
raise NameError("Must run init_train and/or active_train before plotting gp_train_corner.")
# GP fit (only for 2D functions)
if "gp_fit_2D" in plots:
if hasattr(self, "_theta") and hasattr(self, "_y"):
print("Plotting gp fit 2D...")
if self.ndim == 2:
return vis.plot_gp_fit_2D(self, ngrid=60, title=f"{self.kernel_name} surrogate", show=show,
vmin=cb_rng[0], vmax=cb_rng[1], log_scale=log_scale)
else:
print("theta must be 2D to use gp_fit_2D!")
else:
raise NameError("Must run init_train and/or active_train before plotting gp_fit_2D.")
# Objective function contour plot
if "obj_fn_2D" in plots:
if hasattr(self, "_theta") and hasattr(self, "_y") and hasattr(self, "gp"):
print("Plotting objective function contours 2D...")
return vis.plot_utility_2D(self, ngrid=60, show=show, vmin=cb_rng[0], vmax=cb_rng[1], log_scale=log_scale)
else:
raise NameError("Must run init_train and init_gp before plotting obj_fn_2D.")
if "true_fn_2D" in plots:
if self.ndim == 2:
print("Plotting true function contours 2D...")
return vis.plot_true_fit_2D(self, ngrid=60, show=show, vmin=cb_rng[0], vmax=cb_rng[1], log_scale=log_scale)
else:
raise print("theta must be 2D to use true_fn_2D!")
# 1D GP prediction slices through the surrogate maximum
if "gp_predictions_1D" in plots:
if hasattr(self, "_theta") and hasattr(self, "gp"):
print("Plotting 1D GP predictions...")
theta_max, _ = self.find_max_surrogate()
return vis.plot_gp_predictions_1D(self, theta_max,
savedir=self.savedir, show=show)
else:
raise NameError("Must run init_train and init_gp before plotting gp_predictions_1D.")
# ================================
# emcee plots
# ================================
if "emcee_all" in plots:
emcee_plots = ["emcee_corner", "emcee_walkers"]
for pl in emcee_plots:
plots.append(pl)
# emcee posterior samples
if "emcee_corner" in plots:
if hasattr(self, "emcee_samples"):
print("Plotting emcee posterior...")
return vis.plot_corner(self, self.emcee_samples, sampler="emcee_", show=show);
else:
raise NameError("Must run run_emcee before plotting emcee_corner.")
# emcee walkers
if "emcee_walkers" in plots:
if hasattr(self, "emcee_samples"):
print("Plotting emcee walkers...")
return vis.plot_emcee_walkers(self, show=show)
else:
raise NameError("Must run run_emcee before plotting emcee_walkers.")
# ================================
# dynesty plots
# ================================
if "dynesty_all" in plots:
dynesty_plots = ["dynesty_corner", "dynesty_corner_kde",
"dynesty_traceplot", "dynesty_runplot"]
for pl in dynesty_plots:
plots.append(pl)
# dynesty posterior samples
if "dynesty_corner" in plots:
if hasattr(self, "dynesty_samples"):
print("Plotting dynesty posterior...")
return vis.plot_corner(self, self.dynesty_samples, sampler="dynesty_", show=show);
else:
raise NameError("Must run run_dynesty before plotting dynesty_corner.")
if "dynesty_corner_kde" in plots:
if hasattr(self, "dynesty_samples"):
print("Plotting dynesty posterior kde...")
return vis.plot_corner_kde(self, show=show)
else:
raise NameError("Must run run_dynesty before plotting dynesty_corner.")
if "dynesty_traceplot" in plots:
if hasattr(self, "dynesty_samples"):
print("Plotting dynesty traceplot...")
return vis.plot_dynesty_traceplot(self, show=show)
else:
raise NameError("Must run run_dynesty before plotting dynesty_traceplot.")
if "dynesty_runplot" in plots:
if hasattr(self, "dynesty_samples"):
print("Plotting dynesty runplot...")
return vis.plot_dynesty_runplot(self, show=show)
else:
raise NameError("Must run run_dynesty before plotting dynesty_runplot.")
# ================================
# MCMC comparison plots
# ================================
if "mcmc_comparison" in plots:
if hasattr(self, "emcee_samples") and hasattr(self, "dynesty_samples"):
print("Plotting emcee vs dynesty posterior comparison...")
return vis.plot_emcee_dynesty_comparison(self, show=show)
else:
raise NameError("Must run run_emcee and run_dynesty before plotting emcee_comparison.")
def _run_chain_worker(self, chain_id, niter=5, algorithm="bape", gp_opt_freq=1,
obj_opt_method="lbfgsb", nopt=10,
use_grad_opt=True, optimizer_kwargs=None, show_progress=True,
allow_opt_multiproc=False):
"""
Worker function to run a single active learning chain.
This function is designed to be pickled for multiprocessing.
:param chain_id: (*int*)
Identifier for this chain
Other parameters same as active_train()
:returns: *tuple or None*
(new_theta, new_y, training_results) if successful, None if failed
"""
try:
# Create a copy of the current model for this chain
chain = self._create_chain_copy(chain_id=chain_id)
# Store initial state
initial_theta = chain._theta.copy()
initial_y = chain._y.copy()
# Run active learning on this chain (explicitly disable multiprocessing)
chain.active_train(niter=niter, algorithm=algorithm, gp_opt_freq=gp_opt_freq,
save_progress=False, obj_opt_method=obj_opt_method,
nopt=nopt, use_grad_opt=use_grad_opt,
optimizer_kwargs=optimizer_kwargs, show_progress=show_progress,
allow_opt_multiproc=allow_opt_multiproc)
# Extract only the new samples (excluding initial training data)
initial_len = len(initial_theta)
new_theta = chain._theta[initial_len:]
new_y = chain._y[initial_len:]
return new_theta, new_y, chain.training_results
except Exception as e:
print(f"Chain {chain_id} failed with error: {e}")
return None
def _create_chain_copy(self, chain_id):
"""Create a copy of the current SurrogateModel for a parallel chain."""
import copy
# Create a deep copy of the current model
chain = copy.deepcopy(self)
# Modify savedir to avoid conflicts
chain.savedir = f"{self.savedir}/chain_{chain_id}"
if not os.path.exists(chain.savedir):
os.makedirs(chain.savedir)
# Reset training results for this chain
chain.training_results = {"iteration" : [],
"gp_hyperparameters" : [],
"training_mse" : [],
"test_mse" : [],
"training_scaled_mse" : [],
"test_scaled_mse" : [],
"gp_kl_divergence" : [],
"gp_train_time" : [],
"obj_fn_opt_time" : [],
"gp_hyperparameter_opt_iteration" : [],
"gp_hyperparam_opt_time" : []}
# Ensure chain copies use single-threaded operations
chain.ncore = 1 # Force single-core for all operations in chain copies
if hasattr(chain, 'opt_gp_kwargs'):
chain.opt_gp_kwargs = chain.opt_gp_kwargs.copy()
chain.opt_gp_kwargs["multi_proc"] = False
# Try to control threading at the library level
try:
# Set NumPy to single-threaded if possible
import numpy as np
if hasattr(np, 'random') and hasattr(np.random, 'seed'):
# Some NumPy operations respect thread limits better after reseeding
np.random.seed(chain_id + 42) # Different seed per chain
except:
pass
try:
# Control scikit-learn backend
import sklearn
if hasattr(sklearn, 'get_config'):
current_config = sklearn.get_config()
sklearn.set_config(assume_finite=True, working_memory=128)
except:
pass
return chain
def _combine_chain_results(self, all_new_theta, all_new_y, chain_results):
"""Combine results from all chains into the main model."""
# Concatenate all new training samples
if len(all_new_theta) > 0 and any(len(theta) > 0 for theta in all_new_theta):
combined_new_theta = np.vstack([theta for theta in all_new_theta if len(theta) > 0])
combined_new_y = np.hstack([y for y in all_new_y if len(y) > 0])
# CRITICAL: Remove near-duplicate points to prevent numerical instability
# Check for points that are very close to each other (within 1e-6)
tolerance = 1e-6
keep_indices = []
for i, point in enumerate(combined_new_theta):
is_duplicate = False
# Check against previously kept points
for j in keep_indices:
if np.allclose(point, combined_new_theta[j], atol=tolerance):
is_duplicate = True
break
# Also check against existing training data
if not is_duplicate:
for existing_point in self._theta:
if np.allclose(point, existing_point, atol=tolerance):
is_duplicate = True
break
if not is_duplicate:
keep_indices.append(i)
if len(keep_indices) < len(combined_new_theta):
print(f"Warning: Removed {len(combined_new_theta) - len(keep_indices)} near-duplicate points from parallel chains")
combined_new_theta = combined_new_theta[keep_indices]
combined_new_y = combined_new_y[keep_indices]
# Add to main model (only if we have points left)
if len(combined_new_theta) > 0:
self._theta = np.vstack([self._theta, combined_new_theta])
self._y = np.hstack([self._y, combined_new_y])
# Update counters
self.ntrain = len(self._theta)
self.nactive = self.ntrain - self.ninit_train
# Refit GP with all combined data while preserving hyperparameters
# Store current hyperparameters before refitting
if hasattr(self, 'gp') and self.gp is not None:
try:
# Refit GP with combined data using current hyperparameters
self.gp, _ = self._fit_gp(_theta=self._theta, _y=self._y, hyperparameters=self.gp.get_parameter_vector())
# Check for numerical stability by computing a test prediction
try:
test_pred = self.gp.predict(self._y, self._theta[:5], return_var=False, return_cov=False)
if np.any(np.isnan(test_pred)) or np.any(np.isinf(test_pred)):
raise ValueError("GP predictions contain NaN or Inf")
except:
print("Warning: GP numerically unstable after hyperparameter restoration, re-optimizing...")
# If unstable, re-optimize hyperparameters
self.gp, _ = self._opt_gp(**self.opt_gp_kwargs)
except Exception as e:
print(f"Warning: Hyperparameter preservation failed ({e}), falling back to standard fitting")
# If hyperparameter preservation fails, fall back to standard fitting
self.gp, _ = self._fit_gp(_theta=self._theta, _y=self._y)
else:
# No existing GP, use standard fitting
self.gp, _ = self._fit_gp(_theta=self._theta, _y=self._y)
else:
print("Warning: No new points added after duplicate removal")
# Combine training results from all chains
if chain_results:
self._merge_training_results(chain_results)
# Ensure training results have at least one entry to avoid index errors
if not self.training_results.get("test_mse"):
self.training_results["test_mse"] = [np.nan]
if not self.training_results.get("training_mse"):
self.training_results["training_mse"] = [np.nan]
if not self.training_results.get("training_scaled_mse"):
self.training_results["training_scaled_mse"] = [np.nan]
def _merge_training_results(self, chain_results):
"""Merge training results from multiple chains."""
# Get the starting iteration number
if len(self.training_results["iteration"]) == 0:
start_iter = 0
else:
start_iter = max(self.training_results["iteration"]) + 1
# Merge results from all chains
for chain_result in chain_results:
if not chain_result: # Skip empty results
continue
for key in self.training_results.keys():
if key in chain_result:
if key == "iteration":
# Adjust iteration numbers to be sequential
adjusted_iters = [iter_num + start_iter for iter_num in chain_result[key]]
self.training_results[key].extend(adjusted_iters)
start_iter = max(adjusted_iters) + 1 if adjusted_iters else start_iter
else:
self.training_results[key].extend(chain_result[key])
[docs]
def get_chain_diversity_metrics(self):
"""
Calculate diversity metrics for the combined training samples.
Useful for assessing the effectiveness of parallel chains.
"""
if not hasattr(self, '_theta') or len(self._theta) <= self.ninit_train:
return {}
# Get only the active learning samples (exclude initial training)
active_theta = self._theta[self.ninit_train:]
active_y = self._y[self.ninit_train:]
# Calculate diversity metrics
metrics = {}
# Parameter space coverage
for i in range(self.ndim):
param_range = active_theta[:, i].max() - active_theta[:, i].min()
bound_range = self._bounds[i][1] - self._bounds[i][0]
metrics[f'param_{i}_coverage'] = param_range / bound_range
# Function value diversity
if len(active_y) > 1:
metrics['function_value_std'] = np.std(active_y)
metrics['function_value_range'] = active_y.max() - active_y.min()
# Average pairwise distance in parameter space
if len(active_theta) > 1:
from scipy.spatial.distance import pdist
distances = pdist(active_theta)
metrics['avg_pairwise_distance'] = np.mean(distances)
metrics['min_pairwise_distance'] = np.min(distances)
return metrics
def _fix_pymultinest_output_format(self, outputfiles_basename):
"""
Fix malformed scientific notation in PyMultiNest output files.
PyMultiNest sometimes writes very small numbers in malformed scientific
notation (e.g., '1.23-100' instead of '1.23E-100'), which causes numpy
to fail when loading the files. This method fixes such formatting issues.
:param outputfiles_basename: (*str*)
Base name of PyMultiNest output files to fix
"""
import re
import os
# Common PyMultiNest output file extensions that may contain malformed numbers
suffixes = [
'post_equal_weights.dat', # Equal weighted posterior samples
'phys_live.points', # Live points
'post_separate.dat', # Separate mode samples
'stats.dat', # Statistics file
'live.points', # Live points file
'ev.dat', # Evidence file
'summary.txt', # Summary file
'.txt', # Generic text output
'points.dat', # Points file
'posterior_samples.dat', # Alternative posterior samples name
]
for suffix in suffixes:
filepath = f"{outputfiles_basename}{suffix}"
if os.path.exists(filepath):
try:
# Read the file
with open(filepath, 'r') as f:
content = f.read()
# Fix malformed scientific notation using regex
# Pattern matches various forms of malformed scientific notation:
# 1. "1.234567890123456789-123" -> "1.234567890123456789E-123"
# 2. "-0.139060048608094152-308" -> "-0.139060048608094152E-308"
# 3. "1.23+45" -> "1.23E+45" (if exponent is large)
# More comprehensive pattern for malformed scientific notation
pattern = r'(-?\d+\.?\d*)([+-]\d+)(?=\s|$|,|\t)'
# Function to fix scientific notation
def fix_scientific(match):
number = match.group(1)
exponent = match.group(2)
# Only fix if this looks like malformed scientific notation
# (exponent magnitude suggests it's not just arithmetic)
exp_value = int(exponent)
if abs(exp_value) > 10: # Likely scientific notation
return f"{number}E{exponent}"
else:
return match.group(0) # Leave unchanged
# Apply the fix
fixed_content = re.sub(pattern, fix_scientific, content)
# Write back only if changes were made
if fixed_content != content:
with open(filepath, 'w') as f:
f.write(fixed_content)
except Exception as e:
if getattr(self, 'verbose', False):
print(f"Warning: Could not fix formatting in {filepath}: {e}")
def _get_pickleable_state(self):
"""
Get a pickleable representation of the model state for multiprocessing.
:returns: *dict*
Dictionary containing all necessary state information including GP hyperparameters
"""
import copy
# Create a simplified state dictionary
state = {
'theta': self._theta.copy(),
'y': self._y.copy(),
'bounds': copy.deepcopy(self._bounds),
'ndim': self.ndim,
'ninit_train': self.ninit_train,
'ntrain': self.ntrain,
'function': self.true_log_likelihood, # Use the correct function attribute
'verbose': self.verbose,
'ncore': self.ncore
}
# Include GP hyperparameters if GP exists and is trained
if hasattr(self, 'gp') and self.gp is not None:
try:
state['gp_hyperparameters'] = self.gp.get_parameter_vector()
# Also include GP initialization settings for consistency
state['gp_kernel'] = getattr(self, '_gp_kernel', 'ExpSquaredKernel')
state['gp_fit_amp'] = getattr(self, '_gp_fit_amp', True)
state['gp_fit_mean'] = getattr(self, '_gp_fit_mean', True)
state['gp_fit_white_noise'] = getattr(self, '_gp_fit_white_noise', True)
state['gp_white_noise'] = getattr(self, '_gp_white_noise', -12)
except:
# If we can't get hyperparameters, workers will initialize fresh GP
state['gp_hyperparameters'] = None
else:
state['gp_hyperparameters'] = None
return state
def _run_chain_worker_mp(args):
"""
Multiprocessing worker function to run a single active learning chain.
This function is designed to work with multiprocessing.Pool and must be
defined at module level to be pickleable.
:param args: (*tuple*)
Tuple containing (chain_state, niter, algorithm, gp_opt_freq,
obj_opt_method, nopt, use_grad_opt, optimizer_kwargs)
:returns: *tuple or None*
(new_theta, new_y, training_results) if successful, None if failed
"""
import os
import numpy as np
import copy
try:
# Unpack arguments
(chain_state, niter, algorithm, gp_opt_freq, obj_opt_method,
nopt, use_grad_opt, optimizer_kwargs) = args
chain_id = chain_state['chain_id']
# Reconstruct the model from the pickled state
from alabi import SurrogateModel
# Create a new model instance with the saved state
surrogate = SurrogateModel(
lnlike_fn=chain_state['function'],
bounds=chain_state['bounds'],
ncore=1, # Force single-core for this process
verbose=False # Disable verbose to avoid cluttering output
)
# Restore the training data
surrogate._theta = chain_state['theta'].copy()
surrogate._y = chain_state['y'].copy()
surrogate.ndim = chain_state['ndim']
surrogate.ninit_train = chain_state['ninit_train']
surrogate.ntrain = chain_state['ntrain']
# Set up the save directory for this chain
surrogate.savedir = chain_state['savedir']
if not os.path.exists(surrogate.savedir):
os.makedirs(surrogate.savedir)
# CRITICAL: Set unique random seed for this chain to prevent identical point generation
import time
# Use chain_id and current time to ensure unique randomization per chain
chain_random_seed = int((time.time() * 1000000) % (2**31)) + chain_id * 1000
np.random.seed(chain_random_seed)
# Initialize GP for this chain using preserved hyperparameters
use_preserved_hyperparams = (chain_state['gp_hyperparameters'] is not None)
if use_preserved_hyperparams:
# Initialize GP with same settings as parent process
surrogate.init_gp(
kernel=chain_state.get('gp_kernel', 'ExpSquaredKernel'),
fit_amp=chain_state.get('gp_fit_amp', True),
fit_mean=chain_state.get('gp_fit_mean', True),
fit_white_noise=chain_state.get('gp_fit_white_noise', True),
white_noise=chain_state.get('gp_white_noise', -12)
)
# Set the optimized hyperparameters from parent process
surrogate.gp.set_parameter_vector(chain_state['gp_hyperparameters'])
# CRITICAL: Compute the GP with the restored hyperparameters
surrogate.gp.compute(surrogate._theta)
# Add small random perturbation to hyperparameters to prevent identical chains
# This helps avoid numerical issues while preserving the optimization quality
current_hyperparams = surrogate.gp.get_parameter_vector()
# Add 1% random noise to hyperparameters
perturbation = np.random.normal(0, 0.01 * np.abs(current_hyperparams))
perturbed_hyperparams = current_hyperparams + perturbation
surrogate.gp.set_parameter_vector(perturbed_hyperparams)
surrogate.gp.compute(surrogate._theta)
# IMPROVED: Allow limited GP re-optimization for accuracy
# Increase gp_opt_freq to reduce re-optimization but don't disable it completely
# This balances preserved hyperparameters with GP accuracy as data grows
effective_gp_opt_freq = max(gp_opt_freq * 3, 50) # At least 3x original freq, min 50
else:
# Fallback to fresh initialization if hyperparameters not available
surrogate.init_gp()
effective_gp_opt_freq = gp_opt_freq
# Store initial state for comparison
initial_theta = surrogate._theta.copy()
initial_y = surrogate._y.copy()
# Run active learning on this chain (force single-core execution)
surrogate.active_train(
niter=niter,
algorithm=algorithm,
gp_opt_freq=effective_gp_opt_freq,
save_progress=False, # Don't save intermediate progress in parallel chains
obj_opt_method=obj_opt_method,
nopt=nopt,
use_grad_opt=use_grad_opt,
optimizer_kwargs=optimizer_kwargs,
show_progress=False, # Don't show progress bars in parallel
allow_opt_multiproc=False, # Critical: disable nested multiprocessing
)
# Extract only the new samples (excluding initial training data)
initial_len = len(initial_theta)
new_theta = surrogate._theta[initial_len:]
new_y = surrogate._y[initial_len:]
return new_theta, new_y, surrogate.training_results
except Exception as e:
print(f"Multiprocessing chain {chain_state.get('chain_id', '?')} failed with error: {e}")
import traceback
traceback.print_exc()
return None