"""
:py:mod:`gp_utils.py`
-------------------------------------------------
Gaussian process utility functions for initializing GPs and optimizing their
hyperparameters.
"""
import alabi
import numpy as np
import george
from scipy.optimize import minimize
from sklearn.model_selection import KFold
from sklearn.metrics import mean_squared_error
from functools import partial
import copy
import warnings
from tqdm import tqdm
from scipy.stats import lognorm
__all__ = ["configure_gp",
"optimize_gp",
"optimize_gp_kfold_cv",
"regularization_term",
"regularization_gradient"]
[docs]
def regularization_term(hparams, lengthscale_indices, amp_0=1.0, mu_0=1.0, sigma_0=2.0):
"""
Compute the regularization term (negative log prior) from Hvafner 2024 Equation 4.
This implements the dimensionality-scaled LogNormal prior:
p(ℓ_i | d) = LogNormal(μ_0 + log(√d), σ_0)
The regularization term is -log p(ℓ | d) = -Σ log p(ℓ_i | d)
Parameters:
-----------
mu_0 : float, default=0.0
The base location parameter for the 1D LogNormal prior
sigma_0 : float, default=1.0
The scale parameter for the LogNormal prior
Returns:
--------
float
The regularization term (negative log prior)
"""
ndim = len(hparams)
log_lengthscales = hparams[lengthscale_indices]
# Scaled location parameter: μ = μ_0 + log(√d)
mu = mu_0 + 0.5 * np.log(ndim)
# Compute negative log likelihood for each lengthscale
# For LogNormal(μ, σ), if X ~ LogNormal(μ, σ), then log(X) ~ Normal(μ, σ)
# log p(ℓ_i) = -log(ℓ_i) - log(σ√(2π)) - (log(ℓ_i) - μ)²/(2σ²)
# Negative log prior for each dimension
neg_log_prior = (
log_lengthscales + # log(ℓ_i) term from the Jacobian
0.5 * np.log(2 * np.pi * sigma_0**2) + # normalization constant
(log_lengthscales - mu)**2 / (2 * sigma_0**2) # quadratic term
)
# Sum over all dimensions
reg = amp_0 * np.sum(neg_log_prior)
return reg
[docs]
def regularization_gradient(hparams, lengthscale_indices, amp_0=1.0, mu_0=1.0, sigma_0=2.0):
"""
Compute the gradient of the regularization term with respect to lengthscales.
Parameters:
-----------
lengthscales : array-like, shape (d,)
The lengthscale parameters for each dimension
d : int
The dimensionality of the problem
mu_0 : float, default=0.0
The base location parameter for the 1D LogNormal prior
sigma_0 : float, default=1.0
The scale parameter for the LogNormal prior
Returns:
--------
ndarray, shape (d,)
The gradient of the regularization term with respect to each lengthscale
"""
ndim = len(hparams)
gradient_vector = np.zeros_like(hparams)
log_lengthscales = hparams[lengthscale_indices]
lengthscales = np.exp(log_lengthscales)
# Scaled location parameter
mu = mu_0 + 0.5 * np.log(ndim)
# d/dℓ_i [-log p(ℓ_i)] = d/dℓ_i [log(ℓ_i) + C + (log(ℓ_i) - μ)²/(2σ²)]
# = 1/ℓ_i + (log(ℓ_i) - μ)/(σ² ℓ_i)
# = [1 + (log(ℓ_i) - μ)/σ²] / ℓ_i
gradient_vector[lengthscale_indices] = (1.0 + (log_lengthscales - mu) / sigma_0**2) / lengthscales
return amp_0 * gradient_vector
def _nll(p, gp, y, gp_hyper_prior):
"""
Given parameters and data, compute the negative log likelihood of the data
under the george Gaussian process.
Parameters
----------
p : array
GP hyperparameters
gp : george.GP
y : array
data to condition GP on
Returns
-------
nll : float
negative log-likelihood of y under gp
"""
if not np.isfinite(gp_hyper_prior(p)):
return np.inf
# Catch singular matrices
try:
gp.set_parameter_vector(p)
except np.linalg.LinAlgError:
return np.inf
ll = gp.log_likelihood(y, quiet=True)
return -ll if np.isfinite(ll) else np.inf
def _grad_nll(p, gp, y):
"""
Given parameters and data, compute the gradient of the negative log
likelihood of the data under the george Gaussian process.
Parameters
----------
p : array
GP hyperparameters
gp : george.GP
y : array
data to condition GP on
Returns
-------
gnll : array
gradient of the negative log-likelihood of y under gp
"""
gp.set_parameter_vector(p)
# Negative gradient of log likelihood
grad_nll = -gp.grad_log_likelihood(y, quiet=True)
return grad_nll
[docs]
def optimize_gp(gp, _theta, _y, gp_hyper_prior, p0, bounds=None,
method="l-bfgs-b", optimizer_kwargs=None,
regularize=True, amp_0=1.0, mu_0=1.0, sigma_0=2.0, lengthscale_indices=None):
"""
Optimize Gaussian Process hyperparameters by maximizing marginal likelihood.
Performs hyperparameter optimization for a Gaussian Process using scipy's
minimize function. Supports multiple optimization restarts and automatically
selects the result with highest marginal likelihood.
:param gp: (*george.GP*)
Configured Gaussian Process object. Should be computed with training data.
:param _theta: (*array-like, shape (n_samples, n_features)*)
Training input locations (parameters). Will be squeezed if 1D.
:param _y: (*array-like, shape (n_samples,)*)
Training target values (function evaluations). Will be squeezed if 1D.
:param gp_hyper_prior: (*callable*)
Prior function for hyperparameters. Should return log-probability density
for given hyperparameter vector. Used to constrain optimization.
:param p0: (*array-like, shape (n_restarts, n_hyperparams) or (n_hyperparams,)*)
Initial guesses for hyperparameter optimization. If 2D, performs multiple
restarts with different initializations.
:param bounds: (*list of tuples, optional, default=None*)
Bounds for hyperparameter optimization as [(min, max), ...]. Only used
for methods that support bounds (e.g., 'l-bfgs-b').
:param method: (*str, optional, default="l-bfgs-b"*)
Scipy optimization method. Supported methods:
- 'l-bfgs-b': L-BFGS-B with bounds support (default)
- 'newton-cg': Newton-CG with gradients
- 'bfgs': BFGS (no bounds support)
- 'powell': Powell method (derivative-free)
:param optimizer_kwargs: (*dict, optional, default=None*)
Additional keyword arguments passed to scipy.optimize.minimize.
If None, uses method-specific defaults optimized for GP optimization.
:param max_iter: (*int, optional, default=50*)
Maximum number of iterations for optimization. Used as default in
optimizer_kwargs if not specified.
:param regularize: (*bool, optional, default=False*)
Whether to apply Hvarfner dimensionality-scaled regularization (Equation 4
from "Vanilla Bayesian Optimization Performs Great in High Dimensions").
When True, lengthscale priors are scaled as LogNormal(μ_0 + log(√d), σ_0).
:param mu_0: (*float, optional, default=0.0*)
Base location parameter for Hvarfner regularization. Only used if regularize=True.
:param sigma_0: (*float, optional, default=1.0*)
Scale parameter for Hvarfner regularization. Only used if regularize=True.
:param lengthscale_indices: (*list of int, optional, default=None*)
Indices in the parameter vector corresponding to lengthscale parameters.
Only used if regularize=True. If None, attempts to infer from kernel.
:returns: *george.GP or None*
GP object with optimized hyperparameters, or None if optimization failed.
The returned GP is recomputed with optimal hyperparameters.
"""
warnings.filterwarnings("ignore", category=RuntimeWarning)
# Collapse arrays if 1D
_theta = _theta.squeeze()
_y = _y.squeeze()
# Get dimensionality for regularization
ndim = _theta.shape[-1] if _theta.ndim > 1 else 1
# initial hyperparameters
init_hp = gp.get_parameter_vector()
nhparam = init_hp.shape[0]
# Infer lengthscale indices if regularization is enabled
if regularize and lengthscale_indices is None:
param_names = gp.kernel.get_parameter_names()
lengthscale_indices = []
other_indices = []
for ii, name in enumerate(param_names):
# Common patterns for lengthscale parameters in george kernels
if any(pattern in name.lower() for pattern in ['metric:log_m']):
lengthscale_indices.append(ii)
else:
other_indices.append(ii)
if len(lengthscale_indices) == 0:
warnings.warn("Could not infer lengthscale indices from kernel. "
"Regularization gradient will not be applied. Please specify "
"lengthscale_indices explicitly.")
valid_methods = ["newton-cg", "bfgs", "l-bfgs-b", "powell", "nelder-mead"]
if method not in valid_methods:
print(f"Warning: {method} not in valid methods {valid_methods}. Using 'l-bfgs-b' optimizer instead.")
method = "l-bfgs-b"
# methods without bounds arg
if method in ["bfgs"]:
bounds = None
# configure objective function
if regularize:
obj_fn = lambda p: _nll(p, gp, _y, gp_hyper_prior) + regularization_term(p, lengthscale_indices, amp_0=amp_0, mu_0=mu_0, sigma_0=sigma_0)
else:
obj_fn = lambda p: _nll(p, gp, _y, gp_hyper_prior)
# configure gradient function
if method in ["newton-cg", "l-bfgs-b"]:
if regularize:
# Use custom gradient function that includes regularization
jac = lambda p: _grad_nll(p, gp, _y) + regularization_gradient(p, lengthscale_indices, amp_0=amp_0, mu_0=mu_0, sigma_0=sigma_0)
else:
jac = lambda p: _grad_nll(p, gp, _y)
else:
jac = None
# # Set improved default optimizer_kwargs for faster convergence
# if optimizer_kwargs is None:
# default_optimizer_kwargs = {
# 'newton-cg': {'maxiter': 200},
# 'bfgs': {'maxiter': 200},
# 'l-bfgs-b': {'maxiter': 200, 'factr': 1e12},
# 'powell': {'maxiter': 200},
# 'nelder-mead': {'maxiter': 200},
# }
# optimizer_kwargs = default_optimizer_kwargs.get(method.lower(), {})
nopt = p0.shape[0] if p0.ndim > 1 else 1
if nopt > 1:
# Run the optimization routine nopt times
res = []
mll = []
for ii, x0 in enumerate(p0):
try:
result = minimize(obj_fn, x0, method=method, jac=jac, bounds=bounds, options=optimizer_kwargs)
# print("opt iterations:", result.nit, result.success)
if np.isfinite(gp_hyper_prior(result.x)):
# Compute marginal log likelihood for this set of kernel hyperparameters
test_gp = copy.copy(gp)
test_gp.set_parameter_vector(result.x)
test_gp.recompute()
current_mll = test_gp.log_likelihood(_y, quiet=True)
res.append(result.x)
mll.append(current_mll)
else:
warning_msg = f"\nWarning: GP hyperparameter optimization restart {ii} failed. Solution failed prior bounds.\n"
res.append(init_hp)
mll.append(-np.inf)
except Exception as e:
warning_msg = f"\nWarning: GP hyperparameter optimization restart {ii} failed with error: {e}\n"
res.append(init_hp)
mll.append(-np.inf)
# Pick result with largest marginal log likelihood
if len(mll) > 0 and max(mll) > -np.inf:
ind = np.argmax(mll)
try:
gp.set_parameter_vector(res[ind])
gp.recompute()
except:
warning_msg = "\nWarning: Failed to set best hyperparameters. Using initial values.\n"
gp.set_parameter_vector(init_hp)
gp.recompute()
else:
warning_msg = "\nWarning: All hyperparameter optimizations failed. Using initial values.\n"
gp.set_parameter_vector(init_hp)
else:
# Single optimization run
try:
result = minimize(obj_fn, p0, method=method, jac=jac, bounds=bounds, options=optimizer_kwargs)
if result.success and np.isfinite(gp_hyper_prior(result.x)):
gp.set_parameter_vector(result.x)
gp.recompute()
else:
warning_msg = "\nWarning: GP hyperparameter optimization failed. Using initial values.\n"
gp.set_parameter_vector(init_hp)
gp.recompute()
except Exception as e:
warning_msg = f"\nWarning: GP hyperparameter optimization failed with error: {e}. Using initial values.\n"
gp.set_parameter_vector(init_hp)
gp.recompute()
warning_msg = warning_msg if 'warning_msg' in locals() else None
return gp, warning_msg
def weighted_mse_by_probability(y_true, y_pred, weight_method='exponential', temperature=1.0):
"""
Compute weighted MSE where higher probability values get larger weights.
Parameters:
-----------
y_true : array-like
True log likelihood values (more negative = lower probability)
y_pred : array-like
Predicted log likelihood values
weight_method : str
Method for computing weights from log likelihood:
- 'exponential': w = exp(y_true / temperature)
- 'linear': w = y_true - min(y_true) + epsilon
- 'softmax': w = softmax(y_true / temperature)
- 'rank': w based on rank order of y_true
temperature : float
Temperature parameter for exponential/softmax weighting
Returns:
--------
weighted_mse : float
Weighted mean squared error
weights : array
The weights used for each point
"""
y_true = np.asarray(y_true)
y_pred = np.asarray(y_pred)
if weight_method == 'exponential':
# Higher log likelihood = higher probability = larger weight
weights = np.exp(y_true / temperature)
elif weight_method == 'linear':
# Shift to make all weights positive, higher y_true gets higher weight
epsilon = 1e-6
weights = y_true - np.min(y_true) + epsilon
elif weight_method == 'softmax':
# Softmax weighting - probabilistic approach
weights = np.exp(y_true / temperature)
weights = weights / np.sum(weights) * len(weights) # Normalize to maintain scale
elif weight_method == 'rank':
# Rank-based weighting - highest log likelihood gets highest weight
ranks = np.argsort(np.argsort(y_true)) # Get ranks (0 to n-1)
weights = ranks + 1 # Make weights 1 to n
else:
raise ValueError(f"Unknown weight_method: {weight_method}")
# Normalize weights to have mean = 1 (maintains MSE scale)
weights = weights / np.mean(weights)
# Compute weighted MSE
mse_values = (y_true - y_pred) ** 2
weighted_mse = np.average(mse_values, weights=weights)
return weighted_mse
def _evaluate_candidate_worker(args):
"""
Worker function for parallel evaluation of a single hyperparameter candidate.
:param args: tuple containing (cand_idx, hyperparams, gp, theta, y,
k_folds, scoring, weighted_mse_method, weighted_mse_factor)
:returns: tuple (cand_idx, fold_scores, success_flag)
"""
import copy
import numpy as np
from sklearn.model_selection import KFold
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
(cand_idx, hyperparams, gp, _theta, _y, y_scaler, k_folds, scoring, weighted_mse_method, weighted_mse_factor) = args
try:
# Check for basic hyperparameter validity
if np.any(np.isnan(hyperparams)) or np.any(np.isinf(hyperparams)):
return (cand_idx, None, "Invalid hyperparameters (NaN/Inf)")
# Set up cross-validation
kf = KFold(n_splits=k_folds, shuffle=True, random_state=None)
fold_scores = []
fold_errors = [] # Track errors from each fold
fold_idx = 0
# Perform k-fold cross validation
for train_idx, val_idx in kf.split(_theta):
try:
# Split data
_theta_train, _theta_val = _theta[train_idx], _theta[val_idx]
_y_train, _y_val = _y[train_idx], _y[val_idx]
# Check training data validity
if np.any(np.isnan(_y_train)) or np.any(np.isinf(_y_train)):
raise ValueError(f"Training targets contain NaN/Inf values")
if np.any(np.isnan(_theta_train)) or np.any(np.isinf(_theta_train)):
raise ValueError(f"Training inputs contain NaN/Inf values")
# Ensure proper shapes for george GP
_theta_train = np.atleast_2d(_theta_train)
_theta_val = np.atleast_2d(_theta_val)
# Handle case where we have single samples
if _theta_train.shape[0] == 1 and _theta_train.shape[1] != _theta.shape[1]:
_theta_train = _theta_train.T
if _theta_val.shape[0] == 1 and _theta_val.shape[1] != _theta.shape[1]:
_theta_val = _theta_val.T
# Create and configure GP for this fold
gp_fold = copy.deepcopy(gp)
# Check for invalid hyperparameters before setting
if np.any(np.isnan(hyperparams)) or np.any(np.isinf(hyperparams)):
raise ValueError(f"Invalid hyperparameters contain NaN/Inf: {hyperparams}")
gp_fold.set_parameter_vector(hyperparams)
gp_fold.compute(_theta_train)
# Check if GP computation was successful
current_hp = gp_fold.get_parameter_vector()
if np.any(np.isnan(current_hp)) or np.any(np.isinf(current_hp)):
raise ValueError(f"GP parameters became invalid after compute: {current_hp}")
# Check GP log-likelihood for numerical stability
try:
log_like = gp_fold.log_likelihood(_y_train)
if np.isnan(log_like) or np.isinf(log_like):
raise ValueError(f"GP log-likelihood is invalid: {log_like} with hyperparams: {hyperparams}")
except Exception as e:
raise ValueError(f"Failed to compute GP log-likelihood: {str(e)} with hyperparams: {hyperparams}")
# Make predictions on validation set
try:
_y_pred = gp_fold.predict(_y_train, _theta_val, return_var=False, return_cov=False)
except Exception as e:
raise ValueError(f"GP prediction failed: {str(e)} with hyperparams: {hyperparams} "
f"train_shape: {_theta_train.shape}, val_shape: {_theta_val.shape}")
# Check for NaN or infinite values
if len(_y_pred) == 0:
raise ValueError(f"GP predictions are empty (no predictions returned)")
elif np.any(np.isnan(_y_pred)) or np.any(np.isinf(_y_pred)):
mean_val = np.nanmean(_y_pred) if len(_y_pred) > 0 else float('nan')
std_val = np.nanstd(_y_pred) if len(_y_pred) > 0 else float('nan')
raise ValueError(f"GP predictions contain NaN or Inf values: _y_pred stats: "
f"mean={mean_val}, std={std_val}, "
f"n_nan={np.sum(np.isnan(_y_pred))}, n_inf={np.sum(np.isinf(_y_pred))}")
if np.any(np.isnan(_y_val)) or np.any(np.isinf(_y_val)):
raise ValueError(f"Validation targets contain NaN or Inf values")
# Compute error using unscaled y values
y_val = y_scaler.inverse_transform(_y_val.reshape(-1, 1)).flatten()
y_pred = y_scaler.inverse_transform(_y_pred.reshape(-1, 1)).flatten()
# Calculate fold score
if scoring == 'mse':
fold_score = mean_squared_error(y_val, y_pred)
elif scoring == 'mae':
fold_score = mean_absolute_error(y_val, y_pred)
elif scoring == 'r2':
fold_score = -r2_score(y_val, y_pred) # Negative because we minimize
elif scoring == 'weighted_mse':
fold_score = weighted_mse_by_probability(y_val, y_pred,
weight_method=weighted_mse_method,
temperature=weighted_mse_factor)
else:
raise ValueError(f"Unsupported scoring method: {scoring}")
fold_scores.append(fold_score)
except Exception as e:
fold_scores.append(np.inf)
fold_errors.append(f"Fold {fold_idx}: {str(e)}")
fold_idx += 1
# If all folds failed, return error message with details
if all(np.isinf(fold_scores)):
error_msg = f"All folds failed. First error: {fold_errors[0] if fold_errors else 'Unknown'}"
return (cand_idx, fold_scores, error_msg)
return (cand_idx, fold_scores, "success")
except Exception as e:
return (cand_idx, None, str(e))
[docs]
def optimize_gp_kfold_cv(gp, _theta, _y, hyperparameter_candidates, y_scaler,
k_folds=5, scoring="mse", pool=None,
stage2_candidates=None, stage2_width=0.5,
stage3_candidates=None, stage3_width=0.2,
weighted_mse_method="exponential", weighted_mse_factor=1.0,
verbose=True):
"""
Optimize Gaussian Process hyperparameters using k-fold cross-validation.
This function evaluates different hyperparameter configurations using k-fold
cross-validation to select the configuration that generalizes best to unseen data.
This approach helps prevent overfitting compared to standard marginal likelihood
maximization.
:param gp: (*george.GP*)
Configured Gaussian Process object template. Will be copied for each CV fold.
:param _theta: (*array-like, shape (n_samples, n_features)*)
Training input locations (parameters). Must have at least k_folds samples.
:param _y: (*array-like, shape (n_samples,)*)
Training target values (function evaluations). Must match _theta length.
:param gp_hyper_prior: (*callable*)
Prior function for hyperparameters. Should return log-probability density
for given hyperparameter vector. Used to constrain search space.
:param hyperparameter_candidates: (*array-like, shape (n_candidates, n_hyperparams)*)
Array of hyperparameter vectors to evaluate via cross-validation.
Each row represents one hyperparameter configuration to test.
:param k_folds: (*int, optional, default=5*)
Number of folds for cross-validation. Must be >= 2 and <= n_samples.
Common choices: 5 or 10 for good bias-variance tradeoff.
:param scoring: (*str, optional, default='mse'*)
Scoring metric for cross-validation. Supported options:
- 'mse': Mean Squared Error (lower is better)
- 'mae': Mean Absolute Error (lower is better)
- 'r2': R-squared coefficient (higher is better)
- 'weighted_mse': Weighted MSE giving higher weight to high-probability regions
:param pool: (*multiprocessing.Pool, optional, default=None*)
Multiprocessing pool for parallel evaluation of hyperparameter candidates.
If None, evaluation runs sequentially. If provided, candidates are
evaluated in parallel using the pool's worker processes.
:param stage2_candidates: (*int, optional, default=None*)
Number of candidates for stage 2 grid search. If None, uses
len(hyperparameter_candidates) // 2.
:param stage2_width: (*float, optional, default=0.5*)
Width factor for stage 2 search around best parameters.
Smaller values = tighter search, larger values = wider search.
:param stage3_candidates: (*int, optional, default=None*)
Number of candidates for stage 3 ultra-fine search. If None, uses
max(stage2_candidates // 2, 3).
:param stage3_width: (*float, optional, default=0.2*)
Width factor for stage 3 search around stage 2 best parameters.
Should be smaller than stage2_width for finer refinement.
:param weighted_mse_method: (*str, optional, default='exponential'*)
Weighting method for weighted_mse scoring. Options:
- 'exponential': w = exp(y_true / temperature)
- 'linear': w = y_true - min(y_true) + epsilon
- 'softmax': w = softmax(y_true / temperature)
- 'rank': w based on rank order of y_true
:param weighted_mse_factor: (*float, optional, default=1.0*)
Temperature parameter for exponential/softmax weighting.
Lower values emphasize high-probability regions more strongly.
:returns:
- **best_gp** (*george.GP*) -- GP with optimal hyperparameters set
- **best_hyperparams** (*array*) -- Best hyperparameter vector
- **cv_results** (*dict*) -- Detailed CV results with scores and statistics
:raises:
- **ValueError** -- If insufficient data, invalid parameters, or no valid hyperparameters
- **RuntimeError** -- If all hyperparameter evaluations fail
"""
if stage2_candidates is not None:
two_stage = True
else:
two_stage = False
if stage3_candidates is not None:
three_stage = True
else:
three_stage = False
# Input validation
_theta = np.asarray(_theta)
_y = np.asarray(_y)
hyperparameter_candidates = np.asarray(hyperparameter_candidates)
if _theta.ndim == 1:
_theta = _theta.reshape(-1, 1)
if _y.ndim != 1:
_y = _y.squeeze()
n_samples = len(_theta)
if len(_y) != n_samples:
raise ValueError(f"_theta and _y must have same length, got {len(_theta)} and {len(_y)}")
if n_samples < k_folds:
raise ValueError(f"Number of samples ({n_samples}) must be >= k_folds ({k_folds})")
if k_folds < 2:
raise ValueError(f"k_folds must be >= 2, got {k_folds}")
if hyperparameter_candidates.ndim == 1:
hyperparameter_candidates = hyperparameter_candidates.reshape(1, -1)
n_candidates, n_hyperparams = hyperparameter_candidates.shape
# Set up cross-validation
kf = KFold(n_splits=k_folds, shuffle=True, random_state=None)
# Storage for results
cv_scores = np.zeros((n_candidates, k_folds))
cv_scores.fill(np.inf) # Initialize with worst possible score
# Two-stage optimization: explore then exploit
if two_stage and verbose:
print("=== STAGE 1: EXPLORATION (Random Search) ===")
print(f"Evaluating {n_candidates} hyperparameter candidates using {k_folds}-fold CV...")
if pool is not None:
# Parallel evaluation using multiprocessing pool
print(f"Using multiprocessing pool with {pool._processes} processes")
# Prepare arguments for all candidates
worker_args = []
for cand_idx, hyperparams in enumerate(hyperparameter_candidates):
args = (cand_idx, hyperparams, gp, _theta, _y, y_scaler,
k_folds, scoring, weighted_mse_method, weighted_mse_factor)
worker_args.append(args)
# Evaluate all candidates in parallel with progress bar
failed_candidate_errors = [] # Track errors for diagnostics
with tqdm(total=len(worker_args), desc="Evaluating candidates", unit="candidate") as pbar:
# Use pool.imap for progress tracking
results = []
for result in pool.imap(_evaluate_candidate_worker, worker_args):
results.append(result)
pbar.update(1)
# Process results
for cand_idx, fold_scores, status in results:
if status == "success" and fold_scores is not None:
# Store fold scores
for fold_idx, score in enumerate(fold_scores):
cv_scores[cand_idx, fold_idx] = score
# Calculate and print statistics
valid_scores = [s for s in fold_scores if np.isfinite(s)]
if len(valid_scores) > 0:
mean_score = np.mean(valid_scores)
std_score = np.std(valid_scores)
if verbose:
print(f" Candidate {cand_idx+1}/{n_candidates}: {scoring}={mean_score:.4f}±{std_score:.4f} "
f"({len(valid_scores)}/{k_folds} folds successful)")
else:
if verbose:
print(f" Candidate {cand_idx+1}/{n_candidates}: All folds failed")
failed_candidate_errors.append((cand_idx, hyperparameter_candidates[cand_idx], "All folds failed"))
else:
# Candidate failed completely
if verbose:
print(f" Candidate {cand_idx+1}/{n_candidates}: {status}")
if "hyperparams" in status.lower() or "prior" in status.lower():
print(f" Hyperparams: {hyperparameter_candidates[cand_idx]}")
failed_candidate_errors.append((cand_idx, hyperparameter_candidates[cand_idx], status))
else:
# Sequential evaluation using the worker function
failed_candidate_errors = [] # Track errors for diagnostics
for cand_idx, hyperparams in enumerate(hyperparameter_candidates):
args = (cand_idx, hyperparams, gp, _theta, _y, y_scaler, k_folds, scoring, weighted_mse_method, weighted_mse_factor)
results = _evaluate_candidate_worker(args)
cand_idx_result, fold_scores, status = results
if status == "success" and fold_scores is not None:
# Store fold scores
for fold_idx, score in enumerate(fold_scores):
cv_scores[cand_idx, fold_idx] = score
# Calculate and print statistics
valid_scores = [s for s in fold_scores if np.isfinite(s)]
if len(valid_scores) > 0:
mean_score = np.mean(valid_scores)
std_score = np.std(valid_scores)
if verbose:
print(f" Candidate {cand_idx+1}/{n_candidates}: {scoring}={mean_score:.4f}±{std_score:.4f} "
f"({len(valid_scores)}/{k_folds} folds successful)")
else:
if verbose:
print(f" Candidate {cand_idx+1}/{n_candidates}: All folds failed")
failed_candidate_errors.append((cand_idx, hyperparams, "All folds failed"))
else:
# Candidate failed completely
if verbose:
print(f" Candidate {cand_idx+1}/{n_candidates}: {status}")
if "hyperparams" in status.lower() or "prior" in status.lower():
print(f" Hyperparams: {hyperparams}")
failed_candidate_errors.append((cand_idx, hyperparams, status))
# Find best hyperparameters
# Calculate mean CV score for each candidate (only over successful folds)
mean_cv_scores = []
std_cv_scores = []
for cand_idx in range(n_candidates):
fold_scores = cv_scores[cand_idx, :]
valid_scores = fold_scores[np.isfinite(fold_scores)]
if len(valid_scores) > 0:
mean_cv_scores.append(np.mean(valid_scores))
std_cv_scores.append(np.std(valid_scores))
else:
mean_cv_scores.append(np.inf)
std_cv_scores.append(np.inf)
mean_cv_scores = np.array(mean_cv_scores)
std_cv_scores = np.array(std_cv_scores)
# Find best candidate (lowest score for most metrics, highest for R²)
if not np.all(np.isinf(mean_cv_scores)):
best_idx = np.argmin(mean_cv_scores)
best_hyperparams = hyperparameter_candidates[best_idx]
best_score = mean_cv_scores[best_idx]
best_score_std = std_cv_scores[best_idx]
if verbose:
print(f"\nStage 1 best hyperparameters (candidate {best_idx+1}): {best_hyperparams}")
print(f"Stage 1 best CV {scoring}: {best_score:.4f} ± {best_score_std:.4f}")
# Store stage 1 results
stage1_results = {
'best_score': best_score,
'best_score_std': best_score_std,
'best_hyperparams': best_hyperparams,
'best_candidate_idx': best_idx,
'all_scores': mean_cv_scores,
'all_scores_std': std_cv_scores,
'cv_scores_matrix': cv_scores
}
else:
# All candidates failed
if verbose:
print("\n" + "="*80)
print("ERROR: All hyperparameter candidates failed CV evaluation")
print("="*80)
print("\nDiagnostics:")
print(f" Number of candidates: {n_candidates}")
print(f" Number of folds: {k_folds}")
print(f" Training data shape: {_theta.shape}")
print(f" Training targets shape: {_y.shape}")
print(f" Training targets stats: min={np.min(_y):.4f}, max={np.max(_y):.4f}, mean={np.mean(_y):.4f}, std={np.std(_y):.4f}")
# Report errors from failed candidates
if 'failed_candidate_errors' in locals() and len(failed_candidate_errors) > 0:
print(f"\nShowing first 5 candidate errors:")
for i, (cand_idx, hp, error) in enumerate(failed_candidate_errors[:5]):
print(f" Candidate {cand_idx+1}: {error}")
print(f" Hyperparams: {hp}")
print("\nPossible causes:")
print(" - Hyperparameters produce numerically unstable GP")
print(" - Training data has issues (NaN, inf, extreme values)")
print(" - GP kernel is incompatible with data")
print(" - Insufficient training data for cross-validation")
print("="*80)
return None
# === STAGE 2: EXPLOITATION (Grid Search around best) ===
if two_stage:
if verbose:
print(f"\n=== STAGE 2: EXPLOITATION (Grid Search around best) ===")
# Determine number of stage 2 candidates
if stage2_candidates is None:
stage2_candidates = max(n_candidates // 2, 5)
# Generate grid around best hyperparameters
stage2_hyperparam_candidates = _generate_stage2_candidates(
best_hyperparams, stage2_candidates, stage2_width, gp=gp
)
# Evaluate stage 2 candidates
n_candidates_s2 = len(stage2_hyperparam_candidates)
cv_scores_s2 = np.zeros((n_candidates_s2, k_folds))
cv_scores_s2.fill(np.inf)
if verbose:
print(f"Evaluating {n_candidates_s2} stage 2 candidates using {k_folds}-fold CV...")
if pool is not None:
# Parallel evaluation for stage 2
worker_args_s2 = []
for cand_idx, hyperparams in enumerate(stage2_hyperparam_candidates):
args = (cand_idx, hyperparams, gp, _theta, _y, y_scaler,
k_folds, scoring, weighted_mse_method, weighted_mse_factor)
worker_args_s2.append(args)
with tqdm(total=len(worker_args_s2), desc="Stage 2 candidates", unit="candidate") as pbar:
results_s2 = []
for result in pool.imap(_evaluate_candidate_worker, worker_args_s2):
results_s2.append(result)
pbar.update(1)
# Process stage 2 results
for cand_idx, fold_scores, status in results_s2:
if status == "success" and fold_scores is not None:
for fold_idx, score in enumerate(fold_scores):
cv_scores_s2[cand_idx, fold_idx] = score
valid_scores = [s for s in fold_scores if np.isfinite(s)]
if len(valid_scores) > 0:
mean_score = np.mean(valid_scores)
std_score = np.std(valid_scores)
if verbose:
print(f" Stage 2 Candidate {cand_idx+1}/{n_candidates_s2}: {scoring}={mean_score:.4f}±{std_score:.4f} "
f"({len(valid_scores)}/{k_folds} folds successful)")
else:
if verbose:
print(f" Stage 2 Candidate {cand_idx+1}/{n_candidates_s2}: All folds failed")
else:
if verbose:
print(f" Stage 2 Candidate {cand_idx+1}/{n_candidates_s2}: {status}")
else:
# Sequential evaluation for stage 2
for cand_idx, hyperparams in enumerate(stage2_hyperparam_candidates):
args = (cand_idx, hyperparams, gp, _theta, _y, y_scaler, k_folds, scoring, weighted_mse_method, weighted_mse_factor)
results_s2 = _evaluate_candidate_worker(args)
cand_idx_result, fold_scores, status = results_s2
if status == "success" and fold_scores is not None:
for fold_idx, score in enumerate(fold_scores):
cv_scores_s2[cand_idx, fold_idx] = score
valid_scores = [s for s in fold_scores if np.isfinite(s)]
if len(valid_scores) > 0:
mean_score = np.mean(valid_scores)
std_score = np.std(valid_scores)
if verbose:
print(f" Stage 2 Candidate {cand_idx+1}/{n_candidates_s2}: {scoring}={mean_score:.4f}±{std_score:.4f} "
f"({len(valid_scores)}/{k_folds} folds successful)")
else:
if verbose:
print(f" Stage 2 Candidate {cand_idx+1}/{n_candidates_s2}: All folds failed")
else:
if verbose:
print(f" Stage 2 Candidate {cand_idx+1}/{n_candidates_s2}: {status}")
# Find best from stage 2
mean_cv_scores_s2 = []
std_cv_scores_s2 = []
for cand_idx in range(n_candidates_s2):
fold_scores = cv_scores_s2[cand_idx, :]
valid_scores = fold_scores[np.isfinite(fold_scores)]
if len(valid_scores) > 0:
mean_cv_scores_s2.append(np.mean(valid_scores))
std_cv_scores_s2.append(np.std(valid_scores))
else:
mean_cv_scores_s2.append(np.inf)
std_cv_scores_s2.append(np.inf)
mean_cv_scores_s2 = np.array(mean_cv_scores_s2)
std_cv_scores_s2 = np.array(std_cv_scores_s2)
# Compare stage 2 best with stage 1 best
if not np.all(np.isinf(mean_cv_scores_s2)):
best_idx_s2 = np.argmin(mean_cv_scores_s2)
best_hyperparams_s2 = stage2_hyperparam_candidates[best_idx_s2]
best_score_s2 = mean_cv_scores_s2[best_idx_s2]
best_score_std_s2 = std_cv_scores_s2[best_idx_s2]
if verbose:
print(f"\nStage 2 best hyperparameters (candidate {best_idx_s2+1}): {best_hyperparams_s2}")
print(f"Stage 2 best CV {scoring}: {best_score_s2:.4f} ± {best_score_std_s2:.4f}")
# Use stage 2 results if better
if best_score_s2 < best_score: # Assuming lower is better for most metrics
best_hyperparams = best_hyperparams_s2
best_score = best_score_s2
best_score_std = best_score_std_s2
best_idx = best_idx_s2 # Note: this is index within stage 2 candidates
if verbose:
print(f"✓ Stage 2 improved results!")
else:
if verbose:
print(f"✓ Stage 1 results remain best.")
else:
if verbose:
print("Stage 2 failed - using Stage 1 results")
# === STAGE 3: REFINEMENT (Ultra-fine search around stage 2 best) ===
if three_stage and two_stage:
if verbose:
print(f"\n=== STAGE 3: REFINEMENT (Ultra-fine search around best) ===")
# Determine number of stage 3 candidates
if stage3_candidates is None:
stage3_candidates = max((stage2_candidates if stage2_candidates else n_candidates // 2) // 2, 3)
# Generate ultra-fine grid around current best hyperparameters
stage3_hyperparam_candidates = _generate_stage3_candidates(
best_hyperparams, stage3_candidates, stage3_width, gp=gp
)
# Evaluate stage 3 candidates
n_candidates_s3 = len(stage3_hyperparam_candidates)
cv_scores_s3 = np.zeros((n_candidates_s3, k_folds))
cv_scores_s3.fill(np.inf)
if verbose:
print(f"Evaluating {n_candidates_s3} stage 3 candidates using {k_folds}-fold CV...")
if pool is not None:
# Parallel evaluation for stage 3
worker_args_s3 = []
for cand_idx, hyperparams in enumerate(stage3_hyperparam_candidates):
args = (cand_idx, hyperparams, gp, _theta, _y, y_scaler,
k_folds, scoring, weighted_mse_method, weighted_mse_factor)
worker_args_s3.append(args)
with tqdm(total=len(worker_args_s3), desc="Stage 3 candidates", unit="candidate") as pbar:
results_s3 = []
for result in pool.imap(_evaluate_candidate_worker, worker_args_s3):
results_s3.append(result)
pbar.update(1)
# Process stage 3 results
for cand_idx, fold_scores, status in results_s3:
if status == "success" and fold_scores is not None:
for fold_idx, score in enumerate(fold_scores):
cv_scores_s3[cand_idx, fold_idx] = score
valid_scores = [s for s in fold_scores if np.isfinite(s)]
if len(valid_scores) > 0:
mean_score = np.mean(valid_scores)
std_score = np.std(valid_scores)
if verbose:
print(f" Stage 3 Candidate {cand_idx+1}/{n_candidates_s3}: {scoring}={mean_score:.4f}±{std_score:.4f} "
f"({len(valid_scores)}/{k_folds} folds successful)")
else:
if verbose:
print(f" Stage 3 Candidate {cand_idx+1}/{n_candidates_s3}: All folds failed")
else:
if verbose:
print(f" Stage 3 Candidate {cand_idx+1}/{n_candidates_s3}: {status}")
else:
# Sequential evaluation for stage 3
for cand_idx, hyperparams in enumerate(stage3_hyperparam_candidates):
args = (cand_idx, hyperparams, gp, _theta, _y, y_scaler, k_folds, scoring, weighted_mse_method, weighted_mse_factor)
results_s3 = _evaluate_candidate_worker(args)
cand_idx_result, fold_scores, status = results_s3
if status == "success" and fold_scores is not None:
for fold_idx, score in enumerate(fold_scores):
cv_scores_s3[cand_idx, fold_idx] = score
valid_scores = [s for s in fold_scores if np.isfinite(s)]
if len(valid_scores) > 0:
mean_score = np.mean(valid_scores)
std_score = np.std(valid_scores)
if verbose:
print(f" Stage 3 Candidate {cand_idx+1}/{n_candidates_s3}: {scoring}={mean_score:.4f}±{std_score:.4f} "
f"({len(valid_scores)}/{k_folds} folds successful)")
else:
if verbose:
print(f" Stage 3 Candidate {cand_idx+1}/{n_candidates_s3}: All folds failed")
else:
if verbose:
print(f" Stage 3 Candidate {cand_idx+1}/{n_candidates_s3}: {status}")
# Find best from stage 3
mean_cv_scores_s3 = []
std_cv_scores_s3 = []
for cand_idx in range(n_candidates_s3):
fold_scores = cv_scores_s3[cand_idx, :]
valid_scores = fold_scores[np.isfinite(fold_scores)]
if len(valid_scores) > 0:
mean_cv_scores_s3.append(np.mean(valid_scores))
std_cv_scores_s3.append(np.std(valid_scores))
else:
mean_cv_scores_s3.append(np.inf)
std_cv_scores_s3.append(np.inf)
mean_cv_scores_s3 = np.array(mean_cv_scores_s3)
std_cv_scores_s3 = np.array(std_cv_scores_s3)
# Compare stage 3 best with previous best
if not np.all(np.isinf(mean_cv_scores_s3)):
best_idx_s3 = np.argmin(mean_cv_scores_s3)
best_hyperparams_s3 = stage3_hyperparam_candidates[best_idx_s3]
best_score_s3 = mean_cv_scores_s3[best_idx_s3]
best_score_std_s3 = std_cv_scores_s3[best_idx_s3]
if verbose:
print(f"\nStage 3 best hyperparameters (candidate {best_idx_s3+1}): {best_hyperparams_s3}")
print(f"Stage 3 best CV {scoring}: {best_score_s3:.4f} ± {best_score_std_s3:.4f}")
# Use stage 3 results if better
if best_score_s3 < best_score: # Assuming lower is better for most metrics
best_hyperparams = best_hyperparams_s3
best_score = best_score_s3
best_score_std = best_score_std_s3
best_idx = best_idx_s3 # Note: this is index within stage 3 candidates
if verbose:
print(f"✓ Stage 3 improved results!")
else:
if verbose:
print(f"✓ Previous results remain best.")
else:
if verbose:
print("Stage 3 failed - using previous results")
elif three_stage and not two_stage:
if verbose:
print("Warning: three_stage=True requires two_stage=True. Ignoring three_stage option.")
# Configure final GP with best hyperparameters
try:
if verbose:
print(f"\nSetting best hyperparameters on GP...")
print(f"Best hyperparams shape: {np.array(best_hyperparams).shape}")
print(f"GP current params shape: {gp.get_parameter_vector().shape}")
gp.set_parameter_vector(best_hyperparams)
gp.compute(_theta)
if verbose:
print(f"Successfully configured GP with best hyperparameters")
except Exception as e:
if verbose:
print(f"Failed to set best hyperparameters: {str(e)}")
# raise RuntimeError(f"Failed to set best hyperparameters: {str(e)}")
# If optimization fails, return original GP without changes
return gp
# Compile detailed results
cv_results = {
"best_score": best_score,
"best_score_std": best_score_std,
"best_hyperparams": best_hyperparams,
"best_candidate_idx": best_idx,
"all_scores": mean_cv_scores,
"all_scores_std": std_cv_scores,
"cv_scores_matrix": cv_scores,
"scoring_method": scoring,
"n_folds": k_folds,
"n_candidates": n_candidates,
"two_stage": two_stage,
"three_stage": three_stage
}
# Add stage-specific results if multi-stage was used
if two_stage:
cv_results["stage1_results"] = stage1_results
if "mean_cv_scores_s2" in locals():
cv_results["stage2_results"] = {
"all_scores": mean_cv_scores_s2,
"all_scores_std": std_cv_scores_s2,
"cv_scores_matrix": cv_scores_s2,
"candidates": stage2_hyperparam_candidates
}
# Add stage 3 results if three-stage was used
if three_stage and "mean_cv_scores_s3" in locals():
cv_results["stage3_results"] = {
"all_scores": mean_cv_scores_s3,
"all_scores_std": std_cv_scores_s3,
"cv_scores_matrix": cv_scores_s3,
"candidates": stage3_hyperparam_candidates
}
if verbose:
print(f"CV optimization completed. Best {scoring}: {cv_results['best_score']:.4f} ± {cv_results['best_score_std']:.4f}")
return gp
def _generate_stage2_candidates(best_params, n_candidates, width_factor, gp=None):
"""
Generate stage 2 candidates around best parameters from stage 1.
:param best_params: Best hyperparameters from stage 1
:param n_candidates: Number of candidates to generate
:param width_factor: Width of search around best parameters
:param gp: Optional GP object to infer parameter structure from kernel
:returns: Array of stage 2 hyperparameter candidates
"""
best_params = np.asarray(best_params)
n_params = len(best_params)
# Infer lengthscale indices from GP kernel if available
lengthscale_indices = None
if gp is not None:
param_names = gp.kernel.get_parameter_names()
lengthscale_indices = [i for i, name in enumerate(param_names)
if 'metric:log_m' in name.lower()]
# Check if we have uniform length scales
if lengthscale_indices is not None and len(lengthscale_indices) > 1:
# Check if all lengthscale values are identical
length_scales = best_params[lengthscale_indices]
uniform_scales = np.allclose(length_scales, length_scales[0])
else:
# Fallback: assume indices 2+ are length scales if n_params > 2
if n_params > 2:
length_scales = best_params[2:]
uniform_scales = np.allclose(length_scales, length_scales[0])
lengthscale_indices = list(range(2, n_params))
else:
uniform_scales = False
lengthscale_indices = []
# Generate candidates using normal distribution around best parameters
candidates = []
# Always include the best parameters as first candidate
candidates.append(best_params.copy())
# Generate remaining candidates
for i in range(n_candidates - 1):
if uniform_scales and len(lengthscale_indices) > 0:
# For uniform scales: perturb each non-lengthscale parameter independently
# and apply single perturbation to all lengthscales
candidate = best_params.copy()
# Perturb non-lengthscale parameters independently
non_lengthscale_indices = [j for j in range(n_params) if j not in lengthscale_indices]
for idx in non_lengthscale_indices:
candidate[idx] += np.random.normal(0, width_factor)
# Apply same perturbation to all length scales
lengthscale_perturbation = np.random.normal(0, width_factor)
for idx in lengthscale_indices:
candidate[idx] += lengthscale_perturbation
else:
# For independent scales: perturb all parameters independently
perturbations = np.random.normal(0, width_factor, n_params)
candidate = best_params + perturbations
candidates.append(candidate)
return np.array(candidates)
def _generate_stage3_candidates(best_params, n_candidates, width_factor, gp=None):
"""
Generate stage 3 candidates around best parameters from stage 2.
Uses tighter search with smaller perturbations for ultra-fine optimization.
:param best_params: Best hyperparameters from stage 2
:param n_candidates: Number of candidates to generate
:param width_factor: Width of search around best parameters (should be smaller than stage2)
:param gp: Optional GP object to infer parameter structure from kernel
:returns: Array of stage 3 hyperparameter candidates
"""
best_params = np.asarray(best_params)
n_params = len(best_params)
# Infer lengthscale indices from GP kernel if available
lengthscale_indices = None
if gp is not None:
param_names = gp.kernel.get_parameter_names()
lengthscale_indices = [i for i, name in enumerate(param_names)
if 'metric:log_m' in name.lower()]
# Check if we have uniform length scales
if lengthscale_indices is not None and len(lengthscale_indices) > 1:
# Check if all lengthscale values are identical
length_scales = best_params[lengthscale_indices]
uniform_scales = np.allclose(length_scales, length_scales[0])
else:
# Fallback: assume indices 2+ are length scales if n_params > 2
if n_params > 2:
length_scales = best_params[2:]
uniform_scales = np.allclose(length_scales, length_scales[0])
lengthscale_indices = list(range(2, n_params))
else:
uniform_scales = False
lengthscale_indices = []
# Generate candidates using tighter normal distribution around best parameters
candidates = []
# Always include the best parameters as first candidate
candidates.append(best_params.copy())
# Generate remaining candidates with smaller perturbations for fine-tuning
for i in range(n_candidates - 1):
if uniform_scales and len(lengthscale_indices) > 0:
# For uniform scales: perturb each non-lengthscale parameter independently
# and apply single perturbation to all lengthscales
candidate = best_params.copy()
# Perturb non-lengthscale parameters independently
non_lengthscale_indices = [j for j in range(n_params) if j not in lengthscale_indices]
for idx in non_lengthscale_indices:
candidate[idx] += np.random.normal(0, width_factor)
# Apply same perturbation to all length scales
lengthscale_perturbation = np.random.normal(0, width_factor)
for idx in lengthscale_indices:
candidate[idx] += lengthscale_perturbation
else:
# For independent scales: perturb all parameters independently
perturbations = np.random.normal(0, width_factor, n_params)
candidate = best_params + perturbations
candidates.append(candidate)
return np.array(candidates)