Deepmind suite tested

This commit is contained in:
Niko Feith 2023-08-17 18:26:23 +02:00
parent 886514f9e6
commit e574a5ca2c
7 changed files with 327 additions and 166 deletions

View File

@ -1,13 +1,11 @@
import numpy as np
from scipy.stats import norm
def ConfidenceBound(gp, X, nr_test, nr_weights, lam=1.2, seed=None, lower=-1.0, upper=1.0):
y_hat = gp.predict(X)
best_y = max(y_hat)
def ConfidenceBound(gp, nr_test, nr_weights, beta=1.2, seed=None, lower=-1.0, upper=1.0):
rng = np.random.default_rng(seed=seed)
X_test = rng.uniform(lower, upper, (nr_test, nr_weights))
mu, sigma = gp.predict(X_test, return_std=True)
cb = mu + lam * sigma
cb = mu + beta * sigma
idx = np.argmax(cb)
X_next = X_test[idx, :]

View File

@ -1,26 +1,25 @@
import numpy as np
from scipy.stats import norm, multivariate_normal
import matplotlib.pyplot as plt
class PreferenceExpectedImprovement:
def __init__(self, nr_samples, nr_dims, nr_likelihood_samples, lower_bound, upper_bound, init_var, seed=None):
def __init__(self, nr_dims, nr_samples, lower_bound, upper_bound, initial_variance, update_variance, seed=None):
self.nr_samples = int(nr_samples)
self.nr_dims = nr_dims
self.nr_likelihood_samples = int(nr_likelihood_samples)
self.nr_samples = int(nr_samples)
# check if upper_bound and lower_bound are numpy arrays of shape (nr_dims, 1) or (nr_dims,) or if they are float
self.upper_bound = upper_bound
self.lower_bound = lower_bound
self.upper_bound = upper_bound
self.initial_variance = init_var
self.proposal_mean = np.zeros((nr_dims, 1))
self.proposal_cov = np.diag(np.ones((nr_dims,)) * self.initial_variance)
self.init_var = initial_variance
self.update_var = update_variance
self.rng = np.random.default_rng(seed=seed)
# initial proposal distribution
self.proposal_mean = np.zeros((nr_dims, 1))
self.proposal_cov = np.diag(np.ones((nr_dims,)) * self.init_var)
def rejection_sampling(self):
samples = np.empty((0, self.nr_dims))
while samples.shape[0] < self.nr_samples:
@ -34,18 +33,10 @@ class PreferenceExpectedImprovement:
check = True
samples = np.append(samples, sample, axis=0)
# sample = self.rng.multivariate_normal(
# mean,
# self.proposal_cov
# )
#
# # check if the sample is within the bounds
# if np.all(sample >= self.lower_bound) and np.all(sample <= self.upper_bound):
# samples = np.append(samples, [sample], axis=0)
return samples
def expected_improvement(self, gp, X, kappa=0.01):
def expected_improvement(self, gp , X, kappa=0.01):
X_sample = self.rejection_sampling()
mu_sample, sigma_sample = gp.predict(X_sample, return_std=True)
@ -66,88 +57,21 @@ class PreferenceExpectedImprovement:
return x_next
def likelihood(self, preference):
covariance_diag = np.ones((self.nr_dims,)) * self.initial_variance
covariance_diag[preference] = 0.05
covariance = np.diag(covariance_diag)
return covariance
def update_proposal_model(self, preference_mean, preference_bool):
cov_diag = np.ones((self.nr_dims,)) * self.init_var
cov_diag[preference_bool] = self.update_var
covariance_diag = np.ones((self.nr_dims,)) * self.initial_variance
covariance_diag[preference_bool] = 0.05
preference_cov = np.diag(covariance_diag)
preference_cov = np.diag(cov_diag)
preference_mean = preference_mean.reshape(-1, 1)
posterior_mean = np.linalg.inv(np.linalg.inv(self.proposal_cov) + np.linalg.inv(preference_cov)).dot(np.linalg.inv(self.proposal_cov).dot(self.proposal_mean) + np.linalg.inv(preference_cov).dot(preference_mean))
posterior_cov = np.linalg.inv(np.linalg.inv(self.proposal_cov) + np.linalg.inv(preference_cov))
posterior_mean = np.linalg.inv(np.linalg.inv(self.proposal_cov) + np.linalg.inv(preference_cov))\
.dot(np.linalg.inv(self.proposal_cov).dot(self.proposal_mean)
+ np.linalg.inv(preference_cov).dot(preference_mean))
print(posterior_mean, posterior_cov)
posterior_cov = np.linalg.inv(np.linalg.inv(self.proposal_cov) + np.linalg.inv(preference_cov))
self.proposal_mean = posterior_mean
self.proposal_cov = posterior_cov
def plot_2D(self, mean, cov):
print(mean.shape, cov.shape)
if mean.shape == (2, 1):
mean = mean.squeeze()
gaussian = multivariate_normal(mean, cov)
x = np.random.uniform(self.lower_bound, self.upper_bound, (self.nr_likelihood_samples, self.nr_dims))
pdf = gaussian.pdf(x)
pdf = pdf / pdf.sum()
plt.scatter(x[:, 0], x[:, 1], c=pdf, cmap='viridis')
# Add a color bar
plt.colorbar(label='PDF value')
# Add labels
plt.xlabel('x1')
plt.ylabel('x2')
# Show the plot
plt.show()
if __name__ == '__main__':
# acquisition = PreferenceExpectedImprovement(10, 10, 10e4, -1.0, 1.0, 5.0)
# sample_res = acquisition.rejection_sampling()
# print(f"finished: {sample_res}")
acquisition = PreferenceExpectedImprovement(10, 2, 10e4, -1.0, 1.0, 10.0)
mean_ = np.array([0.5, 0.23])
preference_ = [False, True]
likelihood_cov = acquisition.likelihood(preference_)
acquisition.plot_2D(acquisition.proposal_mean, acquisition.proposal_cov)
acquisition.plot_2D(mean_, likelihood_cov)
acquisition.update_proposal_model(mean_, preference_)
acquisition.plot_2D(acquisition.proposal_mean, acquisition.proposal_cov)
mean2 = np.array([0.33, 0.24])
preference2 = [True, False]
likelihood_cov2 = acquisition.likelihood(preference2)
acquisition.plot_2D(mean2, likelihood_cov2)
acquisition.update_proposal_model(mean2, preference2)
acquisition.plot_2D(acquisition.proposal_mean, acquisition.proposal_cov)
mean2 = np.array([-0.66, -0.5])
preference2 = [False, False]
likelihood_cov2 = acquisition.likelihood(preference2)
acquisition.plot_2D(mean2, likelihood_cov2)
acquisition.update_proposal_model(mean2, preference2)
acquisition.update_proposal_model(mean2, preference2)
acquisition.update_proposal_model(mean2, preference2)
acquisition.update_proposal_model(mean2, preference2)
acquisition.update_proposal_model(mean2, preference2)
acquisition.plot_2D(acquisition.proposal_mean, acquisition.proposal_cov)

