191 lines
5.9 KiB
Python
191 lines
5.9 KiB
Python
|
import numpy as np
|
||
|
from collections import deque
|
||
|
import gym
|
||
|
import gym_super_mario_bros
|
||
|
from nes_py.wrappers import JoypadSpace
|
||
|
from gym_super_mario_bros.actions import SIMPLE_MOVEMENT, COMPLEX_MOVEMENT, RIGHT_ONLY
|
||
|
SIMPLE_MOVEMENT = SIMPLE_MOVEMENT[1:]
|
||
|
from gym import spaces
|
||
|
from PIL import Image
|
||
|
import cv2
|
||
|
|
||
|
PALETTE_ACTIONS = [
|
||
|
['up'],
|
||
|
['down'],
|
||
|
['left'],
|
||
|
['left', 'A'],
|
||
|
['left', 'B'],
|
||
|
['left', 'A', 'B'],
|
||
|
['right'],
|
||
|
['right', 'A'],
|
||
|
['right', 'B'],
|
||
|
['right', 'A', 'B'],
|
||
|
['A'],
|
||
|
['B'],
|
||
|
['A', 'B']
|
||
|
]
|
||
|
def _process_frame_mario(frame):
|
||
|
if frame is not None: # for future meta implementation
|
||
|
img = np.reshape(frame, [240, 256, 3]).astype(np.float32)
|
||
|
img = img[:, :, 0] * 0.299 + img[:, :, 1] * 0.587 + img[:, :, 2] * 0.114
|
||
|
x_t = cv2.resize(img, (84, 84))
|
||
|
x_t = np.reshape(x_t, [1, 84, 84])/255.0
|
||
|
#x_t.astype(np.uint8)
|
||
|
|
||
|
else:
|
||
|
x_t = np.zeros((1, 84, 84))
|
||
|
return x_t
|
||
|
|
||
|
|
||
|
|
||
|
class ProcessFrameMario(gym.Wrapper):
|
||
|
def __init__(self, env=None, reward_type=None):
|
||
|
super(ProcessFrameMario, self).__init__(env)
|
||
|
self.observation_space = gym.spaces.Box(low=0, high=255, shape=(1, 84, 84), dtype=np.uint8)
|
||
|
self.prev_time = 400
|
||
|
self.prev_stat = 0
|
||
|
self.prev_score = 0
|
||
|
self.prev_dist = 40
|
||
|
self.reward_type = reward_type
|
||
|
self.milestones = [i for i in range(150,3150,150)]
|
||
|
self.counter = 0
|
||
|
|
||
|
def step(self, action):
|
||
|
'''
|
||
|
Implementing custom rewards
|
||
|
Time = -0.1
|
||
|
Distance = +1 or 0
|
||
|
Player Status = +/- 5
|
||
|
Score = 2.5 x [Increase in Score]
|
||
|
Done = +50 [Game Completed] or -50 [Game Incomplete]
|
||
|
'''
|
||
|
obs, _, done, info = self.env.step(action)
|
||
|
|
||
|
if self.reward_type == 'sparse':
|
||
|
reward = 0
|
||
|
if (self.counter < len(self.milestones)) and (info['x_pos'] > self.milestones[self.counter]) :
|
||
|
reward = 10
|
||
|
self.counter = self.counter + 1
|
||
|
|
||
|
if done :
|
||
|
if info['flag_get'] :
|
||
|
reward = 50
|
||
|
else:
|
||
|
reward = -10
|
||
|
|
||
|
elif self.reward_type == 'dense':
|
||
|
|
||
|
reward = max(min((info['x_pos'] - self.prev_dist - 0.05), 2), -2)
|
||
|
self.prev_dist = info['x_pos']
|
||
|
|
||
|
reward += (self.prev_time - info['time']) * -0.1
|
||
|
self.prev_time = info['time']
|
||
|
|
||
|
reward += (int(info['status']!='small') - self.prev_stat) * 5
|
||
|
self.prev_stat = int(info['status']!='small')
|
||
|
|
||
|
reward += (info['score'] - self.prev_score) * 0.025
|
||
|
self.prev_score = info['score']
|
||
|
|
||
|
if done:
|
||
|
if info['flag_get'] :
|
||
|
reward += 500
|
||
|
else:
|
||
|
reward -= 50
|
||
|
|
||
|
else : return None
|
||
|
|
||
|
return _process_frame_mario(obs), reward/10, done, info
|
||
|
|
||
|
def reset(self):
|
||
|
self.prev_time = 400
|
||
|
self.prev_stat = 0
|
||
|
self.prev_score = 0
|
||
|
self.prev_dist = 40
|
||
|
self.counter = 0
|
||
|
return _process_frame_mario(self.env.reset())
|
||
|
|
||
|
def change_level(self, level):
|
||
|
self.env.change_level(level)
|
||
|
|
||
|
|
||
|
class BufferSkipFrames(gym.Wrapper):
|
||
|
def __init__(self, env=None, skip=4, shape=(84, 84)):
|
||
|
super(BufferSkipFrames, self).__init__(env)
|
||
|
self.counter = 0
|
||
|
self.observation_space = gym.spaces.Box(low=0, high=255, shape=(4, 84, 84), dtype=np.uint8)
|
||
|
self.skip = skip
|
||
|
self.buffer = deque(maxlen=self.skip)
|
||
|
|
||
|
def step(self, action):
|
||
|
obs, reward, done, info = self.env.step(action)
|
||
|
counter = 1
|
||
|
total_reward = reward
|
||
|
self.buffer.append(obs)
|
||
|
|
||
|
for i in range(self.skip - 1):
|
||
|
if not done:
|
||
|
obs, reward, done, info = self.env.step(action)
|
||
|
total_reward += reward
|
||
|
counter +=1
|
||
|
self.buffer.append(obs)
|
||
|
else:
|
||
|
self.buffer.append(obs)
|
||
|
|
||
|
frame = np.stack(self.buffer, axis=0)
|
||
|
frame = np.reshape(frame, (4, 84, 84))
|
||
|
return frame, total_reward, done, info
|
||
|
|
||
|
def reset(self):
|
||
|
self.buffer.clear()
|
||
|
obs = self.env.reset()
|
||
|
for i in range(self.skip):
|
||
|
self.buffer.append(obs)
|
||
|
|
||
|
frame = np.stack(self.buffer, axis=0)
|
||
|
frame = np.reshape(frame, (4, 84, 84))
|
||
|
return frame
|
||
|
|
||
|
def change_level(self, level):
|
||
|
self.env.change_level(level)
|
||
|
|
||
|
|
||
|
class NormalizedEnv(gym.ObservationWrapper):
|
||
|
def __init__(self, env=None):
|
||
|
super(NormalizedEnv, self).__init__(env)
|
||
|
self.state_mean = 0
|
||
|
self.state_std = 0
|
||
|
self.alpha = 0.9999
|
||
|
self.num_steps = 0
|
||
|
|
||
|
def observation(self, observation):
|
||
|
if observation is not None: # for future meta implementation
|
||
|
self.num_steps += 1
|
||
|
self.state_mean = self.state_mean * self.alpha + \
|
||
|
observation.mean() * (1 - self.alpha)
|
||
|
self.state_std = self.state_std * self.alpha + \
|
||
|
observation.std() * (1 - self.alpha)
|
||
|
|
||
|
unbiased_mean = self.state_mean / (1 - pow(self.alpha, self.num_steps))
|
||
|
unbiased_std = self.state_std / (1 - pow(self.alpha, self.num_steps))
|
||
|
|
||
|
return (observation - unbiased_mean) / (unbiased_std + 1e-8)
|
||
|
|
||
|
else:
|
||
|
return observation
|
||
|
|
||
|
def change_level(self, level):
|
||
|
self.env.change_level(level)
|
||
|
|
||
|
def wrap_mario(env, reward_type):
|
||
|
# assert 'SuperMarioBros' in env.spec.id
|
||
|
env = ProcessFrameMario(env, reward_type)
|
||
|
env = NormalizedEnv(env)
|
||
|
env = BufferSkipFrames(env)
|
||
|
return env
|
||
|
|
||
|
def create_mario_env(env_id, reward_type):
|
||
|
env = gym_super_mario_bros.make(env_id)
|
||
|
env = JoypadSpace(env, PALETTE_ACTIONS)
|
||
|
env = wrap_mario(env, reward_type)
|
||
|
return env
|