137 lines
5.3 KiB
Python
Executable File
137 lines
5.3 KiB
Python
Executable File
# Copyright 2018 The dm_control Authors.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
# ============================================================================
|
|
|
|
"""Tests for the action noise wrapper."""
|
|
|
|
from __future__ import absolute_import
|
|
from __future__ import division
|
|
from __future__ import print_function
|
|
|
|
# Internal dependencies.
|
|
from absl.testing import absltest
|
|
from absl.testing import parameterized
|
|
from dm_control.rl import control
|
|
from dm_control.suite.wrappers import action_noise
|
|
from dm_env import specs
|
|
import mock
|
|
import numpy as np
|
|
|
|
|
|
class ActionNoiseTest(parameterized.TestCase):
|
|
|
|
def make_action_spec(self, lower=(-1.,), upper=(1.,)):
|
|
lower, upper = np.broadcast_arrays(lower, upper)
|
|
return specs.BoundedArray(
|
|
shape=lower.shape, dtype=float, minimum=lower, maximum=upper)
|
|
|
|
def make_mock_env(self, action_spec=None):
|
|
action_spec = action_spec or self.make_action_spec()
|
|
env = mock.Mock(spec=control.Environment)
|
|
env.action_spec.return_value = action_spec
|
|
return env
|
|
|
|
def assertStepCalledOnceWithCorrectAction(self, env, expected_action):
|
|
# NB: `assert_called_once_with()` doesn't support numpy arrays.
|
|
env.step.assert_called_once()
|
|
actual_action = env.step.call_args_list[0][0][0]
|
|
np.testing.assert_array_equal(expected_action, actual_action)
|
|
|
|
@parameterized.parameters([
|
|
dict(lower=np.r_[-1., 0.], upper=np.r_[1., 2.], scale=0.05),
|
|
dict(lower=np.r_[-1., 0.], upper=np.r_[1., 2.], scale=0.),
|
|
dict(lower=np.r_[-1., 0.], upper=np.r_[-1., 0.], scale=0.05),
|
|
])
|
|
def test_step(self, lower, upper, scale):
|
|
seed = 0
|
|
std = scale * (upper - lower)
|
|
expected_noise = np.random.RandomState(seed).normal(scale=std)
|
|
action = np.random.RandomState(seed).uniform(lower, upper)
|
|
expected_noisy_action = np.clip(action + expected_noise, lower, upper)
|
|
task = mock.Mock(spec=control.Task)
|
|
task.random = np.random.RandomState(seed)
|
|
action_spec = self.make_action_spec(lower=lower, upper=upper)
|
|
env = self.make_mock_env(action_spec=action_spec)
|
|
env.task = task
|
|
wrapped_env = action_noise.Wrapper(env, scale=scale)
|
|
time_step = wrapped_env.step(action)
|
|
self.assertStepCalledOnceWithCorrectAction(env, expected_noisy_action)
|
|
self.assertIs(time_step, env.step(expected_noisy_action))
|
|
|
|
@parameterized.named_parameters([
|
|
dict(testcase_name='within_bounds', action=np.r_[-1.], noise=np.r_[0.1]),
|
|
dict(testcase_name='below_lower', action=np.r_[-1.], noise=np.r_[-0.1]),
|
|
dict(testcase_name='above_upper', action=np.r_[1.], noise=np.r_[0.1]),
|
|
])
|
|
def test_action_clipping(self, action, noise):
|
|
lower = -1.
|
|
upper = 1.
|
|
expected_noisy_action = np.clip(action + noise, lower, upper)
|
|
task = mock.Mock(spec=control.Task)
|
|
task.random = mock.Mock(spec=np.random.RandomState)
|
|
task.random.normal.return_value = noise
|
|
action_spec = self.make_action_spec(lower=lower, upper=upper)
|
|
env = self.make_mock_env(action_spec=action_spec)
|
|
env.task = task
|
|
wrapped_env = action_noise.Wrapper(env)
|
|
time_step = wrapped_env.step(action)
|
|
self.assertStepCalledOnceWithCorrectAction(env, expected_noisy_action)
|
|
self.assertIs(time_step, env.step(expected_noisy_action))
|
|
|
|
@parameterized.parameters([
|
|
dict(lower=np.r_[-1., 0.], upper=np.r_[1., np.inf]),
|
|
dict(lower=np.r_[np.nan, 0.], upper=np.r_[1., 2.]),
|
|
])
|
|
def test_error_if_action_bounds_non_finite(self, lower, upper):
|
|
action_spec = self.make_action_spec(lower=lower, upper=upper)
|
|
env = self.make_mock_env(action_spec=action_spec)
|
|
with self.assertRaisesWithLiteralMatch(
|
|
ValueError,
|
|
action_noise._BOUNDS_MUST_BE_FINITE.format(action_spec=action_spec)):
|
|
_ = action_noise.Wrapper(env)
|
|
|
|
def test_reset(self):
|
|
env = self.make_mock_env()
|
|
wrapped_env = action_noise.Wrapper(env)
|
|
time_step = wrapped_env.reset()
|
|
env.reset.assert_called_once_with()
|
|
self.assertIs(time_step, env.reset())
|
|
|
|
def test_observation_spec(self):
|
|
env = self.make_mock_env()
|
|
wrapped_env = action_noise.Wrapper(env)
|
|
observation_spec = wrapped_env.observation_spec()
|
|
env.observation_spec.assert_called_once_with()
|
|
self.assertIs(observation_spec, env.observation_spec())
|
|
|
|
def test_action_spec(self):
|
|
env = self.make_mock_env()
|
|
wrapped_env = action_noise.Wrapper(env)
|
|
# `env.action_spec()` is called in `Wrapper.__init__()`
|
|
env.action_spec.reset_mock()
|
|
action_spec = wrapped_env.action_spec()
|
|
env.action_spec.assert_called_once_with()
|
|
self.assertIs(action_spec, env.action_spec())
|
|
|
|
@parameterized.parameters(['task', 'physics', 'control_timestep'])
|
|
def test_getattr(self, attribute_name):
|
|
env = self.make_mock_env()
|
|
wrapped_env = action_noise.Wrapper(env)
|
|
attr = getattr(wrapped_env, attribute_name)
|
|
self.assertIs(attr, getattr(env, attribute_name))
|
|
|
|
|
|
if __name__ == '__main__':
|
|
absltest.main()
|