MF-SM Source Code


MfModel classes

mf_sm.mf_models.MfModel

Bases: ABC

Wrapping class around a multifidelity model to define it, train it and evaluate it.

Attributes:
  • dim (int) –

    problem dimension.

  • model_dict (dict) –

    dictionary containing all necessary model parameters.

  • outdir (str) –

    path to the model folder.

  • seed (int) –

    seed to enforce reproducibility.

  • x_lf_DOE (ndarray) –

    the lf_DOE the model was trained with.

  • x_hf_DOE (ndarray) –

    the hf_DOE the model was trained with.

  • y_lf_DOE (ndarray) –

    the lf_DOE the model was trained with.

  • y_hf_DOE (ndarray) –

    the hf_DOE the model was trained with.

Source code in aero_optim/mf_sm/mf_models.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
class MfModel(ABC):
    """
    Wrapping class around a multifidelity model to define it, train it and evaluate it.

    Attributes:
        dim (int): problem dimension.
        model_dict (dict): dictionary containing all necessary model parameters.
        outdir (str): path to the model folder.
        seed (int): seed to enforce reproducibility.
        x_lf_DOE (np.ndarray): the lf_DOE the model was trained with.
        x_hf_DOE (np.ndarray): the hf_DOE the model was trained with.
        y_lf_DOE (np.ndarray): the lf_DOE the model was trained with.
        y_hf_DOE (np.ndarray): the hf_DOE the model was trained with.
    """
    def __init__(self, dim: int, model_dict: dict, outdir: str, seed: int):
        self.dim = dim
        self.model_dict = model_dict
        self.outdir = outdir
        self.seed = seed
        self.requires_nested_doe: bool
        self.x_lf_DOE: np.ndarray | None = None
        self.x_hf_DOE: np.ndarray | None = None
        self.y_lf_DOE: np.ndarray | None = None
        self.y_hf_DOE: np.ndarray | None = None
        # seed numpy
        np.random.seed(seed)

    @abstractmethod
    def train(self):
        """
        Trains a model with low and high fidelity data.
        """

    @abstractmethod
    def evaluate(self, x: np.ndarray):
        """
        Returns a model prediction for a given input x.
        """

    def evaluate_std(self, x: np.ndarray):
        raise Exception("Not implemented")

    def get_DOE(self) -> np.ndarray:
        """
        Returns the DOE the model was trained with.
        """
        assert self.x_lf_DOE is not None and self.x_hf_DOE is not None
        if self.requires_nested_doe:
            return np.copy(self.x_lf_DOE)
        else:
            return np.vstack((self.x_lf_DOE, self.x_hf_DOE))

    def set_DOE(self, x_lf: np.ndarray, x_hf: np.ndarray, y_lf: np.ndarray, y_hf: np.ndarray):
        """
        Sets the hf and lf DOEs the model was trained with.
        """
        if self.x_lf_DOE is None:
            self.x_lf_DOE = x_lf
            self.x_hf_DOE = x_hf
            self.y_lf_DOE = y_lf
            self.y_hf_DOE = y_hf
        else:
            assert (
                self.x_lf_DOE is not None
                and self.x_hf_DOE is not None
                and self.y_lf_DOE is not None
                and self.y_hf_DOE is not None
            )
            self.x_lf_DOE = np.vstack((self.x_lf_DOE, x_lf))
            self.x_hf_DOE = np.vstack((self.x_hf_DOE, x_hf))
            if self.y_lf_DOE.ndim == 2:
                self.y_lf_DOE = np.vstack((self.y_lf_DOE, y_lf))
                self.y_hf_DOE = np.vstack((self.y_hf_DOE, y_hf))
            else:
                self.y_lf_DOE = np.hstack((self.y_lf_DOE, y_lf))
                self.y_hf_DOE = np.hstack((self.y_hf_DOE, y_hf))

    def get_y_star(self) -> float | np.ndarray:
        """
        Returns the current best lf fitness (SOO).
        """
        # single objective, y_star is simply the min value
        assert self.y_hf_DOE is not None
        return min(self.y_hf_DOE)

evaluate(x: np.ndarray) abstractmethod

Returns a model prediction for a given input x.

Source code in aero_optim/mf_sm/mf_models.py
55
56
57
58
59
@abstractmethod
def evaluate(self, x: np.ndarray):
    """
    Returns a model prediction for a given input x.
    """

get_DOE() -> np.ndarray

Returns the DOE the model was trained with.

