added more gym envs

This commit is contained in:
Niko Feith 2023-04-27 16:38:24 +02:00
parent 4fe3973a53
commit 69ae81d82d
5 changed files with 612 additions and 14 deletions

View File

@ -8,6 +8,7 @@ from AcquistionFunctions.ProbabilityOfImprovement import ProbabilityOfImprovemen
from AcquistionFunctions.ConfidenceBound import ConfidenceBound
from ToyTask.MountainCarGym import Continuous_MountainCarEnv
from ToyTask.Pendulum import PendulumEnv
import time
@ -15,8 +16,9 @@ import matplotlib.pyplot as plt
class BayesianOptimization:
def __init__(self, env, nr_step, nr_init=3, acq='ei', nr_weights=6, policy_seed=None):
def __init__(self, env, nr_step, nr_init=3, acq='ei', nr_weights=6, policy_seed=None, env_seed=None):
self.env = env
self.env_seed = env_seed
self.nr_init = nr_init
self.acq = acq
self.X = None
@ -48,7 +50,7 @@ class BayesianOptimization:
self.best_reward = np.empty((1, 1))
def initialize(self):
self.env.reset()
self.env.reset(seed=self.env_seed)
self.reset_bo()
if self.env.render_mode == 'human':
self.env.render()
@ -69,12 +71,14 @@ class BayesianOptimization:
self.gp.fit(self.X, self.Y)
def runner(self, policy):
self.env.reset(seed=self.env_seed)
done = False
step_count = 0
env_reward = 0.0
while not done:
action = policy[step_count]
output = self.env.step(action)
action_clipped = action.clip(min=-1.0, max=1.0)
output = self.env.step(action_clipped.astype(np.float32))
env_reward += output[1]
done = output[2]
if self.env.render_mode == 'human':
@ -82,8 +86,6 @@ class BayesianOptimization:
step_count += 1
if step_count >= self.nr_steps:
done = True
distance = -(self.env.goal_position - output[0][0])
env_reward += distance * self.distance_penalty
if self.counter_array[0] == 0:
@ -98,7 +100,6 @@ class BayesianOptimization:
if self.env.render_mode == 'human':
time.sleep(0.25)
self.env.reset()
return env_reward, step_count
def next_observation(self):
@ -187,22 +188,21 @@ class BayesianOptimization:
def get_best_result(self, plotter=True):
y_hat = self.gp.predict(self.X)
idx = np.argmax(y_hat)
print(idx, np.argmax(self.Y))
x_max = self.X[idx, :]
self.policy_model.weights = x_max
self.policy_model.policy_rollout()
if plotter:
print(self.counter_array[idx], idx)
self.policy_model.plot_policy(finished=self.counter_array[idx])
else:
return self.counter_array[idx]
def main():
nr_steps = 100
env = Continuous_MountainCarEnv() # render_mode='human'
bo = BayesianOptimization(env, nr_steps, nr_weights=10, acq='ei')
env = PendulumEnv(render_mode='human') # render_mode='human'
bo = BayesianOptimization(env, nr_steps, nr_weights=10, acq='ei', env_seed=1234)
bo.initialize()
iteration_steps = 200
iteration_steps = 100
for i in range(iteration_steps):
x_next = bo.next_observation()
step_count = bo.eval_new_observation(x_next)

309
ToyTask/Cartpole.py Normal file
View File

