import numpy as np from sklearn.gaussian_process import GaussianProcessRegressor from sklearn.gaussian_process.kernels import Matern from PolicyModel.GaussianModelMultiDim import GaussianPolicy from AcquistionFunctions.ExpectedImprovement import ExpectedImprovement from AcquistionFunctions.ProbabilityOfImprovement import ProbabilityOfImprovement from AcquistionFunctions.ConfidenceBound import ConfidenceBound from AcquistionFunctions.PreferenceExpectedImprovement import PreferenceExpectedImprovement class BayesianOptimization: def __init__(self, nr_steps, nr_dims, nr_weights, acq='ei', seed=None): self.acq = acq self.episode = 0 self.nr_steps = nr_steps self.nr_dims = nr_dims self.nr_weights = nr_weights self.weights = self.nr_weights * self.nr_dims self.lower_bound = -1.0 self.upper_bound = 1.0 self.seed = seed self.X = None self.Y = None self.gp = None self.policy_model = GaussianPolicy(self.nr_steps, self.nr_weights, self.nr_dims, lowerb=self.lower_bound, upperb=self.upper_bound, seed=seed) self.acq_sample_size = 100 self.best_reward = np.empty((1, 1)) if acq == "Preference Expected Improvement": self.acq_fun = PreferenceExpectedImprovement(self.weights, self.acq_sample_size, self.lower_bound, self.upper_bound, initial_variance=10.0, update_variance=0.05, seed=seed) self.reset_bo() def reset_bo(self): self.gp = GaussianProcessRegressor(Matern(nu=1.5, ), n_restarts_optimizer=5) #length_scale=(1e-8, 1e5) self.best_reward = np.empty((1, 1)) self.X = np.zeros((1, self.weights), dtype=np.float64) self.Y = np.zeros((1, 1), dtype=np.float64) self.episode = 0 def next_observation(self): if self.acq == "Expected Improvement": x_next = ExpectedImprovement(self.gp, self.X, self.acq_sample_size, self.weights, kappa=0, seed=self.seed, lower=self.lower_bound, upper=self.upper_bound) elif self.acq == "Probability of Improvement": x_next = ProbabilityOfImprovement(self.gp, self.X, self.acq_sample_size, self.weights, kappa=0, seed=self.seed, lower=self.lower_bound, upper=self.upper_bound) elif self.acq == "Upper Confidence Bound": x_next = ConfidenceBound(self.gp, self.acq_sample_size, self.weights, beta=2.576, seed=self.seed, lower=self.lower_bound, upper=self.upper_bound) elif self.acq == "Preference Expected Improvement": x_next = self.acq_fun.expected_improvement(self.gp, self.X, kappa=0) else: raise NotImplementedError return x_next def add_observation(self, reward, x): if self.episode == 0: self.X[0, :] = x self.Y[0] = reward self.best_reward[0] = np.max(self.Y) else: self.X = np.vstack((self.X, np.around(x, decimals=8)), dtype=np.float64) self.Y = np.vstack((self.Y, reward), dtype=np.float64) self.best_reward = np.vstack((self.best_reward, np.max(self.Y)), dtype=np.float64) self.gp.fit(self.X, self.Y) self.episode += 1 def get_best_result(self): y_max = np.max(self.Y) idx = np.argmax(self.Y) x_max = self.X[idx, :] return y_max, x_max, idx