Source code for smt_optim.core.state
import copy
import time
import numpy as np
from smt_optim.core import OptimizationDataset
from smt_optim.utils.constraints import compute_rscv
# from smt_optim.core import Problem
[docs]
class State:
"""
State of the optimization process at a given moment.
The optimization state holds information about the optimization process at a given moment.
Parameters
----------
problem: Problem
The problem to be optimized.
Attributes
----------
problem: Problem
The problem to be optimized.
iter: int
Current iteration number.
budget: float
Current used budget.
bo_start: float
Start time of the optimization problem
bo_time: float
Elapsed time of the optimization driver.
obj_models: list[Surrogate]
List containing the surrogate modeling the objective(s) function(s).
cstr_models: list[Surrogate]
List containing the surrogate modeling the constraint(s) function(s).
cstr_types: list[str]
List containing the constraint types.
dataset: OptimizationDataset
The dataset containing all samples from the expensive-to-evaluate functions.
scaled_dataset: OptimizationDataset
The scaled dataset.
iter_log: dict
Dictionary containing logging data.
Methods
-------
scale_dataset(unit_std: bool)
Scale data in the dataset. The scaled dataset is accessible using the `scaled_dataset`attribute.
build_models()
Builds the surrogate models based on the scale dataset.
get_best_sample()
Returns the best sample in the dataset.
"""
def __init__(self, problem):
self.problem = problem
self.iter = 0
self.budget = 0
self.bo_start = 0
self.bo_time = 0
self.obj_models: list = []
for obj_config in self.problem.obj_configs:
self.obj_models.append(obj_config.surrogate())
self.cstr_models: list = []
self.cstr_types: list[str] = []
for cstr_config in self.problem.cstr_configs:
self.cstr_models.append(cstr_config.surrogate())
self.cstr_types.append(cstr_config.type)
self.dataset = OptimizationDataset()
self.scaled_dataset = None
self.iter_log = dict()
[docs]
def scale_dataset(self, unit_std: bool = False):
"""
Scales the dataset.
Parameters
----------
unit_std : bool, optional
If True, normalize by standard deviation.
Returns
-------
None
"""
num_qoi = self.problem.num_obj + self.problem.num_cstr
qoi_factor = [np.empty(num_qoi)] * self.problem.num_fidelity
qoi_step = [np.empty(num_qoi)] * self.problem.num_fidelity
for lvl in range(self.problem.num_fidelity):
for obj_idx in range(self.problem.num_obj):
data = self.dataset.export_data(obj_idx, lvl)
if unit_std:
factor = np.std(data)
step = np.mean(data)
else:
factor = 1
step = 0
if self.problem.obj_configs[obj_idx].type == "minimize":
qoi_factor[lvl][obj_idx] = factor
elif self.problem.obj_config[obj_idx].type == "maximize":
qoi_factor[lvl][obj_idx] = -factor
qoi_step[lvl][obj_idx] = step
for cstr_idx in range(self.problem.num_cstr):
c_config = self.problem.cstr_configs[cstr_idx]
data = self.dataset.export_data(self.problem.num_obj+cstr_idx, lvl)
if unit_std:
factor = np.std(data)
else:
factor = 1
if c_config.type in ["less", "equal"]:
qoi_factor[lvl][self.problem.num_obj+cstr_idx] = factor
elif c_config.type in ["greater"]:
qoi_factor[lvl][self.problem.num_obj+cstr_idx] = -factor
qoi_step[lvl][self.problem.num_obj+cstr_idx] = c_config.value
self.qoi_factor = qoi_factor
self.qoi_step = qoi_step
self.scaled_dataset = OptimizationDataset()
for sample in self.dataset.samples:
scaled_sample = copy.deepcopy(sample)
lvl = scaled_sample.fidelity
# should only normalize real variables
scaled_sample.x -= self.problem.design_space[:, 0]
scaled_sample.x /= (self.problem.design_space[:, 1] - self.problem.design_space[:, 0])
scaled_sample.obj[:] -= self.qoi_step[lvl][:self.problem.num_obj]
scaled_sample.obj[:] /= self.qoi_factor[lvl][:self.problem.num_obj]
scaled_sample.cstr[:] -= self.qoi_step[lvl][self.problem.num_obj:self.problem.num_obj+self.problem.num_cstr]
scaled_sample.cstr[:] /= self.qoi_factor[lvl][self.problem.num_obj:self.problem.num_obj+self.problem.num_cstr]
self.scaled_dataset.add(scaled_sample)
[docs]
def build_models(self):
"""
Builds the surrogate models.
Returns
-------
None
"""
data = self.scaled_dataset.export_as_dict()
fidelity = data["fidelity"]
all_xt = data["x"]
all_yt = data["obj"]
all_ct = data["cstr"]
fidelity_masks = []
xt = []
yt = []
ct = []
for lvl in range(self.problem.num_fidelity):
fidelity_masks.append((fidelity == lvl).ravel())
xt.append(all_xt[fidelity_masks[lvl], :])
t0 = time.perf_counter()
for idx in range(self.problem.num_obj):
yt.append(
[all_yt[fidelity_masks[lvl], idx].reshape(-1, 1) for lvl in range(self.problem.num_fidelity)]
)
kwargs = self.problem.obj_configs[idx].surrogate_kwargs if self.problem.obj_configs[idx].surrogate_kwargs is not None else dict()
self.obj_models[idx].train(xt, yt[idx], **kwargs)
for idx in range(self.problem.num_cstr):
ct.append(
[all_ct[fidelity_masks[lvl], idx].reshape(-1, 1) for lvl in range(self.problem.num_fidelity)]
)
kwargs = self.problem.cstr_configs[idx].surrogate_kwargs if self.problem.cstr_configs[idx].surrogate_kwargs is not None else dict()
self.cstr_models[idx].train(xt, ct[idx], **kwargs)
t1 = time.perf_counter()
self.iter_log["gp_training_time"] = t1 - t0
# def reset_log(self):
# self.iter_log.clear()
[docs]
def get_best_sample(self, ctol=1e-4, fidelity=-1):
"""
Returns the best sample based on the objective function value.
Parameters
----------
ctol : float, optional
Tolerance for constraint violation. Default is 1e-4.
fidelity : int, optional
Fidelity level to consider. If -1, uses the highest fidelity. Default is -1.
Returns
-------
sample : Sample
The best sample based on the objective function value.
"""
if fidelity == -1:
fidelity = self.problem.num_fidelity-1
best_obj = np.inf
best_sample = None
coeff = 1 if self.problem.obj_configs[0].type == "minimize" else -1
samples = self.dataset.get_by_fidelity(fidelity)
if self.problem.num_cstr == 0:
for s in samples:
if s.obj[0] < best_obj * coeff:
best_obj = s.obj[0]
best_sample = s
else:
for s in samples:
rscv = compute_rscv(s.cstr.reshape(1, -1), self.cstr_types)
if rscv <= ctol:
if s.obj[0] < best_obj * coeff:
best_obj = s.obj[0]
best_sample = s
if best_sample is None:
min_rscv = np.inf
for s in samples:
rscv = compute_rscv(s.cstr.reshape(1, -1), self.cstr_types)
if rscv < min_rscv:
best_sample = s
return best_sample