From 5c6750886cfdee9b3d0045b5239283efe1e38074 Mon Sep 17 00:00:00 2001 From: Niko Date: Fri, 16 Feb 2024 16:01:22 +0100 Subject: [PATCH] tested async functions --- requirements.txt | 3 +- src/interaction_msgs/CMakeLists.txt | 1 + src/interaction_msgs/msg/Opt2Rep.msg | 2 + src/interaction_msgs/msg/TaskOrder.msg | 1 + .../async_testing.py | 78 +++++++ src/interaction_objective_function/setup.py | 1 + .../bayesian_optimization_node.py | 208 +++++++++++++++++- 7 files changed, 285 insertions(+), 9 deletions(-) create mode 100644 src/interaction_msgs/msg/TaskOrder.msg create mode 100644 src/interaction_objective_function/interaction_objective_function/async_testing.py diff --git a/requirements.txt b/requirements.txt index 7cf115b..7fd41ab 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,4 +5,5 @@ pydot~=1.4.2 empy~=3.3.4 lark~=1.1.1 scipy~=1.12.0 -scikit-learn~=1.4.0 \ No newline at end of file +scikit-learn~=1.4.0 +transitions~=0.9.0 \ No newline at end of file diff --git a/src/interaction_msgs/CMakeLists.txt b/src/interaction_msgs/CMakeLists.txt index b23126b..d4f3d41 100644 --- a/src/interaction_msgs/CMakeLists.txt +++ b/src/interaction_msgs/CMakeLists.txt @@ -14,6 +14,7 @@ rosidl_generate_interfaces(${PROJECT_NAME} "msg/Opt2UI.msg" "msg/Task2Opt.msg" "msg/UI2Opt.msg" + "msg/TaskOrder.msg" DEPENDENCIES ) diff --git a/src/interaction_msgs/msg/Opt2Rep.msg b/src/interaction_msgs/msg/Opt2Rep.msg index ecc174f..c6daae0 100644 --- a/src/interaction_msgs/msg/Opt2Rep.msg +++ b/src/interaction_msgs/msg/Opt2Rep.msg @@ -1,3 +1,5 @@ +# Return reward - True: for updating the model, False: for displaying the x_best to the user +bool reward_return # Number of dimensions of the representation model uint16 nr_dim # Number of parameters per dimensions diff --git a/src/interaction_msgs/msg/TaskOrder.msg b/src/interaction_msgs/msg/TaskOrder.msg new file mode 100644 index 0000000..46faacc --- /dev/null +++ b/src/interaction_msgs/msg/TaskOrder.msg @@ -0,0 +1 @@ +uint16 bla \ No newline at end of file diff --git a/src/interaction_objective_function/interaction_objective_function/async_testing.py b/src/interaction_objective_function/interaction_objective_function/async_testing.py new file mode 100644 index 0000000..8a716b8 --- /dev/null +++ b/src/interaction_objective_function/interaction_objective_function/async_testing.py @@ -0,0 +1,78 @@ +import rclpy +from rclpy.node import Node +from transitions.extensions.asyncio import AsyncMachine +from std_msgs.msg import String +import asyncio +import threading +import time + + +class StatefulNode(Node): + def __init__(self, event_loop): + super().__init__('stateful_node') + self.event_loop = event_loop + self.subscription = self.create_subscription(String, 'my_topic', self.listener_callback, 10) + + states = [ + {'name': 'state1', 'on_enter': 'on_enter_state1'}, + {'name': 'state2', 'on_enter': 'on_enter_state2'}, + {'name': 'state3', 'on_enter': 'on_enter_state3'} + ] + transitions = [ + {'trigger': 'go_to_state1', 'source': '*', 'dest': 'state1'}, + {'trigger': 'go_to_state2', 'source': '*', 'dest': 'state2'}, + {'trigger': 'go_to_state3', 'source': '*', 'dest': 'state3'}, + ] + + self.machine = AsyncMachine(model=self, states=states, transitions=transitions, initial='state1') + + async def on_enter_state1(self): + self.get_logger().info(f"Entering State 1 - {time.time()}") + + async def on_enter_state2(self): + self.get_logger().info(f"Entering State 2 - {time.time()}") + + async def on_enter_state3(self): + self.get_logger().info(f"Entering State 3 - {time.time()}") + + def listener_callback(self, msg): + try: + self.get_logger().info(f'Received message: "{msg.data}"') + if msg.data == 'trigger1': + self.get_logger().info("Attempting to go to state2") + asyncio.run_coroutine_threadsafe(self.go_to_state1(), self.event_loop) + elif msg.data == 'trigger2': + self.get_logger().info("Attempting to go to state1") + asyncio.run_coroutine_threadsafe(self.go_to_state2(), self.event_loop) + elif msg.data == 'trigger3': + self.get_logger().info("Attempting to go to state1") + asyncio.run_coroutine_threadsafe(self.go_to_state3(), self.event_loop) + except Exception as e: + self.get_logger().error(f"Error in listener_callback: {str(e)}") + + +def run_asyncio_loop(): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop.run_forever() + return loop # Ensure you return the loop reference + + +def main(): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + threading.Thread(target=loop.run_forever, daemon=True).start() + + # Initialize ROS and pass the event loop to the node + rclpy.init() + node = StatefulNode(loop) + + # Now, the ROS 2 node has the correct event loop reference for asyncio operations + rclpy.spin(node) + + node.destroy_node() + rclpy.shutdown() + loop.call_soon_threadsafe(loop.stop) + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/src/interaction_objective_function/setup.py b/src/interaction_objective_function/setup.py index 27798b8..06e8b5f 100644 --- a/src/interaction_objective_function/setup.py +++ b/src/interaction_objective_function/setup.py @@ -20,6 +20,7 @@ setup( tests_require=['pytest'], entry_points={ 'console_scripts': [ + 'async_node = interaction_objective_function.async_testing:main' ], }, ) diff --git a/src/interaction_optimizers/interaction_optimizers/bayesian_optimization_node.py b/src/interaction_optimizers/interaction_optimizers/bayesian_optimization_node.py index 6f7e4db..5063640 100644 --- a/src/interaction_optimizers/interaction_optimizers/bayesian_optimization_node.py +++ b/src/interaction_optimizers/interaction_optimizers/bayesian_optimization_node.py @@ -1,14 +1,23 @@ - import rclpy from rclpy.node import Node from rclpy.callback_groups import ReentrantCallbackGroup from interaction_msgs.srv import Query +from interaction_msgs.msg import Task2Opt +from interaction_msgs.msg import UI2Opt +from interaction_msgs.msg import TaskOrder +from interaction_msgs.msg import Opt2UI +from interaction_msgs.msg import Opt2Rep from .optimizers.bayesian_optimization import BayesianOptimization +from transitions.extensions.asyncio import AsyncMachine +import asyncio + import numpy as np + +import threading import time import os @@ -17,29 +26,118 @@ class BayesianOptimizationNode(Node): def __init__(self): super().__init__('bayesian_optimization_node') - # reading parameters + # region Parameters self.kernel_type = self.declare_parameter('kernel_type', 'Matern').get_parameter_value().string_value self.acquisition_function_name = (self.declare_parameter('acquisition_function_name', 'EI') - .get_parameter_value().string_value) + .get_parameter_value().string_value) + self.nr_bo_steps = self.declare_parameter('bo_steps', 100).get_parameter_value().integer_value + self.nr_dimensions = self.declare_parameter('nr_dimensions',1).get_parameter_value().integer_value + self.nr_policy_parameters = (self.declare_parameter('nr_policy_parameters', 100) + .get_parameter_value().integer_value) + self.nr_init = self.declare_parameter('nr_init', 3).get_parameter_value().integer_value + + self.cycle_time = (self.declare_parameter('bo_cylce_time', 0.1)).get_parameter_value().double_value + # endregion # Subscribers + self.reward_sub = self.create_subscription(Task2Opt, 'interaction/reward', self.reward_callback, 10) + self.ui_sub = self.create_subscription(UI2Opt, 'interaction/ui_response', self.ui_callback, 10) + self.order_sub = self.create_subscription(Task2Opt, 'interaction/task_order', self.task_order_callback, 10) # Publishers + self.ui_pub = self.create_publisher(Opt2UI, 'interaction/ui_request', 10) + self.rep_pub = self.create_publisher(Opt2Rep, 'interaction/next_x', 10) # Service Clients - self.query_client = self.create_client(Query, 'user_query') + self.query_client = self.create_client(Query, 'interaction/user_query') # Bayesian Optimization self.bayesian_optimization = None - self.nr_bo_steps = 100 - self.nr_dimensions = 1 - self.nr_policy_parameters = 10 - self.nr_init = 3 self.seed = None self.lower_bound = None self.upper_bound = None + # State Machine + states = ['idle', + 'initialize_bo', + 'user_query', + 'processing', + 'waiting_for_user_response', + 'waiting_for_task_response'] + transitions = [ + {'trigger': 'order_received', 'source': 'idle', 'dest': 'initialize_bo', 'after': 'after_receiving_order'}, + {'trigger': 'initialization_finished', 'source': 'initialize_bo', 'dest': 'user_query'}, + {'trigger': 'non_interactive', 'source': 'user_query', 'dest': 'sampling_x_next'}, + {'trigger': 'sampling_finished', 'source': 'sampling_x_next', 'dest': 'processing'}, + {'trigger': 'interactive', 'source': 'user_query', 'dest': 'waiting_for_user_response'}, + {'trigger': 'user_response_received', 'source': 'waiting_for_user_response', 'dest': 'processing'}, + {'trigger': 'processing_finished', 'source': 'processing', 'dest': 'waiting_for_task_response'}, + {'trigger': 'task_response_received', 'source': 'waiting_for_task_response', 'dest': 'user_query'}, + {'trigger': 'order_completed', 'source': 'waiting_for_task_response', 'dest': 'idle'}, + {'trigger': 'abort', 'source': '*', 'dest': 'idle'} + ] + + """ + Algo: + A) idle -> order_received() -> initializing_bo: + order_sub - receives order + reset_bo with the order parameters + trigger: order_received() + + for i=0:nr_episodes + B) initializing_bo -> initialization_finished() -> user_query: + for j = 0:nr_init + x_next <- initial sampling + send x_next to task node + reward received from task node + fit model + trigger: initialization_finished() + + C1.1) user_query -> non_interactive() -> sampling_x_next + service call for user_query + if user_query is False: + trigger: non_interactive() + C1.2) sampling_x_next -> sampling_finished() -> processing + sampling x_next with acqusition function + trigger: sampling_finished() + + C2.1) user_query -> interactive() -> waiting_for_user + service call for user_query + if user_query is True: + trigger: interactive() + C2.2) waiting_for_user_response -> user_response_received() -> processing + send x_best to task node + send x_best to UI + user adapts x_best + (UI sends x_next to task node) + receives x_next from UI + trigger: user_response_received() + + D) processing -> processing_finished() -> waiting_for_task_response + sends x_next to task node + trigger: processing_finished() + + E) waiting_for_task_response -> task_response_received() -> user_query + if episode < nr_episodes: + get results from reward_sub + model fit with additional observation + trigger: task_response_received() + + F) waiting_for_task_response -> order_completed() -> idle + if episode = nr_episodes: + completion_pub with the completed results + trigger: order_completed() + + + Additional Transitions: + Abort: * -> abort() -> idle + """ + + self.machine = AsyncMachine(model=self, states=states, + transitions=transitions, initial='idle', + ignore_invalid_triggers=True) + def reset_bo(self, fixed_dimensions=None, **kwargs): self.bayesian_optimization = BayesianOptimization(self.nr_bo_steps, self.nr_dimensions, @@ -52,5 +150,99 @@ class BayesianOptimizationNode(Node): kernel_type=self.kernel_type, **kwargs) + # region Callback functions + def reward_callback(self, msg): + pass + def ui_callback(self, msg): + pass + def task_order_callback(self, msg): + pass + + async def call_service_async(self, client, request): + """ + Asynchronously call a ROS 2 service and wait for its response. + """ + # Use an asyncio Future to wait for the response + future = asyncio.Future() + + def callback(response_future): + # Set the result of the asyncio Future when the ROS 2 service responds + rclpy.spin_once(self, timeout_sec=0) # Make sure the callback is processed + future.set_result(response_future.result()) + + client.call_async(request).add_done_callback(callback) + + # Wait for the response + response = await future + return response + # endregion + + # region Trigger Placeholers + async def order_received(self): + # Placeholder for trigger + pass + + async def initialization_finished(self): + # Placeholder for trigger + pass + + async def non_interactive(self): + # Placeholder for trigger + pass + + async def interactive(self): + # Placeholder for trigger + pass + + async def user_response_received(self): + # Placeholder for trigger + pass + + async def processing_finished(self): + # Placeholder for trigger + pass + + async def task_response_received(self): + # Placeholder for trigger + pass + + async def order_completed(self): + # Placeholder for trigger + pass + + async def abort(self): + # Placeholder for trigger + pass + + # endregion + + # State Methods + async def after_receiving_order(self): + self.get_logger().info('Initializing bo...') + + async def user_query_state(self): + self.get_logger().info('Deciding wheter to ask the user...') + srv_msg = Query() + response = await self.call_service_async(self.query_client, srv_msg) + if response.interaction: + await self.interactive() + + else: + await self.non_interactive() + + async def ask_user_state(self): + self.get_logger().info('Waiting for user response...') + + async def processing_next_episode(self): + pass + + async def waiting_for_task_response(self): + pass + + async def completing_order(self): + pass + + async def abort(self): + pass \ No newline at end of file