initializing bo finished

This commit is contained in:
Niko Feith 2024-02-20 14:41:38 +01:00
parent 5c6750886c
commit d8dd0c4f4b
5 changed files with 141 additions and 103 deletions

View File

@ -10,7 +10,8 @@ find_package(rosidl_default_generators REQUIRED)
rosidl_generate_interfaces(${PROJECT_NAME} rosidl_generate_interfaces(${PROJECT_NAME}
"srv/Query.srv" "srv/Query.srv"
"msg/Opt2Rep.msg" "srv/Task.srv"
"msg/Opt2Task.msg"
"msg/Opt2UI.msg" "msg/Opt2UI.msg"
"msg/Task2Opt.msg" "msg/Task2Opt.msg"
"msg/UI2Opt.msg" "msg/UI2Opt.msg"

View File

@ -0,0 +1,13 @@
# Return reward - True: for updating the model, False: for displaying the x_best to the user
bool reward_return
# Number of dimensions of the representation model
uint16 nr_dim
# Number of parameters per dimensions
uint16 nr_parameter
# Next parameters
float32[] x_next
---
# observed parameters
float32[] x_observed
# observed reward
float32 reward

View File

@ -4,69 +4,81 @@ from rclpy.node import Node
from rclpy.callback_groups import ReentrantCallbackGroup from rclpy.callback_groups import ReentrantCallbackGroup
from interaction_msgs.srv import Query from interaction_msgs.srv import Query
from interaction_msgs.msg import Task2Opt from interaction_msgs.srv import Task
from interaction_msgs.msg import UI2Opt from interaction_msgs.msg import UI2Opt
from interaction_msgs.msg import TaskOrder from interaction_msgs.msg import TaskOrder
from interaction_msgs.msg import Opt2UI from interaction_msgs.msg import Opt2UI
from interaction_msgs.msg import Opt2Rep
from .optimizers.bayesian_optimization import BayesianOptimization from .optimizers.bayesian_optimization import BayesianOptimization
from transitions.extensions.asyncio import AsyncMachine from transitions.extensions.asyncio import AsyncMachine
import asyncio import asyncio
import numpy as np
import threading import threading
import time import time
import os import os
import numpy as np
class BayesianOptimizationNode(Node): class BayesianOptimizationNode(Node):
def __init__(self): def __init__(self, event_loop):
super().__init__('bayesian_optimization_node') super().__init__('bayesian_optimization_node')
self.event_loop = event_loop
# region Parameters # region Parameters
self.kernel_type = self.declare_parameter('kernel_type', 'Matern').get_parameter_value().string_value """
self.acquisition_function_name = (self.declare_parameter('acquisition_function_name', 'EI') self.acquisition_function_name = (self.declare_parameter('acquisition_function_name', 'EI')
.get_parameter_value().string_value) .get_parameter_value().string_value)
self.nr_bo_steps = self.declare_parameter('bo_steps', 100).get_parameter_value().integer_value self.nr_bo_steps = self.declare_parameter('bo_steps', 100).get_parameter_value().integer_value
self.nr_dimensions = self.declare_parameter('nr_dimensions',1).get_parameter_value().integer_value self.nr_dimensions = self.declare_parameter('nr_dimensions', 1).get_parameter_value().integer_value
self.nr_policy_parameters = (self.declare_parameter('nr_policy_parameters', 100) self.nr_policy_parameters = (self.declare_parameter('nr_policy_parameters', 100)
.get_parameter_value().integer_value) .get_parameter_value().integer_value)
"""
self.kernel_type = self.declare_parameter('kernel_type', 'Matern').get_parameter_value().string_value
self.nr_init = self.declare_parameter('nr_init', 3).get_parameter_value().integer_value self.nr_init = self.declare_parameter('nr_init', 3).get_parameter_value().integer_value
self.task_request_timeout = self.declare_parameter('task_request_timeout', 10).get_parameter_value().integer_value
self.cycle_time = (self.declare_parameter('bo_cylce_time', 0.1)).get_parameter_value().double_value self.task_request_max_tries = self.declare_parameter('task_request_max_tries', 3).get_parameter_value().integer_value
# endregion # endregion
# Subscribers # Subscribers
self.reward_sub = self.create_subscription(Task2Opt, 'interaction/reward', self.reward_callback, 10)
self.ui_sub = self.create_subscription(UI2Opt, 'interaction/ui_response', self.ui_callback, 10) self.ui_sub = self.create_subscription(UI2Opt, 'interaction/ui_response', self.ui_callback, 10)
self.order_sub = self.create_subscription(Task2Opt, 'interaction/task_order', self.task_order_callback, 10) self.order_sub = self.create_subscription(TaskOrder, 'interaction/order', self.task_order_callback, 10)
# Publishers # Publishers
self.ui_pub = self.create_publisher(Opt2UI, 'interaction/ui_request', 10) self.ui_pub = self.create_publisher(Opt2UI, 'interaction/ui_request', 10)
self.rep_pub = self.create_publisher(Opt2Rep, 'interaction/next_x', 10)
# Service Clients # Service Clients
self.query_client = self.create_client(Query, 'interaction/user_query') self.query_client = self.create_client(Query, 'interaction/user_query')
self.task_client = self.create_client(Task, 'interaction/task_srv')
# Bayesian Optimization # Bayesian Optimization
self.bayesian_optimization = None self.bayesian_optimization = None
self.current_episodes = 0
self.nr_bo_steps = 0
self.nr_dimensions = 0
self.nr_policy_parameters = 0
self.acquisition_function_name = ''
self.fixed_dimensions = None
self.seed = None self.seed = None
self.lower_bound = None self.lower_bound = None
self.upper_bound = None self.upper_bound = None
# State Machine # State Machine
states = ['idle', states = [
'initialize_bo', {'name': 'idle'},
'user_query', {'name': 'initialize_bo', 'on_enter': 'initialize_bo_fun'},
'processing', {'name': 'user_query', 'on_enter': 'user_query_fun'},
'waiting_for_user_response', {'name': 'sample_x_next', 'on_enter': 'sample_x_next_fun'},
'waiting_for_task_response'] {'name': 'waiting_for_user_response', 'on_enter': 'waiting_for_user_response_fun'},
{'name': 'processing', 'on_enter': 'processing_fun'},
{'name': 'waiting_for_task_response', 'on_enter': 'waiting_for_task_response_fun'}
]
transitions = [ transitions = [
{'trigger': 'order_received', 'source': 'idle', 'dest': 'initialize_bo', 'after': 'after_receiving_order'}, {'trigger': 'order_received', 'source': 'idle', 'dest': 'initialize_bo'},
{'trigger': 'initialization_finished', 'source': 'initialize_bo', 'dest': 'user_query'}, {'trigger': 'initialization_finished', 'source': 'initialize_bo', 'dest': 'user_query'},
{'trigger': 'non_interactive', 'source': 'user_query', 'dest': 'sampling_x_next'}, {'trigger': 'non_interactive', 'source': 'user_query', 'dest': 'sampling_x_next'},
{'trigger': 'sampling_finished', 'source': 'sampling_x_next', 'dest': 'processing'}, {'trigger': 'sampling_finished', 'source': 'sampling_x_next', 'dest': 'processing'},
@ -84,7 +96,7 @@ class BayesianOptimizationNode(Node):
order_sub - receives order order_sub - receives order
reset_bo with the order parameters reset_bo with the order parameters
trigger: order_received() trigger: order_received()
for i=0:nr_episodes for i=0:nr_episodes
B) initializing_bo -> initialization_finished() -> user_query: B) initializing_bo -> initialization_finished() -> user_query:
for j = 0:nr_init for j = 0:nr_init
@ -93,7 +105,7 @@ class BayesianOptimizationNode(Node):
reward received from task node reward received from task node
fit model fit model
trigger: initialization_finished() trigger: initialization_finished()
C1.1) user_query -> non_interactive() -> sampling_x_next C1.1) user_query -> non_interactive() -> sampling_x_next
service call for user_query service call for user_query
if user_query is False: if user_query is False:
@ -101,7 +113,7 @@ class BayesianOptimizationNode(Node):
C1.2) sampling_x_next -> sampling_finished() -> processing C1.2) sampling_x_next -> sampling_finished() -> processing
sampling x_next with acqusition function sampling x_next with acqusition function
trigger: sampling_finished() trigger: sampling_finished()
C2.1) user_query -> interactive() -> waiting_for_user C2.1) user_query -> interactive() -> waiting_for_user
service call for user_query service call for user_query
if user_query is True: if user_query is True:
@ -113,11 +125,11 @@ class BayesianOptimizationNode(Node):
(UI sends x_next to task node) (UI sends x_next to task node)
receives x_next from UI receives x_next from UI
trigger: user_response_received() trigger: user_response_received()
D) processing -> processing_finished() -> waiting_for_task_response D) processing -> processing_finished() -> waiting_for_task_response
sends x_next to task node sends x_next to task node
trigger: processing_finished() trigger: processing_finished()
E) waiting_for_task_response -> task_response_received() -> user_query E) waiting_for_task_response -> task_response_received() -> user_query
if episode < nr_episodes: if episode < nr_episodes:
get results from reward_sub get results from reward_sub
@ -128,8 +140,8 @@ class BayesianOptimizationNode(Node):
if episode = nr_episodes: if episode = nr_episodes:
completion_pub with the completed results completion_pub with the completed results
trigger: order_completed() trigger: order_completed()
Additional Transitions: Additional Transitions:
Abort: * -> abort() -> idle Abort: * -> abort() -> idle
""" """
@ -149,100 +161,111 @@ class BayesianOptimizationNode(Node):
acquisition_function_name=self.acquisition_function_name, acquisition_function_name=self.acquisition_function_name,
kernel_type=self.kernel_type, **kwargs) kernel_type=self.kernel_type, **kwargs)
# region Callback functions # region Callback functions
def task_order_callback(self, msg):
self.current_episodes = 0
self.nr_bo_steps = msg.nr_bo_steps
self.nr_dimensions = msg.nr_dimensions
self.nr_policy_parameters = msg.nr_policy_parameters
self.seed = msg.seed
self.fixed_dimensions = msg.fixed_dimensions
self.acquisition_function_name = msg.acquisition_function_name
asyncio.run_coroutine_threadsafe(self.order_received(), self.event_loop)
def reward_callback(self, msg): def reward_callback(self, msg):
pass pass
def ui_callback(self, msg): def ui_callback(self, msg):
pass pass
def task_order_callback(self, msg): async def send_task_request_with_retry(self, request):
pass for attempt in range(self.task_request_max_tries):
self.get_logger().info(f"Attempt {attempt + 1}, send task request with retry!")
async def call_service_async(self, client, request): future = self.task_client.call_async(request)
""" try:
Asynchronously call a ROS 2 service and wait for its response. response = await asyncio.wait_for(future, self.task_request_timeout)
""" return response
# Use an asyncio Future to wait for the response except asyncio.TimeoutError:
future = asyncio.Future() self.get_logger().warning(f"Task request timed out, resending...")
def callback(response_future):
# Set the result of the asyncio Future when the ROS 2 service responds
rclpy.spin_once(self, timeout_sec=0) # Make sure the callback is processed
future.set_result(response_future.result())
client.call_async(request).add_done_callback(callback)
# Wait for the response
response = await future
return response
# endregion
# region Trigger Placeholers
async def order_received(self):
# Placeholder for trigger
pass
async def initialization_finished(self):
# Placeholder for trigger
pass
async def non_interactive(self):
# Placeholder for trigger
pass
async def interactive(self):
# Placeholder for trigger
pass
async def user_response_received(self):
# Placeholder for trigger
pass
async def processing_finished(self):
# Placeholder for trigger
pass
async def task_response_received(self):
# Placeholder for trigger
pass
async def order_completed(self):
# Placeholder for trigger
pass
async def abort(self):
# Placeholder for trigger
pass
self.get_logger().error(f"Max retries reached for task request: {self.task_request_max_tries}")
return None
# endregion # endregion
# State Methods # State Methods
async def after_receiving_order(self): async def initializing_bo_fun(self):
self.get_logger().info('Initializing bo...') self.get_logger().info('Initializing bo...')
async def user_query_state(self): self.reset_bo(fixed_dimensions=self.fixed_dimensions)
request = Task.Request()
request.reward_return = False
request.nr_dim = self.nr_dimensions
request.nr_parameter = self.nr_policy_parameters
for i in range(self.nr_init):
request.x_next = self.bayesian_optimization.sample_random_next_observation()
response = await self.send_task_request_with_retry(request)
if i < self.nr_init:
self.bayesian_optimization.add_observation(response.reward, response.x_observed, fit=False)
else:
self.bayesian_optimization.add_observation(response.reward, response.x_observed)
await self.initialization_finished()
async def user_query_fun(self):
self.get_logger().info('Deciding wheter to ask the user...') self.get_logger().info('Deciding wheter to ask the user...')
srv_msg = Query() srv_msg = Query()
response = await self.call_service_async(self.query_client, srv_msg) future = self.query_client.call_async(srv_msg)
if response.interaction: try:
await self.interactive() response = await asyncio.wait_for(future, timeout=2)
if response.interaction:
await self.interactive()
else: else:
await self.non_interactive() await self.non_interactive()
async def ask_user_state(self): except asyncio.TimeoutError:
self.get_logger().info("Timeout for Query Service...")
async def sample_x_next_fun(self):
pass
async def waiting_for_user_response_fun(self):
self.get_logger().info('Waiting for user response...') self.get_logger().info('Waiting for user response...')
async def processing_next_episode(self): async def processing_fun(self):
pass pass
async def waiting_for_task_response(self): async def waiting_for_task_response_fun(self):
pass pass
async def completing_order(self):
pass
async def abort(self): def run_asyncio_loop():
pass loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_forever()
return loop # Ensure you return the loop reference
def main():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
threading.Thread(target=loop.run_forever, daemon=True).start()
# Initialize ROS and pass the event loop to the node
rclpy.init()
node = BayesianOptimizationNode(loop)
# Now, the ROS 2 node has the correct event loop reference for asyncio operations
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
loop.call_soon_threadsafe(loop.stop)
if __name__ == '__main__':
main()

