Source code for smt_optim.acquisition_strategies.mfsego

from time import perf_counter
from typing import Callable

import numpy as np
from scipy import optimize as so, stats as stats
from scipy.spatial.distance import cdist

import smt.design_space as ds

from smt_optim.acquisition_functions import log_ei
from smt_optim.surrogate_models.base import Surrogate
from smt_optim.acquisition_strategies import AcquisitionStrategy
# from smt_optim.surrogate_models.smt import SmtMFK

from smt_optim.core.state import State
from smt_optim.subsolvers.multistart import mixvar_multistart_minimize

from smt_optim.utils.get_fmin import get_fmin

from smt_optim.subsolvers import multistart_minimize


[docs] class MFSEGO(AcquisitionStrategy): """ Multi-Fidelity Super Efficient Global Optimization (MF-SEGO) strategy. This acquisition strategy can perform Efficient Global Optimization (EGO) (unconstrained optimization), SEGO (constrained optimization), and MF-SEGO (multi-fidelity unconstrained or constrained optimization). It is compatible with various acquisition functions, including: - expected improvement, - log expected improvement, - probability of improvement, and - log probability of improvement. The constraints are handled by maximizing the acquisition function with respect to predictions from constraint surrogate models, instead of using the Probability-of-Improvement approach. In the multi-fidelity setting, the acquisition function is first maximized, followed by fidelity level selection. This strategy maintains a nested Design of Experiments (DoE), meaning that for each new fidelity level sampled, all lower-fidelity levels are also requested to be sampled. MF-SEGO offers different fidelity selection criteria: - obj-only, - optimistic, - pessimistic, and - average. Parameters ---------- state : State Optimization state containing surrogate models, data, and problem definition. Other Parameters ---------------- acq_func: callable, optional Acquisition function used to rank candidate points (default: log_ei). n_start: int, optional Number of multistart initializations for the inner optimizer. Default: 20. fidelity_crit: {"obj-only", "average", "optimistic", "pessimistic"}, optional Strategy used to select fidelity level. select_fidelity: bool, optional If False, always evaluate all fidelity levels. sp_method: str, optional Optimization method passed to SciPy (e.g., "SLSQP", "COBYLA"). Default = "SLSQP". sp_tol: float, optional Tolerance for the SciPy optimizer. Default = sqrt(machine epsilon). Notes ----- When optimizing a high-dimensional problem, it is recommended to increase the number of starting points (`n_start`). The default setting may be insufficient for problems with higher dimensions or many constraints. This acquisition strategy is designed to work with SMT's surrogate models. In the multi-fidelity setting, SMT's MFK model must be used. """ def __init__(self, state: State, **kwargs): """ Initialize the MFSEGO acquisition strategy. Parameters ---------- state : State Optimization state. **kwargs Optional configuration parameters. See class docstring for full list. Raises ------ TypeError If unexpected keyword arguments are provided. """ super().__init__() self.acq_context = state self.acq_func = kwargs.pop("acq_func", log_ei) # expected_improvement, log_ei self.fmin_crit = kwargs.pop("fmin_crit", "min_rscv") # broken -> to be removed (min_rscv, fmin, mean_rscv) # self.sub_optimizer = kwargs.pop("sub_optimizer", "COBYLA") self.n_start = kwargs.pop("n_start", None) # optimizer multistart self.fidelity_crit = kwargs.pop("fidelity_crit", "obj-only") # obj-only, average, optimistic, pessimistic self.select_fidelity = kwargs.pop("select_fidelity", True) # if set to False, will always sample (LF+HF) self.min_rscv_first = kwargs.pop("min_rscv_first", False) # broken -> to be fixed! self.filter_rscv = kwargs.pop("filter_rscv", False) # broken -> to be fixed! self.optimize_best = kwargs.pop("optimize_best", False) # broken -> to be fixed! self.relax_constraints = kwargs.pop("relax_constraints", False) # broken -> to be fixed! self.cr_override = kwargs.pop("cr_override", None) # override optimizer Cost Ratio self.sp_method = kwargs.pop("sp_method", "SLSQP") # SciPy optimizer method self.sp_tol = kwargs.pop("sp_tol", np.sqrt(np.finfo(float).eps)) # SciPy optimizer tolerance self.seed = kwargs.pop("seed", None) if kwargs: raise TypeError(f"Unexpected keyword arguments: {list(kwargs.keys())}") if state and self.n_start is None: self.n_start = 20 # * state.problem.num_dim self.fmin: float | None = None # current best feasible objective value
[docs] def validate_config(self, acq_context: State) -> None: if acq_context.problem.num_obj > 1: raise Exception("Multi-objective not implemented.") if not isinstance(acq_context.design_space, np.ndarray): raise Exception("Design space must be a numpy array.") obj_required_methods = [ "predict_values", "predict_variances", ] for method in obj_required_methods: if not callable(getattr(acq_context.obj_models[0], method, None)): raise TypeError(f"Objective model requires: '{method}' method.") cstr_required_methods = [ "predict_values", ] for c_id in range(acq_context.problem.num_cstr): for method in cstr_required_methods: if not callable(getattr(acq_context.cstr_models[c_id], method, None)): raise TypeError(f"Constraint model requires: '{method}' method.")
[docs] def get_infill(self, acq_context: State) -> list[np.ndarray]: """ Compute the next infill point(s) using the acquisition strategy. Parameters ---------- acq_context : State Current optimization state, including surrogate models and data. Returns ------- list of ndarray List of selected infill points. Each entry corresponds to a point (and potentially a fidelity level, depending on configuration). Notes ----- This method: - Optimizes the acquisition function using a multistart strategy - Applies the selected fidelity criterion if `select_fidelity=True` - Uses SciPy optimizers (controlled via `sp_method`, `sp_tol`) """ if isinstance(self.seed, int) or isinstance(self.seed, float): self.seed += 1 acq_data = dict() # gets the current best feasible objective value from the scaled dataset best_sample = acq_context.get_best_sample(ctol=0., scaled=True) self.fmin = best_sample.obj[0] # mono-objective only acq_data["fmin"] = self.fmin # scipy objective wrapper scipy_obj = self.build_scipy_objective(acq_context) # scipy constraint wrapper (for scipy, the feasible domain is g >= 0) scipy_cstr = self.build_scipy_constraints(acq_context) mix_var = False for dv in acq_context.problem.design_space.design_variables: if not isinstance(dv, ds.FloatVariable): mix_var = True break # TODO: merge continuous and mixvar multistart optimization if not mix_var: # generate starting points for the multistart optimization gen_t0 = perf_counter() # multi_x0 = self.generate_multistart_points(optimizer) # TODO: initialize sampler in init class method sampler = stats.qmc.LatinHypercube(d=acq_context.problem.num_dim, rng=acq_context.iter) multi_x0 = sampler.random(self.n_start) gen_t1 = perf_counter() acq_data["generate_init_points_time"] = gen_t1 - gen_t0 res = multistart_minimize(scipy_obj, bounds=np.array([[0, 1]] * acq_context.problem.num_dim), multi_x0=multi_x0, constraints=scipy_cstr, seed=self.seed, tol=self.sp_tol, method=self.sp_method,) else: res = mixvar_multistart_minimize(scipy_obj, design_space=acq_context.problem.design_space, constraints=scipy_cstr, n_start=self.n_start, method=self.sp_method, tol=self.sp_tol, seed=self.seed) # next infill location next_x = res.x # selects highest fidelity level to sample fid_crit_t0 = perf_counter() level = self.get_fidelity(next_x.reshape(1, -1), acq_context)[0] # keeps the DoE nested -> requests sampling all lower fidelity levels infills = [] for lvl in range(acq_context.problem.num_fidelity): if lvl <= level: infills.append(next_x.copy().reshape(1, -1)) else: infills.append(None) fid_crit_t1 = perf_counter() acq_data["fid_crit_time"] = fid_crit_t1 - fid_crit_t0 acq_context.iter_log["acquisition"] = acq_data return infills
[docs] def build_scipy_objective(self, acq_context: State) -> Callable: def scipy_acq_func(x): x = x.reshape(1, -1) mu = acq_context.obj_models[0].predict_values(x).item() s2 = acq_context.obj_models[0].predict_variances(x).item() return -float(self.acq_func(mu, s2, self.fmin)) return scipy_acq_func
[docs] def build_scipy_constraints(self, state: State) -> list[dict]: scipy_cstr = [] def append_sp_cstr(func: Callable, type: str) -> None: scipy_cstr.append({ "fun": func, "type": type, }) # TODO: re-implement constraint relaxation # def sp_constraint(x): # x = x.reshape(1, -1) # mu = mu_func(x).item() # # if relax: # s = np.sqrt(s2_func(x).item()) # if type == "ineq": # return -(mu - 3 * s) # elif type == "eq": # return -(np.abs(mu) - 3 * s) # # return -mu # # return sp_constraint def sp_constraint(x, model): x = x.reshape(1, -1) mu = model.predict_values(x) return mu.item() for c_id, c_config in enumerate(state.problem.cstr_configs): if c_config.equal is not None: func = lambda x, f=sp_constraint, value=state.cstr_equal[c_id], m=state.cstr_models[c_id]: f(x, m) - value append_sp_cstr(func, "eq") else: if c_config.lower is not None: func = lambda x, f=sp_constraint, value=state.cstr_lower[c_id], m=state.cstr_models[c_id]: - value + f(x, m) append_sp_cstr(func, "ineq") if c_config.upper is not None: func = lambda x, f=sp_constraint, value=state.cstr_upper[c_id], m=state.cstr_models[c_id]: - f(x, m) + value append_sp_cstr(func, "ineq") return scipy_cstr
[docs] def get_fidelity(self, next_x: np.ndarray, state: State) -> list[int]: """ Select the highest fidelity level to sample at the given point(s). Parameters ---------- next_x : np.ndarray The point(s) to sample at. state : State The current optimization state. Returns ------- levels : list[int] or array of int The selected fidelity level(s). If `state.problem.num_fidelity` is 1, returns the single fidelity level; otherwise, returns a list of fidelity levels, one for each point in `next_x`. Notes ----- This method takes into account the problem's cost model and the available surrogate models. """ num_points = next_x.shape[0] if state.problem.num_fidelity > 1 and self.select_fidelity: all_surrogates = [] for o_surrogate in state.obj_models: all_surrogates.append(o_surrogate) for c_surrogate in state.cstr_models: all_surrogates.append(c_surrogate) if self.cr_override is not None: costs = self.cr_override else: costs = state.problem.costs levels, s2_red_norm = select_fidelity_level(next_x, costs, all_surrogates, self.fidelity_crit) else: levels = [(state.problem.num_fidelity - 1) for _ in range(num_points)] return levels
[docs] def corrected_predict_variances_all_levels(x_pred: np.ndarray, model, method: str = "max") -> tuple[np.ndarray, list]: """ Predict the variance at all fidelity levels for given prediction points `x_pred`. Parameters ---------- x_pred: np.ndarray of shape (num_points, num_dim) Prediction points. model: MFK SMT's MFK model. method: str, optional Correction method for ill-conditioned models. Returns ------- np.ndarray Variances for all points at all fidelity levels. list[float] The squared correlation coefficient for each level. Notes ----- If `method` is set to `None`, no correction is performed. If set to `max`, the maximum variance for each level is subtracted to the variance of `x_pred`. If set to `closest`, for each prediction point and each fidelity level the variance of the closest training point is subtracted to the predicted variance. """ noise2, rho2 = model.predict_variances_all_levels(model.X[-1]) if method is None: noise2_corr = np.zeros((x_pred.shape[0], noise2.shape[1])) elif method == "max": noise2_corr = np.max(noise2, axis=0).reshape(1, -1).repeat(x_pred.shape[0], axis=0) elif method == "closest": min_dist_idx = np.argmin(cdist(x_pred, model.X[-1]), axis=1) noise2_corr = noise2[min_dist_idx, :] else: raise Exception(f"`{method}` is not a valid method.") s2, rho2 = model.predict_variances_all_levels(x_pred) s2_corr = np.clip(s2 - noise2_corr, 0, np.inf) return s2_corr, rho2
[docs] def compute_sigma2_red(x_pred: np.ndarray, surrogate, method="max") -> np.ndarray: r""" Compute the variance contribution of each fidelity level viewed from the highest fidelity level. For a given surrogate model, the output corresponds to .. math:: \sigma_{\text{red}}^2(\ell, \boldsymbol{x}_{i}) = \sum_{\ell'=1}^{\ell}\sigma^2_{(\delta, \ell')}(\boldsymbol{x}_{i}) \prod_{j=\ell'}^{L-1}{\rho^2_j}. where :math:`\ell` corresponds to the fidelity level evaluated and :math:`L` to the total number of fidelity level. Parameters ---------- x_pred : np.ndarray of shape (num_points, num_dim) Prediction points. surrogate : Surrogate SMT-Optim surrogate model with a model attribute corresponding to a SMT MFK model. method : str, optional `corrected_predict_variances_all_levels` correction method. Returns ------- np.ndarray of shape (num_points, num_level) Variance contribution of each level viewed from the highest fidelity level. """ # np.ndarray(num_points, num_levels), list[np.ndarray(num_points)] # s2, rho2 = surrogate.model.predict_variances_all_levels(x_pred) s2, rho2 = corrected_predict_variances_all_levels(x_pred, surrogate.model, method=method) num_levels = s2.shape[1] tot_rho2 = np.ones((x_pred.shape[0], num_levels)) s2_red = np.empty((x_pred.shape[0], num_levels)) for k in range(num_levels): for l in range(k, num_levels-1): tot_rho2[:, k] *= rho2[l][:] s2_red[:, k] = s2[:, k] * tot_rho2[:, k] # np.array(num_points, num_levels) return s2_red
[docs] def compute_norm_squared_cost(costs: list[float]) -> np.ndarray: r""" Compute the normalized total squared cost of each fidelity level. The output corresponds to: .. math:: \text{cost}_{\ell} = \left(\sum_{\ell'=1}^{\ell}{c_{\ell'}} \; \bigg/ \; \sum_{\ell'=1}^{L}{c_{\ell'}}\right)^{2}. where :math:`\ell` corresponds to the fidelity level evaluated and :math:`L` to the total number of fidelity levels. Parameters ---------- costs : list of float Evaluation cost of each fidelity level. Returns ------- np.ndarray Normalized total squared cost of each fidelity level. """ num_levels = len(costs) tot_costs2 = np.empty(num_levels) for k in range(num_levels): tot_costs2[k] = np.sum(costs[0:k+1])**2 # normalize the aggregate costs squared by its maximum tot_costs2 /= np.max(tot_costs2) return tot_costs2
[docs] def compute_norm_sigma2_red(x_pred: np.ndarray, norm_costs2: list[float], surrogate) -> np.ndarray: """ Normalize the variance reduction of each level by their corresponding normalized total squared costs. Parameters ---------- x_pred : np.ndarray of shape (num_points, num_dim) Prediction points. norm_costs2: list of float normalized total squared costs. surrogate : Surrogate SMT-Optim surrogate model with a model attribute corresponding to a SMT MFK model. Returns ------- np.ndarray of shape (num_points, num_level) """ num_levels = len(norm_costs2) s2_red = compute_sigma2_red(x_pred, surrogate) s2_norm = np.empty_like(s2_red) for k in range(num_levels): s2_norm[:, k] = s2_red[:, k] / norm_costs2[k] # if equal 0 somewhere override... # has_zero = np.any(s2_norm == 0, axis=1) # indices = s2_norm.shape[1] - 1 - np.argmax((s2_norm == 0)[:, ::-1], axis=1) # s2_norm[has_zero, indices[has_zero]] = np.inf all_zero = np.all(s2_norm == 0, axis=1) s2_norm[all_zero, -1] = np.inf return s2_norm
[docs] def compute_all_s2_red_norm(x_pred: np.ndarray, costs: list[float], surrogates: list) -> list[np.ndarray]: """ Compute the normalized the variance reduction of all models in the `surrogates` list. Parameters ---------- x_pred : np.ndarray of shape (num_points, num_dim) Prediction points. costs: list of float Evaluation cost of each fidelity level. surrogates : list of Surrogate List of SMT-Optim surrogate models. Returns ------- List of np.ndarray of shape (num_points, num_level). """ num_pts = x_pred.shape[0] num_levels = len(costs) norm_costs2 = compute_norm_squared_cost(costs) s2_red_norm = [np.empty((num_pts, num_levels)) for _ in range(len(surrogates))] for i, surrogate in enumerate(surrogates): s2_red_norm[i] = compute_norm_sigma2_red(x_pred, norm_costs2, surrogate) return s2_red_norm
[docs] def select_fidelity_level(x_pred: np.ndarray, costs: list[float], all_surrogates: list[Surrogate], criterion: str = "pessimistic") -> tuple[np.ndarray, np.ndarray]: """ Select the highest fidelity level to sample based on the `criterion`. Parameters ---------- x_pred : np.ndarray of shape(num_points, num_dim) Prediction points. costs : list of float Evaluation cost of each fidelity level. all_surrogates : list of Surrogate List of surrogate models. criterion : str, optional Fidelity criterion. The possible values are : "obj-only", "optimistic", "pessimistic", "average", and "cstr-only". Returns ------- np.ndarray of shape (num_points,) Highest fidelity level to sample based on the `criterion`. np.ndarray The criterion value for each fidelity level. The index corresponds to the corresponding fidelity level. Notes ----- The `obj-only` criterion only evaluates the variance reduction of the first objective model. The `optimistic` criterion selects the overall lowest fidelity level from all the models. The `pessimistic` criterion selects the overall highest fidelity level from all the models. The `average` criterion averages the variance reduction of all the models on a fidelity level basis, and selects the level with the highest averaged variance reduction. The `cstr-only` criterion only works for a single constraint. """ num_pts: int = x_pred.shape[0] # level: np.ndarray = np.zeros(num_pts) if criterion == "obj-only": surrogates = [all_surrogates[0]] s2_red_norm = compute_all_s2_red_norm(x_pred, costs, surrogates) level = s2_red_norm[0].argmax(axis=1) elif criterion == "optimistic": s2_red_norm = compute_all_s2_red_norm(x_pred, costs, all_surrogates) # TODO: make it compatible with multiple infill points level = s2_red_norm[0].argmax(axis=1) for i in range(1, len(all_surrogates)): level = np.vstack((level, s2_red_norm[i].argmax(axis=1))).min(axis=0) elif criterion == "pessimistic": s2_red_norm = compute_all_s2_red_norm(x_pred, costs, all_surrogates) level = s2_red_norm[0].argmax(axis=1) for i in range(1, len(all_surrogates)): level = np.vstack((level, s2_red_norm[i].argmax(axis=1))).max(axis=0) elif criterion == "average": # s2_red of each surrogate is normalized by the cost. Should it be normalized after the sum? # -> should be the same s2_red_norm = compute_all_s2_red_norm(x_pred, costs, all_surrogates) s2_red_avg = np.zeros((num_pts, s2_red_norm[0].shape[1])) # sum the s2_red from all surrogates for i in range(len(all_surrogates)): s2_red_avg[:, :] += s2_red_norm[i][:, :] level = s2_red_avg.argmax(axis=1) elif criterion == "cstr-only": if len(all_surrogates) == 1: raise Exception("cstr-only criterion requires one constraint surrogate.") surrogates = all_surrogates[1:] if len(surrogates) > 1: raise Exception("cstr-only is not implemented for more than 1 constraints.") s2_red_norm = compute_all_s2_red_norm(x_pred, costs, surrogates) level = s2_red_norm[0].argmax(axis=1) # np.ndarray(num_pts) -> fidelity level for each infill points return level, s2_red_norm