481 lines
18 KiB
Python
Executable File
481 lines
18 KiB
Python
Executable File
# Copyright 2019 The dm_control Authors.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
# ============================================================================
|
|
|
|
"""Quadruped Domain."""
|
|
|
|
from __future__ import absolute_import
|
|
from __future__ import division
|
|
from __future__ import print_function
|
|
|
|
import collections
|
|
|
|
from dm_control import mujoco
|
|
from dm_control.mujoco.wrapper import mjbindings
|
|
from dm_control.rl import control
|
|
from local_dm_control_suite import base
|
|
from local_dm_control_suite import common
|
|
from dm_control.utils import containers
|
|
from dm_control.utils import rewards
|
|
from dm_control.utils import xml_tools
|
|
|
|
from lxml import etree
|
|
import numpy as np
|
|
from scipy import ndimage
|
|
|
|
enums = mjbindings.enums
|
|
mjlib = mjbindings.mjlib
|
|
|
|
|
|
_DEFAULT_TIME_LIMIT = 20
|
|
_CONTROL_TIMESTEP = .02
|
|
|
|
# Horizontal speeds above which the move reward is 1.
|
|
_RUN_SPEED = 5
|
|
_WALK_SPEED = 0.5
|
|
|
|
# Constants related to terrain generation.
|
|
_HEIGHTFIELD_ID = 0
|
|
_TERRAIN_SMOOTHNESS = 0.15 # 0.0: maximally bumpy; 1.0: completely smooth.
|
|
_TERRAIN_BUMP_SCALE = 2 # Spatial scale of terrain bumps (in meters).
|
|
|
|
# Named model elements.
|
|
_TOES = ['toe_front_left', 'toe_back_left', 'toe_back_right', 'toe_front_right']
|
|
_WALLS = ['wall_px', 'wall_py', 'wall_nx', 'wall_ny']
|
|
|
|
SUITE = containers.TaggedTasks()
|
|
|
|
|
|
def make_model(floor_size=None, terrain=False, rangefinders=False,
|
|
walls_and_ball=False):
|
|
"""Returns the model XML string."""
|
|
xml_string = common.read_model('quadruped.xml')
|
|
parser = etree.XMLParser(remove_blank_text=True)
|
|
mjcf = etree.XML(xml_string, parser)
|
|
|
|
# Set floor size.
|
|
if floor_size is not None:
|
|
floor_geom = mjcf.find('.//geom[@name={!r}]'.format('floor'))
|
|
floor_geom.attrib['size'] = '{} {} .5'.format(floor_size, floor_size)
|
|
|
|
# Remove walls, ball and target.
|
|
if not walls_and_ball:
|
|
for wall in _WALLS:
|
|
wall_geom = xml_tools.find_element(mjcf, 'geom', wall)
|
|
wall_geom.getparent().remove(wall_geom)
|
|
|
|
# Remove ball.
|
|
ball_body = xml_tools.find_element(mjcf, 'body', 'ball')
|
|
ball_body.getparent().remove(ball_body)
|
|
|
|
# Remove target.
|
|
target_site = xml_tools.find_element(mjcf, 'site', 'target')
|
|
target_site.getparent().remove(target_site)
|
|
|
|
# Remove terrain.
|
|
if not terrain:
|
|
terrain_geom = xml_tools.find_element(mjcf, 'geom', 'terrain')
|
|
terrain_geom.getparent().remove(terrain_geom)
|
|
|
|
# Remove rangefinders if they're not used, as range computations can be
|
|
# expensive, especially in a scene with heightfields.
|
|
if not rangefinders:
|
|
rangefinder_sensors = mjcf.findall('.//rangefinder')
|
|
for rf in rangefinder_sensors:
|
|
rf.getparent().remove(rf)
|
|
|
|
return etree.tostring(mjcf, pretty_print=True)
|
|
|
|
|
|
@SUITE.add()
|
|
def walk(time_limit=_DEFAULT_TIME_LIMIT, random=None, environment_kwargs=None):
|
|
"""Returns the Walk task."""
|
|
xml_string = make_model(floor_size=_DEFAULT_TIME_LIMIT * _WALK_SPEED)
|
|
physics = Physics.from_xml_string(xml_string, common.ASSETS)
|
|
task = Move(desired_speed=_WALK_SPEED, random=random)
|
|
environment_kwargs = environment_kwargs or {}
|
|
return control.Environment(physics, task, time_limit=time_limit,
|
|
control_timestep=_CONTROL_TIMESTEP,
|
|
**environment_kwargs)
|
|
|
|
|
|
@SUITE.add()
|
|
def run(time_limit=_DEFAULT_TIME_LIMIT, random=None, environment_kwargs=None):
|
|
"""Returns the Run task."""
|
|
xml_string = make_model(floor_size=_DEFAULT_TIME_LIMIT * _RUN_SPEED)
|
|
physics = Physics.from_xml_string(xml_string, common.ASSETS)
|
|
task = Move(desired_speed=_RUN_SPEED, random=random)
|
|
environment_kwargs = environment_kwargs or {}
|
|
return control.Environment(physics, task, time_limit=time_limit,
|
|
control_timestep=_CONTROL_TIMESTEP,
|
|
**environment_kwargs)
|
|
|
|
|
|
@SUITE.add()
|
|
def escape(time_limit=_DEFAULT_TIME_LIMIT, random=None,
|
|
environment_kwargs=None):
|
|
"""Returns the Escape task."""
|
|
xml_string = make_model(floor_size=40, terrain=True, rangefinders=True)
|
|
physics = Physics.from_xml_string(xml_string, common.ASSETS)
|
|
task = Escape(random=random)
|
|
environment_kwargs = environment_kwargs or {}
|
|
return control.Environment(physics, task, time_limit=time_limit,
|
|
control_timestep=_CONTROL_TIMESTEP,
|
|
**environment_kwargs)
|
|
|
|
|
|
@SUITE.add()
|
|
def fetch(time_limit=_DEFAULT_TIME_LIMIT, random=None, environment_kwargs=None):
|
|
"""Returns the Fetch task."""
|
|
xml_string = make_model(walls_and_ball=True)
|
|
physics = Physics.from_xml_string(xml_string, common.ASSETS)
|
|
task = Fetch(random=random)
|
|
environment_kwargs = environment_kwargs or {}
|
|
return control.Environment(physics, task, time_limit=time_limit,
|
|
control_timestep=_CONTROL_TIMESTEP,
|
|
**environment_kwargs)
|
|
|
|
|
|
class Physics(mujoco.Physics):
|
|
"""Physics simulation with additional features for the Quadruped domain."""
|
|
|
|
def _reload_from_data(self, data):
|
|
super(Physics, self)._reload_from_data(data)
|
|
# Clear cached sensor names when the physics is reloaded.
|
|
self._sensor_types_to_names = {}
|
|
self._hinge_names = []
|
|
|
|
def _get_sensor_names(self, *sensor_types):
|
|
try:
|
|
sensor_names = self._sensor_types_to_names[sensor_types]
|
|
except KeyError:
|
|
[sensor_ids] = np.where(np.in1d(self.model.sensor_type, sensor_types))
|
|
sensor_names = [self.model.id2name(s_id, 'sensor') for s_id in sensor_ids]
|
|
self._sensor_types_to_names[sensor_types] = sensor_names
|
|
return sensor_names
|
|
|
|
def torso_upright(self):
|
|
"""Returns the dot-product of the torso z-axis and the global z-axis."""
|
|
return np.asarray(self.named.data.xmat['torso', 'zz'])
|
|
|
|
def torso_velocity(self):
|
|
"""Returns the velocity of the torso, in the local frame."""
|
|
return self.named.data.sensordata['velocimeter'].copy()
|
|
|
|
def egocentric_state(self):
|
|
"""Returns the state without global orientation or position."""
|
|
if not self._hinge_names:
|
|
[hinge_ids] = np.nonzero(self.model.jnt_type ==
|
|
enums.mjtJoint.mjJNT_HINGE)
|
|
self._hinge_names = [self.model.id2name(j_id, 'joint')
|
|
for j_id in hinge_ids]
|
|
return np.hstack((self.named.data.qpos[self._hinge_names],
|
|
self.named.data.qvel[self._hinge_names],
|
|
self.data.act))
|
|
|
|
def toe_positions(self):
|
|
"""Returns toe positions in egocentric frame."""
|
|
torso_frame = self.named.data.xmat['torso'].reshape(3, 3)
|
|
torso_pos = self.named.data.xpos['torso']
|
|
torso_to_toe = self.named.data.xpos[_TOES] - torso_pos
|
|
return torso_to_toe.dot(torso_frame)
|
|
|
|
def force_torque(self):
|
|
"""Returns scaled force/torque sensor readings at the toes."""
|
|
force_torque_sensors = self._get_sensor_names(enums.mjtSensor.mjSENS_FORCE,
|
|
enums.mjtSensor.mjSENS_TORQUE)
|
|
return np.arcsinh(self.named.data.sensordata[force_torque_sensors])
|
|
|
|
def imu(self):
|
|
"""Returns IMU-like sensor readings."""
|
|
imu_sensors = self._get_sensor_names(enums.mjtSensor.mjSENS_GYRO,
|
|
enums.mjtSensor.mjSENS_ACCELEROMETER)
|
|
return self.named.data.sensordata[imu_sensors]
|
|
|
|
def rangefinder(self):
|
|
"""Returns scaled rangefinder sensor readings."""
|
|
rf_sensors = self._get_sensor_names(enums.mjtSensor.mjSENS_RANGEFINDER)
|
|
rf_readings = self.named.data.sensordata[rf_sensors]
|
|
no_intersection = -1.0
|
|
return np.where(rf_readings == no_intersection, 1.0, np.tanh(rf_readings))
|
|
|
|
def origin_distance(self):
|
|
"""Returns the distance from the origin to the workspace."""
|
|
return np.asarray(np.linalg.norm(self.named.data.site_xpos['workspace']))
|
|
|
|
def origin(self):
|
|
"""Returns origin position in the torso frame."""
|
|
torso_frame = self.named.data.xmat['torso'].reshape(3, 3)
|
|
torso_pos = self.named.data.xpos['torso']
|
|
return -torso_pos.dot(torso_frame)
|
|
|
|
def ball_state(self):
|
|
"""Returns ball position and velocity relative to the torso frame."""
|
|
data = self.named.data
|
|
torso_frame = data.xmat['torso'].reshape(3, 3)
|
|
ball_rel_pos = data.xpos['ball'] - data.xpos['torso']
|
|
ball_rel_vel = data.qvel['ball_root'][:3] - data.qvel['root'][:3]
|
|
ball_rot_vel = data.qvel['ball_root'][3:]
|
|
ball_state = np.vstack((ball_rel_pos, ball_rel_vel, ball_rot_vel))
|
|
return ball_state.dot(torso_frame).ravel()
|
|
|
|
def target_position(self):
|
|
"""Returns target position in torso frame."""
|
|
torso_frame = self.named.data.xmat['torso'].reshape(3, 3)
|
|
torso_pos = self.named.data.xpos['torso']
|
|
torso_to_target = self.named.data.site_xpos['target'] - torso_pos
|
|
return torso_to_target.dot(torso_frame)
|
|
|
|
def ball_to_target_distance(self):
|
|
"""Returns horizontal distance from the ball to the target."""
|
|
ball_to_target = (self.named.data.site_xpos['target'] -
|
|
self.named.data.xpos['ball'])
|
|
return np.linalg.norm(ball_to_target[:2])
|
|
|
|
def self_to_ball_distance(self):
|
|
"""Returns horizontal distance from the quadruped workspace to the ball."""
|
|
self_to_ball = (self.named.data.site_xpos['workspace']
|
|
-self.named.data.xpos['ball'])
|
|
return np.linalg.norm(self_to_ball[:2])
|
|
|
|
|
|
def _find_non_contacting_height(physics, orientation, x_pos=0.0, y_pos=0.0):
|
|
"""Find a height with no contacts given a body orientation.
|
|
|
|
Args:
|
|
physics: An instance of `Physics`.
|
|
orientation: A quaternion.
|
|
x_pos: A float. Position along global x-axis.
|
|
y_pos: A float. Position along global y-axis.
|
|
Raises:
|
|
RuntimeError: If a non-contacting configuration has not been found after
|
|
10,000 attempts.
|
|
"""
|
|
z_pos = 0.0 # Start embedded in the floor.
|
|
num_contacts = 1
|
|
num_attempts = 0
|
|
# Move up in 1cm increments until no contacts.
|
|
while num_contacts > 0:
|
|
try:
|
|
with physics.reset_context():
|
|
physics.named.data.qpos['root'][:3] = x_pos, y_pos, z_pos
|
|
physics.named.data.qpos['root'][3:] = orientation
|
|
except control.PhysicsError:
|
|
# We may encounter a PhysicsError here due to filling the contact
|
|
# buffer, in which case we simply increment the height and continue.
|
|
pass
|
|
num_contacts = physics.data.ncon
|
|
z_pos += 0.01
|
|
num_attempts += 1
|
|
if num_attempts > 10000:
|
|
raise RuntimeError('Failed to find a non-contacting configuration.')
|
|
|
|
|
|
def _common_observations(physics):
|
|
"""Returns the observations common to all tasks."""
|
|
obs = collections.OrderedDict()
|
|
obs['egocentric_state'] = physics.egocentric_state()
|
|
obs['torso_velocity'] = physics.torso_velocity()
|
|
obs['torso_upright'] = physics.torso_upright()
|
|
obs['imu'] = physics.imu()
|
|
obs['force_torque'] = physics.force_torque()
|
|
return obs
|
|
|
|
|
|
def _upright_reward(physics, deviation_angle=0):
|
|
"""Returns a reward proportional to how upright the torso is.
|
|
|
|
Args:
|
|
physics: an instance of `Physics`.
|
|
deviation_angle: A float, in degrees. The reward is 0 when the torso is
|
|
exactly upside-down and 1 when the torso's z-axis is less than
|
|
`deviation_angle` away from the global z-axis.
|
|
"""
|
|
deviation = np.cos(np.deg2rad(deviation_angle))
|
|
return rewards.tolerance(
|
|
physics.torso_upright(),
|
|
bounds=(deviation, float('inf')),
|
|
sigmoid='linear',
|
|
margin=1 + deviation,
|
|
value_at_margin=0)
|
|
|
|
|
|
class Move(base.Task):
|
|
"""A quadruped task solved by moving forward at a designated speed."""
|
|
|
|
def __init__(self, desired_speed, random=None):
|
|
"""Initializes an instance of `Move`.
|
|
|
|
Args:
|
|
desired_speed: A float. If this value is zero, reward is given simply
|
|
for standing upright. Otherwise this specifies the horizontal velocity
|
|
at which the velocity-dependent reward component is maximized.
|
|
random: Optional, either a `numpy.random.RandomState` instance, an
|
|
integer seed for creating a new `RandomState`, or None to select a seed
|
|
automatically (default).
|
|
"""
|
|
self._desired_speed = desired_speed
|
|
super(Move, self).__init__(random=random)
|
|
|
|
def initialize_episode(self, physics):
|
|
"""Sets the state of the environment at the start of each episode.
|
|
|
|
Args:
|
|
physics: An instance of `Physics`.
|
|
|
|
"""
|
|
# Initial configuration.
|
|
orientation = self.random.randn(4)
|
|
orientation /= np.linalg.norm(orientation)
|
|
_find_non_contacting_height(physics, orientation)
|
|
super(Move, self).initialize_episode(physics)
|
|
|
|
def get_observation(self, physics):
|
|
"""Returns an observation to the agent."""
|
|
return _common_observations(physics)
|
|
|
|
def get_reward(self, physics):
|
|
"""Returns a reward to the agent."""
|
|
|
|
# Move reward term.
|
|
move_reward = rewards.tolerance(
|
|
physics.torso_velocity()[0],
|
|
bounds=(self._desired_speed, float('inf')),
|
|
margin=self._desired_speed,
|
|
value_at_margin=0.5,
|
|
sigmoid='linear')
|
|
|
|
return _upright_reward(physics) * move_reward
|
|
|
|
|
|
class Escape(base.Task):
|
|
"""A quadruped task solved by escaping a bowl-shaped terrain."""
|
|
|
|
def initialize_episode(self, physics):
|
|
"""Sets the state of the environment at the start of each episode.
|
|
|
|
Args:
|
|
physics: An instance of `Physics`.
|
|
|
|
"""
|
|
# Get heightfield resolution, assert that it is square.
|
|
res = physics.model.hfield_nrow[_HEIGHTFIELD_ID]
|
|
assert res == physics.model.hfield_ncol[_HEIGHTFIELD_ID]
|
|
# Sinusoidal bowl shape.
|
|
row_grid, col_grid = np.ogrid[-1:1:res*1j, -1:1:res*1j]
|
|
radius = np.clip(np.sqrt(col_grid**2 + row_grid**2), .04, 1)
|
|
bowl_shape = .5 - np.cos(2*np.pi*radius)/2
|
|
# Random smooth bumps.
|
|
terrain_size = 2 * physics.model.hfield_size[_HEIGHTFIELD_ID, 0]
|
|
bump_res = int(terrain_size / _TERRAIN_BUMP_SCALE)
|
|
bumps = self.random.uniform(_TERRAIN_SMOOTHNESS, 1, (bump_res, bump_res))
|
|
smooth_bumps = ndimage.zoom(bumps, res / float(bump_res))
|
|
# Terrain is elementwise product.
|
|
terrain = bowl_shape * smooth_bumps
|
|
start_idx = physics.model.hfield_adr[_HEIGHTFIELD_ID]
|
|
physics.model.hfield_data[start_idx:start_idx+res**2] = terrain.ravel()
|
|
super(Escape, self).initialize_episode(physics)
|
|
|
|
# If we have a rendering context, we need to re-upload the modified
|
|
# heightfield data.
|
|
if physics.contexts:
|
|
with physics.contexts.gl.make_current() as ctx:
|
|
ctx.call(mjlib.mjr_uploadHField,
|
|
physics.model.ptr,
|
|
physics.contexts.mujoco.ptr,
|
|
_HEIGHTFIELD_ID)
|
|
|
|
# Initial configuration.
|
|
orientation = self.random.randn(4)
|
|
orientation /= np.linalg.norm(orientation)
|
|
_find_non_contacting_height(physics, orientation)
|
|
|
|
def get_observation(self, physics):
|
|
"""Returns an observation to the agent."""
|
|
obs = _common_observations(physics)
|
|
obs['origin'] = physics.origin()
|
|
obs['rangefinder'] = physics.rangefinder()
|
|
return obs
|
|
|
|
def get_reward(self, physics):
|
|
"""Returns a reward to the agent."""
|
|
|
|
# Escape reward term.
|
|
terrain_size = physics.model.hfield_size[_HEIGHTFIELD_ID, 0]
|
|
escape_reward = rewards.tolerance(
|
|
physics.origin_distance(),
|
|
bounds=(terrain_size, float('inf')),
|
|
margin=terrain_size,
|
|
value_at_margin=0,
|
|
sigmoid='linear')
|
|
|
|
return _upright_reward(physics, deviation_angle=20) * escape_reward
|
|
|
|
|
|
class Fetch(base.Task):
|
|
"""A quadruped task solved by bringing a ball to the origin."""
|
|
|
|
def initialize_episode(self, physics):
|
|
"""Sets the state of the environment at the start of each episode.
|
|
|
|
Args:
|
|
physics: An instance of `Physics`.
|
|
|
|
"""
|
|
# Initial configuration, random azimuth and horizontal position.
|
|
azimuth = self.random.uniform(0, 2*np.pi)
|
|
orientation = np.array((np.cos(azimuth/2), 0, 0, np.sin(azimuth/2)))
|
|
spawn_radius = 0.9 * physics.named.model.geom_size['floor', 0]
|
|
x_pos, y_pos = self.random.uniform(-spawn_radius, spawn_radius, size=(2,))
|
|
_find_non_contacting_height(physics, orientation, x_pos, y_pos)
|
|
|
|
# Initial ball state.
|
|
physics.named.data.qpos['ball_root'][:2] = self.random.uniform(
|
|
-spawn_radius, spawn_radius, size=(2,))
|
|
physics.named.data.qpos['ball_root'][2] = 2
|
|
physics.named.data.qvel['ball_root'][:2] = 5*self.random.randn(2)
|
|
super(Fetch, self).initialize_episode(physics)
|
|
|
|
def get_observation(self, physics):
|
|
"""Returns an observation to the agent."""
|
|
obs = _common_observations(physics)
|
|
obs['ball_state'] = physics.ball_state()
|
|
obs['target_position'] = physics.target_position()
|
|
return obs
|
|
|
|
def get_reward(self, physics):
|
|
"""Returns a reward to the agent."""
|
|
|
|
# Reward for moving close to the ball.
|
|
arena_radius = physics.named.model.geom_size['floor', 0] * np.sqrt(2)
|
|
workspace_radius = physics.named.model.site_size['workspace', 0]
|
|
ball_radius = physics.named.model.geom_size['ball', 0]
|
|
reach_reward = rewards.tolerance(
|
|
physics.self_to_ball_distance(),
|
|
bounds=(0, workspace_radius+ball_radius),
|
|
sigmoid='linear',
|
|
margin=arena_radius, value_at_margin=0)
|
|
|
|
# Reward for bringing the ball to the target.
|
|
target_radius = physics.named.model.site_size['target', 0]
|
|
fetch_reward = rewards.tolerance(
|
|
physics.ball_to_target_distance(),
|
|
bounds=(0, target_radius),
|
|
sigmoid='linear',
|
|
margin=arena_radius, value_at_margin=0)
|
|
|
|
reach_then_fetch = reach_reward * (0.5 + 0.5*fetch_reward)
|
|
|
|
return _upright_reward(physics) * reach_then_fetch
|