View File

@ -117,8 +117,8 @@ class BayesianOptimization:
x_next = self.acquisition_function(self.gauss_process, self.x_observed, seed=self.seed) x_next = self.acquisition_function(self.gauss_process, self.x_observed, seed=self.seed)
return x_next return x_next
def add_observation(self, y_new, x_new): def add_observation(self, y_new, x_new, fit=True):
if self.episode == 0: if self.x_observed.shape[0] == 1:
self.x_observed[0, :] = x_new self.x_observed[0, :] = x_new
self.y_observed[0] = y_new self.y_observed[0] = y_new
self.best_reward[0] = np.max(self.y_observed) self.best_reward[0] = np.max(self.y_observed)
@ -127,8 +127,9 @@ class BayesianOptimization:
self.y_observed = np.vstack((self.y_observed, y_new)) self.y_observed = np.vstack((self.y_observed, y_new))
self.best_reward = np.vstack((self.best_reward, np.max(self.y_observed))) self.best_reward = np.vstack((self.best_reward, np.max(self.y_observed)))
self.gauss_process.fit(self.x_observed, self.y_observed) if fit:
self.episode += 1 self.gauss_process.fit(self.x_observed, self.y_observed)
self.episode += 1
def get_best_result(self): def get_best_result(self):
y_max = np.max(self.y_observed) y_max = np.max(self.y_observed)