testing of new acquisition function

This commit is contained in:
Niko Feith 2023-07-05 13:24:03 +02:00
parent 54a2eb9bba
commit 2ffaa3aead
6 changed files with 407 additions and 9 deletions

View File

@ -0,0 +1,77 @@
import numpy as np
from scipy.stats import norm
def generate_samples(n_samples, use_gaussian, means, variances, lower, upper, seed):
"""
Generate samples from a uniform or Gaussian distribution based on a boolean array.
Args:
n_samples: The number of samples to generate.
use_gaussian: A boolean array of shape (n_dims,). If use_gaussian[i] is True,
generate the i-th dimension of the samples from a Gaussian distribution.
means: An array of shape (n_dims,) giving the means of the Gaussian distributions.
variances: An array of shape (n_dims,) giving the variances of the Gaussian distributions.
lower: lower bound of the samples.
upper: upper bound of the samples.
seed: seed of the random generator.
Returns:
samples: An array of shape (n_samples, n_dims) containing the generated samples.
"""
n_dims = len(use_gaussian)
samples = np.empty((n_samples, n_dims))
rng = np.random.default_rng(seed=seed)
for i in range(n_dims):
if use_gaussian[i]:
samples[:, i] = rng.normal(means[i], np.sqrt(variances[i]), n_samples).clip(min=lower, max=upper)
else:
samples[:, i] = rng.uniform(lower, upper, n_samples)
return samples
def expected_improvement(gp, X, n_samples, use_gaussian, means, variances, kappa=0.01, lower=-1.0, upper=1.0, seed=None):
"""
Compute the expected improvement at points X based on existing samples X_sample
using a Gaussian process surrogate model.
Args:
X: Points at which the objective function has already been sampled.
n_samples: The number of samples to generate.
gp: GaussianProcessRegressor object.
use_gaussian: A boolean array of shape (n_dims,). If use_gaussian[i] is True,
generate the i-th dimension of the samples from a Gaussian distribution.
means: An array of shape (n_dims,) giving the means of the Gaussian distributions.
variances: An array of shape (n_dims,) giving the variances of the Gaussian distributions.
kappa: Exploitation-exploration trade-off parameter.
lower: lower bound of the samples.
upper: upper bound of the samples.
seed: seed of the random generator.
Returns:
next best observation
"""
# Generate samples
X_test = generate_samples(n_samples, use_gaussian, means, variances, lower, upper, seed)
mu, sigma = gp.predict(X_test, return_std=True)
mu_sample = gp.predict(X)
sigma = sigma.reshape(-1, 1)
# Needed for noise-based model,
# otherwise use np.max(Y_sample).
mu_sample_opt = np.max(mu_sample)
with np.errstate(divide='warn'):
imp = mu - mu_sample_opt - kappa
imp = imp.reshape(-1, 1)
Z = imp / sigma
ei = imp * norm.cdf(Z) + sigma * norm.pdf(Z)
ei[sigma == 0.0] = 0.0
# Return the sample with the maximum expected improvement
idx = np.argmax(ei)
X_next = X_test[idx, :]
return X_next

View File

