From a367ee2d98bb422f34ebe3819e64084cc7c446c6 Mon Sep 17 00:00:00 2001 From: Niko Date: Thu, 14 Sep 2023 22:28:01 +0200 Subject: [PATCH] bug fixing iml_experiment_node.py --- .../RepresentationModel/DMP.py | 569 ++++++++++++++++++ .../RepresentationModel/__init__.py | 0 franka_iml_experiment/iml_experiment_node.py | 143 +++-- setup.py | 5 +- 4 files changed, 658 insertions(+), 59 deletions(-) create mode 100644 franka_iml_experiment/RepresentationModel/DMP.py create mode 100644 franka_iml_experiment/RepresentationModel/__init__.py diff --git a/franka_iml_experiment/RepresentationModel/DMP.py b/franka_iml_experiment/RepresentationModel/DMP.py new file mode 100644 index 0000000..6e496eb --- /dev/null +++ b/franka_iml_experiment/RepresentationModel/DMP.py @@ -0,0 +1,569 @@ +import numpy as np +import matplotlib.pyplot as plt +from abc import ABC, abstractmethod +import scipy.interpolate as sciip +from sympy import Symbol, diff, exp + + +class CanonicalSystem: + def __init__(self, dt, dmp_type='discrete', **kwargs): + + self._dt = dt + self._x = 1.0 + + # get kwargs + self._a_x = kwargs.get('a_x', 4.0) + self._tau = kwargs.get('tau', 1.0) + + self._type = dmp_type + + if self._type == 'discrete': + self._step = self.discrete_time_step + self._run_time = 1.0 * self._tau + + elif self._type == 'rhythmic': + self._step = self.rhythmic_time_step + self._run_time = 2 * np.pi * self._tau + + else: + raise ValueError('Pattern has to be either discrete or rhythmic') + + self._time_steps = int(self._run_time / self._dt) + self._time = np.zeros(self.time_steps) + + self._x_trajectory = np.empty(self.time_steps) + self._error_coupling = kwargs.get('error_coupling', np.ones(self.time_steps)) + + self.roll_out() + + # region CanonicalSystem methods + def reset_state(self): + self._x = 1.0 + self._time = np.zeros(self.time_steps) + + def discrete_time_step(self, error_coupling=1.0, time_index=0): + self._x *= np.exp((-self._a_x * error_coupling / self._tau) * self._dt) + # self._x += (-self._a_x * self._x * error_coupling) / self._tau * self._dt + + if time_index == 0: + self._time[time_index] = 0 + elif time_index != 0: + self._time[time_index] = self.time[time_index - 1] + self._dt + return self.x + + def rhythmic_time_step(self, error_coupling=1.0, time_index=0): + self._x += self._dt * error_coupling / self._tau + + if time_index == 0: + self._time[time_index] = 0 + elif time_index != 0: + self._time[time_index] = self.time[time_index - 1] + self._dt + return self.x + + def roll_out(self): + self.reset_state() + + for i in range(self.time_steps): + self._x_trajectory[i] = self.x + self._step(self._error_coupling[i], i) + + return self._x_trajectory + + # endregion + + # region Getter Setter + @property + def x(self): + return self._x + + @property + def x_trajectory(self): + return self._x_trajectory + + @property + def time(self): + return self._time + + @property + def step(self): + return self._step + + @property + def time_steps(self): + return self._time_steps + + @property + def run_time(self): + return self._run_time + + @property + def a_x(self): + return self._a_x + # endregion + + +class DMP(ABC): + def __init__(self, nr_dmps, nr_bfs, dt=0.01, **kwargs): + self._nr_dmps = nr_dmps + self._nr_bfs = nr_bfs + self._dt = dt + + self._vector_size = (1, self.nr_dmps) + + # get start and goal points + self._y0 = kwargs.get('y0', np.zeros(self._vector_size)) + self._g = kwargs.get('goal', np.ones(self._vector_size)) + self._gdy = kwargs.get('goal_dy', np.zeros(self._vector_size)) + + # weights generation + self._w_gen = kwargs.get('w_gen', 'zeros') + self._w = kwargs.get('w', self.reset_weigth()) + + # get important params + self._a_z = kwargs.get('a_z', 25 * np.ones(self._vector_size)) + self._b_z = kwargs.get('b_z', self._a_z / 4) + self._tau = kwargs.get('tau', 1.0) + + # initialize canonical system + _a_x = float(self._a_z[:, 0] / 3) + self._cs = CanonicalSystem(dt=self._dt, a_x=_a_x, **kwargs) + self._time_steps = self.cs.time_steps + + # initialize state vectors of + self._y = self._y0.copy() + self._dy = np.zeros(self._vector_size) + self._ddy = np.zeros(self._vector_size) + + # check dimensions and offset + self._dimension_checker() + self._offset_checker() + + # imitation learning + self.y_des = None + + # region DMP methods + def _generate_start(self, y_des): + _start = np.zeros(self._vector_size) + for dim in range(self.nr_dmps): + _start[:, dim] = y_des[0, dim] + + return _start + + def reset_weigth(self): + self.random_gen = np.random.default_rng() + if self._w_gen == 'zeros': + _w = np.zeros((self.nr_dmps, self.nr_bfs)) + + elif self._w_gen == 'random': + _w = 200 * self.random_gen.random((self.nr_dmps, self.nr_bfs)) - 100 + else: + raise ValueError('weight generations can be zero or random') + + self._w = _w + + return self._w + + def reset_states(self): + self.cs.reset_state() + + self._y = self._y0.copy() + self._dy = np.zeros(self._vector_size) + self._ddy = np.zeros(self._vector_size) + + def _dimension_checker(self): + if self.y0.shape[1] != self._nr_dmps or self.y0.shape[0] != 1: + raise ValueError('y0 needs the shape [nr_dmps, 1]') + + if self.g.shape[1] != self._nr_dmps or self.y0.shape[0] != 1: + raise ValueError('g needs the shape [nr_dmps, 1]') + + def _offset_checker(self): + for i in range(self._nr_dmps): + if abs(self.y0[:, i] - self.g[:, i]) < 1e-4: + self.g[i] += 1e-4 + + def step(self, error=0.0, spatial_coupling=None, time_index=0): + # step in canonical system + _error_coupling = 1.0 / (1.0 + error) + _x = self.cs.step(error_coupling=_error_coupling, time_index=time_index) + + # initialise basis function + _psi, _sum_psi = self._generate_psi(_x) + for dim in range(self.nr_dmps): + f = self._generate_front_term(_x, dim) * (np.dot(_psi, self._w[dim, :])) + f /= _sum_psi + + # self._ddy[:, dim] = self._a_z[:, dim] * \ + # ((self._b_z[:, dim] * (self.g[:, dim] - self._y[:, dim])) - + # (-self.gdy[:, dim] + self._dy[:, dim])) + f + + self._ddy[:, dim] = self._a_z[:, dim] * \ + ((self._b_z[:, dim] * (self.g[:, dim] - self._y[:, dim])) - self._dy[:, dim]) + f + + if spatial_coupling is not None: + self._ddy[:, dim] += spatial_coupling[dim] + + self._dy[:, dim] += self._ddy[:, dim] / self._tau * self._dt * _error_coupling + self._y[:, dim] += self._dy[:, dim] * self._dt / self._tau * _error_coupling + + return self._y, self._dy, self._ddy + + def roll_out(self, **kwargs): + self.reset_states() + + _y_trajectory = np.zeros((self._time_steps, self.nr_dmps)) + _dy_trajectory = np.zeros((self._time_steps, self.nr_dmps)) + _ddy_trajectory = np.zeros((self._time_steps, self.nr_dmps)) + + for t in range(self.cs.time_steps): + _y_trajectory[t, :], _dy_trajectory[t, :], _ddy_trajectory[t, :] = self.step(time_index=t, **kwargs) + + return _y_trajectory, _dy_trajectory, _ddy_trajectory + + def _interpolate_path(self, y_des): + _path = np.zeros((self._time_steps, self.nr_dmps)) + _x = np.linspace(0, self.cs.run_time, y_des.shape[0]) + + for dim in range(self.nr_dmps): + _path_generation = sciip.interp1d(_x, y_des[:, dim]) + for t in range(self._time_steps): + _path[t, dim] = _path_generation(t * self._dt) + + return _path + + def imitation_learning(self, y_des): + if y_des.shape[1] != self.nr_dmps: + raise ValueError('y_des needs the shape [nr_dmps, selectable]!') + + if y_des.ndim == 1: + y_des = y_des.reshape(self._vector_size) + + self._y0 = self._generate_start(y_des) + self._g = self._generate_goal(y_des) + + _y_des = self._interpolate_path(y_des) + _dy_des = np.gradient(_y_des, axis=0) / self._dt + _ddy_des = np.gradient(_dy_des, axis=0) / self._dt + + self.y_des = _y_des.copy() + + _f_target = np.zeros((self._time_steps, self.nr_dmps)) + for dim in range(self.nr_dmps): + _f_target[:, dim] = self._tau ** 2 * _ddy_des[:, dim] - \ + self._a_z[:, dim] * (self._b_z[:, dim] * + (self.g[:, dim] - _y_des[:, dim]) - self._tau * _dy_des[:, dim]) + + self._generate_weights(_f_target) + + # endregion + + # region abstract methods + @abstractmethod + def _generate_front_term(self, x, dmp_index): + pass + + @abstractmethod + def _generate_goal(self, y_des): + pass + + @abstractmethod + def _generate_psi(self, x): + pass + + @abstractmethod + def _generate_weights(self, f_target): + pass + + # endregion + + # region Getter and Setter + @property + def y0(self): + return self._y0 + + @property + def g(self): + return self._g + + @property + def nr_bfs(self): + return self._nr_bfs + + @property + def nr_dmps(self): + return self._nr_dmps + + @property + def cs(self): + return self._cs + + @property + def w(self): + return self._w + + @property + def gdy(self): + return self._gdy + + @w.setter + def w(self, value): + if value.shape == (self.nr_dmps, self.nr_bfs): + self._w = value + + elif value.shape == (self.nr_dmps * self.nr_bfs, None) or (self.nr_dmps * self.nr_bfs, 1): + value = np.reshape(value, (self.nr_dmps, self.nr_bfs)) + self._w = value + else: + raise ValueError('w needs shape [self.nr_dmps, self.nr_bfs] or [self.nr_dmps * self.nr_bfs, None]') + # endregion + + +class DmpDiscrete(DMP): + def __init__(self, **kwargs): + super(DmpDiscrete, self).__init__(pattern='discrete', **kwargs) + + # discrete dmp initialization + self._c = np.zeros((self.nr_bfs, 1)) + self._h = np.zeros((self.nr_bfs, 1)) + self._generate_basis_function_parameters() + + # imitation learning + self._regression_type = kwargs.get('regression_type', 'Schaal') + self._imitation_type = kwargs.get('imitation_type', 'eye') + self._reg_lambda = kwargs.get('reg_lambda', 1e-12) + + # psi + self._psi, _ = self._generate_psi(self.cs.roll_out()) + + def _generate_basis_function_parameters(self): + for i in range(1, self.nr_bfs + 1): + _des_c = (i - 1) / (self.nr_bfs - 1) + self._c[i - 1] = np.exp(-self.cs.a_x * _des_c) + + for i in range(self.nr_bfs): + if i != self.nr_bfs - 1: + self._h[i] = (self.c[i + 1] - self.c[i]) ** (-2) + else: + self._h[i] = self._h[i - 1] + + def _generate_front_term(self, x, dmp_index=None): + if dmp_index is not None: + _s = x * (self.g[:, dmp_index] - self.y0[:, dmp_index]) + + else: + _s = np.zeros((self.cs.time_steps, self.nr_dmps)) + for dim in range(self.nr_dmps): + _s[:, dim] = x * (self.g[:, dim] - self.y0[:, dim]) + + return _s + + def _generate_goal(self, y_des): + _goal = np.ones(self._vector_size) + for dim in range(self.nr_dmps): + _goal[:, dim] = y_des[-1, dim] + + return _goal + + def _generate_psi(self, x): + _psi = (np.exp(-self.h * (x - self.c) ** 2)).T + + if x.shape == (): + _sum_psi = np.sum(_psi) + return _psi, _sum_psi + + elif x.shape[0] == self.cs.time_steps: + _sum_psi = np.sum(_psi, axis=1) + return _psi, _sum_psi + + def generate_psi_3rd_derivative(self, x): + _, _psi_sum = self._generate_psi(x) + + _x = Symbol('x') + _h = Symbol('h') + _c = Symbol('c') + _g = Symbol('g') + _y = Symbol('y') + + _psi = exp(-_h * (_x - _c) ** 2) + _s = _x * (_g - _y) + func = _psi * _s + + _psi_dev = np.zeros((self.cs.time_steps, self.nr_bfs, self.nr_dmps)) + + for dim in range(self.nr_dmps): + for bf in range(self.nr_bfs): + for idx, t in enumerate(x): + psi_eval = func.evalf(subs={_h: float(self.h[bf, :]), + _c: float(self._c[bf, :]), + _g: float(self.g[:, dim]), + _y: float(self.y0[:, dim])}) + + psi_diff = diff(psi_eval, _x, 3) + _psi_dev[idx, bf, dim] = psi_diff.evalf(subs={_x: t}) / _psi_sum[idx] + + return _psi_dev + + def _generate_weights(self, f_target): + _x_trajectory = self.cs.roll_out() + _psi, _sum_psi = self._generate_psi(_x_trajectory) + + _s = self._generate_front_term(_x_trajectory) + _sT = _s.T + + if self._regression_type == 'Schaal': + + for dim in range(self.nr_dmps): + _k = self.g[:, dim] - self.y0[:, dim] + for bf in range(self.nr_bfs): + self._w[dim, bf] = np.dot(np.dot(_sT[dim, :], np.diag(_psi[:, bf])), f_target[:, dim]) / \ + (np.dot(np.dot(_sT[dim, :], np.diag(_psi[:, bf])), _s[:, dim])) + + self._w = np.nan_to_num(self._w) + + elif self._regression_type == 'RidgeRegression': + _regression_matrix = np.zeros((self.nr_bfs, self.nr_bfs, self.nr_dmps)) + + if self._imitation_type == 'eye': + for dim in range(self.nr_dmps): + _regression_matrix[:, :, dim] = np.eye(self.nr_bfs) + + elif self._imitation_type == 'jerk': + _Gamma = self.generate_psi_3rd_derivative(_x_trajectory) + for dim in range(self.nr_dmps): + _regression_matrix[:, :, dim] = _Gamma[:, :, dim].T @ _Gamma[:, :, dim] + + else: + raise ValueError('Imitation_type can either be eye or jerk') + + _psi_new = np.zeros((_psi.shape[0], _psi.shape[1], self.nr_dmps)) + for dim in range(self.nr_dmps): + for bf in range(self.nr_bfs): + _psi_new[:, bf, dim] = _psi[:, bf] / _sum_psi * _s[:, dim] + + for dim in range(self.nr_dmps): + _matrix = np.linalg.inv(_psi_new[:, :, dim].T @ _psi_new[:, :, dim] + \ + self._reg_lambda * _regression_matrix[:, :, dim]) @ _psi_new[:, :, dim].T + self._w[dim, :] = np.dot(_matrix, f_target[:, dim]) + + self._w = np.nan_to_num(self._w) + + # region Getter and Setter + @property + def c(self): + return self._c + + @property + def h(self): + return self._h + + @property + def psi(self): + return self._psi + # endregion + + +if __name__ == '__main__': + # # test imitation of path run + # plt.figure(2, figsize=(6, 4)) + # n_bfs = [100] + # + # # a straight line to target + # path1 = np.sin(np.arange(0, 1, 0.01) * 5) + # # a strange path to target + # path2 = np.zeros(path1.shape) + # path2[int(len(path2) / 2.0):] = 0.5 + # + # for ii, bfs in enumerate(n_bfs): + # dmp = DmpDiscrete(nr_dmps=2, nr_bfs=bfs, dt=0.001, regression_type='RidgeRegression', reg_lambda=1e-5) + # + # dmp.imitation_learning(y_des=np.array([path1, path2]).T) + # # change the scale of the movement + # dmp.g[0, 0] = 3 + # dmp.g[0, 1] = 2 + # + # y_track, dy_track, ddy_track = dmp.roll_out() + # + # plt.figure(2) + # plt.subplot(211) + # plt.plot(y_track[:, 0], lw=2) + # plt.subplot(212) + # plt.plot(y_track[:, 1], lw=2) + # + # plt.subplot(211) + # a = plt.plot(dmp.y_des[:, 0] / path1[-1] * dmp.g[:, 0], "r--", lw=2) + # plt.title("DMP imitate path") + # plt.xlabel("time (ms)") + # plt.ylabel("system trajectory") + # plt.legend([a[0]], ["desired path"], loc="lower right") + # plt.subplot(212) + # b = plt.plot(dmp.y_des[:, 1] / path2[-1] * dmp.g[:, 1], "r--", lw=2) + # plt.title("DMP imitate path") + # plt.xlabel("time (ms)") + # plt.ylabel("system trajectory") + # plt.legend(["%i BFs" % i for i in n_bfs], loc="lower right") + # + # dmp.cs.roll_out() + # plt.figure(3) + # plt.subplot(211) + # for i in range(dmp.nr_bfs): + # plt.plot(dmp.cs.time, dmp.psi[:, i]) + # + # plt.subplot(212) + # plt.plot(dmp.cs.time, dmp.cs.x_trajectory) + # for i in range(dmp.nr_bfs): + # plt.axvline(x=dmp.c[i], color='b', label='axvline - full height') + # + # f1 = np.zeros((dmp.psi.shape[1], dmp.psi.shape[0])) + # f2 = np.zeros((dmp.psi.shape[1], dmp.psi.shape[0])) + # + # for i in range(dmp.nr_bfs): + # f1[i, :] = np.dot(dmp.psi[:, i], dmp.w[0, i]) + # f2[i, :] = np.dot(dmp.psi[:, i], dmp.w[1, i]) + # plt.figure(4) + # plt.subplot(211) + # for i in range(dmp.nr_bfs): + # plt.plot(dmp.cs.time, f1[i, :]) + # + # plt.subplot(212) + # for i in range(dmp.nr_bfs): + # plt.plot(dmp.cs.time, f2[i, :]) + # + # plt.tight_layout() + # plt.show() + + bfs = 100 + dmp = DmpDiscrete(nr_dmps=2, nr_bfs=bfs, dt=0.01, regression_type='RidgeRegression', reg_lambda=0.5 * 1e-5) + + # a straight line to target + path1 = np.sin(np.arange(0, 1, 0.01) * 5) + # a strange path to target + path2 = np.zeros(path1.shape) + path2[int(len(path2) / 2.0):] = 0.5 + + dmp.imitation_learning(y_des=np.array([path1, path2]).T) + # change the scale of the movement + dmp.g[0, 0] = 3 + dmp.g[0, 1] = 2 + + y_track, dy_track, ddy_track = dmp.roll_out() + + plt.figure(2) + plt.subplot(211) + plt.plot(y_track[:, 0], lw=2) + plt.subplot(212) + plt.plot(y_track[:, 1], lw=2) + + plt.subplot(211) + a = plt.plot(dmp.y_des[:, 0] / path1[-1] * dmp.g[:, 0], "r--", lw=2) + plt.title("DMP imitate path") + plt.xlabel("time (ms)") + plt.ylabel("system trajectory") + plt.legend([a[0]], ["desired path"], loc="lower right") + plt.subplot(212) + b = plt.plot(dmp.y_des[:, 1] / path2[-1] * dmp.g[:, 1], "r--", lw=2) + plt.title("DMP imitate path") + plt.xlabel("time (ms)") + plt.ylabel("system trajectory") + plt.legend(["%i BFs" % i for i in range(bfs)], loc="lower right") + + plt.show() \ No newline at end of file diff --git a/franka_iml_experiment/RepresentationModel/__init__.py b/franka_iml_experiment/RepresentationModel/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/franka_iml_experiment/iml_experiment_node.py b/franka_iml_experiment/iml_experiment_node.py index 7c3c5b8..4779792 100644 --- a/franka_iml_experiment/iml_experiment_node.py +++ b/franka_iml_experiment/iml_experiment_node.py @@ -10,8 +10,7 @@ from active_bo_msgs.msg import ActiveRLResponse from active_bo_msgs.msg import ActiveRLEvalRequest from active_bo_msgs.msg import ActiveRLEvalResponse -import pydmps -import pydmps.dmp_discrete +from franka_iml_experiment.RepresentationModel.DMP import * import numpy as np @@ -55,21 +54,30 @@ class IML_Experiment(Node): 1, callback_group=topic_callback_group) + # States + self.eval_pending = False + # Reward constants self.distance_pen = -0.1 self.accel_pen = -0.1 - self.time_pen = -1 - self.zone_pen = -20 - self.goal_rew = 10 + self.time_pen = -1. + self.zone_pen = -20. + self.goal_rew = 10. + + # Scene parameters + self.square_center = np.array([0.5, 0.0]) + self.square_half_side = 0.1 # half of the side length + self.circle_radius = 0.02 # BO parameters self.start = np.array([0.5, -0.3]) - self.end = np.array([0.5, -0.3]) + self.end = np.array([0.5, 0.3]) self.time = 10 self.nr_dim = 2 self.nr_bfs = None self.nr_steps = None self.weight_preference = None + self.weight_scaling = 1000 def dmp_callback(self, msg): start = self.start @@ -77,16 +85,18 @@ class IML_Experiment(Node): self.nr_bfs = msg.nr_bfs self.nr_steps = msg.nr_steps interactive_run = msg.interactive_run - weights = np.vstack((msg.p_x, msg.p_y)) + weights = np.vstack((msg.p_x, msg.p_y)) * self.weight_scaling - dmp = pydmps.dmp_discrete.DMPs_discrete(n_dmps=self.nr_dim, n_bfs=self.nr_bfs, w=weights, y0=start, goal=end, - dt=1 / self.nr_steps, ) - y_track, _, ddy_track = dmp.rollout(tau=self.time) + weights_zeros = np.zeros_like(weights) + + dmp = pydmps.dmp_discrete.DMPs_discrete(n_dmps=self.nr_dim, n_bfs=self.nr_bfs, w=weights_zeros, y0=start, goal=end) + y_track, _, ddy_track = dmp.rollout() if interactive_run != 0: - reward, not_perform = self.compute_reward(y_track, ddy_track, self.nr_steps) + self.get_logger().info(f'{reward}') + if interactive_run == 1 and not not_perform: pose_msg = PoseArray() for i in range(y_track.shape[0]): @@ -94,30 +104,30 @@ class IML_Experiment(Node): pose.position.x = y_track[i, 0] pose.position.y = y_track[i, 1] pose.position.z = 0.15 - pose.orientation.x = 1 - pose.orientation.y = 0 - pose.orientation.z = 0 - pose.orientation.w = 0 + pose.orientation.x = 1. + pose.orientation.y = 0. + pose.orientation.z = 0. + pose.orientation.w = 0. pose_msg.poses.append(pose) self.traj_pub.publish(pose_msg) # self.exec_pub.publish(Empty()) - rl_response = ActiveRLResponse() - rl_response.weights = msg.p_x + msg.p_y - rl_response.reward = reward - rl_response.final_step = self.nr_steps - if self.weight_preference is None: - weight_preference = [False] * len(self.nr_bfs * 2) - else: - weight_preference = self.weight_preference + rl_response = ActiveRLResponse() + rl_response.weights = msg.p_x + msg.p_y + rl_response.reward = reward + rl_response.final_step = self.nr_steps + if self.weight_preference is None: + weight_preference = [False] * self.nr_bfs * 2 + else: + weight_preference = self.weight_preference - rl_response.weight_preference = weight_preference - - self.active_rl_pub.publish(rl_response) + rl_response.weight_preference = weight_preference + self.active_rl_pub.publish(rl_response) else: + self.get_logger().info('Interactive Learning started!') eval_msg = ActiveRLEvalRequest() eval_msg.weights = msg.p_x + msg.p_y eval_msg.policy = y_track[:, 0].tolist() + y_track[:, 1] .tolist() @@ -125,46 +135,59 @@ class IML_Experiment(Node): eval_msg.nr_weights = self.nr_bfs self.eval_pub.publish(eval_msg) + self.eval_pending = True def rl_eval_callback(self, msg): - eval_weights = msg.weights.reshape((self.nr_bfs, self.nr_dim), order='F') - self.weight_preference = msg.weight_preference + if self.eval_pending: + self.eval_pending = False + eval_weights = np.array(msg.weights).reshape((self.nr_bfs, self.nr_dim), order='F').T * self.weight_scaling + self.weight_preference = msg.weight_preference - dmp = pydmps.dmp_discrete.DMPs_discrete(n_dmps=self.nr_dim, n_bfs=self.nr_bfs, w=eval_weights, y0=self.start, - goal=self.end, dt=1 / self.nr_steps, ) - y_track, _, ddy_track = dmp.rollout(tau=self.time) + dmp = pydmps.dmp_discrete.DMPs_discrete(n_dmps=self.nr_dim, n_bfs=self.nr_bfs, w=eval_weights, y0=self.start, + goal=self.end) + y_track, _, ddy_track = dmp.rollout() - self.get_logger().info('Active RL Eval: Responded!') + self.get_logger().info('Active RL Eval: Responded!') - reward, not_perform = self.compute_reward(y_track, ddy_track, self.nr_steps) + reward, not_perform = self.compute_reward(y_track, ddy_track, self.nr_steps) + self.get_logger().info(f'{reward}') - if not not_perform: - pose_msg = PoseArray() - for i in range(y_track.shape[0]): - pose = Pose() - pose.position.x = y_track[i, 0] - pose.position.y = y_track[i, 1] - pose.position.z = 0.15 - pose.orientation.x = 1 - pose.orientation.y = 0 - pose.orientation.z = 0 - pose.orientation.w = 0 - pose_msg.poses.append(pose) + if not not_perform: + pose_msg = PoseArray() + for i in range(y_track.shape[0]): + pose = Pose() + pose.position.x = y_track[i, 0] + pose.position.y = y_track[i, 1] + pose.position.z = 0.15 + pose.orientation.x = 1. + pose.orientation.y = 0. + pose.orientation.z = 0. + pose.orientation.w = 0. + pose_msg.poses.append(pose) - self.traj_pub.publish(pose_msg) + self.traj_pub.publish(pose_msg) + # self.exec_pub.publish(Empty()) + + rl_response = ActiveRLResponse() + rl_response.weights = msg.weights + rl_response.reward = reward + rl_response.final_step = self.nr_steps + if self.weight_preference is None: + weight_preference = [False] * self.nr_bfs * 2 + else: + weight_preference = self.weight_preference + + rl_response.weight_preference = weight_preference + + self.active_rl_pub.publish(rl_response) def compute_reward(self, y_track, ddy_track, time): - square_center = np.array([0.5, 0.0]) - square_half_side = 0.1 # half of the side length - circle_center = np.array([0.5, 0.5]) - circle_radius = 0.02 - # 1. Check if any (x,y) position is within the square within_square = np.any( - (y_track[:, 0] > square_center[0] - square_half_side) & - (y_track[:, 0] < square_center[0] + square_half_side) & - (y_track[:, 1] > square_center[1] - square_half_side) & - (y_track[:, 1] < square_center[1] + square_half_side) + (y_track[:, 0] > self.square_center[0] - self.square_half_side) & + (y_track[:, 0] < self.square_center[0] + self.square_half_side) & + (y_track[:, 1] > self.square_center[1] - self.square_half_side) & + (y_track[:, 1] < self.square_center[1] + self.square_half_side) ) # If the robot moves into the forbidden zone it won't perform trajectory and get penalized @@ -174,17 +197,21 @@ class IML_Experiment(Node): # 2. Check if any (x,y) position is within the circle t = 0 for i in range(y_track.shape[0]): - distance_to_circle_center = np.linalg.norm(y_track[i, :] - circle_center, axis=1) - if distance_to_circle_center < circle_radius: + self.get_logger().info(f'{y_track[i, :]}') + distance_to_circle_center = np.linalg.norm(y_track[i, :] - self.end) + if distance_to_circle_center < self.circle_radius: t = i + if t == 0: + t = y_track.shape[0] + y_track = y_track[:t, :] ddy_track = ddy_track[:t, :] penalty = t * self.time_pen # 3. Compute the Euclidean distance from each (x,y) position to the center of the square - distances_to_square_center = np.linalg.norm(y_track - square_center, axis=1) + distances_to_square_center = np.linalg.norm(y_track - self.square_center, axis=1) total_distance_norm = np.sum(distances_to_square_center) # 1. Compute the norm of each acceleration and sum them up diff --git a/setup.py b/setup.py index 0186ba8..320b381 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,9 @@ package_name = 'franka_iml_experiment' setup( name=package_name, version='0.0.0', - packages=find_packages(exclude=['test']), + packages=[package_name, + package_name + '/RepresentationModel', + ], data_files=[ ('share/ament_index/resource_index/packages', ['resource/' + package_name]), @@ -26,6 +28,7 @@ setup( 'dmp_node = franka_iml_experiment.dmp_node:main', 'dmp_test = franka_iml_experiment.dmp_test:main', 'move_square = franka_iml_experiment.move_square:main', + 'iml_experiment = franka_iml_experiment.iml_experiment_node:main', ], }, )