ActiveBOToytask/BayesianOptimization/BOwithDM.py
2023-08-17 18:26:23 +02:00

116 lines
4.6 KiB
Python

import numpy as np
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import Matern
from PolicyModel.GaussianModelMultiDim import GaussianPolicy
from AcquistionFunctions.ExpectedImprovement import ExpectedImprovement
from AcquistionFunctions.ProbabilityOfImprovement import ProbabilityOfImprovement
from AcquistionFunctions.ConfidenceBound import ConfidenceBound
from AcquistionFunctions.PreferenceExpectedImprovement import PreferenceExpectedImprovement
class BayesianOptimization:
def __init__(self, nr_steps, nr_dims, nr_weights, acq='ei', seed=None):
self.acq = acq
self.episode = 0
self.nr_steps = nr_steps
self.nr_dims = nr_dims
self.nr_weights = nr_weights
self.weights = self.nr_weights * self.nr_dims
self.lower_bound = -1.0
self.upper_bound = 1.0
self.seed = seed
self.X = None
self.Y = None
self.gp = None
self.policy_model = GaussianPolicy(self.nr_steps, self.nr_weights, self.nr_dims,
lowerb=self.lower_bound, upperb=self.upper_bound, seed=seed)
self.acq_sample_size = 100
self.best_reward = np.empty((1, 1))
if acq == "Preference Expected Improvement":
self.acq_fun = PreferenceExpectedImprovement(self.weights,
self.acq_sample_size,
self.lower_bound,
self.upper_bound,
initial_variance=10.0,
update_variance=0.05,
seed=seed)
self.reset_bo()
def reset_bo(self):
self.gp = GaussianProcessRegressor(Matern(nu=1.5, ), n_restarts_optimizer=5) #length_scale=(1e-8, 1e5)
self.best_reward = np.empty((1, 1))
self.X = np.zeros((1, self.weights), dtype=np.float64)
self.Y = np.zeros((1, 1), dtype=np.float64)
self.episode = 0
def next_observation(self):
if self.acq == "Expected Improvement":
x_next = ExpectedImprovement(self.gp,
self.X,
self.acq_sample_size,
self.weights,
kappa=0,
seed=self.seed,
lower=self.lower_bound,
upper=self.upper_bound)
elif self.acq == "Probability of Improvement":
x_next = ProbabilityOfImprovement(self.gp,
self.X,
self.acq_sample_size,
self.weights,
kappa=0,
seed=self.seed,
lower=self.lower_bound,
upper=self.upper_bound)
elif self.acq == "Upper Confidence Bound":
x_next = ConfidenceBound(self.gp,
self.acq_sample_size,
self.weights,
beta=2.576,
seed=self.seed,
lower=self.lower_bound,
upper=self.upper_bound)
elif self.acq == "Preference Expected Improvement":
x_next = self.acq_fun.expected_improvement(self.gp,
self.X,
kappa=0)
else:
raise NotImplementedError
return x_next
def add_observation(self, reward, x):
if self.episode == 0:
self.X[0, :] = x
self.Y[0] = reward
self.best_reward[0] = np.max(self.Y)
else:
self.X = np.vstack((self.X, np.around(x, decimals=8)), dtype=np.float64)
self.Y = np.vstack((self.Y, reward), dtype=np.float64)
self.best_reward = np.vstack((self.best_reward, np.max(self.Y)), dtype=np.float64)
self.gp.fit(self.X, self.Y)
self.episode += 1
def get_best_result(self):
y_max = np.max(self.Y)
idx = np.argmax(self.Y)
x_max = self.X[idx, :]
return y_max, x_max, idx