@ -0,0 +1,187 @@
import numpy as np
from scipy.optimize import minimize
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import Matern
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from AcquistionFunctions.ExpectedImprovementwithPreference import expected_improvement
from sklearn.exceptions import ConvergenceWarning
import warnings
warnings.filterwarnings("ignore", category=ConvergenceWarning)
# Define the Branin function
def branin(X):
x, y = X
result = (y - (5.1 / (4*np.pi**2)) * x**2 +
5 * x / np.pi - 6)**2 + 10 * (1 - 1 / (8*np.pi)) * np.cos(x) + 10
return result
# Define the Rosenbrock function
def rosenbrock(X):
x, y = X
return (1 - x)**2 + 100 * (y - x**2)**2
# Function to plot the Test function and the samples
def plot_function_and_samples(func, X_samples, Y_samples, iteration):
fig = plt.figure(figsize=(10, 8))
ax = fig.add_subplot(111, projection='3d')
# Create a grid of points
x = np.linspace(-2, 2, 100)
y = np.linspace(-1, 3, 100)
X, Y = np.meshgrid(x, y)
Z = func([X, Y])
# Plot the function
ax.plot_surface(X, Y, Z, rstride=1, cstride=1, color='b', alpha=0.2, linewidth=0)
# Plot the samples
ax.scatter(X_samples[:, 0], X_samples[:, 1], Y_samples, color='r', s=50)
ax.set_title(f"Iteration {iteration}")
ax.set_xlabel('X')
ax.set_ylabel('Y')
ax.set_zlabel('f(X, Y)')
ax.view_init(elev=30, azim=-30)
plt.show()
def bayesian_optimization(n_iter, change_to_gaussian_after=10):
# # The bounds of the function
# bounds = np.array([[-5.0, 10.0], [0.0, 15.0]]) #brainin
bounds = np.array([[-2.0, 2.0], [-1.0, 3.0]]) #rosenbrock
# Initial samples
X_init = np.random.uniform([b[0] for b in bounds], [b[1] for b in bounds], size=(2, len(bounds)))
Y_init = np.array([[rosenbrock(x)] for x in X_init])
# Gaussian process with Matérn kernel as surrogate model
m52 = Matern(length_scale=[1.0, 1.0], nu=2.5)
gp = GaussianProcessRegressor(kernel=m52, alpha=1e-5)
# Boolean array indicating which dimensions should be sampled from a Gaussian distribution
use_gaussian = np.array([False, False])
# Means and variances for the Gaussian distributions
means = np.array([0.0, 0.0])
variances = np.array([(b[1] - b[0]) * 0.25 for b in bounds])**2
# Array to store the best result from each iteration
best_results = np.empty(n_iter)
# Bayesian optimization loop
for i_ in range(n_iter):
# Update Gaussian process with existing samples
gp.fit(X_init, Y_init)
# Obtain next sampling point from the acquisition function (expected_improvement)
n_samples = 1000
X_next = expected_improvement(X_init, n_samples, gp, use_gaussian, means, variances)
# Obtain next noisy sample from the objective function
Y_next = rosenbrock(X_next)
# Add sample to previous samples
X_init = np.vstack((X_init, X_next))
Y_init = np.vstack((Y_init, Y_next))
# After change_to_gaussian_after iterations, start using Gaussian distribution for sampling
if i_ >= change_to_gaussian_after:
use_gaussian = np.array([True, True])
# Update means with the mean of the best 5 results
best_5_idx = np.argsort(Y_init, axis=0)[:5].flatten()
means = np.mean(X_init[best_5_idx], axis=0)
# Reduce variances by 5 percent
variances *= 0.95
# Store the best result from this iteration
best_results[i_] = np.min(Y_init)
return best_results
# Run the benchmarking test and store the results
results_with_gaussian = np.empty((50, 20))
results_without_gaussian = np.empty((50, 20))
for i in range(50):
results_with_gaussian[i, :] = bayesian_optimization(20, 10)
results_without_gaussian[i, :] = bayesian_optimization(20, 50)
# Compute the mean and standard deviation of the results
mean_with_gaussian = np.mean(results_with_gaussian, axis=0)
std_with_gaussian = np.std(results_with_gaussian, axis=0)
mean_without_gaussian = np.mean(results_without_gaussian, axis=0)
std_without_gaussian = np.std(results_without_gaussian, axis=0)
# Plot the results
plt.figure(figsize=(10, 6))
plt.plot(mean_with_gaussian, label='With Gaussian after 10 iterations')
plt.fill_between(range(20), mean_with_gaussian-std_with_gaussian, mean_with_gaussian+std_with_gaussian, alpha=0.2)
plt.plot(mean_without_gaussian, label='Without Gaussian')
plt.fill_between(range(20), mean_without_gaussian-std_without_gaussian, mean_without_gaussian+std_without_gaussian, alpha=0.2)
plt.xlabel('Iteration')
plt.ylabel('Best Result')
plt.legend()
plt.show()
#
# # Initial samples
# X_init = np.random.uniform([b[0] for b in bounds], [b[1] for b in bounds], size=(2, len(bounds)))
# # Y_init = np.array([[branin(x)] for x in X_init])
# Y_init = np.array([[rosenbrock(x)] for x in X_init])
#
# # Gaussian process with Matérn kernel as surrogate model
# m52 = Matern(length_scale=[1.0, 1.0], nu=2.5)
# gp = GaussianProcessRegressor(kernel=m52, alpha=1e-5)
#
# # Number of iterations
# n_iter = 50
#
# # Boolean array indicating which dimensions should be sampled from a Gaussian distribution
# use_gaussian = np.array([False, False])
#
# # Means and variances for the Gaussian distributions
# means = np.array([0.0, 0.0])
# variances = np.array([(b[1] - b[0]) * 0.25 for b in bounds])**2
#
# # Bayesian optimization loop
# for i in range(n_iter):
# # Update Gaussian process with existing samples
# gp.fit(X_init, Y_init)
#
# # Obtain next sampling point from the acquisition function (expected_improvement)
# n_samples = 1000
# X_next = expected_improvement(X_init, n_samples, gp, use_gaussian, means, variances)
#
# # Obtain next noisy sample from the objective function
# # Y_next = branin(X_next)
# Y_next = rosenbrock(X_next)
#
# # Add sample to previous samples
# X_init = np.vstack((X_init, X_next))
# Y_init = np.vstack((Y_init, Y_next))
#
#
# # After 10 iterations, start using Gaussian distribution for sampling
# if i >= 29:
# use_gaussian = np.array([True, True])
# # Update means with the mean of the best 5 results
# best_5_idx = np.argsort(Y_init, axis=0)[:5].flatten()
# means = np.mean(X_init[best_5_idx], axis=0)
# # Reduce variances by 10 percent
# variances *= 0.9
#
# # Plot the function and the samples
# # plot_function_and_samples(rosenbrock, X_init, Y_init, i)
#
# print("Final samples: ", X_init)
# print("Final function values: ", Y_init)
# print(f"Minimum function values: {np.min(Y_init)}, idx: {np.argmin(Y_init)}")