View File

@ -0,0 +1,115 @@
import numpy as np
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import Matern
from PolicyModel.GaussianModelMultiDim import GaussianPolicy
from AcquistionFunctions.ExpectedImprovement import ExpectedImprovement
from AcquistionFunctions.ProbabilityOfImprovement import ProbabilityOfImprovement
from AcquistionFunctions.ConfidenceBound import ConfidenceBound
from AcquistionFunctions.PreferenceExpectedImprovement import PreferenceExpectedImprovement
class BayesianOptimization:
def __init__(self, nr_steps, nr_dims, nr_weights, acq='ei', seed=None):
self.acq = acq
self.episode = 0
self.nr_steps = nr_steps
self.nr_dims = nr_dims
self.nr_weights = nr_weights
self.weights = self.nr_weights * self.nr_dims
self.lower_bound = -1.0
self.upper_bound = 1.0
self.seed = seed
self.X = None
self.Y = None
self.gp = None
self.policy_model = GaussianPolicy(self.nr_steps, self.nr_weights, self.nr_dims,
lowerb=self.lower_bound, upperb=self.upper_bound, seed=seed)
self.acq_sample_size = 100
self.best_reward = np.empty((1, 1))
if acq == "Preference Expected Improvement":
self.acq_fun = PreferenceExpectedImprovement(self.weights,
self.acq_sample_size,
self.lower_bound,
self.upper_bound,
initial_variance=10.0,
update_variance=0.05,
seed=seed)
self.reset_bo()
def reset_bo(self):
self.gp = GaussianProcessRegressor(Matern(nu=1.5, ), n_restarts_optimizer=5) #length_scale=(1e-8, 1e5)
self.best_reward = np.empty((1, 1))
self.X = np.zeros((1, self.weights), dtype=np.float64)
self.Y = np.zeros((1, 1), dtype=np.float64)
self.episode = 0
def next_observation(self):
if self.acq == "Expected Improvement":
x_next = ExpectedImprovement(self.gp,
self.X,
self.acq_sample_size,
self.weights,
kappa=0,
seed=self.seed,
lower=self.lower_bound,
upper=self.upper_bound)
elif self.acq == "Probability of Improvement":
x_next = ProbabilityOfImprovement(self.gp,
self.X,
self.acq_sample_size,
self.weights,
kappa=0,
seed=self.seed,
lower=self.lower_bound,
upper=self.upper_bound)
elif self.acq == "Upper Confidence Bound":
x_next = ConfidenceBound(self.gp,
self.acq_sample_size,
self.weights,
beta=2.576,
seed=self.seed,
lower=self.lower_bound,
upper=self.upper_bound)
elif self.acq == "Preference Expected Improvement":
x_next = self.acq_fun.expected_improvement(self.gp,
self.X,
kappa=0)
else:
raise NotImplementedError
return x_next
def add_observation(self, reward, x):
if self.episode == 0:
self.X[0, :] = x
self.Y[0] = reward
self.best_reward[0] = np.max(self.Y)
else:
self.X = np.vstack((self.X, np.around(x, decimals=8)), dtype=np.float64)
self.Y = np.vstack((self.Y, reward), dtype=np.float64)
self.best_reward = np.vstack((self.best_reward, np.max(self.Y)), dtype=np.float64)
self.gp.fit(self.X, self.Y)
self.episode += 1
def get_best_result(self):
y_max = np.max(self.Y)
idx = np.argmax(self.Y)
x_max = self.X[idx, :]
return y_max, x_max, idx