Source code in aero_optim/mf_sm/mf_models.py
64
65
66
67
68
69
70
71
72
def get_DOE(self) -> np.ndarray:
    """
    Returns the DOE the model was trained with.
    """
    assert self.x_lf_DOE is not None and self.x_hf_DOE is not None
    if self.requires_nested_doe:
        return np.copy(self.x_lf_DOE)
    else:
        return np.vstack((self.x_lf_DOE, self.x_hf_DOE))

get_y_star() -> float | np.ndarray

Returns the current best lf fitness (SOO).

Source code in aero_optim/mf_sm/mf_models.py
 99
100
101
102
103
104
105
def get_y_star(self) -> float | np.ndarray:
    """
    Returns the current best lf fitness (SOO).
    """
    # single objective, y_star is simply the min value
    assert self.y_hf_DOE is not None
    return min(self.y_hf_DOE)

set_DOE(x_lf: np.ndarray, x_hf: np.ndarray, y_lf: np.ndarray, y_hf: np.ndarray)

Sets the hf and lf DOEs the model was trained with.

Source code in aero_optim/mf_sm/mf_models.py
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def set_DOE(self, x_lf: np.ndarray, x_hf: np.ndarray, y_lf: np.ndarray, y_hf: np.ndarray):
    """
    Sets the hf and lf DOEs the model was trained with.
    """
    if self.x_lf_DOE is None:
        self.x_lf_DOE = x_lf
        self.x_hf_DOE = x_hf
        self.y_lf_DOE = y_lf
        self.y_hf_DOE = y_hf
    else:
        assert (
            self.x_lf_DOE is not None
            and self.x_hf_DOE is not None
            and self.y_lf_DOE is not None
            and self.y_hf_DOE is not None
        )
        self.x_lf_DOE = np.vstack((self.x_lf_DOE, x_lf))
        self.x_hf_DOE = np.vstack((self.x_hf_DOE, x_hf))
        if self.y_lf_DOE.ndim == 2:
            self.y_lf_DOE = np.vstack((self.y_lf_DOE, y_lf))
            self.y_hf_DOE = np.vstack((self.y_hf_DOE, y_hf))
        else:
            self.y_lf_DOE = np.hstack((self.y_lf_DOE, y_lf))
            self.y_hf_DOE = np.hstack((self.y_hf_DOE, y_hf))

train() abstractmethod

Trains a model with low and high fidelity data.

Source code in aero_optim/mf_sm/mf_models.py
49
50
51
52
53
@abstractmethod
def train(self):
    """
    Trains a model with low and high fidelity data.
    """

mf_sm.mf_models.MfDNN

Bases: MfModel

Wrapping class around a torch multifidelity deep neural network model.