View File

@ -6,6 +6,7 @@ from PolicyModel.GaussianModel import GaussianPolicy
from AcquistionFunctions.ExpectedImprovement import ExpectedImprovement from AcquistionFunctions.ExpectedImprovement import ExpectedImprovement
from AcquistionFunctions.ProbabilityOfImprovement import ProbabilityOfImprovement from AcquistionFunctions.ProbabilityOfImprovement import ProbabilityOfImprovement
from AcquistionFunctions.ConfidenceBound import ConfidenceBound from AcquistionFunctions.ConfidenceBound import ConfidenceBound
from AcquistionFunctions.ExpectedImprovementwithPreference import expected_improvement
from ToyTask.MountainCarGym import Continuous_MountainCarEnv from ToyTask.MountainCarGym import Continuous_MountainCarEnv
from ToyTask.Pendulum import PendulumEnv from ToyTask.Pendulum import PendulumEnv
@ -42,6 +43,9 @@ class BayesianOptimization:
self.upperb) self.upperb)
self.nr_test = 100 self.nr_test = 100
self.mean_pref = np.zeros((self.nr_policy_weights, 1))
self.var_pref = np.ones(self.mean_pref.shape) * 0.75
self.use_gaussian = np.array([False] * self.nr_policy_weights)
def reset_bo(self): def reset_bo(self):
self.counter_array = np.zeros((1, 1)) self.counter_array = np.zeros((1, 1))
@ -139,6 +143,27 @@ class BayesianOptimization:
return x_next return x_next
elif self.acq == 'ei_gauss':
if self.episode >= 25:
self.use_gaussian = np.array([True] * self.nr_policy_weights)
best_5_idx = np.argsort(self.Y, axis=0)[:5].flatten()
self.mean_pref = np.mean(self.X[best_5_idx], axis=0)
self.var_pref *= 0.95
x_next = expected_improvement(self.gp,
self.X,
self.nr_test,
self.use_gaussian,
self.mean_pref,
self.var_pref,
kappa=0,
seed=self.policy_seed,
lower=self.lowerb,
upper=self.upperb)
return x_next
else: else:
raise NotImplementedError raise NotImplementedError
@ -169,7 +194,8 @@ class BayesianOptimization:
def plot_objective_function(self): def plot_objective_function(self):
plt.scatter(self.X[:, 0], self.Y) plt.scatter(self.X[:, 0], self.Y)
x_dom = np.linspace(self.lowerb * np.ones((1, 6)), self.upperb * np.ones((1, 6)), 100).reshape(-1, self.nr_policy_weights) x_dom = np.linspace(self.lowerb * np.ones((1, 6)), self.upperb * np.ones((1, 6)), 100).reshape(-1,
self.nr_policy_weights)
y_plot, y_sigma = self.gp.predict(x_dom, return_std=True) y_plot, y_sigma = self.gp.predict(x_dom, return_std=True)
y_plot = y_plot.reshape(-1) y_plot = y_plot.reshape(-1)
@ -199,8 +225,8 @@ class BayesianOptimization:
def main(): def main():
nr_steps = 100 nr_steps = 100
env = PendulumEnv(render_mode='human') # render_mode='human' env = Continuous_MountainCarEnv(render_mode='human') # render_mode='human'
bo = BayesianOptimization(env, nr_steps, nr_weights=10, acq='ei', env_seed=1234) bo = BayesianOptimization(env, nr_steps, nr_weights=10, acq='ei_gauss', env_seed=1234)
bo.initialize() bo.initialize()
iteration_steps = 100 iteration_steps = 100
for i in range(iteration_steps): for i in range(iteration_steps):