View File

@ -109,7 +109,6 @@ class BayesianOptimization:
plt.show()
def main():
max_step = 50
car = MountainCarEnv(max_step)
@ -123,5 +122,6 @@ def main():
bo.plot_reward()
bo.policy_model.plot_policy()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,49 @@
import numpy as np
class GaussianPolicy:
def __init__(self, nr_steps, nr_weights, nr_dims, lowerb=-1.0, upperb=1.0, seed=None):
self.nr_weights = nr_weights
self.nr_steps = nr_steps
self.nr_dims = nr_dims
self.weights = None
self.trajectory = None
self.lowerb = lowerb
self.upperb = upperb
self.rng = np.random.default_rng(seed=seed)
# initialize
self.mid_points = np.linspace(0, self.nr_steps, self.nr_weights)
if nr_weights > 1:
self.std = self.mid_points[1] / (2 * np.sqrt(2 * np.log(2))) # Full width at half maximum
else:
self.std = self.nr_steps / 2
self.reset()
def reset(self):
self.weights = np.zeros((self.nr_weights, self.nr_dims))
self.trajectory = np.zeros((self.nr_steps, self.nr_dims))
def random_weights(self):
for dim in range(self.nr_dims):
self.weights[:, dim] = self.rng.uniform(self.lowerb, self.upperb, self.nr_weights)
def rollout(self):
self.trajectory = np.zeros((self.nr_steps, self.nr_dims))
for step in range(self.nr_steps):
for weight in range(self.nr_weights):
base_fun = np.exp(-0.5 * (step - self.mid_points[weight]) ** 2 / self.std ** 2)
for dim in range(self.nr_dims):
self.trajectory[step, dim] += base_fun * self.weights[weight, dim]
return self.trajectory
def set_weights(self, x):
self.weights = x.reshape(self.nr_weights, self.nr_dims)
def get_x(self):
return self.weights.reshape(self.nr_weights * self.nr_dims, 1)

142
runner/BODmRunner.py Normal file
View File