Source code in aero_optim/mf_sm/mf_models.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
class MfDNN(MfModel):
    """
    Wrapping class around a torch multifidelity deep neural network model.
    """
    def __init__(self, dim: int, model_dict: dict, outdir: str, seed: int):
        super().__init__(dim, model_dict, outdir, seed)
        check_dir(outdir)
        self.requires_nested_doe = model_dict.get("nested_doe", False)
        self.name = model_dict.get("name", "MFDNN")
        # seed torch
        torch.manual_seed(seed)
        # select device
        self.device: torch.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        # the first layer of both network is created here
        self.model_dict["NNL"]["layer_sizes_NNL"].insert(0, dim)
        self.n_obj = self.model_dict["NNL"]["layer_sizes_NNL"][-1]
        self.model_dict["NNH"]["layer_sizes_NNH1"].insert(0, dim + self.n_obj)
        self.model_dict["NNH"]["layer_sizes_NNH2"].insert(0, dim + self.n_obj)
        # low fidelity network
        self.NNL = NNL(self.model_dict["NNL"]["layer_sizes_NNL"])
        # high fidelity network
        self.NNH = NNH(
            self.model_dict["NNH"]["layer_sizes_NNH1"],
            self.model_dict["NNH"]["layer_sizes_NNH2"]
        )
        # to device
        self.NNL.to(self.device)
        self.NNH.to(self.device)
        # init weights
        self.NNL.apply(weights_init)
        self.NNH.apply(weights_init)

    def train(self):
        # pretrain low fidelity model
        weight_decay_NNL = self.model_dict["NNL"]["optimizer"].get("weight_decay", 0)
        if self.model_dict["pretraining"]:
            loss_target = self.model_dict["NNL"].get("loss_target", 1e-3)
            niter = self.model_dict["NNL"].get("niter", 10000)
            # define optimizer
            lr = self.model_dict["NNL"]["optimizer"].get("lr", 1e-3)
            NNL_optimizer = torch.optim.Adam(
                self.NNL.parameters(),
                lr=lr,
                weight_decay=weight_decay_NNL
            )
            # define scheduler
            if "scheduler" in self.model_dict["NNL"]:
                NNL_scheduler = torch.optim.lr_scheduler.MultiStepLR(
                    NNL_optimizer, **self.model_dict["NNL"]["scheduler"]
                )
            else:
                NNL_scheduler = None
            # pretrain
            assert self.x_lf_DOE is not None and self.y_lf_DOE is not None
            NNL_pretrain(
                self.NNL,
                NNL_optimizer,
                self.x_lf_DOE, self.y_lf_DOE,
                loss_target,
                niter,
                self.device,
                scheduler=NNL_scheduler
            )
        # coupled training
        loss_target = self.model_dict["NNH"].get("loss_target", 1e-5)
        niter = self.model_dict["NNH"].get("niter", 15000)
        # define optimizer
        lr = self.model_dict["NNH"]["optimizer"].get("lr", 1e-4)
        weight_decay_NNH1 = (
            self.model_dict["NNH"]["optimizer"].get("weight_decay_NNH1", 0)
        )
        weight_decay_NNH2 = (
            self.model_dict["NNH"]["optimizer"].get("weight_decay_NNH2", 0)
        )
        MfDNN_optimizer = torch.optim.Adam(
            [{'params': self.NNH.NNH1.parameters(), 'weight_decay': weight_decay_NNH1},
             {'params': self.NNH.NNH2.parameters(), 'weight_decay': weight_decay_NNH2},
             {'params': self.NNH.alpha},
             {'params': self.NNL.parameters(), 'weight_decay': weight_decay_NNL}], lr=lr
        )
        # define scheduler
        if "scheduler" in self.model_dict["NNH"]:
            MfDNN_scheduler = torch.optim.lr_scheduler.MultiStepLR(
                MfDNN_optimizer,
                **self.model_dict["NNH"]["scheduler"]
            )
        else:
            MfDNN_scheduler = None
        # train
        assert self.x_lf_DOE is not None and self.y_lf_DOE is not None
        assert self.x_hf_DOE is not None and self.y_hf_DOE is not None
        MfDNN_train(
            self.NNL,
            self.NNH,
            MfDNN_optimizer,
            self.x_lf_DOE, self.y_lf_DOE,
            self.x_hf_DOE, self.y_hf_DOE,
            loss_target,
            niter,
            self.device,
            self.outdir,
            scheduler=MfDNN_scheduler
        )

    def evaluate(self, x: np.ndarray) -> np.ndarray | list[np.ndarray]:
        x_torch = torch.from_numpy(x).float()
        x_torch = x_torch.to(self.device)
        y_lo_hi = self.NNL.eval()(x_torch)
        y = self.NNH.eval()(torch.cat((x_torch, y_lo_hi), dim=1))
        y_np = y.cpu().detach().numpy()
        return y_np if self.n_obj == 1 else [y_np[:, c_][:, None] for c_ in range(self.n_obj)]

mf_sm.mf_models.MfSMT

Bases: MfModel

Wrapping class around an smt multifidelity cokriging model.

Source code in aero_optim/mf_sm/mf_models.py
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
class MfSMT(MfModel):
    """
    Wrapping class around an smt multifidelity cokriging model.
    """
    def __init__(self, dim: int, model_dict: dict, outdir: str, seed: int):
        super().__init__(dim, model_dict, outdir, seed)
        self.requires_nested_doe = model_dict.get("nested_doe", True)
        self.name = model_dict.get("name", "AR1")
        self.eval_noise = model_dict.get("eval_noise", False)
        self.model = MFK(theta0=dim * [1.0], eval_noise=self.eval_noise, print_global=False)

    def train(self):
        self.model.set_training_values(self.x_lf_DOE, self.y_lf_DOE, name=0)
        self.model.set_training_values(self.x_hf_DOE, self.y_hf_DOE)
        self.model.train()

    def evaluate(self, x: np.ndarray) -> np.ndarray:
        y = self.model.predict_values(x)
        return y

    def evaluate_std(self, x: np.ndarray) -> np.ndarray:
        return np.sqrt(self.model.predict_variances(x))

mf_sm.mf_models.MultiObjectiveModel

Bases: MfModel

Multi-objective surrogate model.