View File

@ -0,0 +1,81 @@
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import multivariate_normal
def importance_sampling(target_pdf, proposal_pdf, proposal_sample, num_samples):
"""
Perform importance sampling.
Parameters:
target_pdf: The PDF of the target distribution.
proposal_pdf: The PDF of the proposal distribution.
proposal_sample: A function that generates a sample from the proposal distribution.
num_samples: The number of samples to generate.
Returns:
samples: The generated samples.
weights: The importance weights of the samples.
"""
# Generate samples from the proposal distribution
samples = np.array([proposal_sample() for _ in range(num_samples)])
# Calculate the importance weights
weights = np.array([target_pdf(sample) / proposal_pdf(sample) for sample in samples])
# Normalize the weights
weights /= np.sum(weights)
return samples, weights
# Define the target and proposal distributions
target_pdf = lambda x: multivariate_normal.pdf(x, mean=[0, 0], cov=[[1, 0], [0, 1]])
proposal_pdf = lambda x: multivariate_normal.pdf(x, mean=[1, 1], cov=[[2, 0], [0, 2]])
proposal_sample = lambda: np.random.multivariate_normal([1, 1], [[2, 0], [0, 2]])
# Perform importance sampling
samples, weights = importance_sampling(target_pdf, proposal_pdf, proposal_sample, 1000)
# Define the target and proposal distributions
target_pdf = lambda x: multivariate_normal.pdf(x, mean=[0, 0], cov=[[1, 0], [0, 1]])
proposal_pdf = lambda x: multivariate_normal.pdf(x, mean=[1, 1], cov=[[2, 0], [0, 2]])
# Generate a grid of points
x = np.linspace(-3, 3, 100)
y = np.linspace(-3, 3, 100)
X, Y = np.meshgrid(x, y)
pos = np.dstack((X, Y))
# Calculate the PDFs of the target and proposal distributions at each point
target_Z = np.array([[target_pdf([x, y]) for x, y in zip(X_row, Y_row)] for X_row, Y_row in zip(X, Y)])
proposal_Z = np.array([[proposal_pdf([x, y]) for x, y in zip(X_row, Y_row)] for X_row, Y_row in zip(X, Y)])
new_samples = np.zeros(samples.shape)
for i in range(weights.shape[0]):
new_samples[i, :] = samples[i, 0] * weights[i], samples[i, 1] * weights[i]
print(new_samples)
# Plot the target distribution
plt.figure()
plt.contourf(X, Y, target_Z, cmap='viridis')
plt.scatter(new_samples[:, 0], new_samples[:, 1], color='r', s=10) # Add the sample points
plt.title('Target Distribution')
plt.xlabel('x')
plt.ylabel('y')
plt.colorbar(label='PDF')
plt.show()
# Plot the proposal distribution
plt.figure()
plt.contourf(X, Y, proposal_Z, cmap='viridis')
plt.scatter(samples[:, 0], samples[:, 1], color='r', s=10) # Add the sample points
plt.title('Proposal Distribution')
plt.xlabel('x')
plt.ylabel('y')
plt.colorbar(label='PDF')
plt.show()

