Source code for smt_optim.surrogate_models.smt

import copy
import warnings

import numpy as np

from scipy.linalg import solve_triangular

from smt.surrogate_models import KRG
from smt.applications import MFK, MFCK

from smt.design_space import (
    DesignSpace,
    CategoricalVariable,
    FloatVariable,
    IntegerVariable,
    OrdinalVariable,
)

from smt_optim.surrogate_models import Surrogate


EPSILON = np.finfo(float).eps


# def check_theta_bounds(theta: np.ndarray, theta_bounds: np.ndarray) -> np.ndarray:
#     """
#     Apply corrections to GP hyperparameters that are on or outside of their boundaries,
#     with a small overhead to avoid triggering SMT warnings.
#
#     :param theta: GP hyperparameters.
#     :type theta: np.ndarray
#
#     :param theta_bounds: Hyperparameter bounds. np.ndarray[lower, upper].
#     :type theta_bounds: np.ndarray
#
#     :return: The corrected GP hyperparameters.
#     :rtype: np.ndarray
#     """
#     lower_disc = (theta <= theta_bounds[0])
#     theta = np.where(lower_disc, theta_bounds[0] + np.sqrt(EPSILON), theta)
#
#     upper_disc = (theta >= theta_bounds[1])
#     theta = np.where(upper_disc, theta_bounds[1] - np.sqrt(EPSILON), theta)
#
#     return theta


# def filter_nan_values(x: np.ndarray, y: np.ndarray) -> tuple[np.ndarray]:
#     # TODO: filter inf and similar non numeric values and add doc
#     valid_mask = ~np.isnan(y).ravel()
#     x_val = x[valid_mask]
#     y_val = y[valid_mask]
#     return x_val, y_val

# def clean_training_data(x: list[np.ndarray], y: list[np.ndarray]) -> tuple[np.ndarray]:
#     # TODO: add doc
#     num_level = len(x)
#
#     for lvl in range(num_level):
#         x[lvl], y[lvl] = filter_nan_values(x[lvl], y[lvl])
#
#     return x, y


# class SmtKRG(Surrogate):
#
#     def __init__(self, optimizer=None, name=None):
#         super().__init__()
#
#         # TODO: implement a way to control the random_state
#
#         self.optimizer = optimizer
#         self.name = name
#
#         self.num_dim = 0
#         self.krg = None
#         self.krg_initialized = False
#
#         if optimizer is not None:
#             self._init_smt(optimizer)
#
#         # TODO: should use optimizer.domain
#
#         else:
#             pass
#             # raise Exception("Unsupported domain type.")
#
#     def _init_smt(self, optimizer):
#
#         self.num_dim = optimizer.num_dim
#         self.costs = optimizer.costs
#
#         self.n_start = 3*optimizer.num_dim
#         self.previous_theta = np.ones(self.num_dim)
#
#         # KRG for continuous domain
#         if type(self.optimizer.design_space) is np.ndarray:
#             self.krg = KRG(print_global=False,
#                            n_start=self.n_start,
#                            hyper_opt="Cobyla",
#                            random_state=None)
#
#         # KRG for mixed integer domain
#         # elif type(optimizer.domain) is DesignSpace:
#         #     self.dim = optimizer.domain.n_dv
#         #     self.krg = KRG(design_space=domain,
#         #                     categorical_kernel=MixIntKernelType.CONT_RELAX,
#         #                     hyper_opt="Cobyla",
#         #                     corr="abs_exp",
#         #                     n_start=3*self.dim,
#         #                     print_global=False,
#         #     )
#
#             self.theta_bounds = self.krg.options["theta_bounds"]
#         else:
#             pass
#             # raise Exception("Unsupported domain type.")
#
#         self.krg_initialized = True
#
#     def train(self, xt: list, yt: list, **kwargs):
#         """
#         Train the GP on the training data.
#
#         Args:
#             xt (list[np.ndarray]): training data variables
#             yt (list[np.ndarray]): training data values
#         """
#
#         # if not self.krg_initialized:
#         #     raise Exception("KRG must be initialized before training.")
#
#         # print(f"xt= \n{xt}")
#         try:
#             if type(self.optimizer.design_space) is np.ndarray:
#                 self.previous_theta = check_theta_bounds(self.previous_theta, self.theta_bounds)
#                 self.krg.options["theta0"] = self.previous_theta
#                 self.krg.options["n_start"] = self.n_start
#         except:
#             warnings.warn("Error changing KRG parameters.")
#
#
#         self.xt = xt[-1].copy()
#         self.yt = yt[-1].copy()
#         self.xt, self.yt = filter_nan_values(self.xt, self.yt)
#
#         self.krg.set_training_values(self.xt, self.yt)
#         self.krg.train()
#
#         # store the optimize theta vector for the next iteration
#         self.previous_theta = self.krg.optimal_theta
#         if self.name: self.optimizer.iter_data[f"{self.name}_opt_theta"] = self.previous_theta
#
#     def predict_values(self, x_pred):
#         y_pred = self.krg.predict_values(x_pred)
#         return y_pred
#
#     def predict_variances(self, x_pred):
#         s2_pred = self.krg.predict_variances(x_pred)
#         return s2_pred

