tia/Dreamer/local_dm_control_suite/lqr.py

273 lines
9.7 KiB
Python
Raw Normal View History

2023-07-17 08:48:01 +00:00
# Copyright 2017 The dm_control Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Procedurally generated LQR domain."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import collections
import os
from dm_control import mujoco
from dm_control.rl import control
from local_dm_control_suite import base
from local_dm_control_suite import common
from dm_control.utils import containers
from dm_control.utils import xml_tools
from lxml import etree
import numpy as np
from six.moves import range
from dm_control.utils import io as resources
_DEFAULT_TIME_LIMIT = float('inf')
_CONTROL_COST_COEF = 0.1
SUITE = containers.TaggedTasks()
def get_model_and_assets(n_bodies, n_actuators, random):
"""Returns the model description as an XML string and a dict of assets.
Args:
n_bodies: An int, number of bodies of the LQR.
n_actuators: An int, number of actuated bodies of the LQR. `n_actuators`
should be less or equal than `n_bodies`.
random: A `numpy.random.RandomState` instance.
Returns:
A tuple `(model_xml_string, assets)`, where `assets` is a dict consisting of
`{filename: contents_string}` pairs.
"""
return _make_model(n_bodies, n_actuators, random), common.ASSETS
@SUITE.add()
def lqr_2_1(time_limit=_DEFAULT_TIME_LIMIT, random=None,
environment_kwargs=None):
"""Returns an LQR environment with 2 bodies of which the first is actuated."""
return _make_lqr(n_bodies=2,
n_actuators=1,
control_cost_coef=_CONTROL_COST_COEF,
time_limit=time_limit,
random=random,
environment_kwargs=environment_kwargs)
@SUITE.add()
def lqr_6_2(time_limit=_DEFAULT_TIME_LIMIT, random=None,
environment_kwargs=None):
"""Returns an LQR environment with 6 bodies of which first 2 are actuated."""
return _make_lqr(n_bodies=6,
n_actuators=2,
control_cost_coef=_CONTROL_COST_COEF,
time_limit=time_limit,
random=random,
environment_kwargs=environment_kwargs)
def _make_lqr(n_bodies, n_actuators, control_cost_coef, time_limit, random,
environment_kwargs):
"""Returns a LQR environment.
Args:
n_bodies: An int, number of bodies of the LQR.
n_actuators: An int, number of actuated bodies of the LQR. `n_actuators`
should be less or equal than `n_bodies`.
control_cost_coef: A number, the coefficient of the control cost.
time_limit: An int, maximum time for each episode in seconds.
random: Either an existing `numpy.random.RandomState` instance, an
integer seed for creating a new `RandomState`, or None to select a seed
automatically.
environment_kwargs: A `dict` specifying keyword arguments for the
environment, or None.
Returns:
A LQR environment with `n_bodies` bodies of which first `n_actuators` are
actuated.
"""
if not isinstance(random, np.random.RandomState):
random = np.random.RandomState(random)
model_string, assets = get_model_and_assets(n_bodies, n_actuators,
random=random)
physics = Physics.from_xml_string(model_string, assets=assets)
task = LQRLevel(control_cost_coef, random=random)
environment_kwargs = environment_kwargs or {}
return control.Environment(physics, task, time_limit=time_limit,
**environment_kwargs)
def _make_body(body_id, stiffness_range, damping_range, random):
"""Returns an `etree.Element` defining a body.
Args:
body_id: Id of the created body.
stiffness_range: A tuple of (stiffness_lower_bound, stiffness_uppder_bound).
The stiffness of the joint is drawn uniformly from this range.
damping_range: A tuple of (damping_lower_bound, damping_upper_bound). The
damping of the joint is drawn uniformly from this range.
random: A `numpy.random.RandomState` instance.
Returns:
A new instance of `etree.Element`. A body element with two children: joint
and geom.
"""
body_name = 'body_{}'.format(body_id)
joint_name = 'joint_{}'.format(body_id)
geom_name = 'geom_{}'.format(body_id)
body = etree.Element('body', name=body_name)
body.set('pos', '.25 0 0')
joint = etree.SubElement(body, 'joint', name=joint_name)
body.append(etree.Element('geom', name=geom_name))
joint.set('stiffness',
str(random.uniform(stiffness_range[0], stiffness_range[1])))
joint.set('damping',
str(random.uniform(damping_range[0], damping_range[1])))
return body
def _make_model(n_bodies,
n_actuators,
random,
stiffness_range=(15, 25),
damping_range=(0, 0)):
"""Returns an MJCF XML string defining a model of springs and dampers.
Args:
n_bodies: An integer, the number of bodies (DoFs) in the system.
n_actuators: An integer, the number of actuated bodies.
random: A `numpy.random.RandomState` instance.
stiffness_range: A tuple containing minimum and maximum stiffness. Each
joint's stiffness is sampled uniformly from this interval.
damping_range: A tuple containing minimum and maximum damping. Each joint's
damping is sampled uniformly from this interval.
Returns:
An MJCF string describing the linear system.
Raises:
ValueError: If the number of bodies or actuators is erronous.
"""
if n_bodies < 1 or n_actuators < 1:
raise ValueError('At least 1 body and 1 actuator required.')
if n_actuators > n_bodies:
raise ValueError('At most 1 actuator per body.')
file_path = os.path.join(os.path.dirname(__file__), 'lqr.xml')
with resources.GetResourceAsFile(file_path) as xml_file:
mjcf = xml_tools.parse(xml_file)
parent = mjcf.find('./worldbody')
actuator = etree.SubElement(mjcf.getroot(), 'actuator')
tendon = etree.SubElement(mjcf.getroot(), 'tendon')
for body in range(n_bodies):
# Inserting body.
child = _make_body(body, stiffness_range, damping_range, random)
site_name = 'site_{}'.format(body)
child.append(etree.Element('site', name=site_name))
if body == 0:
child.set('pos', '.25 0 .1')
# Add actuators to the first n_actuators bodies.
if body < n_actuators:
# Adding actuator.
joint_name = 'joint_{}'.format(body)
motor_name = 'motor_{}'.format(body)
child.find('joint').set('name', joint_name)
actuator.append(etree.Element('motor', name=motor_name, joint=joint_name))
# Add a tendon between consecutive bodies (for visualisation purposes only).
if body < n_bodies - 1:
child_site_name = 'site_{}'.format(body + 1)
tendon_name = 'tendon_{}'.format(body)
spatial = etree.SubElement(tendon, 'spatial', name=tendon_name)
spatial.append(etree.Element('site', site=site_name))
spatial.append(etree.Element('site', site=child_site_name))
parent.append(child)
parent = child
return etree.tostring(mjcf, pretty_print=True)
class Physics(mujoco.Physics):
"""Physics simulation with additional features for the LQR domain."""
def state_norm(self):
"""Returns the norm of the physics state."""
return np.linalg.norm(self.state())
class LQRLevel(base.Task):
"""A Linear Quadratic Regulator `Task`."""
_TERMINAL_TOL = 1e-6
def __init__(self, control_cost_coef, random=None):
"""Initializes an LQR level with cost = sum(states^2) + c*sum(controls^2).
Args:
control_cost_coef: The coefficient of the control cost.
random: Optional, either a `numpy.random.RandomState` instance, an
integer seed for creating a new `RandomState`, or None to select a seed
automatically (default).
Raises:
ValueError: If the control cost coefficient is not positive.
"""
if control_cost_coef <= 0:
raise ValueError('control_cost_coef must be positive.')
self._control_cost_coef = control_cost_coef
super(LQRLevel, self).__init__(random=random)
@property
def control_cost_coef(self):
return self._control_cost_coef
def initialize_episode(self, physics):
"""Random state sampled from a unit sphere."""
ndof = physics.model.nq
unit = self.random.randn(ndof)
physics.data.qpos[:] = np.sqrt(2) * unit / np.linalg.norm(unit)
super(LQRLevel, self).initialize_episode(physics)
def get_observation(self, physics):
"""Returns an observation of the state."""
obs = collections.OrderedDict()
obs['position'] = physics.position()
obs['velocity'] = physics.velocity()
return obs
def get_reward(self, physics):
"""Returns a quadratic state and control reward."""
position = physics.position()
state_cost = 0.5 * np.dot(position, position)
control_signal = physics.control()
control_l2_norm = 0.5 * np.dot(control_signal, control_signal)
return 1 - (state_cost + control_l2_norm * self._control_cost_coef)
def get_evaluation(self, physics):
"""Returns a sparse evaluation reward that is not used for learning."""
return float(physics.state_norm() <= 0.01)
def get_termination(self, physics):
"""Terminates when the state norm is smaller than epsilon."""
if physics.state_norm() < self._TERMINAL_TOL:
return 0.0