Curiosity/icm cartpole.py

196 lines
6.2 KiB
Python
Raw Permalink Normal View History

2023-01-30 16:59:26 +00:00
import gym
import numpy as np
import torch
import torch.nn as nn
import torch.optim
import collections
env = gym.make('CartPole-v1')
class Actor(nn.Module):
def __init__(self, n_actions, space_dims, hidden_dims):
super(Actor, self).__init__()
self.feature_extractor = nn.Sequential(
nn.Linear(space_dims, hidden_dims),
nn.ReLU(True),
)
self.actor = nn.Sequential(
nn.Linear(hidden_dims, n_actions),
nn.Softmax(dim=-1),
)
def forward(self, x):
features = self.feature_extractor(x)
policy = self.actor(features)
return policy
class Critic(nn.Module):
def __init__(self, space_dims, hidden_dims):
super(Critic, self).__init__()
self.feature_extractor = nn.Sequential(
nn.Linear(space_dims, hidden_dims),
nn.ReLU(True),
)
self.critic = nn.Linear(hidden_dims, 1)
def forward(self, x):
features = self.feature_extractor(x)
est_reward = self.critic(features)
return est_reward
class InverseModel(nn.Module):
def __init__(self, n_actions, hidden_dims):
super(InverseModel, self).__init__()
self.fc = nn.Linear(hidden_dims*2, n_actions)
def forward(self, features):
features = features.view(1, -1) # (1, hidden_dims)
action = self.fc(features) # (1, n_actions)
return action
class ForwardModel(nn.Module):
def __init__(self, n_actions, hidden_dims):
super(ForwardModel, self).__init__()
self.fc = nn.Linear(hidden_dims+n_actions, hidden_dims)
self.eye = torch.eye(n_actions)
def forward(self, action, features):
x = torch.cat([self.eye[action], features], dim=-1) # (1, n_actions+hidden_dims)
features = self.fc(x) # (1, hidden_dims)
return features
class FeatureExtractor(nn.Module):
def __init__(self, space_dims, hidden_dims):
super(FeatureExtractor, self).__init__()
self.fc = nn.Linear(space_dims, hidden_dims)
def forward(self, x):
y = torch.tanh(self.fc(x))
return y
class PGLoss(nn.Module):
def __init__(self):
super(PGLoss, self).__init__()
def forward(self, action_prob, reward):
loss = -torch.mean(torch.log(action_prob+1e-6)*reward)
return loss
def select_action(policy):
return np.random.choice(len(policy), 1, p=policy)[0]
def to_tensor(x, dtype=None):
return torch.tensor(x, dtype=dtype).unsqueeze(0)
class ConfigArgs:
beta = 0.2
lamda = 0.1
eta = 100.0 # scale factor for intrinsic reward
discounted_factor = 0.99
lr_critic = 0.005
lr_actor = 0.001
lr_icm = 0.001
max_eps = 1000
sparse_mode = True
args = ConfigArgs()
# Actor Critic
actor = Actor(n_actions=env.action_space.n, space_dims=4, hidden_dims=32)
critic = Critic(space_dims=4, hidden_dims=32)
# ICM
feature_extractor = FeatureExtractor(env.observation_space.shape[0], 32)
forward_model = ForwardModel(env.action_space.n, 32)
inverse_model = InverseModel(env.action_space.n, 32)
# Actor Critic
a_optim = torch.optim.Adam(actor.parameters(), lr=args.lr_actor)
c_optim = torch.optim.Adam(critic.parameters(), lr=args.lr_critic)
# ICM
icm_params = list(feature_extractor.parameters()) + list(forward_model.parameters()) + list(inverse_model.parameters())
icm_optim = torch.optim.Adam(icm_params, lr=args.lr_icm)
pg_loss = PGLoss()
mse_loss = nn.MSELoss()
xe_loss = nn.CrossEntropyLoss()
global_step = 0
n_eps = 0
reward_lst = []
mva_lst = []
mva = 0.
avg_ireward_lst = []
while n_eps < args.max_eps:
n_eps += 1
next_obs = to_tensor(env.reset(), dtype=torch.float)
done = False
score = 0
ireward_lst = []
while not done:
env.render()
obs = next_obs
a_optim.zero_grad()
c_optim.zero_grad()
icm_optim.zero_grad()
# estimate action with policy network
policy = actor(obs)
action = select_action(policy.detach().numpy()[0])
# interaction with environment
next_obs, reward, done, info = env.step(action)
next_obs = to_tensor(next_obs, dtype=torch.float)
advantages = torch.zeros_like(policy)
extrinsic_reward = to_tensor([0.], dtype=torch.float) if args.sparse_mode else to_tensor([reward], dtype=torch.float)
t_action = to_tensor(action)
v = critic(obs)[0]
next_v = critic(next_obs)[0]
# ICM
obs_cat = torch.cat([obs, next_obs], dim=0)
features = feature_extractor(obs_cat) # (2, hidden_dims)
inverse_action_prob = inverse_model(features) # (n_actions)
est_next_features = forward_model(t_action, features[0:1])
# Loss - ICM
forward_loss = mse_loss(est_next_features, features[1])
inverse_loss = xe_loss(inverse_action_prob, t_action.view(-1))
icm_loss = (1-args.beta)*inverse_loss + args.beta*forward_loss
# Reward
intrinsic_reward = args.eta*forward_loss.detach()
if done:
total_reward = -100 + intrinsic_reward if score < 499 else intrinsic_reward
advantages[0, action] = total_reward - v
c_target = total_reward
else:
total_reward = extrinsic_reward + intrinsic_reward
advantages[0, action] = total_reward + args.discounted_factor*next_v - v
c_target = total_reward + args.discounted_factor*next_v
# Loss - Actor Critic
actor_loss = pg_loss(policy, advantages.detach())
critic_loss = mse_loss(v, c_target.detach())
ac_loss = actor_loss + critic_loss
# Update
loss = args.lamda*ac_loss + icm_loss
loss.backward()
icm_optim.step()
a_optim.step()
c_optim.step()
if not done:
score += reward
ireward_lst.append(intrinsic_reward.item())
global_step += 1
avg_intrinsic_reward = sum(ireward_lst) / len(ireward_lst)
mva = 0.95*mva + 0.05*score
reward_lst.append(score)
avg_ireward_lst.append(avg_intrinsic_reward)
mva_lst.append(mva)
print('Episodes: {}, AVG Score: {:.3f}, Score: {}, AVG reward i: {:.6f}'.format(n_eps, mva, score, avg_intrinsic_reward))