Source code for smt_optim.core.sample

from dataclasses import dataclass, field
from typing import Callable
import time
import warnings
import csv
import os

import numpy as np


[docs] @dataclass class Sample: """ Store sample data. Attributes ---------- x : np.ndarray Variable obj : np.ndarray Objective value(s). Array dimension: (num_obj,) cstr : np.ndarray Contraint value(s). Array dimension: (num_cstr,) eval_time : np.ndarray Evaluation times of each QoI. Array dimension: (num_obj+num_cstr,) metadata : dict Dictionary with sample metadata such as iter, budget and fidelity. """ x: np.ndarray # (num_dim,) fidelity: int obj: np.ndarray | None # (num_obj,) cstr: np.ndarray | None # (num_cstr,) eval_time: np.ndarray | None # (num_obj + num_cstr,) metadata: dict = field(default_factory=dict) def __repr__(self): string = f"======= sample data =======\n" string += f"x = {self.x}\n" string += f"obj = {self.obj}\n" string += f"cstr = {self.cstr}\n" string += f"eval_time = {self.eval_time}\n" string+= f"------- meta data -------\n" for key, value in self.metadata.items(): string += f"{key} = {value}\n" string += f"===========================\n" return string
[docs] class OptimizationDataset: """ Store samples. Attributes ---------- samples : list[Sample] num_obj: int Number of objectives num_cstr: int Number of constraints num_fidelity: int Number of fidelity levels fidelities: list Fidelity levels sorted in increasing order. num_samples: dict Number of samples for each fidelity levels. """ def __init__(self): self.samples: list[Sample] = [] self.num_obj: int | None = None self.num_cstr: int | None = None self.num_fidelity: int = 0 self.fidelities: list = [] self.num_samples: dict = dict()
[docs] def add(self, sample: Sample): self.samples.append(sample) if self.num_obj is None: self.num_obj = len(sample.obj) self.num_cstr = len(sample.cstr) if sample.cstr is not None else 0 else: if len(sample.obj) != self.num_obj or len(sample.cstr) != self.num_cstr: raise Exception("Sample data does not match dataset.") if sample.fidelity not in self.fidelities: self.fidelities.append(sample.fidelity) self.num_samples[sample.fidelity] = 0 self.num_fidelity += 1 self.num_samples[sample.fidelity] += 1
[docs] def get_by_fidelity(self, lvl: int): return [s for s in self.samples if s.fidelity == lvl]
[docs] def export_data(self, idx: int | list[int], lvl: int) -> np.ndarray: if isinstance(idx, int): idx = [idx] data = [] samples = self.get_by_fidelity(lvl) for s in samples: row = [] for i, qoi_idx in enumerate(idx): if qoi_idx < self.num_obj: row.append(s.obj[qoi_idx]) else: row.append(s.cstr[qoi_idx-self.num_obj]) data.append(row) return np.array(data)
[docs] def export_as_dict(self) -> dict: """ Exports the samples data as a dictionary. Each `attribute` corresponds to a key in the dictionary. Returns ------- dict Dictionary containing all sample data. """ num_sample = len(self.samples) fidelity = np.empty((num_sample, 1)) eval_time = np.empty((num_sample, self.num_obj+self.num_cstr)) nvar = len(self.samples[0].x) xt = np.empty((num_sample, nvar)) # inputs yt = np.empty((num_sample, self.num_obj)) # objectives ct = np.empty((num_sample, self.num_cstr)) # constraints for idx, sample in enumerate(self.samples): fidelity[idx, 0] = sample.fidelity eval_time[idx, :] = sample.eval_time xt[idx, :] = sample.x yt[idx, :] = sample.obj ct[idx, :] = sample.cstr data = { "fidelity": fidelity, "eval_time": eval_time, "x": xt, "obj": yt, "cstr": ct, } return data
[docs] def sample_func(x_new: np.ndarray, func: Callable) -> tuple[float, float]: """ Evaluates the function `func` value evaluated at `x_new`. Returns the function value and the elapsed time. If the function output is of type `np.ndarray`, converts it to a float. Parameters ---------- x_new: np.ndarry Point to sample. func Function to sample. Returns ------- float Function value at `x_new`. float Elapsed time for sampling the function. """ t0 = time.perf_counter() output = func(x_new) t1 = time.perf_counter() elapsed_time = t1 - t0 if isinstance(output, float): pass elif isinstance(output, np.ndarray): output = output.copy().ravel() if len(output) == 1: output = output.item() else: warnings.warn(f"Invalid function output: {output}") output = np.nan return output, elapsed_time
[docs] class Evaluator: """ Evaluate the expensive-to-evaluate functions. Attributes ---------- problem: Problem Optimization problem. res_path: str Logging directory path. """ def __init__(self, problem, res_path: str | None = None): self.problem = problem self.res_path = res_path
[docs] def sample_func(self, infill: list[np.ndarray | None], state) -> None: """ Sample the problem functions at requested query points. Parameters ---------- infill: list[np.ndarray | None] Query points. Each np.ndarray in the list corresponds to a fidelity level. The np.ndarray must have the shape (num_points, num_dim). state: State Optimization state. Returns ------- None """ for lvl, x_lvl in enumerate(infill): if x_lvl is None: continue else: for idx in range(x_lvl.shape[0]): x_new = x_lvl[idx, :] obj_values = np.empty(self.problem.num_obj) cstr_values = np.empty(self.problem.num_cstr) times = np.empty(self.problem.num_obj + self.problem.num_cstr) for obj_idx in range(self.problem.num_obj): obj_values[obj_idx], times[obj_idx] = sample_func(x_new, self.problem.obj_funcs[obj_idx][lvl]) for cstr_idx in range(self.problem.num_cstr): cstr_values[cstr_idx], times[self.problem.num_obj + cstr_idx] = sample_func(x_new, self.problem.cstr_funcs[cstr_idx][lvl]) state.budget += state.problem.costs[lvl] sample = Sample( x=x_new, fidelity=lvl, obj=obj_values, cstr=cstr_values, eval_time=times, metadata={ "iter": state.iter, "budget": state.budget, "fidelity": lvl, } ) state.dataset.add(sample) if self.res_path is not None: self.log_sample(sample)
[docs] def log_sample(self, sample) -> None: """ Log the sample data. Parameters ---------- sample: Sample The sample to log. Returns ------- None """ try: row = dict() row["iter"] = sample.metadata.get("iter", np.nan) row["budget"] = sample.metadata.get("budget", np.nan) # self.compute_used_budget() # self.budget row["fidelity"] = sample.metadata.get("fidelity", np.nan) # self.compute_used_budget() # self.budget # save variables for i in range(len(sample.x)): row[f"x{i}"] = sample.x[i] # save objectives for i in range(len(sample.obj)): row[f"f{i}"] = sample.obj[i] # save constraints for i in range(len(sample.cstr)): row[f"c{i}"] = sample.cstr[i] row["time"] = np.sum(sample.eval_time) path = os.path.join(self.res_path, "doe.csv") file_exists = os.path.isfile(path) # possibly does not work on Windows -> to be tested with open(path, 'a') as file: writer = csv.DictWriter(file, fieldnames=row.keys()) if not file_exists: writer.writeheader() writer.writerow(row) except Exception as e: print(f"Error while saving the DoE: {e}")