116 lines
4.6 KiB
Python
116 lines
4.6 KiB
Python
import numpy as np
|
|
from sklearn.gaussian_process import GaussianProcessRegressor
|
|
from sklearn.gaussian_process.kernels import Matern
|
|
|
|
from PolicyModel.GaussianModelMultiDim import GaussianPolicy
|
|
|
|
from AcquistionFunctions.ExpectedImprovement import ExpectedImprovement
|
|
from AcquistionFunctions.ProbabilityOfImprovement import ProbabilityOfImprovement
|
|
from AcquistionFunctions.ConfidenceBound import ConfidenceBound
|
|
from AcquistionFunctions.PreferenceExpectedImprovement import PreferenceExpectedImprovement
|
|
|
|
|
|
class BayesianOptimization:
|
|
def __init__(self, nr_steps, nr_dims, nr_weights, acq='ei', seed=None):
|
|
self.acq = acq
|
|
self.episode = 0
|
|
|
|
self.nr_steps = nr_steps
|
|
self.nr_dims = nr_dims
|
|
self.nr_weights = nr_weights
|
|
self.weights = self.nr_weights * self.nr_dims
|
|
|
|
self.lower_bound = -1.0
|
|
self.upper_bound = 1.0
|
|
self.seed = seed
|
|
|
|
self.X = None
|
|
self.Y = None
|
|
self.gp = None
|
|
|
|
self.policy_model = GaussianPolicy(self.nr_steps, self.nr_weights, self.nr_dims,
|
|
lowerb=self.lower_bound, upperb=self.upper_bound, seed=seed)
|
|
|
|
self.acq_sample_size = 100
|
|
|
|
self.best_reward = np.empty((1, 1))
|
|
|
|
if acq == "Preference Expected Improvement":
|
|
self.acq_fun = PreferenceExpectedImprovement(self.weights,
|
|
self.acq_sample_size,
|
|
self.lower_bound,
|
|
self.upper_bound,
|
|
initial_variance=10.0,
|
|
update_variance=0.05,
|
|
seed=seed)
|
|
|
|
self.reset_bo()
|
|
|
|
def reset_bo(self):
|
|
self.gp = GaussianProcessRegressor(Matern(nu=1.5, ), n_restarts_optimizer=5) #length_scale=(1e-8, 1e5)
|
|
self.best_reward = np.empty((1, 1))
|
|
self.X = np.zeros((1, self.weights), dtype=np.float64)
|
|
self.Y = np.zeros((1, 1), dtype=np.float64)
|
|
self.episode = 0
|
|
|
|
def next_observation(self):
|
|
if self.acq == "Expected Improvement":
|
|
x_next = ExpectedImprovement(self.gp,
|
|
self.X,
|
|
self.acq_sample_size,
|
|
self.weights,
|
|
kappa=0,
|
|
seed=self.seed,
|
|
lower=self.lower_bound,
|
|
upper=self.upper_bound)
|
|
|
|
elif self.acq == "Probability of Improvement":
|
|
x_next = ProbabilityOfImprovement(self.gp,
|
|
self.X,
|
|
self.acq_sample_size,
|
|
self.weights,
|
|
kappa=0,
|
|
seed=self.seed,
|
|
lower=self.lower_bound,
|
|
upper=self.upper_bound)
|
|
|
|
elif self.acq == "Upper Confidence Bound":
|
|
x_next = ConfidenceBound(self.gp,
|
|
self.acq_sample_size,
|
|
self.weights,
|
|
beta=2.576,
|
|
seed=self.seed,
|
|
lower=self.lower_bound,
|
|
upper=self.upper_bound)
|
|
|
|
elif self.acq == "Preference Expected Improvement":
|
|
x_next = self.acq_fun.expected_improvement(self.gp,
|
|
self.X,
|
|
kappa=0)
|
|
|
|
else:
|
|
raise NotImplementedError
|
|
|
|
return x_next
|
|
|
|
def add_observation(self, reward, x):
|
|
if self.episode == 0:
|
|
self.X[0, :] = x
|
|
self.Y[0] = reward
|
|
self.best_reward[0] = np.max(self.Y)
|
|
else:
|
|
self.X = np.vstack((self.X, np.around(x, decimals=8)), dtype=np.float64)
|
|
self.Y = np.vstack((self.Y, reward), dtype=np.float64)
|
|
self.best_reward = np.vstack((self.best_reward, np.max(self.Y)), dtype=np.float64)
|
|
|
|
self.gp.fit(self.X, self.Y)
|
|
self.episode += 1
|
|
|
|
def get_best_result(self):
|
|
y_max = np.max(self.Y)
|
|
idx = np.argmax(self.Y)
|
|
x_max = self.X[idx, :]
|
|
|
|
return y_max, x_max, idx
|
|
|