diff --git a/src/interaction_msgs/CMakeLists.txt b/src/interaction_msgs/CMakeLists.txt index d4f3d41..66f2e5b 100644 --- a/src/interaction_msgs/CMakeLists.txt +++ b/src/interaction_msgs/CMakeLists.txt @@ -10,7 +10,8 @@ find_package(rosidl_default_generators REQUIRED) rosidl_generate_interfaces(${PROJECT_NAME} "srv/Query.srv" - "msg/Opt2Rep.msg" + "srv/Task.srv" + "msg/Opt2Task.msg" "msg/Opt2UI.msg" "msg/Task2Opt.msg" "msg/UI2Opt.msg" diff --git a/src/interaction_msgs/msg/Opt2Rep.msg b/src/interaction_msgs/msg/Opt2Task.msg similarity index 100% rename from src/interaction_msgs/msg/Opt2Rep.msg rename to src/interaction_msgs/msg/Opt2Task.msg diff --git a/src/interaction_msgs/srv/Task.srv b/src/interaction_msgs/srv/Task.srv new file mode 100644 index 0000000..396a395 --- /dev/null +++ b/src/interaction_msgs/srv/Task.srv @@ -0,0 +1,13 @@ +# Return reward - True: for updating the model, False: for displaying the x_best to the user +bool reward_return +# Number of dimensions of the representation model +uint16 nr_dim +# Number of parameters per dimensions +uint16 nr_parameter +# Next parameters +float32[] x_next +--- +# observed parameters +float32[] x_observed +# observed reward +float32 reward \ No newline at end of file diff --git a/src/interaction_optimizers/interaction_optimizers/bayesian_optimization_node.py b/src/interaction_optimizers/interaction_optimizers/bayesian_optimization_node.py index 5063640..e2f806c 100644 --- a/src/interaction_optimizers/interaction_optimizers/bayesian_optimization_node.py +++ b/src/interaction_optimizers/interaction_optimizers/bayesian_optimization_node.py @@ -4,69 +4,81 @@ from rclpy.node import Node from rclpy.callback_groups import ReentrantCallbackGroup from interaction_msgs.srv import Query -from interaction_msgs.msg import Task2Opt +from interaction_msgs.srv import Task from interaction_msgs.msg import UI2Opt from interaction_msgs.msg import TaskOrder from interaction_msgs.msg import Opt2UI -from interaction_msgs.msg import Opt2Rep + from .optimizers.bayesian_optimization import BayesianOptimization from transitions.extensions.asyncio import AsyncMachine import asyncio -import numpy as np - import threading import time import os +import numpy as np + class BayesianOptimizationNode(Node): - def __init__(self): + def __init__(self, event_loop): super().__init__('bayesian_optimization_node') + self.event_loop = event_loop # region Parameters - self.kernel_type = self.declare_parameter('kernel_type', 'Matern').get_parameter_value().string_value + """ self.acquisition_function_name = (self.declare_parameter('acquisition_function_name', 'EI') .get_parameter_value().string_value) self.nr_bo_steps = self.declare_parameter('bo_steps', 100).get_parameter_value().integer_value - self.nr_dimensions = self.declare_parameter('nr_dimensions',1).get_parameter_value().integer_value + self.nr_dimensions = self.declare_parameter('nr_dimensions', 1).get_parameter_value().integer_value self.nr_policy_parameters = (self.declare_parameter('nr_policy_parameters', 100) .get_parameter_value().integer_value) + """ + self.kernel_type = self.declare_parameter('kernel_type', 'Matern').get_parameter_value().string_value self.nr_init = self.declare_parameter('nr_init', 3).get_parameter_value().integer_value - - self.cycle_time = (self.declare_parameter('bo_cylce_time', 0.1)).get_parameter_value().double_value + self.task_request_timeout = self.declare_parameter('task_request_timeout', 10).get_parameter_value().integer_value + self.task_request_max_tries = self.declare_parameter('task_request_max_tries', 3).get_parameter_value().integer_value # endregion # Subscribers - self.reward_sub = self.create_subscription(Task2Opt, 'interaction/reward', self.reward_callback, 10) self.ui_sub = self.create_subscription(UI2Opt, 'interaction/ui_response', self.ui_callback, 10) - self.order_sub = self.create_subscription(Task2Opt, 'interaction/task_order', self.task_order_callback, 10) + self.order_sub = self.create_subscription(TaskOrder, 'interaction/order', self.task_order_callback, 10) # Publishers self.ui_pub = self.create_publisher(Opt2UI, 'interaction/ui_request', 10) - self.rep_pub = self.create_publisher(Opt2Rep, 'interaction/next_x', 10) # Service Clients self.query_client = self.create_client(Query, 'interaction/user_query') + self.task_client = self.create_client(Task, 'interaction/task_srv') # Bayesian Optimization self.bayesian_optimization = None + self.current_episodes = 0 + self.nr_bo_steps = 0 + self.nr_dimensions = 0 + self.nr_policy_parameters = 0 + self.acquisition_function_name = '' + self.fixed_dimensions = None + self.seed = None self.lower_bound = None self.upper_bound = None # State Machine - states = ['idle', - 'initialize_bo', - 'user_query', - 'processing', - 'waiting_for_user_response', - 'waiting_for_task_response'] + states = [ + {'name': 'idle'}, + {'name': 'initialize_bo', 'on_enter': 'initialize_bo_fun'}, + {'name': 'user_query', 'on_enter': 'user_query_fun'}, + {'name': 'sample_x_next', 'on_enter': 'sample_x_next_fun'}, + {'name': 'waiting_for_user_response', 'on_enter': 'waiting_for_user_response_fun'}, + {'name': 'processing', 'on_enter': 'processing_fun'}, + {'name': 'waiting_for_task_response', 'on_enter': 'waiting_for_task_response_fun'} + ] transitions = [ - {'trigger': 'order_received', 'source': 'idle', 'dest': 'initialize_bo', 'after': 'after_receiving_order'}, + {'trigger': 'order_received', 'source': 'idle', 'dest': 'initialize_bo'}, {'trigger': 'initialization_finished', 'source': 'initialize_bo', 'dest': 'user_query'}, {'trigger': 'non_interactive', 'source': 'user_query', 'dest': 'sampling_x_next'}, {'trigger': 'sampling_finished', 'source': 'sampling_x_next', 'dest': 'processing'}, @@ -84,7 +96,7 @@ class BayesianOptimizationNode(Node): order_sub - receives order reset_bo with the order parameters trigger: order_received() - + for i=0:nr_episodes B) initializing_bo -> initialization_finished() -> user_query: for j = 0:nr_init @@ -93,7 +105,7 @@ class BayesianOptimizationNode(Node): reward received from task node fit model trigger: initialization_finished() - + C1.1) user_query -> non_interactive() -> sampling_x_next service call for user_query if user_query is False: @@ -101,7 +113,7 @@ class BayesianOptimizationNode(Node): C1.2) sampling_x_next -> sampling_finished() -> processing sampling x_next with acqusition function trigger: sampling_finished() - + C2.1) user_query -> interactive() -> waiting_for_user service call for user_query if user_query is True: @@ -113,11 +125,11 @@ class BayesianOptimizationNode(Node): (UI sends x_next to task node) receives x_next from UI trigger: user_response_received() - + D) processing -> processing_finished() -> waiting_for_task_response sends x_next to task node trigger: processing_finished() - + E) waiting_for_task_response -> task_response_received() -> user_query if episode < nr_episodes: get results from reward_sub @@ -128,8 +140,8 @@ class BayesianOptimizationNode(Node): if episode = nr_episodes: completion_pub with the completed results trigger: order_completed() - - + + Additional Transitions: Abort: * -> abort() -> idle """ @@ -149,100 +161,111 @@ class BayesianOptimizationNode(Node): acquisition_function_name=self.acquisition_function_name, kernel_type=self.kernel_type, **kwargs) - # region Callback functions + def task_order_callback(self, msg): + self.current_episodes = 0 + self.nr_bo_steps = msg.nr_bo_steps + self.nr_dimensions = msg.nr_dimensions + self.nr_policy_parameters = msg.nr_policy_parameters + self.seed = msg.seed + self.fixed_dimensions = msg.fixed_dimensions + self.acquisition_function_name = msg.acquisition_function_name + + asyncio.run_coroutine_threadsafe(self.order_received(), self.event_loop) + def reward_callback(self, msg): pass def ui_callback(self, msg): pass - def task_order_callback(self, msg): - pass + async def send_task_request_with_retry(self, request): + for attempt in range(self.task_request_max_tries): + self.get_logger().info(f"Attempt {attempt + 1}, send task request with retry!") - async def call_service_async(self, client, request): - """ - Asynchronously call a ROS 2 service and wait for its response. - """ - # Use an asyncio Future to wait for the response - future = asyncio.Future() - - def callback(response_future): - # Set the result of the asyncio Future when the ROS 2 service responds - rclpy.spin_once(self, timeout_sec=0) # Make sure the callback is processed - future.set_result(response_future.result()) - - client.call_async(request).add_done_callback(callback) - - # Wait for the response - response = await future - return response - # endregion - - # region Trigger Placeholers - async def order_received(self): - # Placeholder for trigger - pass - - async def initialization_finished(self): - # Placeholder for trigger - pass - - async def non_interactive(self): - # Placeholder for trigger - pass - - async def interactive(self): - # Placeholder for trigger - pass - - async def user_response_received(self): - # Placeholder for trigger - pass - - async def processing_finished(self): - # Placeholder for trigger - pass - - async def task_response_received(self): - # Placeholder for trigger - pass - - async def order_completed(self): - # Placeholder for trigger - pass - - async def abort(self): - # Placeholder for trigger - pass + future = self.task_client.call_async(request) + try: + response = await asyncio.wait_for(future, self.task_request_timeout) + return response + except asyncio.TimeoutError: + self.get_logger().warning(f"Task request timed out, resending...") + self.get_logger().error(f"Max retries reached for task request: {self.task_request_max_tries}") + return None # endregion # State Methods - async def after_receiving_order(self): + async def initializing_bo_fun(self): self.get_logger().info('Initializing bo...') - async def user_query_state(self): + self.reset_bo(fixed_dimensions=self.fixed_dimensions) + + request = Task.Request() + request.reward_return = False + request.nr_dim = self.nr_dimensions + request.nr_parameter = self.nr_policy_parameters + for i in range(self.nr_init): + request.x_next = self.bayesian_optimization.sample_random_next_observation() + + response = await self.send_task_request_with_retry(request) + if i < self.nr_init: + self.bayesian_optimization.add_observation(response.reward, response.x_observed, fit=False) + else: + self.bayesian_optimization.add_observation(response.reward, response.x_observed) + + await self.initialization_finished() + + async def user_query_fun(self): self.get_logger().info('Deciding wheter to ask the user...') srv_msg = Query() - response = await self.call_service_async(self.query_client, srv_msg) - if response.interaction: - await self.interactive() + future = self.query_client.call_async(srv_msg) + try: + response = await asyncio.wait_for(future, timeout=2) + if response.interaction: + await self.interactive() - else: - await self.non_interactive() + else: + await self.non_interactive() - async def ask_user_state(self): + except asyncio.TimeoutError: + self.get_logger().info("Timeout for Query Service...") + + async def sample_x_next_fun(self): + pass + + async def waiting_for_user_response_fun(self): self.get_logger().info('Waiting for user response...') - async def processing_next_episode(self): + async def processing_fun(self): pass - async def waiting_for_task_response(self): + async def waiting_for_task_response_fun(self): pass - async def completing_order(self): - pass - async def abort(self): - pass \ No newline at end of file +def run_asyncio_loop(): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop.run_forever() + return loop # Ensure you return the loop reference + + +def main(): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + threading.Thread(target=loop.run_forever, daemon=True).start() + + # Initialize ROS and pass the event loop to the node + rclpy.init() + node = BayesianOptimizationNode(loop) + + # Now, the ROS 2 node has the correct event loop reference for asyncio operations + rclpy.spin(node) + + node.destroy_node() + rclpy.shutdown() + loop.call_soon_threadsafe(loop.stop) + + +if __name__ == '__main__': + main() diff --git a/src/interaction_optimizers/interaction_optimizers/optimizers/bayesian_optimization.py b/src/interaction_optimizers/interaction_optimizers/optimizers/bayesian_optimization.py index b58e90d..a76fac1 100644 --- a/src/interaction_optimizers/interaction_optimizers/optimizers/bayesian_optimization.py +++ b/src/interaction_optimizers/interaction_optimizers/optimizers/bayesian_optimization.py @@ -117,8 +117,8 @@ class BayesianOptimization: x_next = self.acquisition_function(self.gauss_process, self.x_observed, seed=self.seed) return x_next - def add_observation(self, y_new, x_new): - if self.episode == 0: + def add_observation(self, y_new, x_new, fit=True): + if self.x_observed.shape[0] == 1: self.x_observed[0, :] = x_new self.y_observed[0] = y_new self.best_reward[0] = np.max(self.y_observed) @@ -127,8 +127,9 @@ class BayesianOptimization: self.y_observed = np.vstack((self.y_observed, y_new)) self.best_reward = np.vstack((self.best_reward, np.max(self.y_observed))) - self.gauss_process.fit(self.x_observed, self.y_observed) - self.episode += 1 + if fit: + self.gauss_process.fit(self.x_observed, self.y_observed) + self.episode += 1 def get_best_result(self): y_max = np.max(self.y_observed)