Source code in aero_optim/mf_sm/mf_models.py
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
class MultiObjectiveModel(MfModel):
    """
    Multi-objective surrogate model.
    """
    def __init__(self, list_of_models: list[MfSMT | MfKPLS | SfSMT | MfLGP]):
        self.models: list[MfSMT | MfKPLS | SfSMT | MfLGP] = list_of_models
        self.requires_nested_doe = self.models[0].requires_nested_doe
        self.name = self.models[0].name

    def train(self):
        for model in self.models:
            model.train()

    def evaluate(self, x: np.ndarray) -> list[np.ndarray]:
        return [model.evaluate(x) for model in self.models]

    def evaluate_std(self, x: np.ndarray) -> list[np.ndarray]:
        return [model.evaluate_std(x) for model in self.models]

    def get_DOE(self) -> np.ndarray:
        return self.models[0].get_DOE()

    def set_DOE(
            self,
            x_lf: np.ndarray, x_hf: np.ndarray,
            y_lf: np.ndarray | list[np.ndarray], y_hf: np.ndarray | list[np.ndarray]
    ):
        for model, yy_lf, yy_hf in zip(self.models, y_lf, y_hf):
            model.set_DOE(x_lf, x_hf, yy_lf, yy_hf)

    def get_y_star(self) -> float | np.ndarray:
        """
        Returns the current best lf fitness (SO) or pareto front (MO).
        """
        raise Exception("Not implemented")

get_y_star() -> float | np.ndarray

Returns the current best lf fitness (SO) or pareto front (MO).

Source code in aero_optim/mf_sm/mf_models.py
362
363
364
365
366
def get_y_star(self) -> float | np.ndarray:
    """
    Returns the current best lf fitness (SO) or pareto front (MO).
    """
    raise Exception("Not implemented")

Infill problems

mf_sm.mf_infill.EDProblem

Bases: Problem

Euclidean Distance problem.

Source code in aero_optim/mf_sm/mf_infill.py
 97
 98
 99
100
101
102
103
104
105
106
class EDProblem(Problem):
    """
    Euclidean Distance problem.
    """
    def __init__(self, DOE: np.ndarray, n_var: int, bound: list):
        super().__init__(n_var=n_var, n_obj=1, xl=bound[0], xu=bound[-1])
        self.DOE = DOE

    def _evaluate(self, x: np.ndarray, out: np.ndarray, *args, **kwargs):
        out["F"] = -ED_acquisition_function(x, self.DOE)

mf_sm.mf_infill.AcquisitionFunctionProblem

Bases: Problem

Generic class for Bayesian acquisition function optimization problems.

Source code in aero_optim/mf_sm/mf_infill.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
class AcquisitionFunctionProblem(Problem):
    """
    Generic class for Bayesian acquisition function optimization problems.
    """
    def __init__(
            self,
            function: Callable,
            model: MfLGP | MfKPLS | MfSMT | SfSMT | MultiObjectiveModel,
            n_var: int,
            bound: list,
            min: bool = True
    ):
        Problem.__init__(
            self, n_var=n_var, n_ieq_constr=0, xl=bound[0], xu=bound[1]
        )
        self.model = model
        self.function = function
        self.min = min

    def _evaluate(self, X: np.ndarray, out: np.ndarray, *args, **kwargs):
        out["F"] = self.function(X, self.model) if self.min else -self.function(X, self.model)

mf_sm.mf_infill.RegCritProblem

Bases: Problem

Regularized infill criterion problem: see R. Grapin et al. (2022): https://doi.org/10.2514/6.2022-4053

Source code in aero_optim/mf_sm/mf_infill.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
class RegCritProblem(Problem):
    """
    Regularized infill criterion problem:
    see R. Grapin et al. (2022): https://doi.org/10.2514/6.2022-4053
    """
    def __init__(
            self, function: Callable, model: MultiObjectiveModel,
            n_var: int, bound: list, gamma: float
    ):
        super().__init__(n_var=n_var, n_obj=1, xl=bound[0], xu=bound[-1])
        self.model = model
        self.gamma = gamma
        self.function = function

    def _evaluate(self, x: np.ndarray, out: np.ndarray):
        u1 = self.model.models[0].evaluate(x)
        u2 = self.model.models[1].evaluate(x)
        out["F"] = -(self.gamma * self.function(x, self.model)
                     - np.sum(np.column_stack([u1, u2]), axis=1))

Infill acquisition functions

mf_sm.mf_infill.ED_acquisition_function(x: np.ndarray, DOE: np.ndarray) -> np.ndarray

Euclidean Distance: see X. Zhang et al. (2021): https://doi.org/10.1016/j.cma.2020.113485

