import rclpy from rclpy.node import Node from rclpy.callback_groups import ReentrantCallbackGroup from std_msgs.msg import Empty from geometry_msgs.msg import PoseArray from geometry_msgs.msg import Pose from active_bo_msgs.msg import DMP from active_bo_msgs.msg import ActiveRLResponse from active_bo_msgs.msg import ActiveRLEvalRequest from active_bo_msgs.msg import ActiveRLEvalResponse import pydmps import pydmps.dmp_discrete import numpy as np class IML_Experiment(Node): def __init__(self): super().__init__('iml_node') rl_callback_group = ReentrantCallbackGroup() topic_callback_group = ReentrantCallbackGroup() self.traj_pub = self.create_publisher(PoseArray, 'moveit_interface/task_space_trajectory', 10, callback_group=rl_callback_group) self.exec_pub = self.create_publisher(Empty, 'moveit_interface/execute', 10) self.rl_subscription = self.create_subscription(DMP, 'active_rl_request', self.dmp_callback, 10, callback_group=rl_callback_group) self.active_rl_pub = self.create_publisher(ActiveRLResponse, 'active_rl_response', 1, callback_group=rl_callback_group) # interactive part self.eval_pub = self.create_publisher(ActiveRLEvalRequest, 'active_rl_eval_request', 1, callback_group=topic_callback_group) self.eval_sub = self.create_subscription(ActiveRLEvalResponse, 'active_rl_eval_response', self.rl_eval_callback, 1, callback_group=topic_callback_group) # Reward constants self.distance_pen = -0.1 self.accel_pen = -0.1 self.time_pen = -1 self.zone_pen = -20 self.goal_rew = 10 # BO parameters self.start = np.array([0.5, -0.3]) self.end = np.array([0.5, -0.3]) self.time = 10 self.nr_dim = 2 self.nr_bfs = None self.nr_steps = None self.weight_preference = None def dmp_callback(self, msg): start = self.start end = np.array([0.5, -0.3]) self.nr_bfs = msg.nr_bfs self.nr_steps = msg.nr_steps interactive_run = msg.interactive_run weights = np.vstack((msg.p_x, msg.p_y)) dmp = pydmps.dmp_discrete.DMPs_discrete(n_dmps=self.nr_dim, n_bfs=self.nr_bfs, w=weights, y0=start, goal=end, dt=1 / self.nr_steps, ) y_track, _, ddy_track = dmp.rollout(tau=self.time) if interactive_run != 0: reward, not_perform = self.compute_reward(y_track, ddy_track, self.nr_steps) if interactive_run == 1 and not not_perform: pose_msg = PoseArray() for i in range(y_track.shape[0]): pose = Pose() pose.position.x = y_track[i, 0] pose.position.y = y_track[i, 1] pose.position.z = 0.15 pose.orientation.x = 1 pose.orientation.y = 0 pose.orientation.z = 0 pose.orientation.w = 0 pose_msg.poses.append(pose) self.traj_pub.publish(pose_msg) # self.exec_pub.publish(Empty()) rl_response = ActiveRLResponse() rl_response.weights = msg.p_x + msg.p_y rl_response.reward = reward rl_response.final_step = self.nr_steps if self.weight_preference is None: weight_preference = [False] * len(self.nr_bfs * 2) else: weight_preference = self.weight_preference rl_response.weight_preference = weight_preference self.active_rl_pub.publish(rl_response) else: eval_msg = ActiveRLEvalRequest() eval_msg.weights = msg.p_x + msg.p_y eval_msg.policy = y_track[:, 0].tolist() + y_track[:, 1] .tolist() eval_msg.nr_steps = self.nr_steps eval_msg.nr_weights = self.nr_bfs self.eval_pub.publish(eval_msg) def rl_eval_callback(self, msg): eval_weights = msg.weights.reshape((self.nr_bfs, self.nr_dim), order='F') self.weight_preference = msg.weight_preference dmp = pydmps.dmp_discrete.DMPs_discrete(n_dmps=self.nr_dim, n_bfs=self.nr_bfs, w=eval_weights, y0=self.start, goal=self.end, dt=1 / self.nr_steps, ) y_track, _, ddy_track = dmp.rollout(tau=self.time) self.get_logger().info('Active RL Eval: Responded!') reward, not_perform = self.compute_reward(y_track, ddy_track, self.nr_steps) if not not_perform: pose_msg = PoseArray() for i in range(y_track.shape[0]): pose = Pose() pose.position.x = y_track[i, 0] pose.position.y = y_track[i, 1] pose.position.z = 0.15 pose.orientation.x = 1 pose.orientation.y = 0 pose.orientation.z = 0 pose.orientation.w = 0 pose_msg.poses.append(pose) self.traj_pub.publish(pose_msg) def compute_reward(self, y_track, ddy_track, time): square_center = np.array([0.5, 0.0]) square_half_side = 0.1 # half of the side length circle_center = np.array([0.5, 0.5]) circle_radius = 0.02 # 1. Check if any (x,y) position is within the square within_square = np.any( (y_track[:, 0] > square_center[0] - square_half_side) & (y_track[:, 0] < square_center[0] + square_half_side) & (y_track[:, 1] > square_center[1] - square_half_side) & (y_track[:, 1] < square_center[1] + square_half_side) ) # If the robot moves into the forbidden zone it won't perform trajectory and get penalized if within_square: return self.zone_pen + time * self.time_pen, within_square # 2. Check if any (x,y) position is within the circle t = 0 for i in range(y_track.shape[0]): distance_to_circle_center = np.linalg.norm(y_track[i, :] - circle_center, axis=1) if distance_to_circle_center < circle_radius: t = i y_track = y_track[:t, :] ddy_track = ddy_track[:t, :] penalty = t * self.time_pen # 3. Compute the Euclidean distance from each (x,y) position to the center of the square distances_to_square_center = np.linalg.norm(y_track - square_center, axis=1) total_distance_norm = np.sum(distances_to_square_center) # 1. Compute the norm of each acceleration and sum them up accel_norms = np.linalg.norm(ddy_track, axis=1) total_accel_norm = np.sum(accel_norms) penalty += total_distance_norm * self.distance_pen + total_accel_norm * self.accel_pen return penalty, within_square def main(args=None): rclpy.init(args=args) dmp_node = IML_Experiment() rclpy.spin(dmp_node) dmp_node.destroy_node() rclpy.shutdown() if __name__ == '__main__': main()