@ -0,0 +1,142 @@
# Control Suite
from dm_control import suite
# General
import copy
import numpy as np
# Graphics-related
import matplotlib.pyplot as plt
# Bayesian Optimization
from BayesianOptimization.BOwithDM import BayesianOptimization
import warnings
from sklearn.exceptions import ConvergenceWarning
warnings.filterwarnings("ignore", category=ConvergenceWarning)
seed = None
random_state = np.random.RandomState(seed=seed)
env = suite.load('reacher', 'hard', task_kwargs={'random': random_state})
spec = env.action_spec()
time_step = env.reset()
nr_steps = 100
nr_runs = 10
nr_dims = spec.shape[0]
iteration_steps = 50
acquisition_fun = "Expected Improvement"
nr_weights = 15
nr_inits = 3
# storage arrays
best_weights = np.zeros((nr_runs, nr_weights, nr_dims))
best_rewards = np.zeros((nr_runs, iteration_steps))
rewards = np.zeros((nr_runs, iteration_steps))
def post_processing(reward):
reward_mean = np.mean(reward, axis=0)
reward_std = np.std(reward, axis=0)
return reward_mean, reward_std
def plot_reward(mean, std):
eps = np.linspace(0, mean.shape[0], mean.shape[0])
plt.plot(eps, mean)
plt.fill_between(
eps,
mean - 1.96 * std,
mean + 1.96 * std,
alpha=0.5
)
plt.show()
def runner(env_, policy_):
reward = 0
env_.reset()
for step in range(nr_steps):
action = policy_[step]
output = env_.step(action)
if output.reward != 0:
reward += output.reward * 10
break
reward += -1.0
return reward
def main():
global best_weights, rewards
bo = BayesianOptimization(nr_steps, nr_dims, nr_weights, acq=acquisition_fun, seed=seed)
for run in range(nr_runs):
bo.reset_bo()
print(f'Run: {run}')
# initialization
for init in range(nr_inits):
bo.policy_model.random_weights()
policy = bo.policy_model.rollout()
reward = runner(env, policy)
x = bo.policy_model.get_x()
x = x.reshape(nr_weights * nr_dims, )
bo.add_observation(reward, x)
# Bayesian Optimization
for n in range(iteration_steps):
x_next = bo.next_observation()
bo.policy_model.set_weights(x_next)
policy = bo.policy_model.rollout()
reward = runner(env, policy)
x_next = x_next.reshape(nr_weights * nr_dims, )
bo.add_observation(reward, x_next)
rewards[run, n] = reward
y_max, _, _ = bo.get_best_result()
best_rewards[run, n] = y_max
reward_mean, reward_std = post_processing(best_rewards)
plot_reward(reward_mean, reward_std)
if __name__ == '__main__':
main()
# for i in range(nr_steps):
# action = random_state.uniform(spec.minimum, spec.maximum, spec.shape)
# time_step = env.step(action)
#
# camera0 = env.physics.render(camera_id=0, height=400, width=600)
# frames.append(camera0) # Directly append the frame without any modification
# rewards.append(time_step.reward)
# observations.append(copy.deepcopy(time_step.observation))
# ticks.append(env.physics.data.time)
#
# # Show video and plot reward and observations
# for i in range(len(frames)):
# if i % 20 == 0: # Display every 20th frame for example purposes
# print(frames[i].shape)
# fig, ax = plt.subplots(1, 1)
# ax.imshow(frames[i])
# ax.axis('off') # Turn off the axis
#
# # Remove any whitespace from the edges
# ax.set_xticks([])
# ax.set_yticks([])
# plt.subplots_adjust(left=0, right=1, top=1, bottom=0, wspace=0, hspace=0)
# plt.margins(0, 0)
# plt.gca().xaxis.set_major_locator(plt.NullLocator())
# plt.gca().yaxis.set_major_locator(plt.NullLocator())
#
# plt.show()

View File

@ -1,67 +0,0 @@
# Control Suite
from dm_control import suite
# General
import copy
import numpy as np
# Graphics-related
import matplotlib.pyplot as plt
# Bayesian Optimization
from BayesianOptimization.BOwithGym import BayesianOptimization
import warnings
from sklearn.exceptions import ConvergenceWarning
warnings.filterwarnings("ignore", category=ConvergenceWarning)
random_state = np.random.RandomState()
env = suite.load('reacher', 'hard', task_kwargs={'random': random_state})
nr_steps = 100
nr_runs = 10
iteration_steps = 50
acquisition_fun = 'ei'
# storage arrays
finished_store = np.zeros((1, nr_runs))
best_policy = np.zeros((nr_steps, nr_runs, 2))
reward_store = np.zeros((iteration_steps, nr_runs))
spec = env.action_spec()
time_step = env.reset()
def main():
global finished_store, best_policy, reward_store
for i in range(nr_steps):
action = random_state.uniform(spec.minimum, spec.maximum, spec.shape)
time_step = env.step(action)
camera0 = env.physics.render(camera_id=0, height=400, width=600)
frames.append(camera0) # Directly append the frame without any modification
rewards.append(time_step.reward)
observations.append(copy.deepcopy(time_step.observation))
ticks.append(env.physics.data.time)
# Show video and plot reward and observations
for i in range(len(frames)):
if i % 20 == 0: # Display every 20th frame for example purposes
print(frames[i].shape)
fig, ax = plt.subplots(1, 1)
ax.imshow(frames[i])
ax.axis('off') # Turn off the axis
# Remove any whitespace from the edges
ax.set_xticks([])
ax.set_yticks([])
plt.subplots_adjust(left=0, right=1, top=1, bottom=0, wspace=0, hspace=0)
plt.margins(0, 0)
plt.gca().xaxis.set_major_locator(plt.NullLocator())
plt.gca().yaxis.set_major_locator(plt.NullLocator())
plt.show()