bug fixing iml_experiment_node.py

This commit is contained in:
Niko Feith 2023-09-14 22:28:01 +02:00
parent a6945ecf92
commit a367ee2d98
4 changed files with 658 additions and 59 deletions

View File

@ -0,0 +1,569 @@
import numpy as np
import matplotlib.pyplot as plt
from abc import ABC, abstractmethod
import scipy.interpolate as sciip
from sympy import Symbol, diff, exp
class CanonicalSystem:
def __init__(self, dt, dmp_type='discrete', **kwargs):
self._dt = dt
self._x = 1.0
# get kwargs
self._a_x = kwargs.get('a_x', 4.0)
self._tau = kwargs.get('tau', 1.0)
self._type = dmp_type
if self._type == 'discrete':
self._step = self.discrete_time_step
self._run_time = 1.0 * self._tau
elif self._type == 'rhythmic':
self._step = self.rhythmic_time_step
self._run_time = 2 * np.pi * self._tau
else:
raise ValueError('Pattern has to be either discrete or rhythmic')
self._time_steps = int(self._run_time / self._dt)
self._time = np.zeros(self.time_steps)
self._x_trajectory = np.empty(self.time_steps)
self._error_coupling = kwargs.get('error_coupling', np.ones(self.time_steps))
self.roll_out()
# region CanonicalSystem methods
def reset_state(self):
self._x = 1.0
self._time = np.zeros(self.time_steps)
def discrete_time_step(self, error_coupling=1.0, time_index=0):
self._x *= np.exp((-self._a_x * error_coupling / self._tau) * self._dt)
# self._x += (-self._a_x * self._x * error_coupling) / self._tau * self._dt
if time_index == 0:
self._time[time_index] = 0
elif time_index != 0:
self._time[time_index] = self.time[time_index - 1] + self._dt
return self.x
def rhythmic_time_step(self, error_coupling=1.0, time_index=0):
self._x += self._dt * error_coupling / self._tau
if time_index == 0:
self._time[time_index] = 0
elif time_index != 0:
self._time[time_index] = self.time[time_index - 1] + self._dt
return self.x
def roll_out(self):
self.reset_state()
for i in range(self.time_steps):
self._x_trajectory[i] = self.x
self._step(self._error_coupling[i], i)
return self._x_trajectory
# endregion
# region Getter Setter
@property
def x(self):
return self._x
@property
def x_trajectory(self):
return self._x_trajectory
@property
def time(self):
return self._time
@property
def step(self):
return self._step
@property
def time_steps(self):
return self._time_steps
@property
def run_time(self):
return self._run_time
@property
def a_x(self):
return self._a_x
# endregion
class DMP(ABC):
def __init__(self, nr_dmps, nr_bfs, dt=0.01, **kwargs):
self._nr_dmps = nr_dmps
self._nr_bfs = nr_bfs
self._dt = dt
self._vector_size = (1, self.nr_dmps)
# get start and goal points
self._y0 = kwargs.get('y0', np.zeros(self._vector_size))
self._g = kwargs.get('goal', np.ones(self._vector_size))
self._gdy = kwargs.get('goal_dy', np.zeros(self._vector_size))
# weights generation
self._w_gen = kwargs.get('w_gen', 'zeros')
self._w = kwargs.get('w', self.reset_weigth())
# get important params
self._a_z = kwargs.get('a_z', 25 * np.ones(self._vector_size))
self._b_z = kwargs.get('b_z', self._a_z / 4)
self._tau = kwargs.get('tau', 1.0)
# initialize canonical system
_a_x = float(self._a_z[:, 0] / 3)
self._cs = CanonicalSystem(dt=self._dt, a_x=_a_x, **kwargs)
self._time_steps = self.cs.time_steps
# initialize state vectors of
self._y = self._y0.copy()
self._dy = np.zeros(self._vector_size)
self._ddy = np.zeros(self._vector_size)
# check dimensions and offset
self._dimension_checker()
self._offset_checker()
# imitation learning
self.y_des = None
# region DMP methods
def _generate_start(self, y_des):
_start = np.zeros(self._vector_size)
for dim in range(self.nr_dmps):
_start[:, dim] = y_des[0, dim]
return _start
def reset_weigth(self):
self.random_gen = np.random.default_rng()
if self._w_gen == 'zeros':
_w = np.zeros((self.nr_dmps, self.nr_bfs))
elif self._w_gen == 'random':
_w = 200 * self.random_gen.random((self.nr_dmps, self.nr_bfs)) - 100
else:
raise ValueError('weight generations can be zero or random')
self._w = _w
return self._w
def reset_states(self):
self.cs.reset_state()
self._y = self._y0.copy()
self._dy = np.zeros(self._vector_size)
self._ddy = np.zeros(self._vector_size)
def _dimension_checker(self):
if self.y0.shape[1] != self._nr_dmps or self.y0.shape[0] != 1:
raise ValueError('y0 needs the shape [nr_dmps, 1]')
if self.g.shape[1] != self._nr_dmps or self.y0.shape[0] != 1:
raise ValueError('g needs the shape [nr_dmps, 1]')
def _offset_checker(self):
for i in range(self._nr_dmps):
if abs(self.y0[:, i] - self.g[:, i]) < 1e-4:
self.g[i] += 1e-4
def step(self, error=0.0, spatial_coupling=None, time_index=0):
# step in canonical system
_error_coupling = 1.0 / (1.0 + error)
_x = self.cs.step(error_coupling=_error_coupling, time_index=time_index)
# initialise basis function
_psi, _sum_psi = self._generate_psi(_x)
for dim in range(self.nr_dmps):
f = self._generate_front_term(_x, dim) * (np.dot(_psi, self._w[dim, :]))
f /= _sum_psi
# self._ddy[:, dim] = self._a_z[:, dim] * \
# ((self._b_z[:, dim] * (self.g[:, dim] - self._y[:, dim])) -
# (-self.gdy[:, dim] + self._dy[:, dim])) + f
self._ddy[:, dim] = self._a_z[:, dim] * \
((self._b_z[:, dim] * (self.g[:, dim] - self._y[:, dim])) - self._dy[:, dim]) + f
if spatial_coupling is not None:
self._ddy[:, dim] += spatial_coupling[dim]
self._dy[:, dim] += self._ddy[:, dim] / self._tau * self._dt * _error_coupling
self._y[:, dim] += self._dy[:, dim] * self._dt / self._tau * _error_coupling
return self._y, self._dy, self._ddy
def roll_out(self, **kwargs):
self.reset_states()
_y_trajectory = np.zeros((self._time_steps, self.nr_dmps))
_dy_trajectory = np.zeros((self._time_steps, self.nr_dmps))
_ddy_trajectory = np.zeros((self._time_steps, self.nr_dmps))
for t in range(self.cs.time_steps):
_y_trajectory[t, :], _dy_trajectory[t, :], _ddy_trajectory[t, :] = self.step(time_index=t, **kwargs)
return _y_trajectory, _dy_trajectory, _ddy_trajectory
def _interpolate_path(self, y_des):
_path = np.zeros((self._time_steps, self.nr_dmps))
_x = np.linspace(0, self.cs.run_time, y_des.shape[0])
for dim in range(self.nr_dmps):
_path_generation = sciip.interp1d(_x, y_des[:, dim])
for t in range(self._time_steps):
_path[t, dim] = _path_generation(t * self._dt)
return _path
def imitation_learning(self, y_des):
if y_des.shape[1] != self.nr_dmps:
raise ValueError('y_des needs the shape [nr_dmps, selectable]!')
if y_des.ndim == 1:
y_des = y_des.reshape(self._vector_size)
self._y0 = self._generate_start(y_des)
self._g = self._generate_goal(y_des)
_y_des = self._interpolate_path(y_des)
_dy_des = np.gradient(_y_des, axis=0) / self._dt
_ddy_des = np.gradient(_dy_des, axis=0) / self._dt
self.y_des = _y_des.copy()
_f_target = np.zeros((self._time_steps, self.nr_dmps))
for dim in range(self.nr_dmps):
_f_target[:, dim] = self._tau ** 2 * _ddy_des[:, dim] - \
self._a_z[:, dim] * (self._b_z[:, dim] *
(self.g[:, dim] - _y_des[:, dim]) - self._tau * _dy_des[:, dim])
self._generate_weights(_f_target)
# endregion
# region abstract methods
@abstractmethod
def _generate_front_term(self, x, dmp_index):
pass
@abstractmethod
def _generate_goal(self, y_des):
pass
@abstractmethod
def _generate_psi(self, x):
pass
@abstractmethod
def _generate_weights(self, f_target):
pass
# endregion
# region Getter and Setter
@property
def y0(self):
return self._y0
@property
def g(self):
return self._g
@property
def nr_bfs(self):
return self._nr_bfs
@property
def nr_dmps(self):
return self._nr_dmps
@property
def cs(self):
return self._cs
@property
def w(self):
return self._w
@property
def gdy(self):
return self._gdy
@w.setter
def w(self, value):
if value.shape == (self.nr_dmps, self.nr_bfs):
self._w = value
elif value.shape == (self.nr_dmps * self.nr_bfs, None) or (self.nr_dmps * self.nr_bfs, 1):
value = np.reshape(value, (self.nr_dmps, self.nr_bfs))
self._w = value
else:
raise ValueError('w needs shape [self.nr_dmps, self.nr_bfs] or [self.nr_dmps * self.nr_bfs, None]')
# endregion
class DmpDiscrete(DMP):
def __init__(self, **kwargs):
super(DmpDiscrete, self).__init__(pattern='discrete', **kwargs)
# discrete dmp initialization
self._c = np.zeros((self.nr_bfs, 1))
self._h = np.zeros((self.nr_bfs, 1))
self._generate_basis_function_parameters()
# imitation learning
self._regression_type = kwargs.get('regression_type', 'Schaal')
self._imitation_type = kwargs.get('imitation_type', 'eye')
self._reg_lambda = kwargs.get('reg_lambda', 1e-12)
# psi
self._psi, _ = self._generate_psi(self.cs.roll_out())
def _generate_basis_function_parameters(self):
for i in range(1, self.nr_bfs + 1):
_des_c = (i - 1) / (self.nr_bfs - 1)
self._c[i - 1] = np.exp(-self.cs.a_x * _des_c)
for i in range(self.nr_bfs):
if i != self.nr_bfs - 1:
self._h[i] = (self.c[i + 1] - self.c[i]) ** (-2)
else:
self._h[i] = self._h[i - 1]
def _generate_front_term(self, x, dmp_index=None):
if dmp_index is not None:
_s = x * (self.g[:, dmp_index] - self.y0[:, dmp_index])
else:
_s = np.zeros((self.cs.time_steps, self.nr_dmps))
for dim in range(self.nr_dmps):
_s[:, dim] = x * (self.g[:, dim] - self.y0[:, dim])
return _s
def _generate_goal(self, y_des):
_goal = np.ones(self._vector_size)
for dim in range(self.nr_dmps):
_goal[:, dim] = y_des[-1, dim]
return _goal
def _generate_psi(self, x):
_psi = (np.exp(-self.h * (x - self.c) ** 2)).T
if x.shape == ():
_sum_psi = np.sum(_psi)
return _psi, _sum_psi
elif x.shape[0] == self.cs.time_steps:
_sum_psi = np.sum(_psi, axis=1)
return _psi, _sum_psi
def generate_psi_3rd_derivative(self, x):
_, _psi_sum = self._generate_psi(x)
_x = Symbol('x')
_h = Symbol('h')
_c = Symbol('c')
_g = Symbol('g')
_y = Symbol('y')
_psi = exp(-_h * (_x - _c) ** 2)
_s = _x * (_g - _y)
func = _psi * _s
_psi_dev = np.zeros((self.cs.time_steps, self.nr_bfs, self.nr_dmps))
for dim in range(self.nr_dmps):
for bf in range(self.nr_bfs):
for idx, t in enumerate(x):
psi_eval = func.evalf(subs={_h: float(self.h[bf, :]),
_c: float(self._c[bf, :]),
_g: float(self.g[:, dim]),
_y: float(self.y0[:, dim])})
psi_diff = diff(psi_eval, _x, 3)
_psi_dev[idx, bf, dim] = psi_diff.evalf(subs={_x: t}) / _psi_sum[idx]
return _psi_dev
def _generate_weights(self, f_target):
_x_trajectory = self.cs.roll_out()
_psi, _sum_psi = self._generate_psi(_x_trajectory)
_s = self._generate_front_term(_x_trajectory)
_sT = _s.T
if self._regression_type == 'Schaal':
for dim in range(self.nr_dmps):
_k = self.g[:, dim] - self.y0[:, dim]
for bf in range(self.nr_bfs):
self._w[dim, bf] = np.dot(np.dot(_sT[dim, :], np.diag(_psi[:, bf])), f_target[:, dim]) / \
(np.dot(np.dot(_sT[dim, :], np.diag(_psi[:, bf])), _s[:, dim]))
self._w = np.nan_to_num(self._w)
elif self._regression_type == 'RidgeRegression':
_regression_matrix = np.zeros((self.nr_bfs, self.nr_bfs, self.nr_dmps))
if self._imitation_type == 'eye':
for dim in range(self.nr_dmps):
_regression_matrix[:, :, dim] = np.eye(self.nr_bfs)
elif self._imitation_type == 'jerk':
_Gamma = self.generate_psi_3rd_derivative(_x_trajectory)
for dim in range(self.nr_dmps):
_regression_matrix[:, :, dim] = _Gamma[:, :, dim].T @ _Gamma[:, :, dim]
else:
raise ValueError('Imitation_type can either be eye or jerk')
_psi_new = np.zeros((_psi.shape[0], _psi.shape[1], self.nr_dmps))
for dim in range(self.nr_dmps):
for bf in range(self.nr_bfs):
_psi_new[:, bf, dim] = _psi[:, bf] / _sum_psi * _s[:, dim]
for dim in range(self.nr_dmps):
_matrix = np.linalg.inv(_psi_new[:, :, dim].T @ _psi_new[:, :, dim] + \
self._reg_lambda * _regression_matrix[:, :, dim]) @ _psi_new[:, :, dim].T
self._w[dim, :] = np.dot(_matrix, f_target[:, dim])
self._w = np.nan_to_num(self._w)
# region Getter and Setter
@property
def c(self):
return self._c
@property
def h(self):
return self._h
@property
def psi(self):
return self._psi
# endregion
if __name__ == '__main__':
# # test imitation of path run
# plt.figure(2, figsize=(6, 4))
# n_bfs = [100]
#
# # a straight line to target
# path1 = np.sin(np.arange(0, 1, 0.01) * 5)
# # a strange path to target
# path2 = np.zeros(path1.shape)
# path2[int(len(path2) / 2.0):] = 0.5
#
# for ii, bfs in enumerate(n_bfs):
# dmp = DmpDiscrete(nr_dmps=2, nr_bfs=bfs, dt=0.001, regression_type='RidgeRegression', reg_lambda=1e-5)
#
# dmp.imitation_learning(y_des=np.array([path1, path2]).T)
# # change the scale of the movement
# dmp.g[0, 0] = 3
# dmp.g[0, 1] = 2
#
# y_track, dy_track, ddy_track = dmp.roll_out()
#
# plt.figure(2)
# plt.subplot(211)
# plt.plot(y_track[:, 0], lw=2)
# plt.subplot(212)
# plt.plot(y_track[:, 1], lw=2)
#
# plt.subplot(211)
# a = plt.plot(dmp.y_des[:, 0] / path1[-1] * dmp.g[:, 0], "r--", lw=2)
# plt.title("DMP imitate path")
# plt.xlabel("time (ms)")
# plt.ylabel("system trajectory")
# plt.legend([a[0]], ["desired path"], loc="lower right")
# plt.subplot(212)
# b = plt.plot(dmp.y_des[:, 1] / path2[-1] * dmp.g[:, 1], "r--", lw=2)
# plt.title("DMP imitate path")
# plt.xlabel("time (ms)")
# plt.ylabel("system trajectory")
# plt.legend(["%i BFs" % i for i in n_bfs], loc="lower right")
#
# dmp.cs.roll_out()
# plt.figure(3)
# plt.subplot(211)
# for i in range(dmp.nr_bfs):
# plt.plot(dmp.cs.time, dmp.psi[:, i])
#
# plt.subplot(212)
# plt.plot(dmp.cs.time, dmp.cs.x_trajectory)
# for i in range(dmp.nr_bfs):
# plt.axvline(x=dmp.c[i], color='b', label='axvline - full height')
#
# f1 = np.zeros((dmp.psi.shape[1], dmp.psi.shape[0]))
# f2 = np.zeros((dmp.psi.shape[1], dmp.psi.shape[0]))
#
# for i in range(dmp.nr_bfs):
# f1[i, :] = np.dot(dmp.psi[:, i], dmp.w[0, i])
# f2[i, :] = np.dot(dmp.psi[:, i], dmp.w[1, i])
# plt.figure(4)
# plt.subplot(211)
# for i in range(dmp.nr_bfs):
# plt.plot(dmp.cs.time, f1[i, :])
#
# plt.subplot(212)
# for i in range(dmp.nr_bfs):
# plt.plot(dmp.cs.time, f2[i, :])
#
# plt.tight_layout()
# plt.show()
bfs = 100
dmp = DmpDiscrete(nr_dmps=2, nr_bfs=bfs, dt=0.01, regression_type='RidgeRegression', reg_lambda=0.5 * 1e-5)
# a straight line to target
path1 = np.sin(np.arange(0, 1, 0.01) * 5)
# a strange path to target
path2 = np.zeros(path1.shape)
path2[int(len(path2) / 2.0):] = 0.5
dmp.imitation_learning(y_des=np.array([path1, path2]).T)
# change the scale of the movement
dmp.g[0, 0] = 3
dmp.g[0, 1] = 2
y_track, dy_track, ddy_track = dmp.roll_out()
plt.figure(2)
plt.subplot(211)
plt.plot(y_track[:, 0], lw=2)
plt.subplot(212)
plt.plot(y_track[:, 1], lw=2)
plt.subplot(211)
a = plt.plot(dmp.y_des[:, 0] / path1[-1] * dmp.g[:, 0], "r--", lw=2)
plt.title("DMP imitate path")
plt.xlabel("time (ms)")
plt.ylabel("system trajectory")
plt.legend([a[0]], ["desired path"], loc="lower right")
plt.subplot(212)
b = plt.plot(dmp.y_des[:, 1] / path2[-1] * dmp.g[:, 1], "r--", lw=2)
plt.title("DMP imitate path")
plt.xlabel("time (ms)")
plt.ylabel("system trajectory")
plt.legend(["%i BFs" % i for i in range(bfs)], loc="lower right")
plt.show()

