Adding Environment Wrapper and including index randomization for trajectory selection

This commit is contained in:
Vedant Dave 2023-04-13 18:41:15 +02:00
parent 233ca77aa4
commit 9aa07fed6a

View File

@ -1,9 +1,3 @@
# Copyright (c) Facebook, Inc. and its affiliates.
# All rights reserved.
# This source code is licensed under the license found in the
# LICENSE file in the root directory of this source tree.
import os import os
import random import random
import numpy as np import numpy as np
@ -108,6 +102,49 @@ class FrameStack(gym.Wrapper):
return np.concatenate(list(self._frames), axis=0) return np.concatenate(list(self._frames), axis=0)
class ActionRepeat:
def __init__(self, env, amount):
self._env = env
self._amount = amount
def __getattr__(self, name):
return getattr(self._env, name)
def step(self, action):
done = False
total_reward = 0
current_step = 0
while current_step < self._amount and not done:
obs, reward, done, info = self._env.step(action)
total_reward += reward
current_step += 1
return obs, total_reward, done, info
class NormalizeActions:
def __init__(self, env):
self._env = env
self._mask = np.logical_and(
np.isfinite(env.action_space.low),
np.isfinite(env.action_space.high))
self._low = np.where(self._mask, env.action_space.low, -1)
self._high = np.where(self._mask, env.action_space.high, 1)
def __getattr__(self, name):
return getattr(self._env, name)
@property
def action_space(self):
low = np.where(self._mask, -np.ones_like(self._low), self._low)
high = np.where(self._mask, np.ones_like(self._low), self._high)
return gym.spaces.Box(low, high, dtype=np.float32)
def step(self, action):
original = (action + 1) / 2 * (self._high - self._low) + self._low
original = np.where(self._mask, original, action)
return self._env.step(original)
class ReplayBuffer: class ReplayBuffer:
def __init__(self, size, obs_shape, action_size, seq_len, batch_size, args): def __init__(self, size, obs_shape, action_size, seq_len, batch_size, args):
self.size = size self.size = size
@ -164,11 +201,11 @@ class ReplayBuffer:
non_zero_indices = np.nonzero(buffer.episode_count)[0] non_zero_indices = np.nonzero(buffer.episode_count)[0]
variable = variable[non_zero_indices] variable = variable[non_zero_indices]
if obs: if obs:
variable = variable.reshape(self.args.batch_size, self.args.episode_length, variable = variable.reshape(-1, self.args.episode_length,
self.args.frame_stack*self.args.channels, self.args.frame_stack*self.args.channels,
self.args.image_size,self.args.image_size).transpose(1, 0, 2, 3, 4) self.args.image_size,self.args.image_size).transpose(1, 0, 2, 3, 4)
else: else:
variable = variable.reshape(self.args.batch_size, self.args.episode_length, -1).transpose(1, 0, 2) variable = variable.reshape(variable.shape[0]//self.args.episode_length, self.args.episode_length, -1).transpose(1, 0, 2)
return variable return variable
def transform_grouped_steps(self, variable): def transform_grouped_steps(self, variable):
@ -177,6 +214,16 @@ class ReplayBuffer:
self.args.image_size,self.args.image_size) self.args.image_size,self.args.image_size)
return variable return variable
def sample_random_idx(self, buffer_length):
random_indices = random.sample(range(0, buffer_length), self.args.batch_size)
return random_indices
def group_and_sample_random_batch(self, buffer, variable_name, device, random_indices, is_obs=True, offset=0):
if offset == 0:
variable_tensor = torch.tensor(self.group_steps(buffer,variable_name, is_obs)).float()[:self.args.episode_length-1].to(device)
else:
variable_tensor = torch.tensor(self.group_steps(buffer,variable_name, is_obs)).float()[offset:].to(device)
return variable_tensor[:,random_indices,:,:,:] if is_obs else variable_tensor[:,random_indices,:]
def make_env(args): def make_env(args):
# For making ground plane transparent, change rgba to (0, 0, 0, 0) in local_dm_control_suite/{domain_name}.xml, # For making ground plane transparent, change rgba to (0, 0, 0, 0) in local_dm_control_suite/{domain_name}.xml,