View File

@ -0,0 +1,26 @@
import numpy as np
class ImprovementQuery:
def __init__(self, threshold, period, rewards):
self.threshold = threshold
self.period = period
self.rewards = rewards
def query(self):
if self.rewards.shape[0] < self.period:
return False
else:
first = self.rewards[-self.period]
last = self.rewards[-1]
slope = (last - first) / self.period
print(slope)
return slope < self.threshold
if __name__ == "__main__":
rewards = np.array([0, 1, 2, 3, 4, 5])
Query = ImprovementQuery(0.05, 5, rewards)
print(Query.query())

View File

@ -5,7 +5,7 @@ import os
def plot_csv(paths, x_axis, y_axis): def plot_csv(paths, x_axis, y_axis):
for path_ in paths: for path_ in paths:
data = np.genfromtxt(path_, delimiter=',', skip_header=1, dtype=float) data = np.genfromtxt(path_, delimiter=',', skip_header=0, dtype=float)
mean = np.mean(data, axis=1) mean = np.mean(data, axis=1)
std = np.std(data, axis=1) std = np.std(data, axis=1)
@ -30,17 +30,18 @@ def plot_csv(paths, x_axis, y_axis):
plt.xlim([0, mean.shape[0]]) plt.xlim([0, mean.shape[0]])
plt.ylabel(y_axis) plt.ylabel(y_axis)
plt.grid(True) plt.grid(True)
plt.legend(loc="lower right") plt.legend(loc="upper left")
plt.show() plt.show()
if __name__ == '__main__': if __name__ == '__main__':
filenames = ['BO/mc-ei-bo--5-1685952362_3531659.csv', filenames = ['cp-ei-bo--20-1687348670_183786.csv',
'random-0_95/mc-ei-random-0_95-5-1685956146_775975.csv', 'cp-ei-random-0_95-20-1687351879_7839339.csv',
'regular-10_0/mc-ei-regular-10-5-1685968651_7080765.csv', 'cp-ei-regular-20_0-20-1687340326_8181093.csv',
'cp-ei-improvement-0_2-20-1687348464_2842393.csv',
] ]
home_dir = os.path.expanduser('~') home_dir = os.path.expanduser('~')
file_path = os.path.join(home_dir, 'Documents/IntRLResults/mc-e50r10') file_path = os.path.join(home_dir, 'Documents/IntRLResults/cp-e100r10-bf20')
paths = [os.path.join(file_path, filename) for filename in filenames] paths = [os.path.join(file_path, filename) for filename in filenames]
plot_csv(paths, 'Episodes', 'Reward') plot_csv(paths, 'Episodes', 'Reward')
# #