Source code for smt_optim.acquisition_strategies.mfsego

from time import perf_counter
from typing import Callable

import numpy as np
from scipy import optimize as so, stats as stats

from smt_optim.acquisition_functions import log_ei
from smt_optim.acquisition_strategies import AcquisitionStrategy
# from smt_optim.surrogate_models.smt import SmtMFK

from smt_optim.core.state import State

from smt_optim.utils.get_fmin import get_fmin

from smt_optim.subsolvers import multistart_minimize


[docs] class MFSEGO(AcquisitionStrategy): def __init__(self, state: State, **kwargs): super().__init__() self.acq_context = state self.acq_func = kwargs.pop("acq_func", log_ei) # expected_improvement, log_ei self.fmin_crit = kwargs.pop("fmin_crit", "min_rscv") # min_rscv, fmin, mean_rscv # self.sub_optimizer = kwargs.pop("sub_optimizer", "COBYLA") self.n_start = kwargs.pop("n_start", None) self.fidelity_crit = kwargs.pop("fidelity_crit", "obj-only") # obj-only, average, optimistic, pessimistic self.select_fidelity = kwargs.pop("select_fidelity", True) self.min_rscv_first = kwargs.pop("min_rscv_first", False) self.filter_rscv = kwargs.pop("filter_rscv", False) self.optimize_best = kwargs.pop("optimize_best", False) self.relax_constraints = kwargs.pop("relax_constraints", False) self.cr_override = kwargs.pop("cr_override", None) # override optimizer Cost Ratio self.seed = kwargs.pop("seed", None) if kwargs: raise TypeError(f"Unexpected keyword arguments: {list(kwargs.keys())}") if state and self.n_start is None: self.n_start = 10 * state.problem.num_dim
[docs] def validate_config(self, acq_context: State) -> None: if acq_context.problem.num_obj > 1: raise Exception("Multi-objective not implemented.") if not isinstance(acq_context.design_space, np.ndarray): raise Exception("Design space must be a numpy array.") obj_required_methods = [ "predict_values", "predict_variances", ] for method in obj_required_methods: if not callable(getattr(acq_context.obj_models[0], method, None)): raise TypeError(f"Objective model requires: '{method}' method.") cstr_required_methods = [ "predict_values", ] for c_id in range(acq_context.problem.num_cstr): for method in cstr_required_methods: if not callable(getattr(acq_context.cstr_models[c_id], method, None)): raise TypeError(f"Constraint model requires: '{method}' method.")
[docs] def get_infill(self, acq_context: State) -> list[np.ndarray]: acq_data = dict() # TODO: correct .dataset -> .scaled_dataset y = acq_context.scaled_dataset.export_data([0], acq_context.problem.num_fidelity-1) if acq_context.problem.num_cstr > 0: indices = [idx for idx in range(acq_context.problem.num_obj, acq_context.problem.num_obj+acq_context.problem.num_cstr)] c = acq_context.dataset.export_data(indices, acq_context.problem.num_fidelity-1) self.fmin = get_fmin(y, c, acq_context.cstr_types) else: self.fmin = get_fmin(y) acq_data["fmin"] = self.fmin # acq_data["fmin_descaled"] = self.fmin * optimizer.yt[-1].std(axis=0) + optimizer.yt[-1].mean() # generate starting points for the multistart optimization gen_t0 = perf_counter() # multi_x0 = self.generate_multistart_points(optimizer) sampler = stats.qmc.LatinHypercube(d=acq_context.problem.num_dim, rng=acq_context.iter) multi_x0 = sampler.random(self.n_start) gen_t1 = perf_counter() acq_data["generate_init_points_time"] = gen_t1 - gen_t0 # scipy objective wrapper scipy_obj = self.build_scipy_objective(acq_context) # scipy constraint wrapper (for scipy, the feasible domain is g >= 0) scipy_cstr = self.build_scipy_constraints(acq_context) res = multistart_minimize(scipy_obj, bounds=np.array([[0, 1]] * acq_context.problem.num_dim), multi_x0=multi_x0, constraints=scipy_cstr, seed=self.seed) next_x = res.x # select highest fidelity level to sample fid_crit_t0 = perf_counter() level = self.get_fidelity(next_x.reshape(1, -1), acq_context)[0] infills = [] for lvl in range(acq_context.problem.num_fidelity): if lvl <= level: infills.append(next_x.copy().reshape(1, -1)) else: infills.append(None) fid_crit_t1 = perf_counter() acq_data["fid_crit_time"] = fid_crit_t1 - fid_crit_t0 acq_context.iter_log["acquisition"] = acq_data return infills
[docs] def build_scipy_objective(self, acq_context: State) -> Callable: def scipy_acq_func(x): x = x.reshape(1, -1) mu = acq_context.obj_models[0].predict_values(x) s2 = acq_context.obj_models[0].predict_variances(x) return -self.acq_func(mu, s2, self.fmin).item() return scipy_acq_func
[docs] def build_scipy_constraints(self, acq_context: State) -> list[dict]: scipy_cstr = [] for c_id, c_type in enumerate(acq_context.cstr_types): if c_type in ["less", "greater"]: c_type = "ineq" elif c_type == "equal": c_type = "eq" else: raise Exception(f"Unexpected constraint type: {c_type.type}") def wrap_constraint(mu_func, s2_func, type, relax=False): def sp_constraint(x): x = x.reshape(1, -1) mu = mu_func(x).item() if relax: s = np.sqrt(s2_func(x).item()) if type == "ineq": return -(mu - 3*s) elif type == "eq": return -(np.abs(mu) - 3*s) return -mu return sp_constraint mu_func = acq_context.cstr_models[c_id].predict_values s2_func = acq_context.cstr_models[c_id].predict_variances sp_cstr_func = wrap_constraint(mu_func, s2_func, c_type, relax=self.relax_constraints) if self.relax_constraints: c_type = "ineq" scipy_cstr.append( { "type": c_type, "fun": lambda x, f=sp_cstr_func: f(x), } ) return scipy_cstr
[docs] def get_fidelity(self, next_x: np.ndarray, state: State) -> int: num_points = next_x.shape[0] num_dim = next_x.shape[1] if state.problem.num_fidelity > 1 and self.select_fidelity: all_surrogates = state.obj_models for c_surrogate in state.cstr_models: all_surrogates.append(c_surrogate) if self.cr_override is not None: costs = self.cr_override else: costs = state.problem.costs levels, s2_red_norm = self.select_fidelity_level(next_x, costs, all_surrogates, self.fidelity_crit) else: levels = [(state.problem.num_fidelity - 1) for _ in range(num_points)] return levels self.acq_log["infill_level"] = level if state.problem.num_fidelity > 1 and self.select_fidelity: self.acq_log["normalized_s2_reduction"] = s2_red_norm
# acq_data["multi_idx"] = idx # acq_data["multi_x"] = multi_x # acq_data["multi_f"] = multi_f # acq_data["multi_c"] = multi_c # acq_data["acq_success"] = success_rate # if optimizer.num_cstr > 0: # acq_data["rscv"] = rscv # if self.optimize_best: # # if optimizer.num_cstr > 0: # optimized_cstr = np.empty(optimizer.num_cstr) # # res = so.minimize(scipy_acq_func, # x0=x_min, # bounds=optimizer.domain_scaled, # constraints=scipy_cstr, # method="SLSQP", # tol=1e-15, # options={"maxiter": 50 * optimizer.num_dim}) # # # optimized_x = np.clip(res.x, optimizer.domain_scaled[:, 0], optimizer.domain_scaled[:, 1]) # acq_data["optimized_x"] = optimized_x # # optimized_l2 = np.linalg.norm(optimized_x - x_min) # acq_data["optimized_l2"] = optimized_l2 # # optimized_acq = scipy_acq_func(optimized_x) # acq_data["optimized_acq"] = optimized_acq # # if optimizer.num_cstr > 0: # for c_id in range(len(scipy_cstr)): # optimized_cstr[c_id] = -scipy_cstr[c_id]["fun"](optimized_x) # acq_data["optimized_cstr"] = optimized_cstr # # # update x_min # x_min = optimized_x # log expected values # expected_values = np.empty(acq_context.problem.num_cstr+1) # expected_values[0] = acq_context.obj_models[0].predict_values(next_x.reshape(1, -1)).item() # if acq_context.scaling: # yt_scaled, yt_mean, yt_std = acq_context._standardize_data(acq_context.yt[-1]) # expected_values[0] *= yt_std # expected_values[0] += yt_mean # # for c_id, c_surrogate in enumerate(acq_context.cstr_models): # expected_values[c_id+1] = c_surrogate.predict_values(x_min.reshape(1, -1)).item() # if acq_context.scaling: # yt_scaled, yt_mean, yt_std = acq_context._standardize_data(acq_context.ct[-1][:, c_id]) # expected_values[c_id+1] *= yt_std # # acq_data["expected_values"] = expected_values # # acq_context.iter_data["acquisition"] = acq_data # def generate_multistart_points(self, optimizer) -> np.ndarray: # # sampler = stats.qmc.LatinHypercube(d=optimizer.domain_scaled.shape[0]) # # # LHS filter # large_x0 = sampler.random(10*self.n_start) # # large_x0 = sampler.random(self.n_start) # large_x0 = stats.qmc.scale(large_x0, optimizer.domain_scaled[:, 0], optimizer.domain_scaled[:, 1]) # # mu = optimizer.obj_models.predict_values(large_x0) # s2 = optimizer.obj_models.predict_variances(large_x0) # large_f = -self.acq_func(mu, s2, self.fmin) # # # no constraints -> selects the best starting points # if optimizer.num_cstr == 0: # sorted_idx = np.argsort(large_f.ravel()) # # # with constraints -> selects the best points with the lowest constraint violation # else: # large_c = np.empty((large_x0.shape[0], optimizer.num_cstr)) # # for c_id, c_surrogate in enumerate(optimizer.cstr_models): # large_c[:, c_id] = c_surrogate.predict_values(large_x0).ravel() # # rscv = self.compute_rscv(large_c, optimizer.cstr_config) # sorted_idx = np.lexsort((large_f.ravel(), rscv)) # rscv = rscv[sorted_idx][:self.n_start] # # multi_x0 = large_x0[sorted_idx][:self.n_start, :] # # # with constraints -> try to reduce the starting point RSCV # if optimizer.num_cstr > 0 and self.min_rscv_first: # # def min_rscv(x): # cstr_values = np.empty((1, len(optimizer.cstr_models))) # for c_id, c_surrogate in enumerate(optimizer.cstr_models): # cstr_values[0, c_id] = c_surrogate.predict_values(x.reshape(1, -1)).item() # return self.compute_rscv(cstr_values, optimizer.cstr_config).item() # # # for i in range(multi_x0.shape[0]): # # try to reduce the constraint violation if the starting point is not feasible # if rscv[i] != 0.0: # res = so.minimize(min_rscv, # x0=multi_x0[i, :], # bounds=optimizer.domain_scaled, # method="COBYLA", # tol=1e-8, # options={"maxiter": 50*optimizer.num_dim}) # # rscv[i] = res.fun # multi_x0[i, :] = res.x # # return multi_x0
[docs] def compute_sigma2_red(self, x_pred: np.ndarray, surrogate) -> np.ndarray: # np.ndarray(num_points, num_levels), list[np.ndarray(num_points)] s2, rho2 = surrogate.model.predict_variances_all_levels(x_pred) num_levels = s2.shape[1] tot_rho2 = np.ones((x_pred.shape[0], num_levels)) s2_red = np.empty((x_pred.shape[0], num_levels)) for k in range(num_levels): for l in range(k, num_levels-1): tot_rho2[:, k] *= rho2[l][:] s2_red[:, k] = s2[:, k] * tot_rho2[:, k] # np.array(num_points, num_levels) return s2_red
[docs] def compute_norm_squared_cost(self, costs: list[float]) -> np.ndarray: num_levels = len(costs) tot_costs2 = np.empty(num_levels) for k in range(num_levels): tot_costs2[k] = np.sum(costs[0:k+1])**2 # normalize the aggregate costs squared by its maximum tot_costs2 /= np.max(tot_costs2) return tot_costs2
[docs] def compute_norm_sigma2_red(self, x_pred: np.ndarray, norm_costs2: list[float], surrogate) -> np.ndarray: num_levels = len(norm_costs2) s2_red = self.compute_sigma2_red(x_pred, surrogate) s2_norm = np.empty_like(s2_red) for k in range(num_levels): s2_norm[:, k] = s2_red[:, k] / norm_costs2[k] return s2_norm
[docs] def compute_all_s2_red_norm(self, x_pred: np.ndarray, costs: list[float], surrogates: list) -> list[np.ndarray]: num_pts = x_pred.shape[0] num_levels = len(costs) norm_costs2 = self.compute_norm_squared_cost(costs) s2_red_norm = [np.empty((num_pts, num_levels)) for _ in range(len(surrogates))] for i, surrogate in enumerate(surrogates): s2_red_norm[i] = self.compute_norm_sigma2_red(x_pred, norm_costs2, surrogate) return s2_red_norm
[docs] def select_fidelity_level(self, x_pred: np.ndarray, costs: list[float], all_surrogates: list, criterion: str) -> np.ndarray: num_pts: int = x_pred.shape[0] # level: np.ndarray = np.zeros(num_pts) if criterion == "obj-only": surrogates = [all_surrogates[0]] s2_red_norm = self.compute_all_s2_red_norm(x_pred, costs, surrogates) level = s2_red_norm[0].argmax(axis=1) elif criterion == "optimistic": s2_red_norm = self.compute_all_s2_red_norm(x_pred, costs, all_surrogates) # TODO: make it compatible with multiple infill points level = s2_red_norm[0].argmax(axis=1) for i in range(1, len(all_surrogates)): level = np.vstack((level, s2_red_norm[i].argmax(axis=1))).min(axis=0) elif criterion == "pessimistic": s2_red_norm = self.compute_all_s2_red_norm(x_pred, costs, all_surrogates) level = s2_red_norm[0].argmax(axis=1) for i in range(1, len(all_surrogates)): level = np.vstack((level, s2_red_norm[i].argmax(axis=1))).max(axis=0) elif criterion == "average": # s2_red of each surrogate is normalized by the cost. Should it be normalized after the sum? # -> should be the same s2_red_norm = self.compute_all_s2_red_norm(x_pred, costs, all_surrogates) s2_red_avg = np.zeros((num_pts, s2_red_norm[0].shape[1])) # sum the s2_red from all surrogates for i in range(len(all_surrogates)): s2_red_avg[:, :] += s2_red_norm[i][:, :] level = s2_red_avg.argmax(axis=1) elif criterion == "cstr-only": if len(all_surrogates) == 1: raise Exception("cstr-only criterion requires one constraint surrogate.") surrogates = all_surrogates[1:] if len(surrogates) > 1: raise Exception("cstr-only is not implemented for more than 1 constraints.") s2_red_norm = self.compute_all_s2_red_norm(x_pred, costs, surrogates) level = s2_red_norm[0].argmax(axis=1) # np.ndarray(num_pts) -> fidelity level for each infill points return level, s2_red_norm