import numpy as np
import scipy.stats as stats
from dataclasses import dataclass
import warnings
import time
import json
import csv
import copy
import os
from typing import Callable, Type
import pickle
from smt_optim.core.state import State
from smt_optim.surrogate_models import Surrogate
from smt_optim.acquisition_strategies import AcquisitionStrategy
from smt_optim.core import Sample, OptimizationDataset, Evaluator
from smt_optim.core import State
from smt_optim.utils.initial_design import generate_initial_design
from smt_optim.utils.stop_criteria import check_stop_criteria
from smt_optim.utils.logger import ConsoleLogger, JsonLogger
[docs]
def wrap_func(func: Callable, factor: float = 1, step: float = 0) -> Callable:
"""
Wrap function to return factor * (func - step).
:param func: Function to wrap.
:type func: Callable
:param factor: Multiplicative factor.
:type factor: float
:param step: Additive factor.
:type step: float
:return: Wrapped function.
:rtype: Callable
"""
def wrapped(x, f=func):
return factor * (f(x) - step)
return wrapped
[docs]
def wrap_array(
array: np.ndarray, factor: float | np.ndarray = 1.0, step: float | np.ndarray = 0.0
) -> np.ndarray:
return factor * (array - step)
[docs]
def check_bounds(x: np.ndarray, bounds: np.ndarray) -> np.ndarray:
"""
Apply L1 correction to x point to make sure it's within the problem's bounds.
:param x: Infill point.
:type x: np.ndarray
:param bounds: Problem boundaries.
:type bounds: np.ndarray
:return: The bounds corrected infill point.
:rtype: np.ndarray
"""
x_corrected = np.where(x < bounds[:, 0], bounds[:, 0], x)
x_corrected = np.where(x_corrected > bounds[:, 1], bounds[:, 1], x_corrected)
if np.any(x != x_corrected):
warnings.warn(
f"Infill point was outside of the bounds. L1 correction was applied: (initial = {x}; corrected = {x_corrected})."
)
return x_corrected
# def compute_rscv(self, cstr_array: np.ndarray, cstr_config: list[ConstraintConfig], g_tol: float = 0., h_tol: float = 0.) -> np.ndarray:
[docs]
def compute_rscv(
cstr_array: np.ndarray, cstr_config: list, g_tol: float = 0.0, h_tol: float = 0.0
) -> np.ndarray:
scv = np.full_like(cstr_array, 0.0) # Square Constraint Violation
for c_id, c_config in enumerate(cstr_config):
if c_config.type in ["less", "greater"]:
valid_mask = cstr_array[:, c_id] <= g_tol
scv[~valid_mask, c_id] = cstr_array[~valid_mask, c_id] ** 2
elif c_config.type == "equal":
valid_mask = np.abs(cstr_array[:, c_id]) <= h_tol
scv[~valid_mask, c_id] = cstr_array[~valid_mask, c_id] ** 2
else:
raise Exception(
f"{c_config.type} is not a valid constraint type. It must be 'less', 'greater' or 'equal'."
)
rscv = np.sqrt(scv.sum(axis=1))
return rscv
[docs]
@dataclass
class ObjectiveConfig:
"""
Configuration of the objective function used in the optimization problem.
This class stores an objective callable(s) together with surrogate
modeling information used to approximate the objective during optimization.
Attributes
----------
objective : list[Callable]
List of an objective functions. Each callable must accept a decision
variable vector ``x`` and return a scalar objective value. The functions
must be ordered in increasing level of fidelity.
type : {"minimize", "maximize"}, default="minimize"
Specifies whether the objective should be minimized or maximized.
surrogate : Surrogate or None, default=None
Surrogate model used to approximate the objective function.
surrogate_kwargs : dict or None, default=None
Optional keyword arguments passed to the surrogate model.
"""
objective: list[Callable]
surrogate: type[Surrogate]
type: str = "minimize" # problem's type -> "minimize" or "maximize")
surrogate_kwargs: dict | None = None
[docs]
@dataclass
class ConstraintConfig:
"""
Configuration of a constraint function used in the optimization problem.
This class stores a constraint callable(s) together with surrogate
modeling information used to approximate the constraint during optimization.
Attributes
----------
constraint : list[Callable]
List of constraint functions. Each callable must accept a decision
variable vector ``x`` and return a scalar constraint value. The functions
must be ordered in increasing level of fidelity.
type : {"less", "greater", "equal"}, default="minimize"
Specifies whether the feasible domain is defined when the constraint value
is less, greater or equal to the configuration value.
value: float, default=0
Specifies the value defining the feasible domain.
surrogate : Surrogate or None, default=None
Surrogate model used to approximate the constraint function.
surrogate_kwargs : dict or None, default=None
Optional keyword arguments passed to the surrogate model.
"""
constraint: list[Callable]
type: str = "less" # "less"-> g <= 0; "greater" -> g >= 0
value: float = 0 # g <= value (or g >= value if type is " greater")
surrogate: Surrogate = None
surrogate_kwargs: dict | None = None
[docs]
@dataclass
class DriverConfig:
"""
Optimization driver configuration
Attributes
----------
max_iter: int
Maximum number of iteration
max_budget: float, default=inf
Maximum budget before termination of the optimization process
max_time: float, default=inf
Maximum time before termination of the optimization process
nt_init: int
Number of samples in the initial DoE
xt_init: list[np.ndarray]
Initial DoE to use. The Numpy array must be of shape (num_sample, num_dimension).
By providing an initial DoE, the driver will not generate an initial DoE. Cannot be
used with `nt_init`
results_dir: str or None, default=None
Name of the logging directory
verbose: bool, default=False
Print optimization information.
log_doe: bool, default=False
Log the value of the function values as soon as they are sampled. The values are
stored in a .csv file.
log_stats: bool, default=False
Log optimization statistics at the end of each iteration. The stats are store in
a .jsonl file.
scaling: bool, default=True
Scale the data. The objective is standardized. The constraints are divided by
their standard deviation. The design variables are normalized between 0 and 1.
seed: default=None
Seed for experiment reproducibility
"""
ctol: float = 1e-4 # tolerance for all constraints
max_iter: int | None = None # max number of BO iterations
max_budget: float = float("inf") # max BO budget
max_time: float = float("inf") # max BO elapsed time
nt_init: int | None = None # number of samples in initial DOE (with LHS)
xt_init: np.ndarray | None = (
None # initial training data [np.ndarray(nt, dim), np.ndarray(nt, dim)]
)
results_dir: str | None = "bo_result" # name for the results directory
verbose: bool = False # True/False print each iteration informations
log_doe: bool = False
log_stats: bool = False
callback_func: list[Callable] | Callable | None = (
None # additional method to call at the end of each iteration
)
scaling: bool = True # standardize the training data
seed: None = None
[docs]
class Driver:
def __init__(self, problem, config, strategy, strategy_kwargs=dict()):
self.problem = problem
self.config = config
self.state = State(problem)
self.state.dataset.log_data = True
self.strategy_kwargs = strategy_kwargs
self.strategy_kwargs["seed"] = config.seed
self.strategy = strategy(self.state, **self.strategy_kwargs)
if self.config.log_doe or self.config.log_stats:
self.config.results_dir = self.make_res_dir(self.config.results_dir)
else:
self.config.results_dir = None
self.evaluator = Evaluator(problem, config.results_dir)
# setup loggers
self.loggers = []
if self.config.verbose:
self.loggers.append(ConsoleLogger(self.config))
if self.config.log_stats:
self.loggers.append(JsonLogger(self.config))
[docs]
def iteration(self, state):
"""
Perform an optimization iteration. An iteration consists of:
- scaling all the training data
- building the surrogate models
- acquiring points to sample (and their associated fidelity level)
- sampling the expensive-to-evaluate functions
Parameters
----------
state: State
Optimization state on which to perform an iteration.
Returns
-------
State
Return optimization state on which an iteration was performed.
"""
state.iter += 1
# scale data
state.scale_dataset(self.config.scaling)
# build models
state.build_models()
# get infill
t0 = time.perf_counter()
infill = self.strategy.get_infill(state)
t1 = time.perf_counter()
state.iter_log["acq_opt_time"] = t1 - t0
for i in range(len(infill)):
if infill[i] is not None:
infill[i] *= (
self.problem.design_space[:, 1] - self.problem.design_space[:, 0]
)
infill[i] += self.problem.design_space[:, 0]
state.iter_log["fidelity"] = i + 1
# evaluate infill points
self.evaluator.sample_func(infill, state)
# log iteration data
self.call_loggers(state)
return state
[docs]
def optimize(self):
"""
Perform the optimization process which consists of:
- generating a DoE if the initial dataset is empty
- while no termination criteria is met, perform iterations
Returns
-------
State
optimization state on which the optimization process was performed.
"""
# generate initial design
if len(self.state.dataset.samples) == 0:
generate_initial_design(self.state, self.evaluator, self.config)
# loop - check stop criteria
while check_stop_criteria(self.state, self.config):
# iteration
self.iteration(self.state)
return self.state
[docs]
def finalize(self):
pass
[docs]
def call_loggers(self, state):
# if self.loggers is not None:
for logger in self.loggers:
try:
logger.on_iter_end(state)
except Exception as e:
print(f"Error while logging: {e}")
[docs]
def make_res_dir(self, res_dir: str | None) -> str | None:
og_res_dir = res_dir
if res_dir is not None:
# if results_dir already exists, append '_idx' to it to avoid overwriting existing data
idx = 1
while os.path.exists(res_dir):
res_dir = og_res_dir + f"_{idx}"
idx += 1
# create results_dir directory
os.makedirs(res_dir, exist_ok=False)
return res_dir
else:
return None
# # ======= old =======
# class OptimizerOld():
#
# def __init__(self, obj_config: ObjectiveConfig, config: OptimizerConfig, strategy: AcquisitionStrategy, strategy_params: dict | None = None):
#
# # initialize print setting
# self.verbose = config.verbose
#
# self.opt_data = {}
#
# self.results_dir = config.results_dir
# self.log_types = ["doe", "json"]
#
# # initialize objective function
# self.obj_config = obj_config
# self.obj_func = obj_config.objective
# self.domain = obj_config.design_space
# self.obj_type = obj_config.type
# self.obj_models = obj_config.surrogate
# self.costs = obj_config.costs
#
# # get constraint configurations
# self.cstr_config = config.constraints
# self.cstr_funcs = []
# self.ctol = config.ctol
#
# self.iter = 0
#
# # get stopping criteria
# self.max_iter = config.max_iter
# self.max_budget = config.max_budget
# self.max_time = config.max_time
#
# # get initial training data / setup
# self.nt_init = config.nt_init
# self.xt_init = config.xt_init
#
# # get misc configuration
# self.callback_func = config.callback_func
# self.scaling = config.scaling
# self.dynamic_costs = config.dynamic_costs
#
# self._check_optimizer_config()
# self._setup_logging()
#
# #
# self.strategy = strategy
# self.strategy_params = strategy_params
#
# self.num_dim = 0
# self.num_fidelity = 0
# self.num_obj = 0
# self.num_cstr = 0
#
# self.yt_factor = None
# self.ct_factor = None
# self.ct_step = None
#
# self._check_objective()
# self._check_constraints()
# self._setup_stopping_criteria()
# self._check_init_points()
#
# self.obj_models = []
# self.cstr_models = []
# self.g_models = []
# self.h_models = []
#
# self._initialize_surrogates()
#
# self.dataset = OptimizationDataset()
# self.scaled_dataset = None
# self.data = []
# self.xt = []
# self.yt = []
# self.ct = []
# self.f_min = np.inf
# self.rscv_min = np.inf
# self.x_min = None
# self.c_min = None
# self.samples_time = []
#
# self.xt_scaled = []
# self.yt_scaled = []
# self.ct_scaled = []
# self.f_min_scaled = np.inf
#
# # self._check_init_points()
# # self._gen_init_train_data()
# # self.update_f_min()
#
# self.acq_strategy = None
# self._initialize_acq_strategy()
#
# self.opt_data = {}
# self.iter_data = {}
#
# def _check_optimizer_config(self) -> None:
#
# if self.dynamic_costs is not None and self.dynamic_costs not in ["samples"]:
# raise Exception(f"Dynamic costs '{self.dynamic_costs}' is not supported.")
#
# if callable(self.callback_func):
# self.callback_func = [self.callback_func]
#
#
# def _setup_logging(self):
#
# results_dir = self.results_dir
#
# if results_dir is not None:
# # if results_dir already exists, append '_idx' to it to avoid overwriting existing data
# idx = 1
# while os.path.exists(results_dir):
# results_dir = self.results_dir + f"_{idx}"
# idx += 1
# self.results_dir = results_dir
#
# # create results_dir directory
# os.makedirs(self.results_dir)
#
# # create the DOE subdirectory (used to save the DOEs from each level)
# doe_path = os.path.join(self.results_dir, "DOE/")
# os.makedirs(doe_path, exist_ok=True)
#
#
# def _check_objective(self) -> None:
#
# if callable(self.obj_func):
# self.obj_func = [self.obj_func]
# elif type(self.obj_func) is list:
# pass
# else:
# raise Exception("ObjectiveConfig.objective must be of type list.")
#
# if self.obj_type == "minimize":
# maximize = False
# self.yt_factor = 1.
# elif self.obj_type == "maximize":
# maximize = True
# self.yt_factor = -1.
# else:
# raise Exception("ObjectiveConfig.type must be 'minimize' or 'maximize'.")
#
# # self.obj_func = self._wrap_objectives(self.obj_func, maximize=maximize)
#
# self.num_dim = self.domain.shape[0]
# self.num_obj += 1
#
# self.num_fidelity = len(self.obj_func)
#
# self.obj_func = [self.obj_func]
#
# if len(self.costs) != self.num_fidelity:
# raise Exception("ObjectiveConfig.costs must have the same number of levels as the objective.")
#
# # TODO: check costs are in ascending order
#
# def _check_constraints(self) -> None:
#
# if self.cstr_config is None:
# self.cstr_config = []
#
# self.num_cstr = len(self.cstr_config)
#
# if self.num_cstr > 0:
#
# self.ct_factor = np.empty(self.num_cstr)
# self.ct_step = np.empty(self.num_cstr)
#
# for c_id, c_config in enumerate(self.cstr_config):
#
# # self.cstr_funcs.append([])
#
# if c_config.type == "greater":
# self.ct_factor[c_id] = -1.
# else:
# self.ct_factor[c_id] = 1.
#
# self.ct_step[c_id] = c_config.value
#
# if callable(c_config.constraint):
# c_config.constraint = [c_config.constraint]
# elif type(c_config.constraint) is list:
# pass
# else:
# raise Exception("ConstraintConfig.constraint must be of type list.")
#
# if len(c_config.constraint) != self.num_fidelity:
# raise Exception("ConstraintConfig.constraint must have the same number of levels as the objective.")
#
# self.cstr_funcs.append( c_config.constraint )
# # self._wrap_constraints(c_config.constraint, c_config.type, c_config.value)
# # )
#
# def _setup_stopping_criteria(self):
#
# if self.num_dim == 0:
# raise Exception("Problem must have at least one dimension.")
#
# if self.max_iter is None:
# warnings.warn("Max number of iterations not specified. Set to 100.")
# self.max_iter = 10*self.num_dim
#
# if self.max_budget is None:
# self.max_budget = np.inf
#
# if self.max_time is None:
# self.max_time = np.inf
#
# def _check_init_points(self):
#
# if self.nt_init is not None and self.xt_init is not None:
# raise Exception("Define nt_init or xt_init, but not both.")
#
# elif self.nt_init is None:
# self.nt_init = 3*self.num_dim
#
# if self.xt_init is None:
# sampler = stats.qmc.LatinHypercube(self.num_dim)
# xt = sampler.random(self.nt_init)
# xt = stats.qmc.scale(xt, self.domain[:, 0], self.domain[:, 1])
# self.xt_init = [xt for _ in range(self.num_fidelity)]
#
# def _initialize_surrogates(self):
#
# self.obj_models = [self.obj_config.surrogate(optimizer=self)]
#
# for c_id, c_config in enumerate(self.cstr_config):
# self.cstr_models.append(
# c_config.surrogate(optimizer=self)
# )
#
# if c_config.type in ["less", "greater"]:
# self.g_models.append(self.cstr_models[c_id])
# elif c_config.type == "equal":
# self.h_models.append(self.cstr_models[c_id])
#
# def _gen_init_train_data(self):
#
# for lvl in range(self.num_fidelity):
# for idx in range(self.xt_init[lvl].shape[0]):
# self.sample_point(self.xt_init[lvl][idx, :], lvl)
#
# def _initialize_acq_strategy(self):
#
# acq_context = self.generate_acq_context()
#
# if type(self.strategy_params) is dict:
# self.acq_strategy = self.strategy(acq_context, **self.strategy_params)
# else:
# self.acq_strategy = self.strategy(acq_context)
#
# self.acq_strategy.validate_config(acq_context)
#
# # def update_f_min(self):
# # # feasible_mask = np.any(self.ct[-1] <= 1e-4, axis=1) # use cstr_tol in ConstraintConfig
# # # self.f_min = np.min(np.where(feasible_mask == True, self.yt[-1], np.inf))
# # # print(f"f_min = {self.f_min}")
# #
# # previous_f_min = self.f_min
# #
# # #feas_mask = np.all(self.ct[-1] <= self.ctol, axis=1)
# # valid_cstr = np.full((self.ct[-1].shape[0], self.num_cstr), False)
# #
# # for c_id, c_config in enumerate(self.cstr_config):
# # if c_config.type in ["less", "greater"]:
# # valid_cstr[:, c_id] = np.where(self.ct[-1][:, c_id] <= self.ctol, True, False)
# # elif c_config.type == "equal":
# # valid_cstr[:, c_id] = np.where(np.abs(self.ct[-1][:, c_id]) <= self.ctol, True, False)
# #
# # feas_mask = np.all(valid_cstr, axis=1)
# #
# # if np.any(feas_mask):
# # local_min_id = self.yt[-1][feas_mask].argmin()
# # # global_min_id = feas_mask[local_min_id]
# #
# # self.f_min = self.yt[-1][feas_mask][local_min_id].item()
# # self.x_min = self.xt[-1][feas_mask][local_min_id]
# # self.c_min = self.ct[-1][feas_mask][local_min_id]
# #
# # else:
# # self.f_min = np.inf
# # self.x_min = None
# # self.c_min = None
# #
# # self._check_f_min_decreasing(self.f_min, previous_f_min)
# #
# # def update_rscv_min(self):
# # rscv = compute_rscv(self.ct[-1], self.cstr_config, g_tol=0.0, h_tol=0.0)
# # self.rscv_min = rscv.min()
# #
# #
# # def _check_f_min_decreasing(self, next_f_min, previous_f_min):
# # if previous_f_min < next_f_min:
# # warnings.warn("f_min is increasing.")
#
# def sample_point(self, x_new: np.ndarray, level: int) -> None:
#
# x_new = check_bounds(x_new, self.domain)
#
# obj_values = np.empty(self.num_obj)
# cstr_values = np.empty(self.num_cstr)
# times = np.empty(self.num_obj+self.num_cstr)
#
# def sample_func(x_new: np.ndarray, func: Callable) -> tuple[float, float]:
# t0 = time.perf_counter()
# value = func(x_new)
# t1 = time.perf_counter()
# return value, t1-t0
#
# for obj_idx in range(self.num_obj):
# obj_values[obj_idx], times[obj_idx] = sample_func(x_new, self.obj_func[obj_idx][level])
#
# for cstr_idx in range(self.num_cstr):
# cstr_values[cstr_idx], times[self.num_obj+cstr_idx] = sample_func(x_new, self.cstr_funcs[cstr_idx][level])
#
# sample = Sample(
# x=x_new,
# fidelity=level,
# obj=obj_values,
# cstr=cstr_values,
# eval_time=times,
# metadata={"iter": self.iter}
# )
#
# self.dataset.add(sample)
#
# if "doe" in self.log_types:
# self.save_sample(sample)
#
# def _standardize_data(self, data: np.ndarray) -> tuple[np.ndarray | float]:
#
# mean = data.mean()
# std = data.std()
# std_data = (data - mean)/std
#
# return std_data, mean, std
#
# def scale_data(self):
#
# num_qoi = self.num_obj + self.num_cstr
# qoi_factor = np.empty(num_qoi)
# qoi_step = np.empty(num_qoi)
#
# for obj_idx in range(self.num_obj):
# if self.obj_config.type == "minimize":
# qoi_factor[obj_idx] = 1
# elif self.obj_config.type == "maximize":
# qoi_factor[obj_idx] = -1
#
# qoi_step[obj_idx] = 0
#
# for cstr_idx in range(self.num_cstr):
# c_config = self.cstr_config[cstr_idx]
#
# if c_config.type in ["less", "equal"]:
# qoi_factor[self.num_obj+cstr_idx] = 1
# elif c_config.type in ["greater"]:
# qoi_factor[self.num_obj+cstr_idx] = -1
#
# qoi_step[self.num_obj+cstr_idx] = c_config.value
#
# self.qoi_factor = qoi_factor
# self.qoi_step = qoi_step
#
# self.scaled_dataset = OptimizationDataset()
#
# for sample in self.dataset.samples:
#
# scaled_sample = copy.deepcopy(sample)
#
# # should only normalize real variables
# scaled_sample.x -= self.domain[:, 0]
# scaled_sample.x /= (self.domain[:, 1] - self.domain[:, 0])
# scaled_sample.obj[:] *= self.qoi_factor[:self.num_obj]
# scaled_sample.cstr[:] *= self.qoi_factor[self.num_obj:self.num_obj+self.num_cstr]
#
# self.scaled_dataset.add(scaled_sample)
#
#
# def build_models(self):
#
# def group_by_fidelity() -> tuple[list[np.ndarray], list[np.ndarray]]:
#
# x = []
# qoi = []
#
# for lvl in range(self.num_fidelity):
#
# samples = self.scaled_dataset.get_by_fidelity(lvl)
#
# x_lvl = np.empty((len(samples), self.num_dim))
# qoi_lvl = np.empty((len(samples), self.num_obj + self.num_cstr))
#
# for idx, sample in enumerate(samples):
# x_lvl[idx, :] = sample.x
# qoi_lvl[idx, :self.num_obj] = sample.obj
# qoi_lvl[idx, self.num_obj:] = sample.cstr
#
# x.append(x_lvl)
# qoi.append(qoi_lvl)
#
# return x, qoi
#
# x_train, qoi_train = group_by_fidelity()
#
# qoi_models = self.obj_models + self.cstr_models
#
# for qoi_idx in range(self.num_obj+self.num_cstr):
#
# idx_train = []
#
# for lvl in range(self.num_fidelity):
# idx_train.append(qoi_train[lvl][:, qoi_idx].reshape(-1, 1))
#
# qoi_models[qoi_idx].train(x_train, idx_train)
#
#
# def generate_acq_context(self):
#
# cstr_types = []
# for c_id, c_config in enumerate(self.cstr_config):
# if c_config.type in ["less", "greater"]:
# cstr_types.append("less")
# elif c_config.type == "equal":
# cstr_types.append("equal")
#
# acq_context = State(
# num_dim=self.num_dim,
# num_obj=1,
# num_cstr=self.num_cstr,
# num_fidelity=self.num_fidelity,
# design_space=np.array([[0, 1]] * self.num_dim),
# obj_models=self.obj_models,
# cstr_models=self.cstr_models,
# cstr_types=cstr_types,
# dataset=self.scaled_dataset,
# )
#
# return acq_context
#
# def sample_infills(self, next_x: list[np.ndarray]) -> None:
#
# # Convert the single fidelity acquisition function output to a list as
# # to make it compatible with the multi-fidelity approach
# if type(next_x) is not list:
# next_x = [next_x]
#
# if len(next_x) != self.num_fidelity:
# warnings.warn("")
#
# # Sample each fidelity level sequentially
# for k in range(self.num_fidelity):
#
# # if None -> the fidelity k is not to be sampled
# if self.next_x[k] is None:
# continue
#
# # unscale infill point
# if self.scaling:
# self.next_x[k] *= (self.domain[:, 1] - self.domain[:, 0])
# self.next_x[k] += self.domain[:, 0]
#
# # Check if the infill point is already in the training data
# # TODO: what to do if the next infill location is already in the training data?
# # if np.any(np.all(self.xt[k] == self.next_x[k], axis=1)):
# # warnings.warn("Infill point is already in the training data.")
# # continue
#
# # sample objective function and constraints
# self.sample_point(self.next_x[k], k)
#
#
# def perform_iteration(self):
#
# self.iter_data = dict() # reset iteration data dictionary
#
# # ------- Wrap training data -------
# # self._wrap_training_data()
#
# # ------- Scale training data -------
# # self.scale_training_data()
# self.scale_data()
#
# # ------- Update cost ratio -------
# self.update_costs()
#
# # ------- Surrogate models training -------
# gp_t0 = time.perf_counter()
#
# self.build_models()
#
# gp_t1 = time.perf_counter()
# gp_time = gp_t1 - gp_t0 # elapsed time for training the models
#
# # log gp training elapsed time
# self.iter_data["gp_training_time"] = gp_time
#
# # # ------- Acquisition function optimization -------
# acq_context = self.generate_acq_context()
#
# acq_t0 = time.perf_counter()
#
# # Find enrichment location
# self.next_x, acq_data = self.acq_strategy.get_infill(acq_context)
# self.iter_data["acq_data"] = acq_data
#
# acq_t1 = time.perf_counter()
# acq_time = acq_t1 - acq_t0 # elapsed time for finding the next acquisition point
#
# # log acquisition function maximization time
# self.iter_data["acq_opt_time"] = acq_time
#
# # ------- Sample infill location -------
# self.sample_infills(self.next_x)
#
# if self.callback_func is not None:
# for func in self.callback_func:
# func(self)
#
#
# def optimize(self):
#
# bo_start = time.perf_counter()
#
# # generate initial doe
# self._gen_init_train_data()
#
# # self.scale_training_data()
# self.scale_data()
#
# # update f_min
# # self.update_f_min()
# # self.update_rscv_min()
#
# iter_id = 0
# self.iter = iter_id
#
# # for l in range(self.num_fidelity):
# # self.iter_data[f"n{l + 1}"] = len(self.yt[l])
# #
# # self.update_costs()
# # self.budget = self.compute_used_budget()
# # self.iter_data["budget"] = self.compute_used_budget()
# # self.iter_data["f_min"] = self.f_min
# # self.iter_data["x_min"] = self.x_min
# # self.iter_data["c_min"] = self.c_min
#
# self.opt_data[0] = self.iter_data
# self.save_data()
#
# self.continue_bo = True
#
# # if self.verbose: print(
# # f"| iter= {iter_id}/{self.max_iter} | budget={self.budget:.3f}/{self.max_budget:.3f} | f_min={self.f_min:.3e} | rscv_min={self.rscv_min:.3e} |"
# # )
#
# while self.continue_bo:
#
# iter_id += 1
# self.iter = iter_id
#
# self.perform_iteration()
#
#
# # update f_min
# # self.update_f_min()
# # self.update_rscv_min()
#
# # for l in range(self.num_fidelity):
# # self.iter_data[f"n{l + 1}"] = len(self.yt[l])
# #
# # self.iter_data["f_min"] = self.f_min
# # self.iter_data["x_min"] = self.x_min
# # self.iter_data["c_min"] = self.c_min
#
# self.budget = self.compute_used_budget()
# self.iter_data["budget"] = self.budget
#
# self.iter_data["costs"] = self.costs
#
# # elapsed time since optimization start
# self.bo_time = time.perf_counter() - bo_start
# self.iter_data["bo_time"] = self.bo_time
#
# self.continue_bo = self.check_stop_criteria()
# self.iter_data["continue"] = self.continue_bo
#
# # add iteration data to the optimization data dictionary
# self.opt_data[iter_id] = self.iter_data
#
# # self.dump_pikle_log()
# self.save_data()
#
# # Display the iteration number, best feasible objective and fidelity level sampled
# # if self.verbose : print(
# # f"| iter= {iter_id}/{self.max_iter} | budget={self.budget:.3f}/{self.max_budget:.3f} | f_min={self.f_min:.3e} | rscv_min={self.rscv_min:.3e} | lvl={max_level}/{self.num_fidelity - 1} | gp_time={gp_time:.3f} | acq_time={acq_time:.3f}")
#
# if self.verbose : print(
# f"| iter= {iter_id}/{self.max_iter} | budget={self.budget:.3f}/{self.max_budget:.3f}")
#
# # ------- End of optimization loop -------
#
# return self.opt_data
#
# # def dump_pikle_log(self):
# # try:
# #
# # path = os.path.join(self.results_dir, "opt_data.pkl")
# #
# # with open(path, 'wb') as file:
# # pickle.dump(self.opt_data, file)
# #
# # except Exception as e:
# # # TODO: use warnings.warn
# # warnings.warn(f"Error while saving optimization data: {e}")
#
#
# def dump_json_log(self):
# try:
#
# path = os.path.join(self.results_dir, "opt_data.json")
#
# with open(path, 'w') as file:
# safe_data = json_safe(self.opt_data)
# json.dump(safe_data, file, indent=2)
#
# except Exception as e:
# # TODO: use warnings.warn
# warnings.warn(f"Error while saving optimization data: {e}")
#
#
# def save_data(self):
#
# if "json" in self.log_types:
# self.dump_json_log()
#
#
#
#
# def save_sample(self, sample: Sample) -> None:
#
# try:
# row = dict()
#
# row["iter"] = self.iter
# row["budget"] = self.compute_used_budget() # self.budget
#
# # save variable
# for i in range(self.num_dim):
# row[f"x{i}"] = sample.x[i]
#
# # save objectives
# for i in range(self.num_obj):
# row[f"{i}"] = sample.obj[i]
#
# # save constraints
# for i in range(self.num_cstr):
# row[f"c{i}"] = sample.cstr[i]
#
# row["time"] = np.sum(sample.eval_time)
#
# path = os.path.join(self.results_dir, "DOE", f"doe_fidelity_{sample.fidelity}.csv")
# file_exists = os.path.isfile(path)
#
# # possibly does not work on Windows -> to be tested
# with open(path, 'a') as file:
# writer = csv.DictWriter(file, fieldnames=row.keys())
#
# if not file_exists:
# writer.writeheader()
#
# writer.writerow(row)
#
# except Exception as e:
# print(f"Error while saving the DoE: {e}")
#
#
# def dump_csv_doe(self, level):
# pass
#
# try:
# row = dict()
#
# row["iter"] = self.iter
# row["budget"] = self.budget
#
# x = self.xt[level][-1, :]
# print(f"x = {x}")
# for i in range(len(x)):
# row[f"x{i}"] = x[i]
#
# row["f"] = self.data[level][-1, 0]
#
# for i in range(self.num_cstr):
# row[f"c{i}"] = self.data[level][-1, i+1]
#
# path = f"DoE/{self.log_filename}_{level}.csv"
# file_exists = os.path.isfile(path)
# print(f"file exists = {file_exists}")
#
# with open(path, 'w') as file:
# writer = csv.DictWriter(file, fieldnames=row.keys())
#
# if not file_exists:
# writer.writeheader()
#
# writer.writerow(row)
#
# except Exception as e:
# print(f"Error while logging the DoE: {e}")
#
# def compute_used_budget(self):
#
# budget = 0
#
# for k in range(self.num_fidelity):
# samples = self.dataset.get_by_fidelity(k)
# budget += self.costs[k] * len(samples)
#
# return budget
#
# def check_stop_criteria(self):
#
# if self.iter >= self.max_iter:
# return False
# elif self.budget >= self.max_budget:
# return False
# elif self.bo_time >= self.max_time:
# return False
# else:
# return True
#
# def update_costs(self) -> None:
# """
# Update the costs of each level.
# - If set to None, the costs are never updated.
# - If set to 'samples', the cost of each level corresponds to it's average time to be sampled.
#
# :return: None
# """
#
# if self.dynamic_costs == "samples":
# for lvl in range(self.num_fidelity):
# # average sampling time per level
# self.costs[lvl] = self.samples_time[lvl].sum(axis=1).mean().item()
#
# # def scale_training_data(self):
# #
# # self.xt_scaled = []
# # self.yt_scaled = []
# # self.ct_scaled = []
# #
# # for lvl in range(self.num_fidelity):
# #
# # self.xt_scaled.append(self.xt[lvl].copy())
# #
# # # transform the objective into a minimization problem
# # self.yt_scaled.append(wrap_array(self.yt[lvl], factor=self.yt_factor))
# #
# # if self.num_cstr >= 1:
# # # transform the constraints to define the feasible domain as: g <= 0 and h == 0
# # self.ct_scaled.append(wrap_array(self.ct[lvl], factor=self.ct_factor, step=self.ct_step))
# #
# # if self.scaling:
# # # scale xt between 0 and 1
# # self.xt_scaled[lvl][:] -= self.domain[:, 0]
# # self.xt_scaled[lvl][:] /= (self.domain[:, 1] - self.domain[:, 0])
# #
# # # update scaled domain boundaries
# # self.domain_scaled = np.empty((self.num_dim, 2))
# # self.domain_scaled[:, 0] = 0.
# # self.domain_scaled[:, 1] = 1.
# #
# #
# # # scaled objective to unit std
# # yt_scaled, yt_mean, yt_std = self._standardize_data(self.yt[lvl])
# # self.yt_scaled[lvl] = yt_scaled
# #
# # # update minimum objective
# # self.f_min_scaled = (self.f_min - yt_mean) / yt_std
# #
# # # scaled constraints to unit std
# # if self.num_cstr >= 1:
# # for c_id in range(self.ct[lvl].shape[1]):
# # self.ct_scaled[lvl][:, c_id] /= self.ct[lvl][:, c_id].std()
# # else:
# # self.domain_scaled = self.domain.copy()
if __name__ == "__main__":
pass