@ -0,0 +1,309 @@
"""
Classic cart-pole system implemented by Rich Sutton et al.
Copied from http://incompleteideas.net/sutton/book/code/pole.c
permalink: https://perma.cc/C9ZM-652R
"""
import math
from typing import Optional, Union
import numpy as np
import gym
from gym import logger, spaces
from gym.spaces import Box
from gym.envs.classic_control import utils
from gym.error import DependencyNotInstalled
class CartPoleEnv(gym.Env[np.ndarray, Union[int, np.ndarray]]):
"""
### Description
This environment corresponds to the version of the cart-pole problem described by Barto, Sutton, and Anderson in
["Neuronlike Adaptive Elements That Can Solve Difficult Learning Control Problem"](https://ieeexplore.ieee.org/document/6313077).
A pole is attached by an un-actuated joint to a cart, which moves along a frictionless track.
The pendulum is placed upright on the cart and the goal is to balance the pole by applying forces
in the left and right direction on the cart.
### Action Space
Due to the policy shaping approach the action is a 'ndarray' with shape '(1,)' which can take values '[-1,1]' which
is scaled by the force_mag pushing the cart to the left if it is lower than 0, to the right if it is higher than 0
and doing nothing if the action is equal to 0
### Observation Space
The observation is a `ndarray` with shape `(4,)` with the values corresponding to the following positions and velocities:
| Num | Observation | Min | Max |
|-----|-----------------------|---------------------|-------------------|
| 0 | Cart Position | -4.8 | 4.8 |
| 1 | Cart Velocity | -Inf | Inf |
| 2 | Pole Angle | ~ -0.418 rad (-24°) | ~ 0.418 rad (24°) |
| 3 | Pole Angular Velocity | -Inf | Inf |
**Note:** While the ranges above denote the possible values for observation space of each element,
it is not reflective of the allowed values of the state space in an unterminated episode. Particularly:
- The cart x-position (index 0) can be take values between `(-4.8, 4.8)`, but the episode terminates
if the cart leaves the `(-2.4, 2.4)` range.
- The pole angle can be observed between `(-.418, .418)` radians (or **±24°**), but the episode terminates
if the pole angle is not in the range `(-.2095, .2095)` (or **±12°**)
### Rewards
Since the goal is to keep the pole upright for as long as possible, a reward of `+1` for every step taken,
including the termination step, is allotted. The threshold for rewards is 475 for v1.
### Starting State
All observations are assigned a uniformly random value in `(-0.05, 0.05)`
### Episode End
The episode ends if any one of the following occurs:
1. Termination: Pole Angle is greater than ±12°
2. Termination: Cart Position is greater than ±2.4 (center of the cart reaches the edge of the display)
3. Truncation: Episode length is greater than 500 (200 for v0)
### Arguments
```
gym.make('CartPole-v1')
```
No additional arguments are currently supported.
"""
metadata = {
"render_modes": ["human", "rgb_array"],
"render_fps": 50,
}
def __init__(self, render_mode: Optional[str] = None):
self.gravity = 9.8
self.masscart = 1.0
self.masspole = 0.1
self.total_mass = self.masspole + self.masscart
self.length = 0.5 # actually half the pole's length
self.polemass_length = self.masspole * self.length
self.force_mag = 10.0
self.tau = 0.02 # seconds between state updates
self.kinematics_integrator = "euler"
# Angle at which to fail the episode
self.theta_threshold_radians = 12 * 2 * math.pi / 360
self.x_threshold = 2.4
# Angle limit set to 2 * theta_threshold_radians so failing observation
# is still within bounds.
high = np.array(
[
self.x_threshold * 2,
np.finfo(np.float32).max,
self.theta_threshold_radians * 2,
np.finfo(np.float32).max,
],
dtype=np.float32,
)
self.action_space = Box(low=-1.0, high=1.0, shape=(1,), dtype=np.float32)
self.observation_space = spaces.Box(-high, high, dtype=np.float32)
self.render_mode = render_mode
self.screen_width = 600
self.screen_height = 400
self.screen = None
self.clock = None
self.isopen = True
self.state = None
self.steps_beyond_terminated = None
def step(self, action):
err_msg = f"{action!r} ({type(action)}) invalid"
assert self.action_space.contains(action), err_msg
assert self.state is not None, "Call reset before using step method."
x, x_dot, theta, theta_dot = self.state
# changed usage of action due to policy shaping approach
force = action * self.force_mag
costheta = math.cos(theta)
sintheta = math.sin(theta)
# For the interested reader:
# https://coneural.org/florian/papers/05_cart_pole.pdf
temp = (
force + self.polemass_length * theta_dot**2 * sintheta
) / self.total_mass
thetaacc = (self.gravity * sintheta - costheta * temp) / (
self.length * (4.0 / 3.0 - self.masspole * costheta**2 / self.total_mass)
)
xacc = temp - self.polemass_length * thetaacc * costheta / self.total_mass
if self.kinematics_integrator == "euler":
x = x + self.tau * x_dot
x_dot = x_dot + self.tau * xacc
theta = theta + self.tau * theta_dot
theta_dot = theta_dot + self.tau * thetaacc
else: # semi-implicit euler
x_dot = x_dot + self.tau * xacc
x = x + self.tau * x_dot
theta_dot = theta_dot + self.tau * thetaacc
theta = theta + self.tau * theta_dot
self.state = (x, x_dot[0], theta, theta_dot[0])
terminated = bool(
x < -self.x_threshold
or x > self.x_threshold
or theta < -self.theta_threshold_radians
or theta > self.theta_threshold_radians
)
if not terminated:
reward = 1.0
elif self.steps_beyond_terminated is None:
# Pole just fell!
self.steps_beyond_terminated = 0
reward = 1.0
else:
if self.steps_beyond_terminated == 0:
logger.warn(
"You are calling 'step()' even though this "
"environment has already returned terminated = True. You "
"should always call 'reset()' once you receive 'terminated = "
"True' -- any further steps are undefined behavior."
)
self.steps_beyond_terminated += 1
reward = 0.0
if self.render_mode == "human":
self.render()
return np.array(self.state, dtype=np.float32), reward, terminated, False, {}
def reset(
self,
*,
seed: Optional[int] = None,
options: Optional[dict] = None,
):
super().reset(seed=seed)
# Note that if you use custom reset bounds, it may lead to out-of-bound
# state/observations.
low, high = utils.maybe_parse_reset_bounds(
options, -0.05, 0.05 # default low
) # default high
self.state = self.np_random.uniform(low=low, high=high, size=(4,))
self.steps_beyond_terminated = None
if self.render_mode == "human":
self.render()
return np.array(self.state, dtype=np.float32), {}
def render(self):
if self.render_mode is None:
gym.logger.warn(
"You are calling render method without specifying any render mode. "
"You can specify the render_mode at initialization, "
f'e.g. gym("{self.spec.id}", render_mode="rgb_array")'
)
return
try:
import pygame
from pygame import gfxdraw
except ImportError:
raise DependencyNotInstalled(
"pygame is not installed, run `pip install gym[classic_control]`"
)
if self.screen is None:
pygame.init()
if self.render_mode == "human":
pygame.display.init()
self.screen = pygame.display.set_mode(
(self.screen_width, self.screen_height)
)
else: # mode == "rgb_array"
self.screen = pygame.Surface((self.screen_width, self.screen_height))
if self.clock is None:
self.clock = pygame.time.Clock()
world_width = self.x_threshold * 2
scale = self.screen_width / world_width
polewidth = 10.0
polelen = scale * (2 * self.length)
cartwidth = 50.0
cartheight = 30.0
if self.state is None:
return None
x = self.state
self.surf = pygame.Surface((self.screen_width, self.screen_height))
self.surf.fill((255, 255, 255))
l, r, t, b = -cartwidth / 2, cartwidth / 2, cartheight / 2, -cartheight / 2
axleoffset = cartheight / 4.0
cartx = x[0] * scale + self.screen_width / 2.0 # MIDDLE OF CART
carty = 100 # TOP OF CART
cart_coords = [(l, b), (l, t), (r, t), (r, b)]
cart_coords = [(c[0] + cartx, c[1] + carty) for c in cart_coords]
gfxdraw.aapolygon(self.surf, cart_coords, (0, 0, 0))
gfxdraw.filled_polygon(self.surf, cart_coords, (0, 0, 0))
l, r, t, b = (
-polewidth / 2,
polewidth / 2,
polelen - polewidth / 2,
-polewidth / 2,
)
pole_coords = []
for coord in [(l, b), (l, t), (r, t), (r, b)]:
coord = pygame.math.Vector2(coord).rotate_rad(-x[2])
coord = (coord[0] + cartx, coord[1] + carty + axleoffset)
pole_coords.append(coord)
gfxdraw.aapolygon(self.surf, pole_coords, (202, 152, 101))
gfxdraw.filled_polygon(self.surf, pole_coords, (202, 152, 101))
gfxdraw.aacircle(
self.surf,
int(cartx),
int(carty + axleoffset),
int(polewidth / 2),
(129, 132, 203),
)
gfxdraw.filled_circle(
self.surf,
int(cartx),
int(carty + axleoffset),
int(polewidth / 2),
(129, 132, 203),
)
gfxdraw.hline(self.surf, 0, self.screen_width, carty, (0, 0, 0))
self.surf = pygame.transform.flip(self.surf, False, True)
self.screen.blit(self.surf, (0, 0))
if self.render_mode == "human":
pygame.event.pump()
self.clock.tick(self.metadata["render_fps"])
pygame.display.flip()
elif self.render_mode == "rgb_array":
return np.transpose(
np.array(pygame.surfarray.pixels3d(self.screen)), axes=(1, 0, 2)
)
def close(self):
if self.screen is not None:
import pygame
pygame.display.quit()
pygame.quit()
self.isopen = False

