import numpy as np from sklearn.gaussian_process import GaussianProcessRegressor from sklearn.gaussian_process.kernels import Matern from PolicyModel.GaussianModel import GaussianPolicy from AcquistionFunctions.ExpectedImprovement import ExpectedImprovement from ToyTask.MountainCarGym import Continuous_MountainCarEnv import gym import time import matplotlib.pyplot as plt class BayesianOptimization: def __init__(self, env, nr_step, nr_init=3, acq='ei', nr_weights=6, policy_seed=None): self.env = env self.nr_init = nr_init self.acq = acq self.X = None self.Y = None self.gp = None self.episode = 0 self.best_reward = np.empty((1, 1)) self.distance_penalty = 100 self.nr_policy_weights = nr_weights self.nr_steps = nr_step self.policy_seed = policy_seed self.lowerb = -1.0 self.upperb = 1.0 self.policy_model = GaussianPolicy(self.nr_policy_weights, self.nr_steps, self.policy_seed, self.lowerb, self.upperb) self.nr_test = 100 def initialize(self): self.env.reset() self.env.render() self.X = np.zeros((self.nr_init, self.nr_policy_weights)) self.Y = np.zeros((self.nr_init, 1)) for i in range(self.nr_init): self.policy_model.random_policy() self.X[i, :] = self.policy_model.weights.T policy = self.policy_model.policy_rollout() reward = self.runner(policy) self.Y[i] = reward self.gp = GaussianProcessRegressor(Matern(nu=1.5)) self.gp.fit(self.X, self.Y) def runner(self, policy): done = False step_count = 0 env_reward = 0.0 while not done: action = policy[step_count] output = self.env.step(action) env_reward += output[1] done = output[2] time.sleep(0.0001) step_count += 1 if step_count >= self.nr_steps: done = True distance = -(self.env.goal_position - output[0][0]) env_reward += distance * self.distance_penalty time.sleep(0.25) self.env.reset() return env_reward def next_observation(self): if self.acq == 'ei': x_next = ExpectedImprovement(self.gp, self.X, self.nr_test, self.nr_policy_weights, seed=self.policy_seed, lower=self.lowerb, upper=self.upperb) return x_next else: raise NotImplementedError def eval_new_observation(self, x_next): self.policy_model.weights = x_next policy = self.policy_model.policy_rollout() reward = self.runner(policy) self.X = np.vstack((self.X, x_next)) self.Y = np.vstack((self.Y, reward)) self.gp.fit(self.X, self.Y) if self.episode == 0: self.best_reward[0] = max(self.Y) else: self.best_reward = np.vstack((self.best_reward, max(self.Y))) self.episode += 1 self.policy_model.plot_policy() def plot_reward(self): epsiodes = np.linspace(0, self.episode, self.episode) plt.plot(epsiodes, self.best_reward) plt.show() def main(): nr_steps = 100 env = Continuous_MountainCarEnv(render_mode='human') bo = BayesianOptimization(env, nr_steps) bo.initialize() iteration_steps = 200 for i in range(iteration_steps): x_next = bo.next_observation() bo.eval_new_observation(x_next) print(bo.episode, bo.best_reward[-1][0]) bo.plot_reward() bo.policy_model.plot_policy() if __name__ == "__main__": main()