208 lines
7.0 KiB
Python
208 lines
7.0 KiB
Python
import numpy as np
|
|
import torch
|
|
from botorch.models import SingleTaskGP
|
|
from botorch.optim import optimize_acqf
|
|
from gpytorch.kernels import MaternKernel
|
|
from botorch.fit import fit_gpytorch_mll
|
|
from gpytorch.mlls import ExactMarginalLogLikelihood
|
|
|
|
from botorch.acquisition import UpperConfidenceBound, ExpectedImprovement, ProbabilityOfImprovement
|
|
|
|
import warnings
|
|
from botorch.exceptions.warnings import InputDataWarning, BadInitialCandidatesWarning
|
|
|
|
from PolicyModel.GaussianModel import GaussianPolicy
|
|
from ToyTask.MountainCarGym import Continuous_MountainCarEnv
|
|
|
|
import matplotlib.pyplot as plt
|
|
|
|
torch.set_default_dtype(torch.float64)
|
|
warnings.filterwarnings("ignore", category=InputDataWarning)
|
|
warnings.filterwarnings("ignore", category=BadInitialCandidatesWarning)
|
|
|
|
class BayesianOptimization:
|
|
def __init__(self, env, nr_steps, nr_init=5, acq="Expected Improvement", nr_weights=6, policy_seed=None):
|
|
self.env = env
|
|
self.nr_init = nr_init
|
|
self.acq = acq
|
|
|
|
self.X = None
|
|
self.X_np = None
|
|
self.Y_np = None
|
|
self.GP = None
|
|
|
|
self.episode = 0
|
|
self.counter_array = np.empty((1, 1))
|
|
self.best_reward = np.empty((1, 1))
|
|
self.distance_penalty = 0
|
|
|
|
self.nr_policy_weights = nr_weights
|
|
self.nr_steps = nr_steps
|
|
self.policy_seed = policy_seed
|
|
|
|
self.lower_bound = 0
|
|
self.upper_bound = 1.0
|
|
|
|
self.bounds = torch.t(torch.tensor([[self.lower_bound, self.upper_bound]]*self.nr_policy_weights))
|
|
|
|
self.policy_model = GaussianPolicy(self.nr_policy_weights,
|
|
self.nr_steps,
|
|
self.policy_seed,
|
|
self.lower_bound,
|
|
self.upper_bound)
|
|
|
|
self.eval_X = 200
|
|
self.eval_restarts = 5
|
|
|
|
def reset_bo(self):
|
|
self.counter_array = np.empty((1, 1))
|
|
self.GP = None
|
|
self.episode = 0
|
|
self.best_reward = np.empty((1, 1))
|
|
|
|
def runner(self, policy):
|
|
env_reward = 0.0
|
|
step_count = 0
|
|
|
|
for i in range(len(policy)):
|
|
action = policy[i]
|
|
output = self.env.step(action)
|
|
|
|
env_reward += output[1]
|
|
done = output[2]
|
|
step_count += 1
|
|
|
|
if done:
|
|
self.counter_array = np.vstack((self.counter_array, step_count))
|
|
break
|
|
|
|
if not done and i == len(policy):
|
|
distance = -(self.env.goal_position - output[0][0])
|
|
env_reward += distance * self.distance_penalty
|
|
self.counter_array = np.vstack((self.counter_array, step_count))
|
|
|
|
self.env.reset()
|
|
return env_reward, step_count
|
|
|
|
def initialize(self):
|
|
self.env.reset()
|
|
self.reset_bo()
|
|
|
|
self.X = torch.zeros((self.nr_init, self.nr_policy_weights))
|
|
self.X_np = np.zeros((self.nr_init, self.nr_policy_weights))
|
|
self.Y_np = np.zeros((self.nr_init, 1))
|
|
|
|
for i in range(self.nr_init):
|
|
self.policy_model.random_policy()
|
|
self.X_np[i, :] = self.policy_model.weights.T.clip(min=-1.0, max=1.0)
|
|
self.X[i, :] = torch.tensor((self.policy_model.weights.T.clip(min=-1.0, max=1.0) + 1)/2)
|
|
policy = self.policy_model.policy_rollout()
|
|
|
|
reward, step_count = self.runner(policy)
|
|
|
|
self.Y_np[i] = reward
|
|
|
|
Y = torch.tensor(self.Y_np)
|
|
|
|
self.GP = SingleTaskGP(train_X=self.X, train_Y=Y, covar_module=MaternKernel(nu=1.5))
|
|
mll = ExactMarginalLogLikelihood(self.GP.likelihood, self.GP)
|
|
fit_gpytorch_mll(mll)
|
|
|
|
def next_observation(self):
|
|
if self.acq == "Expected Improvement":
|
|
ei = ExpectedImprovement(self.GP, best_f=self.best_reward[-1][0], maximize=True)
|
|
x_next, _ = optimize_acqf(ei,
|
|
bounds=self.bounds,
|
|
num_restarts=self.eval_restarts,
|
|
raw_samples=self.eval_X,
|
|
q=1)
|
|
|
|
elif self.acq == "Probability of Improvement":
|
|
poi = ProbabilityOfImprovement(self.GP, best_f=self.best_reward[-1][0], maximize=True)
|
|
x_next, _ = optimize_acqf(poi,
|
|
bounds=self.bounds,
|
|
num_restarts=self.eval_restarts,
|
|
raw_samples=self.eval_X,
|
|
q=1)
|
|
|
|
elif self.acq == "Upper Confidence Bound":
|
|
ucb = UpperConfidenceBound(self.GP, beta=2.576, maximize=True)
|
|
x_next, _ = optimize_acqf(ucb,
|
|
bounds=self.bounds,
|
|
num_restarts=self.eval_restarts,
|
|
raw_samples=self.eval_X,
|
|
q=1)
|
|
|
|
else:
|
|
raise NotImplementedError
|
|
|
|
return torch.t(x_next)
|
|
|
|
def eval_new_observation(self, x_next):
|
|
new_weight = x_next.detach().numpy() * 2 - 1
|
|
self.policy_model.weights = new_weight
|
|
policy = self.policy_model.policy_rollout()
|
|
|
|
reward, step_count = self.runner(policy)
|
|
|
|
x_clipped = x_next.clip(min=-1.0, max=1.0)
|
|
|
|
self.X_np = np.vstack((self.X_np, new_weight.reshape(1, -1)))
|
|
self.X = torch.vstack((self.X, x_next.reshape(1, -1)))
|
|
self.Y_np = np.vstack((self.Y_np, reward))
|
|
|
|
Y = torch.tensor(self.Y_np)
|
|
|
|
self.GP = SingleTaskGP(train_X=self.X, train_Y=Y, covar_module=MaternKernel(nu=1.5))
|
|
mll = ExactMarginalLogLikelihood(self.GP.likelihood, self.GP)
|
|
fit_gpytorch_mll(mll)
|
|
|
|
if self.episode == 0:
|
|
self.best_reward[0] = max(self.Y_np)
|
|
else:
|
|
self.best_reward = np.vstack((self.best_reward, max(self.Y_np)))
|
|
|
|
self.episode += 1
|
|
return step_count
|
|
|
|
def add_new_observation(self, reward, x_new):
|
|
self.X = torch.vstack((self.X, torch.tensor(x_new)))
|
|
self.Y_np = np.vstack((self.Y_np, reward))
|
|
|
|
if self.episode == 0:
|
|
self.best_reward[0] = max(self.Y_np)
|
|
else:
|
|
self.best_reward = np.vstack((self.best_reward, max(self.Y_np)))
|
|
|
|
self.episode += 1
|
|
|
|
def get_best_result(self):
|
|
y_hat = self.GP.posterior(self.X)
|
|
idx = torch.argmax(y_hat)
|
|
x_max = self.X[idx, :].detach().numpy()
|
|
|
|
self.policy_model.weights = x_max
|
|
best_policy = self.policy_model.policy_rollout().reshape(-1, )
|
|
|
|
return best_policy, y_hat[idx].detach().numpy(), x_max
|
|
|
|
|
|
def main():
|
|
nr_steps = 100
|
|
env = Continuous_MountainCarEnv() # render_mode='human'
|
|
bo = BayesianOptimization(env, nr_steps, nr_weights=5, acq="Expected Improvement")
|
|
bo.initialize()
|
|
iteration_steps = 200
|
|
for i in range(iteration_steps):
|
|
x_next = bo.next_observation()
|
|
step_count = bo.eval_new_observation(x_next)
|
|
|
|
print(bo.episode, bo.best_reward[-1][0], step_count)
|
|
|
|
print(bo.Y_np)
|
|
print(bo.X_np)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|