Source code in aero_optim/mf_sm/mf_infill.py
22
23
24
25
26
27
28
29
30
def ED_acquisition_function(x: np.ndarray, DOE: np.ndarray) -> np.ndarray:
    """
    Euclidean Distance:
    see X. Zhang et al. (2021): https://doi.org/10.1016/j.cma.2020.113485
    """
    f1 = np.min(cdist(np.atleast_2d(x), DOE, "euclidean"), axis=1)
    f1 = np.expand_dims(f1, axis=1)
    assert f1.shape == (x.shape[0], 1), f"f1 shape {f1.shape} x shape {x.shape}"
    return f1

mf_sm.mf_infill.LCB_acquisition_function(x: np.ndarray, model: MfSMT, alpha: float = 1) -> np.ndarray

Lower Confidence Bound acquisition function.

Source code in aero_optim/mf_sm/mf_infill.py
33
34
35
36
37
def LCB_acquisition_function(x: np.ndarray, model: MfSMT, alpha: float = 1) -> np.ndarray:
    """
    Lower Confidence Bound acquisition function.
    """
    return model.evaluate(x) - alpha * model.evaluate_std(x)

mf_sm.mf_infill.EI_acquisition_function(x: np.ndarray, model: MfSMT) -> np.ndarray

Expected Improvement acquisition function.

Source code in aero_optim/mf_sm/mf_infill.py
40
41
42
43
44
45
46
47
48
49
def EI_acquisition_function(x: np.ndarray, model: MfSMT) -> np.ndarray:
    """
    Expected Improvement acquisition function.
    """
    u = model.get_y_star() - model.evaluate(x)
    std = model.evaluate_std(x)
    f1 = u * norm.cdf(u / (std + EPSILON)) + std * norm.pdf(u / (std + EPSILON))
    zero_std_idx = np.argwhere(std < EPSILON)
    f1[zero_std_idx] = 0.
    return f1

mf_sm.mf_infill.PI_acquisition_function(x: np.ndarray, model: MultiObjectiveModel) -> np.ndarray

Bi-objective Probability of Improvement: see A. J. Keane (2006): https://doi.org/10.2514/1.16875

Source code in aero_optim/mf_sm/mf_infill.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def PI_acquisition_function(x: np.ndarray, model: MultiObjectiveModel) -> np.ndarray:
    """
    Bi-objective Probability of Improvement:
    see A. J. Keane (2006): https://doi.org/10.2514/1.16875
    """
    assert model.models[0].y_hf_DOE is not None
    assert model.models[1].y_hf_DOE is not None
    pareto = compute_pareto(model.models[0].y_hf_DOE, model.models[1].y_hf_DOE)

    u1 = model.models[0].evaluate(x)
    u2 = model.models[1].evaluate(x)
    std1 = model.models[0].evaluate_std(x) + EPSILON
    std2 = model.models[1].evaluate_std(x) + EPSILON

    PIaug = norm.cdf((pareto[0, 0] - u1) / std1)
    for i in range(len(pareto) - 1):
        PIaug += (
            (norm.cdf((pareto[i + 1, 0] - u1) / std1) - norm.cdf((pareto[i, 0] - u1) / std1))
            * norm.cdf((pareto[i, 1] - u2) / std2)
        )
    PIaug += (1 - norm.cdf((pareto[-1, 0] - u1) / std1)) * norm.cdf((pareto[-1, 1] - u2) / std2)
    return PIaug

mf_sm.mf_infill.MPI_acquisition_function(x: np.ndarray, model: MultiObjectiveModel) -> np.ndarray

Bi-objective Minimal Probability of Improvement: see A. A. Rahat (2017): https://dl.acm.org/doi/10.1145/3071178.3071276

Source code in aero_optim/mf_sm/mf_infill.py
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
def MPI_acquisition_function(x: np.ndarray, model: MultiObjectiveModel) -> np.ndarray:
    """
    Bi-objective Minimal Probability of Improvement:
    see A. A. Rahat (2017): https://dl.acm.org/doi/10.1145/3071178.3071276
    """
    assert model.models[0].y_hf_DOE is not None
    assert model.models[1].y_hf_DOE is not None
    pareto = compute_pareto(model.models[0].y_hf_DOE, model.models[1].y_hf_DOE)

    u1 = model.models[0].evaluate(x)
    u2 = model.models[1].evaluate(x)
    std1 = model.models[0].evaluate_std(x) + EPSILON
    std2 = model.models[1].evaluate_std(x) + EPSILON

    MPI = np.min(np.column_stack(
        [1 - norm.cdf((u1 - pp[0]) / std1) * norm.cdf((u2 - pp[1]) / std2)
            for pp in pareto]
    ), axis=1)
    return MPI