273
ToyTask/Pendulum.py Normal file
View File

@ -0,0 +1,273 @@
__credits__ = ["Carlos Luis"]
from os import path
from typing import Optional
import numpy as np
import gym
from gym import spaces
from gym.envs.classic_control import utils
from gym.error import DependencyNotInstalled
DEFAULT_X = np.pi
DEFAULT_Y = 1.0
class PendulumEnv(gym.Env):
"""
### Description
The inverted pendulum swingup problem is based on the classic problem in control theory.
The system consists of a pendulum attached at one end to a fixed point, and the other end being free.
The pendulum starts in a random position and the goal is to apply torque on the free end to swing it
into an upright position, with its center of gravity right above the fixed point.
The diagram below specifies the coordinate system used for the implementation of the pendulum's
dynamic equations.
![Pendulum Coordinate System](./diagrams/pendulum.png)
- `x-y`: cartesian coordinates of the pendulum's end in meters.
- `theta` : angle in radians.
- `tau`: torque in `N m`. Defined as positive _counter-clockwise_.
### Action Space
The action is a `ndarray` with shape `(1,)` representing the torque applied to free end of the pendulum.
| Num | Action | Min | Max |
|-----|--------|------|-----|
| 0 | Torque | -1.0 | 1.0 |
### Observation Space
The observation is a `ndarray` with shape `(3,)` representing the x-y coordinates of the pendulum's free
end and its angular velocity.
| Num | Observation | Min | Max |
|-----|------------------|------|-----|
| 0 | x = cos(theta) | -1.0 | 1.0 |
| 1 | y = sin(theta) | -1.0 | 1.0 |
| 2 | Angular Velocity | -8.0 | 8.0 |
### Rewards
The reward function is defined as:
*r = -(theta<sup>2</sup> + 0.1 * theta_dt<sup>2</sup> + 0.001 * torque<sup>2</sup>)*
where `$\theta$` is the pendulum's angle normalized between *[-pi, pi]* (with 0 being in the upright position).
Based on the above equation, the minimum reward that can be obtained is
*-(pi<sup>2</sup> + 0.1 * 8<sup>2</sup> + 0.001 * 2<sup>2</sup>) = -16.2736044*,
while the maximum reward is zero (pendulum is upright with zero velocity and no torque applied).
### Starting State
The starting state is a random angle in *[-pi, pi]* and a random angular velocity in *[-1,1]*.
### Episode Truncation
The episode truncates at 200 time steps.
### Arguments
- `g`: acceleration of gravity measured in *(m s<sup>-2</sup>)* used to calculate the pendulum dynamics.
The default value is g = 10.0 .
```
gym.make('Pendulum-v1', g=9.81)
```
### Version History
* v1: Simplify the math equations, no difference in behavior.
* v0: Initial versions release (1.0.0)
"""
metadata = {
"render_modes": ["human", "rgb_array"],
"render_fps": 30,
}
def __init__(self, render_mode: Optional[str] = None, g=10.0):
self.max_speed = 8
self.max_torque = 2.0
self.dt = 0.05
self.g = g
self.m = 1.0
self.l = 1.0
self.render_mode = render_mode
self.screen_dim = 500
self.screen = None
self.clock = None
self.isopen = True
high = np.array([1.0, 1.0, self.max_speed], dtype=np.float32)
# This will throw a warning in tests/envs/test_envs in utils/env_checker.py as the space is not symmetric
# or normalised as max_torque == 2 by default. Ignoring the issue here as the default settings are too old
# to update to follow the openai gym api
self.action_space = spaces.Box(
low=-self.max_torque, high=self.max_torque, shape=(1,), dtype=np.float32
)
self.observation_space = spaces.Box(low=-high, high=high, dtype=np.float32)
def step(self, u):
th, thdot = self.state # th := theta
g = self.g
m = self.m
l = self.l
dt = self.dt
u = 2 * u # scaling the action to +/- 2 Nm
u = np.clip(u, -self.max_torque, self.max_torque)[0]
self.last_u = u # for rendering
costs = angle_normalize(th) ** 2 + 0.1 * thdot**2 + 0.001 * (u**2)
newthdot = thdot + (3 * g / (2 * l) * np.sin(th) + 3.0 / (m * l**2) * u) * dt
newthdot = np.clip(newthdot, -self.max_speed, self.max_speed)
newth = th + newthdot * dt
self.state = np.array([newth, newthdot])
if self.render_mode == "human":
self.render()
return self._get_obs(), -costs, False, False, {}
def reset(self, *, seed: Optional[int] = None, options: Optional[dict] = None):
super().reset(seed=seed)
if options is None:
high = np.array([DEFAULT_X, DEFAULT_Y])
else:
# Note that if you use custom reset bounds, it may lead to out-of-bound
# state/observations.
x = options.get("x_init") if "x_init" in options else DEFAULT_X
y = options.get("y_init") if "y_init" in options else DEFAULT_Y
x = utils.verify_number_and_cast(x)
y = utils.verify_number_and_cast(y)
high = np.array([x, y])
low = -high # We enforce symmetric limits.
self.state = self.np_random.uniform(low=low, high=high)
self.last_u = None
if self.render_mode == "human":
self.render()
return self._get_obs(), {}
def _get_obs(self):
theta, thetadot = self.state
return np.array([np.cos(theta), np.sin(theta), thetadot], dtype=np.float32)
def render(self):
if self.render_mode is None:
gym.logger.warn(
"You are calling render method without specifying any render mode. "
"You can specify the render_mode at initialization, "
f'e.g. gym("{self.spec.id}", render_mode="rgb_array")'
)
return
try:
import pygame
from pygame import gfxdraw
except ImportError:
raise DependencyNotInstalled(
"pygame is not installed, run `pip install gym[classic_control]`"
)
if self.screen is None:
pygame.init()
if self.render_mode == "human":
pygame.display.init()
self.screen = pygame.display.set_mode(
(self.screen_dim, self.screen_dim)
)
else: # mode in "rgb_array"
self.screen = pygame.Surface((self.screen_dim, self.screen_dim))
if self.clock is None:
self.clock = pygame.time.Clock()
self.surf = pygame.Surface((self.screen_dim, self.screen_dim))
self.surf.fill((255, 255, 255))
bound = 2.2
scale = self.screen_dim / (bound * 2)
offset = self.screen_dim // 2
rod_length = 1 * scale
rod_width = 0.2 * scale
l, r, t, b = 0, rod_length, rod_width / 2, -rod_width / 2
coords = [(l, b), (l, t), (r, t), (r, b)]
transformed_coords = []
for c in coords:
c = pygame.math.Vector2(c).rotate_rad(self.state[0] + np.pi / 2)
c = (c[0] + offset, c[1] + offset)
transformed_coords.append(c)
gfxdraw.aapolygon(self.surf, transformed_coords, (204, 77, 77))
gfxdraw.filled_polygon(self.surf, transformed_coords, (204, 77, 77))
gfxdraw.aacircle(self.surf, offset, offset, int(rod_width / 2), (204, 77, 77))
gfxdraw.filled_circle(
self.surf, offset, offset, int(rod_width / 2), (204, 77, 77)
)
rod_end = (rod_length, 0)
rod_end = pygame.math.Vector2(rod_end).rotate_rad(self.state[0] + np.pi / 2)
rod_end = (int(rod_end[0] + offset), int(rod_end[1] + offset))
gfxdraw.aacircle(
self.surf, rod_end[0], rod_end[1], int(rod_width / 2), (204, 77, 77)
)
gfxdraw.filled_circle(
self.surf, rod_end[0], rod_end[1], int(rod_width / 2), (204, 77, 77)
)
fname = path.join(path.dirname(__file__), "assets/clockwise.png")
img = pygame.image.load(fname)
if self.last_u is not None:
scale_img = pygame.transform.smoothscale(
img,
(scale * np.abs(self.last_u) / 2, scale * np.abs(self.last_u) / 2),
)
is_flip = bool(self.last_u > 0)
scale_img = pygame.transform.flip(scale_img, is_flip, True)
self.surf.blit(
scale_img,
(
offset - scale_img.get_rect().centerx,
offset - scale_img.get_rect().centery,
),
)
# drawing axle
gfxdraw.aacircle(self.surf, offset, offset, int(0.05 * scale), (0, 0, 0))
gfxdraw.filled_circle(self.surf, offset, offset, int(0.05 * scale), (0, 0, 0))
self.surf = pygame.transform.flip(self.surf, False, True)
self.screen.blit(self.surf, (0, 0))
if self.render_mode == "human":
pygame.event.pump()
self.clock.tick(self.metadata["render_fps"])
pygame.display.flip()
else: # mode == "rgb_array":
return np.transpose(
np.array(pygame.surfarray.pixels3d(self.screen)), axes=(1, 0, 2)
)
def close(self):
if self.screen is not None:
import pygame
pygame.display.quit()
pygame.quit()
self.isopen = False
def angle_normalize(x):
return ((x + np.pi) % (2 * np.pi)) - np.pi