[docs] class SmtAutoModel(Surrogate): def __init__(self): super().__init__() self.model = None self.train_counter = 0 pass
[docs] def train(self, xt: list[np.ndarray], yt: list[np.ndarray], **kwargs) -> None: """ Train the GP on the training data. Args: xt (list[np.ndarray]): training data variables yt (list[np.ndarray]): training data values """ num_dim = xt[-1].shape[1] num_fidelity = len(xt) if num_fidelity == 1: self.model = KRG(print_global=False, n_start=3*num_dim, hyper_opt="Cobyla", seed=self.train_counter) else: self.model = MFK(print_global=False, n_start=3*num_dim, hyper_opt="Cobyla", seed=self.train_counter) for lvl in range(num_fidelity-1): self.model.set_training_values(xt[lvl], yt[lvl], name=lvl) self.model.set_training_values(xt[-1], yt[-1]) self.model.train() self.train_counter += 1
[docs] def predict_values(self, x_pred: np.ndarray) -> np.ndarray: y_pred = self.model.predict_values(x_pred) return y_pred
[docs] def predict_variances(self, x_pred: np.ndarray) -> np.ndarray: s2_pred = self.model.predict_variances(x_pred) return s2_pred
# class SmtMFK(Surrogate): # # def __init__(self, optimizer=None, name=None): # # self.optimizer = optimizer # # self.name = name # # self.num_dim = 0 # self.num_levels = 0 # self.costs = [] # self.mfk = None # self.mfk_initialized = False # # if optimizer is not None: # self._init_smt(optimizer) # # # def _init_smt(self, optimizer): # # self.num_dim = optimizer.num_dim # self.num_levels = optimizer.num_fidelity # self.costs = optimizer.costs # # self.n_start = 3*optimizer.num_dim # self.previous_theta = np.ones((self.num_levels, self.num_dim)) # # self.mfk = MFK(print_global=False, # n_start=self.n_start, # hyper_opt="Cobyla", # random_state=None) # # self.theta_bounds = self.mfk.options["theta_bounds"] # # self.mfk_initialized = True # # # def train(self, xt: list[np.ndarray], yt: list[np.ndarray], **kwargs) -> None: # # if not self.mfk_initialized: # raise Exception("MFK must be initialized before training.") # # try: # self.previous_theta = check_theta_bounds(self.previous_theta, self.theta_bounds) # self.mfk.options["theta0"] = self.previous_theta # self.mfk.options["n_start"] = self.n_start # except: # warn("Error changing MFK parameters.") # # # TODO: data should be cleaned in the Driver class # self.xt = copy.deepcopy(xt) # self.yt = copy.deepcopy(yt) # self.xt, self.yt = clean_training_data(self.xt, self.yt) # # for k in range(self.num_levels-1): # self.mfk.set_training_values(self.xt[k], self.yt[k], name=k) # # self.mfk.set_training_values(self.xt[-1], self.yt[-1]) # # self.mfk.train() # # self.previous_theta = np.array(self.mfk.optimal_theta).reshape(self.num_levels, self.num_dim) # # if self.name: self.optimizer.iter_data[f"{self.name}_opt_theta"] = self.previous_theta # # def predict_values(self, x_pred: np.ndarray) -> np.ndarray: # y_pred = self.mfk.predict_values(x_pred) # return y_pred # # def predict_variances(self, x_pred: np.ndarray) -> np.ndarray: # s2_pred = self.mfk.predict_variances(x_pred) # return s2_pred # # def predict_variances_all_levels(self, x_pred: np.ndarray) -> tuple[np.ndarray, np.ndarray]: # """ # Compute the uncertainty reduction and the square scale factor of each level. # # Args: # x_pred (np.ndarray): coordinates of the next infill location # # Returns: # s2_red (np.ndarray): uncertainty reduction of each fidelity level # rho2 (np.ndarray): square scale factor between each fidelity level # # """ # # # np.ndarray(num_points, num_levels), list[np.ndarray(num_points)] # s2_pred, rho2 = self.mfk.predict_variances_all_levels(x_pred) # s2_red = np.zeros(self.num_levels) # # # s2_red[0] = s2_pred[0, 0] # # # for k in range(1, self.num_levels): # # s2_red[k] = s2_pred[0, k] - rho2[k-1].item() * s2_pred[0, k-1] # # tot_rho2 = np.ones(self.num_levels-1) # # # TODO: add adjust variance reduction computation to account for the nugget # # for k in range(self.num_levels-1): # tot_rho2[k] = 1 # for l in range(k, self.num_levels-1): # tot_rho2[k] *= rho2[l].item() # # s2_red[k] = s2_pred[0, k]*tot_rho2[k] # # s2_red[-1] = s2_pred[0, -1] # # return s2_red, tot_rho2
[docs] class SmtMFCK(Surrogate): def __init__(self): super().__init__() self.model = None self.train_counter = 0
[docs] def train(self, xt: list, yt: list, **kwargs): num_dim = xt[-1].shape[1] num_fidelity = len(xt) self.model = MFCK(print_global=False, n_start=3 * num_dim, hyper_opt="Cobyla", seed=42+self.train_counter) for k in range(num_fidelity-1): self.model.set_training_values(xt[k], yt[k], name=k) self.model.set_training_values(xt[-1], yt[-1]) self.model.train() self.train_counter += 1
# self.previous_theta = np.array(self.mfk.optimal_theta).reshape(self.num_levels, self.dim)
[docs] def predict_values(self, x_pred: np.ndarray) -> np.ndarray: y_pred = self.model.predict_values(x_pred) return y_pred
[docs] def predict_variances(self, x_pred: np.ndarray) -> np.ndarray: s2_pred = self.model.predict_variances(x_pred) return s2_pred.reshape(-1, 1) # makes variance shape consistent with value prediction
[docs] def predict_level_covariances(self, x: np.ndarray, lvli: int, lvlj: int = None): """ Compute the covariance between two fidelity levels at location x. Parameters ---------- x : np.ndarray Array with the inputs for make the prediction. lvli : int First fidelity level. lvlj : int Second fidelity level. If not specified, will be set to the highest fidelity level. Returns ------- covariances: np.array Returns the posterior covariance. """ x = (x - self.model.X_offset) / self.model.X_scale if self.model.lvl == 1: raise Exception("Fidelity covariances prediction is only for MFCK with multiple fidelity levels.") if self.model.options["eval_noise"]: # TODO raise Exception("Fidelity covariances not implemented for MFCK with noise.") if lvlj is None: lvlj = self.lvl - 1 self.model.K = self.model.compute_blockwise_K(self.model.X_norma_all, self.model.X_norma_all, self.model.optimal_theta) L = np.linalg.cholesky( self.model.K + self.model.options["nugget"] * np.eye(self.model.K.shape[0]) ) l_max = max(lvli, lvlj) l_min = min(lvli, lvlj) k_xx = self.model.compute_cross_K(x, x, l_max, l_min, self.model.optimal_theta) k_xX_i = [] k_xX_j = [] for lvl in range(self.model.lvl): li_max = max(lvli, lvl) li_min = min(lvli, lvl) k_xX_i.append( self.model.compute_cross_K( self.model.X_norma_all[lvl], x, li_max, li_min, self.model.optimal_theta )) lj_max = max(lvlj, lvl) lj_min = min(lvlj, lvl) k_xX_j.append( self.model.compute_cross_K( self.model.X_norma_all[lvl], x, lj_max, lj_min, self.model.optimal_theta )) beta_i = solve_triangular(L, np.vstack(k_xX_i), lower=True) beta_j = solve_triangular(L, np.vstack(k_xX_j), lower=True) covariances = k_xx - np.dot(beta_j.T, beta_i) covariances = np.diag(covariances) # * self.model.y_std**2 return covariances.reshape(-1, 1)