View File

@ -10,8 +10,7 @@ from active_bo_msgs.msg import ActiveRLResponse
from active_bo_msgs.msg import ActiveRLEvalRequest
from active_bo_msgs.msg import ActiveRLEvalResponse
import pydmps
import pydmps.dmp_discrete
from franka_iml_experiment.RepresentationModel.DMP import *
import numpy as np
@ -55,21 +54,30 @@ class IML_Experiment(Node):
1,
callback_group=topic_callback_group)
# States
self.eval_pending = False
# Reward constants
self.distance_pen = -0.1
self.accel_pen = -0.1
self.time_pen = -1
self.zone_pen = -20
self.goal_rew = 10
self.time_pen = -1.
self.zone_pen = -20.
self.goal_rew = 10.
# Scene parameters
self.square_center = np.array([0.5, 0.0])
self.square_half_side = 0.1 # half of the side length
self.circle_radius = 0.02
# BO parameters
self.start = np.array([0.5, -0.3])
self.end = np.array([0.5, -0.3])
self.end = np.array([0.5, 0.3])
self.time = 10
self.nr_dim = 2
self.nr_bfs = None
self.nr_steps = None
self.weight_preference = None
self.weight_scaling = 1000
def dmp_callback(self, msg):
start = self.start
@ -77,16 +85,18 @@ class IML_Experiment(Node):
self.nr_bfs = msg.nr_bfs
self.nr_steps = msg.nr_steps
interactive_run = msg.interactive_run
weights = np.vstack((msg.p_x, msg.p_y))
weights = np.vstack((msg.p_x, msg.p_y)) * self.weight_scaling
dmp = pydmps.dmp_discrete.DMPs_discrete(n_dmps=self.nr_dim, n_bfs=self.nr_bfs, w=weights, y0=start, goal=end,
dt=1 / self.nr_steps, )
y_track, _, ddy_track = dmp.rollout(tau=self.time)
weights_zeros = np.zeros_like(weights)
dmp = pydmps.dmp_discrete.DMPs_discrete(n_dmps=self.nr_dim, n_bfs=self.nr_bfs, w=weights_zeros, y0=start, goal=end)
y_track, _, ddy_track = dmp.rollout()
if interactive_run != 0:
reward, not_perform = self.compute_reward(y_track, ddy_track, self.nr_steps)
self.get_logger().info(f'{reward}')
if interactive_run == 1 and not not_perform:
pose_msg = PoseArray()
for i in range(y_track.shape[0]):
@ -94,30 +104,30 @@ class IML_Experiment(Node):
pose.position.x = y_track[i, 0]
pose.position.y = y_track[i, 1]
pose.position.z = 0.15
pose.orientation.x = 1
pose.orientation.y = 0
pose.orientation.z = 0
pose.orientation.w = 0
pose.orientation.x = 1.
pose.orientation.y = 0.
pose.orientation.z = 0.
pose.orientation.w = 0.
pose_msg.poses.append(pose)
self.traj_pub.publish(pose_msg)
# self.exec_pub.publish(Empty())
rl_response = ActiveRLResponse()
rl_response.weights = msg.p_x + msg.p_y
rl_response.reward = reward
rl_response.final_step = self.nr_steps
if self.weight_preference is None:
weight_preference = [False] * len(self.nr_bfs * 2)
else:
weight_preference = self.weight_preference
rl_response = ActiveRLResponse()
rl_response.weights = msg.p_x + msg.p_y
rl_response.reward = reward
rl_response.final_step = self.nr_steps
if self.weight_preference is None:
weight_preference = [False] * self.nr_bfs * 2
else:
weight_preference = self.weight_preference
rl_response.weight_preference = weight_preference
self.active_rl_pub.publish(rl_response)
rl_response.weight_preference = weight_preference
self.active_rl_pub.publish(rl_response)
else:
self.get_logger().info('Interactive Learning started!')
eval_msg = ActiveRLEvalRequest()
eval_msg.weights = msg.p_x + msg.p_y
eval_msg.policy = y_track[:, 0].tolist() + y_track[:, 1] .tolist()
@ -125,46 +135,59 @@ class IML_Experiment(Node):
eval_msg.nr_weights = self.nr_bfs
self.eval_pub.publish(eval_msg)
self.eval_pending = True
def rl_eval_callback(self, msg):
eval_weights = msg.weights.reshape((self.nr_bfs, self.nr_dim), order='F')
self.weight_preference = msg.weight_preference
if self.eval_pending:
self.eval_pending = False
eval_weights = np.array(msg.weights).reshape((self.nr_bfs, self.nr_dim), order='F').T * self.weight_scaling
self.weight_preference = msg.weight_preference
dmp = pydmps.dmp_discrete.DMPs_discrete(n_dmps=self.nr_dim, n_bfs=self.nr_bfs, w=eval_weights, y0=self.start,
goal=self.end, dt=1 / self.nr_steps, )
y_track, _, ddy_track = dmp.rollout(tau=self.time)
dmp = pydmps.dmp_discrete.DMPs_discrete(n_dmps=self.nr_dim, n_bfs=self.nr_bfs, w=eval_weights, y0=self.start,
goal=self.end)
y_track, _, ddy_track = dmp.rollout()
self.get_logger().info('Active RL Eval: Responded!')
self.get_logger().info('Active RL Eval: Responded!')
reward, not_perform = self.compute_reward(y_track, ddy_track, self.nr_steps)
reward, not_perform = self.compute_reward(y_track, ddy_track, self.nr_steps)
self.get_logger().info(f'{reward}')
if not not_perform:
pose_msg = PoseArray()
for i in range(y_track.shape[0]):
pose = Pose()
pose.position.x = y_track[i, 0]
pose.position.y = y_track[i, 1]
pose.position.z = 0.15
pose.orientation.x = 1
pose.orientation.y = 0
pose.orientation.z = 0
pose.orientation.w = 0
pose_msg.poses.append(pose)
if not not_perform:
pose_msg = PoseArray()
for i in range(y_track.shape[0]):
pose = Pose()
pose.position.x = y_track[i, 0]
pose.position.y = y_track[i, 1]
pose.position.z = 0.15
pose.orientation.x = 1.
pose.orientation.y = 0.
pose.orientation.z = 0.
pose.orientation.w = 0.
pose_msg.poses.append(pose)
self.traj_pub.publish(pose_msg)
self.traj_pub.publish(pose_msg)
# self.exec_pub.publish(Empty())
rl_response = ActiveRLResponse()
rl_response.weights = msg.weights
rl_response.reward = reward
rl_response.final_step = self.nr_steps
if self.weight_preference is None:
weight_preference = [False] * self.nr_bfs * 2
else:
weight_preference = self.weight_preference
rl_response.weight_preference = weight_preference
self.active_rl_pub.publish(rl_response)
def compute_reward(self, y_track, ddy_track, time):
square_center = np.array([0.5, 0.0])
square_half_side = 0.1 # half of the side length
circle_center = np.array([0.5, 0.5])
circle_radius = 0.02
# 1. Check if any (x,y) position is within the square
within_square = np.any(
(y_track[:, 0] > square_center[0] - square_half_side) &
(y_track[:, 0] < square_center[0] + square_half_side) &
(y_track[:, 1] > square_center[1] - square_half_side) &
(y_track[:, 1] < square_center[1] + square_half_side)
(y_track[:, 0] > self.square_center[0] - self.square_half_side) &
(y_track[:, 0] < self.square_center[0] + self.square_half_side) &
(y_track[:, 1] > self.square_center[1] - self.square_half_side) &
(y_track[:, 1] < self.square_center[1] + self.square_half_side)
)
# If the robot moves into the forbidden zone it won't perform trajectory and get penalized
@ -174,17 +197,21 @@ class IML_Experiment(Node):
# 2. Check if any (x,y) position is within the circle
t = 0
for i in range(y_track.shape[0]):
distance_to_circle_center = np.linalg.norm(y_track[i, :] - circle_center, axis=1)
if distance_to_circle_center < circle_radius:
self.get_logger().info(f'{y_track[i, :]}')
distance_to_circle_center = np.linalg.norm(y_track[i, :] - self.end)
if distance_to_circle_center < self.circle_radius:
t = i
if t == 0:
t = y_track.shape[0]
y_track = y_track[:t, :]
ddy_track = ddy_track[:t, :]
penalty = t * self.time_pen
# 3. Compute the Euclidean distance from each (x,y) position to the center of the square
distances_to_square_center = np.linalg.norm(y_track - square_center, axis=1)
distances_to_square_center = np.linalg.norm(y_track - self.square_center, axis=1)
total_distance_norm = np.sum(distances_to_square_center)
# 1. Compute the norm of each acceleration and sum them up

View File

@ -7,7 +7,9 @@ package_name = 'franka_iml_experiment'
setup(
name=package_name,
version='0.0.0',
packages=find_packages(exclude=['test']),
packages=[package_name,
package_name + '/RepresentationModel',
],
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
@ -26,6 +28,7 @@ setup(
'dmp_node = franka_iml_experiment.dmp_node:main',
'dmp_test = franka_iml_experiment.dmp_test:main',
'move_square = franka_iml_experiment.move_square:main',
'iml_experiment = franka_iml_experiment.iml_experiment_node:main',
],
},
)