diff --git a/src/active_bo_msgs/CMakeLists.txt b/src/active_bo_msgs/CMakeLists.txt index 60fb753..414e6be 100644 --- a/src/active_bo_msgs/CMakeLists.txt +++ b/src/active_bo_msgs/CMakeLists.txt @@ -25,8 +25,10 @@ rosidl_generate_interfaces(${PROJECT_NAME} "srv/BO.srv" "srv/ActiveBO.srv" "srv/ActiveRL.srv" - "srv/ActiveRLEval.srv" - "msg/ActiveRLEval.msg" + "msg/ActiveBORequest.msg" + "msg/ActiveBOResponse.msg" + "msg/ActiveRLResponse.msg" + "msg/ActiveRL.msg" "msg/ImageFeedback.msg" ) diff --git a/src/active_bo_msgs/msg/ActiveBORequest.msg b/src/active_bo_msgs/msg/ActiveBORequest.msg new file mode 100644 index 0000000..7e63345 --- /dev/null +++ b/src/active_bo_msgs/msg/ActiveBORequest.msg @@ -0,0 +1,6 @@ +uint16 nr_weights +uint16 max_steps +uint16 nr_episodes +uint16 nr_runs +string acquisition_function +float32 epsilon \ No newline at end of file diff --git a/src/active_bo_msgs/msg/ActiveBOResponse.msg b/src/active_bo_msgs/msg/ActiveBOResponse.msg new file mode 100644 index 0000000..8cae313 --- /dev/null +++ b/src/active_bo_msgs/msg/ActiveBOResponse.msg @@ -0,0 +1,4 @@ +float32[] best_policy +float32[] best_weights +float32[] reward_mean +float32[] reward_std \ No newline at end of file diff --git a/src/active_bo_msgs/msg/ActiveRLEval.msg b/src/active_bo_msgs/msg/ActiveRL.msg similarity index 100% rename from src/active_bo_msgs/msg/ActiveRLEval.msg rename to src/active_bo_msgs/msg/ActiveRL.msg diff --git a/src/active_bo_msgs/msg/ActiveRLResponse.msg b/src/active_bo_msgs/msg/ActiveRLResponse.msg new file mode 100644 index 0000000..c81eda8 --- /dev/null +++ b/src/active_bo_msgs/msg/ActiveRLResponse.msg @@ -0,0 +1,3 @@ +float32[] weights +uint16 final_step +float32 reward \ No newline at end of file diff --git a/src/active_bo_ros/active_bo_ros/active_bo_topic.py b/src/active_bo_ros/active_bo_ros/active_bo_topic.py new file mode 100644 index 0000000..5f93749 --- /dev/null +++ b/src/active_bo_ros/active_bo_ros/active_bo_topic.py @@ -0,0 +1,194 @@ +from active_bo_msgs.msg import ActiveBORequest +from active_bo_msgs.msg import ActiveBOResponse +from active_bo_msgs.msg import ActiveRL +from active_bo_msgs.msg import ActiveRLResponse + +import rclpy +from rclpy.node import Node + +from rclpy.callback_groups import ReentrantCallbackGroup + +from active_bo_ros.BayesianOptimization.BayesianOptimization import BayesianOptimization +from active_bo_ros.ReinforcementLearning.ContinuousMountainCar import Continuous_MountainCarEnv + +import numpy as np +import time + + +class ActiveBOTopic(Node): + def __init__(self): + super().__init__('active_bo_topic') + + bo_callback_group = ReentrantCallbackGroup() + rl_callback_group = ReentrantCallbackGroup() + mainloop_callback_group = ReentrantCallbackGroup() + + # Active Bayesian Optimization Publisher, Subscriber and Message attributes + self.active_bo_pub = self.create_publisher(ActiveBOResponse, + 'active_bo_response', + 1, callback_group=bo_callback_group) + + self.active_bo_sub = self.create_subscription(ActiveBORequest, + 'active_bo_request', + self.active_bo_callback, + 1, callback_group=bo_callback_group) + + self.active_bo_pending = False + self.bo_nr_weights = None + self.bo_steps = None + self.bo_episodes = None + self.bo_runs = None + self.bo_acq_fcn = None + self.bo_epsilon = None + self.current_run = 0 + self.current_episode = 0 + + # Active Reinforcement Learning Publisher, Subscriber and Message attributes + self.active_rl_pub = self.create_publisher(ActiveRL, + 'active_rl_request', + 1, callback_group=rl_callback_group) + self.active_rl_sub = self.create_subscription(ActiveRLResponse, + 'active_rl_response', + self.active_rl_callback, + 1, callback_group=rl_callback_group) + + self.active_rl_pending = False + self.rl_weights = None + self.rl_final_step = None + self.rl_reward = None + + # RL Environments and BO + self.env = Continuous_MountainCarEnv() + self.distance_penalty = 0 + + self.BO = None + self.nr_init = 3 + self.reward = None + self.best_pol_reward = None + self.best_policy = None + self.best_weights = None + + # Main loop timer object + self.mainloop_timer_period = 0.1 + self.mainloop = self.create_timer(self.mainloop_timer_period, + self.mainloop_callback, + callback_group=mainloop_callback_group) + + def reset_bo_request(self): + self.bo_nr_weights = None + self.bo_steps = None + self.bo_episodes = None + self.bo_runs = None + self.bo_acq_fcn = None + self.bo_epsilon = None + self.current_run = 0 + self.current_episode = 0 + + def active_bo_callback(self, msg): + self.get_logger().info('Active Bayesian Optimization request pending!') + self.active_bo_pending = True + self.bo_nr_weights = msg.nr_weights + self.bo_steps = msg.max_steps + self.bo_episodes = msg.nr_episodes + self.bo_runs = msg.nr_runs + self.bo_acq_fcn = msg.acquisition_function + self.bo_epsilon = msg.epsilon + + def reset_rl_response(self): + self.rl_weights = None + self.rl_final_step = None + self.rl_reward = None + + def active_rl_callback(self, msg): + self.get_logger().info('Active Reinforcement Learning response pending!') + self.active_rl_pending = False + self.rl_weights = None + self.rl_final_step = None + self.rl_reward = None + + def mainloop_callback(self): + if self.active_bo_pending: + if self.BO is None: + self.BO = BayesianOptimization(self.env, + self.bo_steps, + nr_init=self.nr_init, + acq=self.bo_acq_fcn, + nr_weights=self.bo_nr_weights) + + self.reward = np.zeros((self.bo_episodes, self.bo_runs)) + self.best_pol_reward = np.zeros((1, self.bo_runs)) + self.best_policy = np.zeros((self.bo_steps, self.bo_runs)) + self.best_weights = np.zeros((self.bo_nr_weights, self.bo_runs)) + + self.BO.initialize() + + if self.current_run >= self.bo_runs: + bo_response = ActiveBOResponse() + + best_policy_idx = np.argmax(self.best_pol_reward) + bo_response.best_policy = self.best_policy[:, best_policy_idx].tolist() + bo_response.best_weights = self.best_weights[:, best_policy_idx].tolist() + + bo_response.reward_mean = np.mean(self.reward, axis=1).tolist() + bo_response.reward_std = np.std(self.reward, axis=1).tolist() + + self.active_bo_pub.publish(bo_response) + self.reset_bo_request() + self.active_bo_pending = False + + else: + if self.active_rl_pending: + pass + elif self.rl_weights is not None and not self.active_rl_pending: + try: + self.BO.add_new_observation(self.rl_reward, self.rl_weights) + self.reset_rl_response() + except Exception as e: + self.get_logger().error(f'Active Reinforcement Learning failed to add new observation: {e}') + else: + if self.current_episode < self.bo_episodes: + if np.random.uniform(0.0, 1.0, 1) < self.bo_epsilon: + active_rl_request = ActiveRL() + old_policy, _, old_weights = self.BO.get_best_result() + + active_rl_request.policy = old_policy.tolist() + active_rl_request.weights = old_weights.tolist() + + self.get_logger().info('Calling: Active RL') + self.active_rl_pub.publish(active_rl_request) + self.active_rl_pending = True + + else: + x_next = self.BO.next_observation() + self.BO.eval_new_observation(x_next) + + self.best_policy[:, self.current_run], \ + self.best_pol_reward[:, self.current_run], \ + self.best_weights[:, self.current_run] = self.BO.get_best_result() + + self.reward[:, self.current_run] = self.BO.best_reward.T + + self.current_episode += 1 + else: + self.current_episode = 0 + self.current_run += 1 + + +def main(args=None): + rclpy.init(args=args) + + active_bo_topic = ActiveBOTopic() + + rclpy.spin(active_bo_topic) + + try: + rclpy.spin(active_bo_topic) + except KeyboardInterrupt: + pass + + active_bo_topic.destroy_node() + rclpy.shutdown() + + +if __name__ == '__main__': + main() diff --git a/src/active_bo_ros/active_bo_ros/active_rl_service.py b/src/active_bo_ros/active_bo_ros/active_rl_service.py index 6c3e8ca..c784f12 100644 --- a/src/active_bo_ros/active_bo_ros/active_rl_service.py +++ b/src/active_bo_ros/active_bo_ros/active_rl_service.py @@ -1,6 +1,6 @@ from active_bo_msgs.srv import ActiveRL from active_bo_msgs.msg import ImageFeedback -from active_bo_msgs.msg import ActiveRLEval +from active_bo_msgs.msg import ActiveRL as ActiveRLEval import rclpy from rclpy.node import Node @@ -36,7 +36,7 @@ class ActiveRLService(Node): self.active_rl_eval_callback, 1, callback_group=topic_callback_group) - + # active_rl_eval_response self.eval_response_received = False self.eval_response = None diff --git a/src/active_bo_ros/active_bo_ros/active_rl_topic.py b/src/active_bo_ros/active_bo_ros/active_rl_topic.py new file mode 100644 index 0000000..3153539 --- /dev/null +++ b/src/active_bo_ros/active_bo_ros/active_rl_topic.py @@ -0,0 +1,179 @@ +from active_bo_msgs.msg import ActiveRL +from active_bo_msgs.msg import ActiveRLResponse + +from active_bo_msgs.msg import ImageFeedback + +import rclpy +from rclpy.node import Node + +from rclpy.callback_groups import ReentrantCallbackGroup + +from active_bo_ros.ReinforcementLearning.ContinuousMountainCar import Continuous_MountainCarEnv + +import numpy as np +import time +import copy + + +class ActiveRLService(Node): + def __init__(self): + super().__init__('active_rl_service') + rl_callback_group = ReentrantCallbackGroup() + topic_callback_group = ReentrantCallbackGroup() + mainloop_callback_group = ReentrantCallbackGroup() + + # Active Reinforcement Learning Publisher, Subscriber and Message attributes + self.active_rl_pub = self.create_publisher(ActiveRLResponse, + 'active_rl_response', + 1, callback_group=rl_callback_group) + self.active_rl_sub = self.create_subscription(ActiveRL, + 'active_rl_request', + self.active_rl_callback, + 1, callback_group=rl_callback_group) + + self.active_rl_pending = False + self.rl_policy = None + self.rl_weights = None + self.rl_reward = 0.0 + self.rl_step = 0 + + # Image publisher to publish the rgb array from the gym environment + self.image_pub = self.create_publisher(ImageFeedback, + 'rl_feedback', + 1, callback_group=topic_callback_group) + + # Active RL Evaluation Publisher, Subscriber and Message attributes + self.eval_pub = self.create_publisher(ActiveRL, + 'active_rl_eval_request', + 1, + callback_group=topic_callback_group) + self.eval_sub = self.create_subscription(ActiveRL, + 'active_rl_eval_response', + self.active_rl_eval_callback, + 1, + callback_group=topic_callback_group) + + self.eval_response_received = False + self.eval_policy = None + self.eval_weights = None + + # RL Environments + self.env = Continuous_MountainCarEnv(render_mode='rgb_array') + self.distance_penalty = 0 + self.best_pol_shown = False + + # Main loop timer object + self.mainloop_timer_period = 0.1 + self.mainloop = self.create_timer(self.mainloop_timer_period, + self.mainloop_callback, + callback_group=mainloop_callback_group) + + def reset_rl_request(self): + self.rl_policy = None + self.rl_weights = None + + def active_rl_callback(self, msg): + self.rl_policy = msg.policy + self.rl_weights = msg.weights + + self.get_logger().info('Active RL: Called!') + self.active_rl_pending = True + + def reset_eval_request(self): + self.eval_policy = None + self.eval_weights = None + + def active_rl_eval_callback(self, msg): + self.eval_policy = msg.policy + self.eval_weights = msg.weights + + self.get_logger().info('Active RL Eval: Responded!') + self.eval_response_received = True + + def next_image(self, policy): + action = policy[self.rl_step] + output = self.env.step(action) + + self.rl_reward += output[1] + done = output[2] + self.rl_step += 1 + + rgb_array = self.env.render() + rgb_shape = rgb_array.shape + + red = rgb_array[:, :, 0].flatten().tolist() + green = rgb_array[:, :, 1].flatten().tolist() + blue = rgb_array[:, :, 2].flatten().tolist() + + feedback_msg = ImageFeedback() + + feedback_msg.height = rgb_shape[0] + feedback_msg.width = rgb_shape[1] + feedback_msg.red = red + feedback_msg.green = green + feedback_msg.blue = blue + + self.image_pub.publish(feedback_msg) + + if not done and self.rl_step == len(policy): + distance = -(self.env.goal_position - output[0][0]) + self.rl_reward += distance * self.distance_penalty + + return done + + def mainloop_callback(self): + if self.active_rl_pending: + if not self.best_pol_shown: + done = self.next_image(self.rl_policy) + + if done: + self.rl_step = 0 + self.rl_reward = 0.0 + + eval_request = ActiveRL() + eval_request.policy = self.rl_policy + eval_request.weights = self.rl_weights + + self.eval_pub.publish(eval_request) + self.get_logger().info('Active RL: Called!') + self.best_pol_shown = True + + elif self.best_pol_shown: + if not self.eval_response_received: + self.get_logger().info('Active RL: Waiting for Eval!') + + if self.eval_response_received: + done = self.next_image(self.eval_policy) + + if done: + rl_response = ActiveRLResponse() + rl_response.weights = self.rl_weights + rl_response.reward = self.rl_reward + rl_response.final_step = self.rl_step + + self.active_rl_pub.publish(rl_response) + + # reset flags and attributes + self.reset_eval_request() + self.reset_rl_request() + + self.rl_step = 0 + self.rl_reward = 0.0 + + self.best_pol_shown = False + self.eval_response_received = False + self.active_rl_pending = False + + +def main(args=None): + rclpy.init(args=args) + + active_rl_service = ActiveRLService() + + rclpy.spin(active_rl_service) + + rclpy.shutdown() + + +if __name__ == '__main__': + main()