Bo node completely implemented but not tested

This commit is contained in:
Niko Feith 2024-02-20 15:50:44 +01:00
parent d8dd0c4f4b
commit 7bcba1132a
4 changed files with 117 additions and 47 deletions

View File

@ -11,11 +11,12 @@ find_package(rosidl_default_generators REQUIRED)
rosidl_generate_interfaces(${PROJECT_NAME} rosidl_generate_interfaces(${PROJECT_NAME}
"srv/Query.srv" "srv/Query.srv"
"srv/Task.srv" "srv/Task.srv"
"msg/Opt2Task.msg" "msg/OptimizerState.msg"
"msg/Opt2UI.msg" "msg/Opt2UI.msg"
"msg/Task2Opt.msg"
"msg/UI2Opt.msg" "msg/UI2Opt.msg"
"msg/TaskOrder.msg" "msg/TaskOrder.msg"
# "msg/Opt2Task.msg"
# "msg/Task2Opt.msg"
DEPENDENCIES DEPENDENCIES
) )

View File

@ -0,0 +1,9 @@
# Current State
string current_state
# Current BO Episode
int32 current_episode
# Best result so far
float32 reward_best
float32[] x_best

View File

@ -8,6 +8,7 @@ from interaction_msgs.srv import Task
from interaction_msgs.msg import UI2Opt from interaction_msgs.msg import UI2Opt
from interaction_msgs.msg import TaskOrder from interaction_msgs.msg import TaskOrder
from interaction_msgs.msg import Opt2UI from interaction_msgs.msg import Opt2UI
from interaction_msgs.msg import OptimizerState
from .optimizers.bayesian_optimization import BayesianOptimization from .optimizers.bayesian_optimization import BayesianOptimization
@ -42,50 +43,23 @@ class BayesianOptimizationNode(Node):
self.task_request_max_tries = self.declare_parameter('task_request_max_tries', 3).get_parameter_value().integer_value self.task_request_max_tries = self.declare_parameter('task_request_max_tries', 3).get_parameter_value().integer_value
# endregion # endregion
# Subscribers
self.ui_sub = self.create_subscription(UI2Opt, 'interaction/ui_response', self.ui_callback, 10)
self.order_sub = self.create_subscription(TaskOrder, 'interaction/order', self.task_order_callback, 10)
# Publishers
self.ui_pub = self.create_publisher(Opt2UI, 'interaction/ui_request', 10)
# Service Clients
self.query_client = self.create_client(Query, 'interaction/user_query')
self.task_client = self.create_client(Task, 'interaction/task_srv')
# Bayesian Optimization
self.bayesian_optimization = None
self.current_episodes = 0
self.nr_bo_steps = 0
self.nr_dimensions = 0
self.nr_policy_parameters = 0
self.acquisition_function_name = ''
self.fixed_dimensions = None
self.seed = None
self.lower_bound = None
self.upper_bound = None
# State Machine # State Machine
states = [ states = [
{'name': 'idle'}, {'name': 'idle'},
{'name': 'initialize_bo', 'on_enter': 'initialize_bo_fun'}, {'name': 'initialize_bo', 'on_enter': 'initialize_bo_fun'},
{'name': 'user_query', 'on_enter': 'user_query_fun'}, {'name': 'user_query', 'on_enter': 'user_query_fun'},
{'name': 'sample_x_next', 'on_enter': 'sample_x_next_fun'}, {'name': 'non_interactive_mode', 'on_enter': 'non_interactive_mode_fun'},
{'name': 'waiting_for_user_response', 'on_enter': 'waiting_for_user_response_fun'}, {'name': 'waiting_for_user_response', 'on_enter': 'waiting_for_user_response_fun'},
{'name': 'processing', 'on_enter': 'processing_fun'}, {'name': 'processing_user_proposal_fun', 'on_enter': 'processing_fun'},
{'name': 'waiting_for_task_response', 'on_enter': 'waiting_for_task_response_fun'}
] ]
transitions = [ transitions = [
{'trigger': 'order_received', 'source': 'idle', 'dest': 'initialize_bo'}, {'trigger': 'order_received', 'source': 'idle', 'dest': 'initialize_bo'},
{'trigger': 'initialization_finished', 'source': 'initialize_bo', 'dest': 'user_query'}, {'trigger': 'initialization_finished', 'source': 'initialize_bo', 'dest': 'user_query'},
{'trigger': 'non_interactive', 'source': 'user_query', 'dest': 'sampling_x_next'}, {'trigger': 'non_interactive', 'source': 'user_query', 'dest': 'non_interactive_mode'},
{'trigger': 'sampling_finished', 'source': 'sampling_x_next', 'dest': 'processing'}, {'trigger': 'non_interactive_finished', 'source': 'non_interactive_mode', 'dest': 'user_query'},
{'trigger': 'interactive', 'source': 'user_query', 'dest': 'waiting_for_user_response'}, {'trigger': 'interactive', 'source': 'user_query', 'dest': 'waiting_for_user_response'},
{'trigger': 'user_response_received', 'source': 'waiting_for_user_response', 'dest': 'processing'}, {'trigger': 'user_response_received', 'source': 'waiting_for_user_response', 'dest': 'processing_user_proposal_fun'},
{'trigger': 'processing_finished', 'source': 'processing', 'dest': 'waiting_for_task_response'}, {'trigger': 'processing_finished', 'source': 'processing_user_proposal_fun', 'dest': 'user_query'},
{'trigger': 'task_response_received', 'source': 'waiting_for_task_response', 'dest': 'user_query'},
{'trigger': 'order_completed', 'source': 'waiting_for_task_response', 'dest': 'idle'}, {'trigger': 'order_completed', 'source': 'waiting_for_task_response', 'dest': 'idle'},
{'trigger': 'abort', 'source': '*', 'dest': 'idle'} {'trigger': 'abort', 'source': '*', 'dest': 'idle'}
] ]
@ -150,6 +124,40 @@ class BayesianOptimizationNode(Node):
transitions=transitions, initial='idle', transitions=transitions, initial='idle',
ignore_invalid_triggers=True) ignore_invalid_triggers=True)
# Subscribers
self.ui_sub = self.create_subscription(UI2Opt, 'interaction/ui_response', self.ui_callback, 10)
self.order_sub = self.create_subscription(TaskOrder, 'interaction/order', self.task_order_callback, 10)
# Publishers
self.ui_pub = self.create_publisher(Opt2UI, 'interaction/ui_request', 10)
self.state_pub = self.create_publisher(OptimizerState, 'interaction/optimizer/state', 10)
# Service Clients
self.query_client = self.create_client(Query, 'interaction/user_query_srv')
self.task_client = self.create_client(Task, 'interaction/task_srv')
# Timer Objects
self.state_timer = self.create_timer(0.1, self.state_callback)
# Bayesian Optimization
self.bayesian_optimization = None
self.current_episodes = 0
self.nr_bo_steps = 0
self.nr_dimensions = 0
self.nr_policy_parameters = 0
self.acquisition_function_name = ''
self.fixed_dimensions = None
self.seed = None
self.lower_bound = None
self.upper_bound = None
self.x_temp = None # temp x_next
self.get_logger().info(f"Bayesian optimization Node started up!")
def reset_bo(self, fixed_dimensions=None, **kwargs): def reset_bo(self, fixed_dimensions=None, **kwargs):
self.bayesian_optimization = BayesianOptimization(self.nr_bo_steps, self.bayesian_optimization = BayesianOptimization(self.nr_bo_steps,
self.nr_dimensions, self.nr_dimensions,
@ -161,7 +169,7 @@ class BayesianOptimizationNode(Node):
acquisition_function_name=self.acquisition_function_name, acquisition_function_name=self.acquisition_function_name,
kernel_type=self.kernel_type, **kwargs) kernel_type=self.kernel_type, **kwargs)
# region Callback functions # region Callback & Async Send functions
def task_order_callback(self, msg): def task_order_callback(self, msg):
self.current_episodes = 0 self.current_episodes = 0
self.nr_bo_steps = msg.nr_bo_steps self.nr_bo_steps = msg.nr_bo_steps
@ -173,11 +181,11 @@ class BayesianOptimizationNode(Node):
asyncio.run_coroutine_threadsafe(self.order_received(), self.event_loop) asyncio.run_coroutine_threadsafe(self.order_received(), self.event_loop)
def reward_callback(self, msg):
pass
def ui_callback(self, msg): def ui_callback(self, msg):
pass self.fixed_dimensions = msg.fixed_dimensions
self.x_temp = msg.set_parameter_values
asyncio.run_coroutine_threadsafe(self.user_response_received)
async def send_task_request_with_retry(self, request): async def send_task_request_with_retry(self, request):
for attempt in range(self.task_request_max_tries): for attempt in range(self.task_request_max_tries):
@ -192,6 +200,15 @@ class BayesianOptimizationNode(Node):
self.get_logger().error(f"Max retries reached for task request: {self.task_request_max_tries}") self.get_logger().error(f"Max retries reached for task request: {self.task_request_max_tries}")
return None return None
def state_callback(self):
msg = OptimizerState()
msg.current_state = self.state
if self.state != 'idle':
msg.current_episode = self.bayesian_optimization.episode
msg.reward_best, msg.x_best, _ = self.bayesian_optimization.get_best_reward()
self.state_pub.publish(msg)
# endregion # endregion
# State Methods # State Methods
@ -201,7 +218,7 @@ class BayesianOptimizationNode(Node):
self.reset_bo(fixed_dimensions=self.fixed_dimensions) self.reset_bo(fixed_dimensions=self.fixed_dimensions)
request = Task.Request() request = Task.Request()
request.reward_return = False request.reward_return = True
request.nr_dim = self.nr_dimensions request.nr_dim = self.nr_dimensions
request.nr_parameter = self.nr_policy_parameters request.nr_parameter = self.nr_policy_parameters
for i in range(self.nr_init): for i in range(self.nr_init):
@ -230,17 +247,59 @@ class BayesianOptimizationNode(Node):
except asyncio.TimeoutError: except asyncio.TimeoutError:
self.get_logger().info("Timeout for Query Service...") self.get_logger().info("Timeout for Query Service...")
async def sample_x_next_fun(self): async def non_interactive_mode_fun(self):
pass request = Task.Request()
request.reward_return = True
request.nr_dim = self.nr_dimensions
request.nr_parameter = self.nr_policy_parameters
request.x_next = self.bayesian_optimization.next_observation()
response = await self.send_task_request_with_retry(request)
self.bayesian_optimization.add_observation(response.reward, response.x_observed)
if self.bayesian_optimization.episode == self.nr_bo_steps:
await self.order_completed()
await self.non_interactive_finished()
async def waiting_for_user_response_fun(self): async def waiting_for_user_response_fun(self):
self.get_logger().info('Waiting for user response...') self.get_logger().info('Waiting for user response...')
async def processing_fun(self): # sending the best result so far to display it to the user
pass _, x_max, _ = self.bayesian_optimization.get_best_result()
async def waiting_for_task_response_fun(self): ui_msg = Opt2UI()
pass ui_msg.x_best = x_max
ui_msg.fixed_parameters = self.fixed_dimensions
self.ui_pub.publish(ui_msg)
# send it to the task node to display the movement
request = Task.Request()
request.reward_return = False
request.nr_dim = self.nr_dimensions
request.nr_parameter = self.nr_policy_parameters
request.x_next = self.bayesian_optimization.next_observation()
_ = await self.send_task_request_with_retry(request)
self.get_logger().info('User best solution displayed completed.')
async def processing_user_proposal_fun(self):
request = Task.Request()
request.reward_return = True
request.nr_dim = self.nr_dimensions
request.nr_parameter = self.nr_policy_parameters
request.x_next = self.x_temp
self.x_temp = None
response = await self.send_task_request_with_retry(request)
self.bayesian_optimization.add_observation(response.reward, response.x_observed)
if self.bayesian_optimization.episode == self.nr_bo_steps:
await self.order_completed()
await self.non_interactive_finished()
def run_asyncio_loop(): def run_asyncio_loop():

View File

@ -20,6 +20,7 @@ setup(
tests_require=['pytest'], tests_require=['pytest'],
entry_points={ entry_points={
'console_scripts': [ 'console_scripts': [
'bo_node = interaction_optimizers.bayesian_optimization_node:main'
], ],
}, },
) )