import numpy as np import torch from botorch.models import SingleTaskGP from botorch.optim import optimize_acqf from gpytorch.kernels import MaternKernel, RBFKernel from botorch.fit import fit_gpytorch_mll from gpytorch.mlls import ExactMarginalLogLikelihood from botorch.acquisition import UpperConfidenceBound, ExpectedImprovement, ProbabilityOfImprovement import warnings from botorch.exceptions.warnings import InputDataWarning, BadInitialCandidatesWarning from PolicyModel.GaussianModel import GaussianPolicy from ToyTask.MountainCarGym import Continuous_MountainCarEnv import matplotlib.pyplot as plt torch.set_default_dtype(torch.float64) warnings.filterwarnings("ignore", category=InputDataWarning) warnings.filterwarnings("ignore", category=BadInitialCandidatesWarning) class BayesianOptimization: def __init__(self, env, nr_steps, nr_init=5, acq="Expected Improvement", nr_weights=6, policy_seed=None): self.env = env self.nr_init = nr_init self.acq = acq self.X = None self.X_np = None self.Y_np = None self.GP = None self.episode = 0 self.counter_array = np.empty((1, 1)) self.best_reward = np.empty((1, 1)) self.distance_penalty = 0 self.nr_policy_weights = nr_weights self.nr_steps = nr_steps self.policy_seed = policy_seed self.lower_bound = 0 self.upper_bound = 1.0 self.bounds = torch.t(torch.tensor([[self.lower_bound, self.upper_bound]]*self.nr_policy_weights)) self.policy_model = GaussianPolicy(self.nr_policy_weights, self.nr_steps, self.policy_seed, self.lower_bound, self.upper_bound) self.eval_X = 200 self.eval_restarts = 10 def reset_bo(self): self.counter_array = np.empty((1, 1)) self.GP = None self.episode = 0 self.best_reward = np.empty((1, 1)) def runner(self, policy): env_reward = 0.0 step_count = 0 for i in range(len(policy)): action = policy[i] output = self.env.step(action) env_reward += output[1] done = output[2] step_count += 1 if done: self.counter_array = np.vstack((self.counter_array, step_count)) break if not done and i == len(policy): distance = -(self.env.goal_position - output[0][0]) env_reward += distance * self.distance_penalty self.counter_array = np.vstack((self.counter_array, step_count)) self.env.reset() return env_reward, step_count def initialize(self): self.env.reset() self.reset_bo() self.X = torch.zeros((self.nr_init, self.nr_policy_weights)) self.X_np = np.zeros((self.nr_init, self.nr_policy_weights)) self.Y_np = np.zeros((self.nr_init, 1)) for i in range(self.nr_init): self.policy_model.random_policy() self.X_np[i, :] = self.policy_model.weights.T.clip(min=-1.0, max=1.0) self.X[i, :] = torch.tensor((self.policy_model.weights.T.clip(min=-1.0, max=1.0) + 1)/2) policy = self.policy_model.policy_rollout() reward, step_count = self.runner(policy) self.Y_np[i] = reward Y = torch.tensor(self.Y_np) self.GP = SingleTaskGP(train_X=self.X, train_Y=Y, covar_module=RBFKernel()) mll = ExactMarginalLogLikelihood(self.GP.likelihood, self.GP) fit_gpytorch_mll(mll) def next_observation(self): if self.acq == "Expected Improvement": ei = ExpectedImprovement(self.GP, best_f=self.best_reward[-1][0], maximize=True) x_next, _ = optimize_acqf(ei, bounds=self.bounds, num_restarts=self.eval_restarts, raw_samples=self.eval_X, q=1) elif self.acq == "Probability of Improvement": poi = ProbabilityOfImprovement(self.GP, best_f=self.best_reward[-1][0], maximize=True) x_next, _ = optimize_acqf(poi, bounds=self.bounds, num_restarts=self.eval_restarts, raw_samples=self.eval_X, q=1) elif self.acq == "Upper Confidence Bound": ucb = UpperConfidenceBound(self.GP, beta=2.576, maximize=True) x_next, _ = optimize_acqf(ucb, bounds=self.bounds, num_restarts=self.eval_restarts, raw_samples=self.eval_X, q=1) else: raise NotImplementedError return torch.t(x_next) def eval_new_observation(self, x_next): new_weight = x_next.detach().numpy() * 2 - 1 self.policy_model.weights = new_weight policy = self.policy_model.policy_rollout() reward, step_count = self.runner(policy) x_clipped = x_next.clip(min=-1.0, max=1.0) self.X_np = np.vstack((self.X_np, new_weight.reshape(1, -1))) self.X = torch.vstack((self.X, x_next.reshape(1, -1))) self.Y_np = np.vstack((self.Y_np, reward)) Y = torch.tensor(self.Y_np) self.GP = SingleTaskGP(train_X=self.X, train_Y=Y, covar_module=RBFKernel()) mll = ExactMarginalLogLikelihood(self.GP.likelihood, self.GP) fit_gpytorch_mll(mll) if self.episode == 0: self.best_reward[0] = max(self.Y_np) else: self.best_reward = np.vstack((self.best_reward, max(self.Y_np))) self.episode += 1 return step_count def add_new_observation(self, reward, x_new): self.X = torch.vstack((self.X, torch.tensor(x_new))) self.Y_np = np.vstack((self.Y_np, reward)) if self.episode == 0: self.best_reward[0] = max(self.Y_np) else: self.best_reward = np.vstack((self.best_reward, max(self.Y_np))) self.episode += 1 def get_best_result(self): Y = torch.tensor(self.Y_np) self.GP = SingleTaskGP(train_X=self.X, train_Y=Y, covar_module=RBFKernel()) mll = ExactMarginalLogLikelihood(self.GP.likelihood, self.GP) fit_gpytorch_mll(mll) y_hat = self.GP.posterior(self.X) idx = torch.argmax(y_hat.mean) x_max = self.X[idx, :].detach().numpy() print(idx, np.argmax(self.Y_np)) self.policy_model.weights = x_max best_policy = self.policy_model.policy_rollout().reshape(-1, ) return best_policy, y_hat.mean[idx].detach().numpy(), x_max def main(): nr_steps = 100 env = Continuous_MountainCarEnv() # render_mode='human' bo = BayesianOptimization(env, nr_steps, nr_weights=10, acq="Expected Improvement") bo.initialize() iteration_steps = 500 for i in range(iteration_steps): x_next = bo.next_observation() step_count = bo.eval_new_observation(x_next) print(bo.episode, bo.best_reward[-1][0], bo.Y_np[-1][0], step_count) _, a, _ =bo.get_best_result() print(a) if __name__ == "__main__": main()