Binary file not shown.

After

Width:  |  Height:  |  Size: 6.8 KiB

View File

@ -1,11 +1,19 @@
from BayesianOptimization.BOwithGym import BayesianOptimization
from ToyTask.MountainCarGym import Continuous_MountainCarEnv
import numpy as np
import matplotlib.pyplot as plt
# from ToyTask.MountainCarGym import Continuous_MountainCarEnv
from ToyTask.Pendulum import PendulumEnv
import warnings
from sklearn.exceptions import ConvergenceWarning
warnings.filterwarnings("ignore", category=ConvergenceWarning)
# BO parameters
env = Continuous_MountainCarEnv()
env = PendulumEnv()
nr_steps = 100
acquisition_fun = 'ei'
iteration_steps = 100
@ -17,6 +25,7 @@ finished_store = np.zeros((1, nr_runs))
best_policy = np.zeros((nr_steps, nr_runs))
reward_store = np.zeros((iteration_steps, nr_runs))
# post-processing
def post_processing(finished, policy, reward):
@ -31,6 +40,7 @@ def post_processing(finished, policy, reward):
return finish_mean, finish_std, policy_mean, policy_std, reward_mean, reward_std
# plot functions
def plot_policy(mean, std, fin_mean, fin_std):
x = np.linspace(0, mean.shape[0], mean.shape[0])
@ -53,6 +63,7 @@ def plot_policy(mean, std, fin_mean, fin_std):
plt.show()
def plot_reward(mean, std):
eps = np.linspace(0, mean.shape[0], mean.shape[0])
plt.plot(eps, mean)
@ -65,12 +76,14 @@ def plot_reward(mean, std):
)
plt.show()
# main
def main():
global finished_store, best_policy, reward_store
bo = BayesianOptimization(env, nr_steps, acq=acquisition_fun)
for i in range(nr_runs):
print('Iteration:', str(i))
bo.env_seed = int(np.random.randint(1, 2147483647, 1)[0])
bo.initialize()
for j in range(iteration_steps):
x_next = bo.next_observation()
@ -82,11 +95,14 @@ def main():
best_policy[:, i] = bo.policy_model.trajectory.T
reward_store[:, i] = bo.best_reward.T
print(reward_store[-1, i])
finish_mean, finish_std, policy_mean, policy_std, reward_mean, reward_std = post_processing(finished_store,
best_policy,
reward_store)
plot_policy(policy_mean, policy_std, finish_mean, finish_std)
plot_reward(reward_mean, reward_std)
if __name__ == '__main__':
main()