import numpy as np import torch from botorch.models import SingleTaskGP from botorch.optim import optimize_acqf from gpytorch.kernels import MaternKernel from botorch.fit import fit_gpytorch_mll from gpytorch.mlls import ExactMarginalLogLikelihood from botorch.acquisition import UpperConfidenceBound, ExpectedImprovement, ProbabilityOfImprovement from PolicyModel.GaussianModel import GaussianPolicy from ToyTask.MountainCarGym import Continuous_MountainCarEnv import matplotlib.pyplot as plt torch.set_default_dtype(torch.float64) class BayesianOptimization: def __init__(self, env, nr_steps, nr_init=3, acq="Expected Improvement", nr_weights=6, policy_seed=None): self.env = env self.nr_init = nr_init self.acq = acq self.X = None self.Y = None self.GP = None self.episode = 0 self.counter_array = np.empty((1, 1)) self.best_reward = np.empty((1, 1)) self.distance_penalty = 0 self.nr_policy_weights = nr_weights self.nr_steps = nr_steps self.policy_seed = policy_seed self.lower_bound = -1.0 self.upper_bound = 1.0 self.bounds = torch.t(torch.tensor([[self.lower_bound, self.upper_bound]]*self.nr_policy_weights)) self.policy_model = GaussianPolicy(self.nr_policy_weights, self.nr_steps, self.policy_seed, self.lower_bound, self.upper_bound) self.eval_X = 512 def reset_bo(self): self.counter_array = np.empty((1, 1)) self.GP = None self.episode = 0 self.best_reward = np.empty((1, 1)) def runner(self, policy): env_reward = 0.0 step_count = 0 for i in range(len(policy)): action = policy[i] output = self.env.step(action) env_reward += output[1] done = output[2] step_count += 1 if done: self.counter_array = np.vstack((self.counter_array, step_count)) break if not done and i == len(policy): distance = -(self.env.goal_position - output[0][0]) env_reward += distance * self.distance_penalty self.counter_array = np.vstack((self.counter_array, step_count)) self.env.reset() return env_reward, step_count def initialize(self): self.env.reset() self.reset_bo() self.X = torch.zeros((self.nr_init, self.nr_policy_weights)) self.Y = torch.zeros((self.nr_init, 1)) for i in range(self.nr_init): self.policy_model.random_policy() self.X[i, :] = torch.tensor(self.policy_model.weights.T) policy = self.policy_model.policy_rollout() reward, step_count = self.runner(policy) self.Y[i] = reward self.GP = SingleTaskGP(train_X=self.X, train_Y=self.Y, covar_module=MaternKernel(nu=1.5)) mll = ExactMarginalLogLikelihood(self.GP.likelihood, self.GP) fit_gpytorch_mll(mll) def next_observation(self): if self.acq == "Expected Improvement": ei = ExpectedImprovement(self.GP, best_f=self.Y.max()) x_next, _ = optimize_acqf(ei, bounds=self.bounds, num_restarts=5, raw_samples=self.eval_X, q=1) elif self.acq == "Probability of Improvement": poi = ProbabilityOfImprovement(self.GP, best_f=self.Y.max()) x_next, _ = optimize_acqf(poi, bounds=self.bounds, num_restarts=5, raw_samples=self.eval_X, q=1) elif self.acq == "Upper Confidence Bound": ucb = UpperConfidenceBound(self.GP, beta=2.576) x_next, _ = optimize_acqf(ucb, bounds=self.bounds, num_restarts=5, raw_samples=self.eval_X, q=1) else: raise NotImplementedError return x_next def eval_new_observation(self, x_next): self.policy_model.weights = x_next.detach().numpy() policy = self.policy_model.policy_rollout() reward, step_count = self.runner(policy) self.X = torch.vstack((self.X, x_next.reshape(1, -1))) self.Y = torch.vstack((self.Y, torch.tensor(reward).reshape(1, -1))) self.GP = SingleTaskGP(train_X=self.X, train_Y=self.Y, covar_module=MaternKernel(nu=1.5)) mll = ExactMarginalLogLikelihood(self.GP.likelihood, self.GP) fit_gpytorch_mll(mll) if self.episode == 0: self.best_reward[0] = torch.max(self.Y, 1).detach().numpy() else: self.best_reward = np.vstack((self.best_reward, torch.max(self.Y, 1).detach().numpy())) self.episode += 1 return step_count def add_new_observation(self, reward, x_new): self.X = torch.vstack((self.X, torch.tensor(x_new))) self.Y = torch.vstack((self.Y, torch.tensor(reward))) if self.episode == 0: self.best_reward[0] = torch.max(self.Y, 1).detach().numpy() else: self.best_reward = np.vstack((self.best_reward, torch.max(self.Y, 1).detach().numpy())) self.episode += 1 def get_best_result(self): y_hat = self.GP.posterior(self.X) idx = torch.argmax(y_hat) x_max = self.X[idx, :].detach().numpy() self.policy_model.weights = x_max best_policy = self.policy_model.policy_rollout().reshape(-1, ) return best_policy, y_hat[idx].detach().numpy(), x_max def main(): nr_steps = 100 env = Continuous_MountainCarEnv() # render_mode='human' bo = BayesianOptimization(env, nr_steps, acq="Expected Improvement") bo.initialize() iteration_steps = 200 for i in range(iteration_steps): x_next = bo.next_observation() step_count = bo.eval_new_observation(x_next) print(bo.episode, bo.best_reward[-1][0], step_count) if __name__ == "__main__": main()