tested async functions
This commit is contained in:
parent
220d590c05
commit
5c6750886c
@ -6,3 +6,4 @@ empy~=3.3.4
|
||||
lark~=1.1.1
|
||||
scipy~=1.12.0
|
||||
scikit-learn~=1.4.0
|
||||
transitions~=0.9.0
|
@ -14,6 +14,7 @@ rosidl_generate_interfaces(${PROJECT_NAME}
|
||||
"msg/Opt2UI.msg"
|
||||
"msg/Task2Opt.msg"
|
||||
"msg/UI2Opt.msg"
|
||||
"msg/TaskOrder.msg"
|
||||
DEPENDENCIES
|
||||
)
|
||||
|
||||
|
@ -1,3 +1,5 @@
|
||||
# Return reward - True: for updating the model, False: for displaying the x_best to the user
|
||||
bool reward_return
|
||||
# Number of dimensions of the representation model
|
||||
uint16 nr_dim
|
||||
# Number of parameters per dimensions
|
||||
|
1
src/interaction_msgs/msg/TaskOrder.msg
Normal file
1
src/interaction_msgs/msg/TaskOrder.msg
Normal file
@ -0,0 +1 @@
|
||||
uint16 bla
|
@ -0,0 +1,78 @@
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from transitions.extensions.asyncio import AsyncMachine
|
||||
from std_msgs.msg import String
|
||||
import asyncio
|
||||
import threading
|
||||
import time
|
||||
|
||||
|
||||
class StatefulNode(Node):
|
||||
def __init__(self, event_loop):
|
||||
super().__init__('stateful_node')
|
||||
self.event_loop = event_loop
|
||||
self.subscription = self.create_subscription(String, 'my_topic', self.listener_callback, 10)
|
||||
|
||||
states = [
|
||||
{'name': 'state1', 'on_enter': 'on_enter_state1'},
|
||||
{'name': 'state2', 'on_enter': 'on_enter_state2'},
|
||||
{'name': 'state3', 'on_enter': 'on_enter_state3'}
|
||||
]
|
||||
transitions = [
|
||||
{'trigger': 'go_to_state1', 'source': '*', 'dest': 'state1'},
|
||||
{'trigger': 'go_to_state2', 'source': '*', 'dest': 'state2'},
|
||||
{'trigger': 'go_to_state3', 'source': '*', 'dest': 'state3'},
|
||||
]
|
||||
|
||||
self.machine = AsyncMachine(model=self, states=states, transitions=transitions, initial='state1')
|
||||
|
||||
async def on_enter_state1(self):
|
||||
self.get_logger().info(f"Entering State 1 - {time.time()}")
|
||||
|
||||
async def on_enter_state2(self):
|
||||
self.get_logger().info(f"Entering State 2 - {time.time()}")
|
||||
|
||||
async def on_enter_state3(self):
|
||||
self.get_logger().info(f"Entering State 3 - {time.time()}")
|
||||
|
||||
def listener_callback(self, msg):
|
||||
try:
|
||||
self.get_logger().info(f'Received message: "{msg.data}"')
|
||||
if msg.data == 'trigger1':
|
||||
self.get_logger().info("Attempting to go to state2")
|
||||
asyncio.run_coroutine_threadsafe(self.go_to_state1(), self.event_loop)
|
||||
elif msg.data == 'trigger2':
|
||||
self.get_logger().info("Attempting to go to state1")
|
||||
asyncio.run_coroutine_threadsafe(self.go_to_state2(), self.event_loop)
|
||||
elif msg.data == 'trigger3':
|
||||
self.get_logger().info("Attempting to go to state1")
|
||||
asyncio.run_coroutine_threadsafe(self.go_to_state3(), self.event_loop)
|
||||
except Exception as e:
|
||||
self.get_logger().error(f"Error in listener_callback: {str(e)}")
|
||||
|
||||
|
||||
def run_asyncio_loop():
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
loop.run_forever()
|
||||
return loop # Ensure you return the loop reference
|
||||
|
||||
|
||||
def main():
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
threading.Thread(target=loop.run_forever, daemon=True).start()
|
||||
|
||||
# Initialize ROS and pass the event loop to the node
|
||||
rclpy.init()
|
||||
node = StatefulNode(loop)
|
||||
|
||||
# Now, the ROS 2 node has the correct event loop reference for asyncio operations
|
||||
rclpy.spin(node)
|
||||
|
||||
node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
loop.call_soon_threadsafe(loop.stop)
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
@ -20,6 +20,7 @@ setup(
|
||||
tests_require=['pytest'],
|
||||
entry_points={
|
||||
'console_scripts': [
|
||||
'async_node = interaction_objective_function.async_testing:main'
|
||||
],
|
||||
},
|
||||
)
|
||||
|
@ -1,14 +1,23 @@
|
||||
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
|
||||
from rclpy.callback_groups import ReentrantCallbackGroup
|
||||
|
||||
from interaction_msgs.srv import Query
|
||||
from interaction_msgs.msg import Task2Opt
|
||||
from interaction_msgs.msg import UI2Opt
|
||||
from interaction_msgs.msg import TaskOrder
|
||||
from interaction_msgs.msg import Opt2UI
|
||||
from interaction_msgs.msg import Opt2Rep
|
||||
|
||||
from .optimizers.bayesian_optimization import BayesianOptimization
|
||||
|
||||
from transitions.extensions.asyncio import AsyncMachine
|
||||
import asyncio
|
||||
|
||||
import numpy as np
|
||||
|
||||
import threading
|
||||
import time
|
||||
import os
|
||||
|
||||
@ -17,29 +26,118 @@ class BayesianOptimizationNode(Node):
|
||||
def __init__(self):
|
||||
super().__init__('bayesian_optimization_node')
|
||||
|
||||
# reading parameters
|
||||
# region Parameters
|
||||
self.kernel_type = self.declare_parameter('kernel_type', 'Matern').get_parameter_value().string_value
|
||||
self.acquisition_function_name = (self.declare_parameter('acquisition_function_name', 'EI')
|
||||
.get_parameter_value().string_value)
|
||||
.get_parameter_value().string_value)
|
||||
self.nr_bo_steps = self.declare_parameter('bo_steps', 100).get_parameter_value().integer_value
|
||||
self.nr_dimensions = self.declare_parameter('nr_dimensions',1).get_parameter_value().integer_value
|
||||
self.nr_policy_parameters = (self.declare_parameter('nr_policy_parameters', 100)
|
||||
.get_parameter_value().integer_value)
|
||||
self.nr_init = self.declare_parameter('nr_init', 3).get_parameter_value().integer_value
|
||||
|
||||
self.cycle_time = (self.declare_parameter('bo_cylce_time', 0.1)).get_parameter_value().double_value
|
||||
# endregion
|
||||
|
||||
# Subscribers
|
||||
self.reward_sub = self.create_subscription(Task2Opt, 'interaction/reward', self.reward_callback, 10)
|
||||
self.ui_sub = self.create_subscription(UI2Opt, 'interaction/ui_response', self.ui_callback, 10)
|
||||
self.order_sub = self.create_subscription(Task2Opt, 'interaction/task_order', self.task_order_callback, 10)
|
||||
|
||||
# Publishers
|
||||
self.ui_pub = self.create_publisher(Opt2UI, 'interaction/ui_request', 10)
|
||||
self.rep_pub = self.create_publisher(Opt2Rep, 'interaction/next_x', 10)
|
||||
|
||||
# Service Clients
|
||||
self.query_client = self.create_client(Query, 'user_query')
|
||||
self.query_client = self.create_client(Query, 'interaction/user_query')
|
||||
|
||||
# Bayesian Optimization
|
||||
self.bayesian_optimization = None
|
||||
self.nr_bo_steps = 100
|
||||
self.nr_dimensions = 1
|
||||
self.nr_policy_parameters = 10
|
||||
|
||||
self.nr_init = 3
|
||||
self.seed = None
|
||||
self.lower_bound = None
|
||||
self.upper_bound = None
|
||||
|
||||
# State Machine
|
||||
states = ['idle',
|
||||
'initialize_bo',
|
||||
'user_query',
|
||||
'processing',
|
||||
'waiting_for_user_response',
|
||||
'waiting_for_task_response']
|
||||
transitions = [
|
||||
{'trigger': 'order_received', 'source': 'idle', 'dest': 'initialize_bo', 'after': 'after_receiving_order'},
|
||||
{'trigger': 'initialization_finished', 'source': 'initialize_bo', 'dest': 'user_query'},
|
||||
{'trigger': 'non_interactive', 'source': 'user_query', 'dest': 'sampling_x_next'},
|
||||
{'trigger': 'sampling_finished', 'source': 'sampling_x_next', 'dest': 'processing'},
|
||||
{'trigger': 'interactive', 'source': 'user_query', 'dest': 'waiting_for_user_response'},
|
||||
{'trigger': 'user_response_received', 'source': 'waiting_for_user_response', 'dest': 'processing'},
|
||||
{'trigger': 'processing_finished', 'source': 'processing', 'dest': 'waiting_for_task_response'},
|
||||
{'trigger': 'task_response_received', 'source': 'waiting_for_task_response', 'dest': 'user_query'},
|
||||
{'trigger': 'order_completed', 'source': 'waiting_for_task_response', 'dest': 'idle'},
|
||||
{'trigger': 'abort', 'source': '*', 'dest': 'idle'}
|
||||
]
|
||||
|
||||
"""
|
||||
Algo:
|
||||
A) idle -> order_received() -> initializing_bo:
|
||||
order_sub - receives order
|
||||
reset_bo with the order parameters
|
||||
trigger: order_received()
|
||||
|
||||
for i=0:nr_episodes
|
||||
B) initializing_bo -> initialization_finished() -> user_query:
|
||||
for j = 0:nr_init
|
||||
x_next <- initial sampling
|
||||
send x_next to task node
|
||||
reward received from task node
|
||||
fit model
|
||||
trigger: initialization_finished()
|
||||
|
||||
C1.1) user_query -> non_interactive() -> sampling_x_next
|
||||
service call for user_query
|
||||
if user_query is False:
|
||||
trigger: non_interactive()
|
||||
C1.2) sampling_x_next -> sampling_finished() -> processing
|
||||
sampling x_next with acqusition function
|
||||
trigger: sampling_finished()
|
||||
|
||||
C2.1) user_query -> interactive() -> waiting_for_user
|
||||
service call for user_query
|
||||
if user_query is True:
|
||||
trigger: interactive()
|
||||
C2.2) waiting_for_user_response -> user_response_received() -> processing
|
||||
send x_best to task node
|
||||
send x_best to UI
|
||||
user adapts x_best
|
||||
(UI sends x_next to task node)
|
||||
receives x_next from UI
|
||||
trigger: user_response_received()
|
||||
|
||||
D) processing -> processing_finished() -> waiting_for_task_response
|
||||
sends x_next to task node
|
||||
trigger: processing_finished()
|
||||
|
||||
E) waiting_for_task_response -> task_response_received() -> user_query
|
||||
if episode < nr_episodes:
|
||||
get results from reward_sub
|
||||
model fit with additional observation
|
||||
trigger: task_response_received()
|
||||
|
||||
F) waiting_for_task_response -> order_completed() -> idle
|
||||
if episode = nr_episodes:
|
||||
completion_pub with the completed results
|
||||
trigger: order_completed()
|
||||
|
||||
|
||||
Additional Transitions:
|
||||
Abort: * -> abort() -> idle
|
||||
"""
|
||||
|
||||
self.machine = AsyncMachine(model=self, states=states,
|
||||
transitions=transitions, initial='idle',
|
||||
ignore_invalid_triggers=True)
|
||||
|
||||
def reset_bo(self, fixed_dimensions=None, **kwargs):
|
||||
self.bayesian_optimization = BayesianOptimization(self.nr_bo_steps,
|
||||
self.nr_dimensions,
|
||||
@ -52,5 +150,99 @@ class BayesianOptimizationNode(Node):
|
||||
kernel_type=self.kernel_type, **kwargs)
|
||||
|
||||
|
||||
# region Callback functions
|
||||
def reward_callback(self, msg):
|
||||
pass
|
||||
|
||||
def ui_callback(self, msg):
|
||||
pass
|
||||
|
||||
def task_order_callback(self, msg):
|
||||
pass
|
||||
|
||||
async def call_service_async(self, client, request):
|
||||
"""
|
||||
Asynchronously call a ROS 2 service and wait for its response.
|
||||
"""
|
||||
# Use an asyncio Future to wait for the response
|
||||
future = asyncio.Future()
|
||||
|
||||
def callback(response_future):
|
||||
# Set the result of the asyncio Future when the ROS 2 service responds
|
||||
rclpy.spin_once(self, timeout_sec=0) # Make sure the callback is processed
|
||||
future.set_result(response_future.result())
|
||||
|
||||
client.call_async(request).add_done_callback(callback)
|
||||
|
||||
# Wait for the response
|
||||
response = await future
|
||||
return response
|
||||
# endregion
|
||||
|
||||
# region Trigger Placeholers
|
||||
async def order_received(self):
|
||||
# Placeholder for trigger
|
||||
pass
|
||||
|
||||
async def initialization_finished(self):
|
||||
# Placeholder for trigger
|
||||
pass
|
||||
|
||||
async def non_interactive(self):
|
||||
# Placeholder for trigger
|
||||
pass
|
||||
|
||||
async def interactive(self):
|
||||
# Placeholder for trigger
|
||||
pass
|
||||
|
||||
async def user_response_received(self):
|
||||
# Placeholder for trigger
|
||||
pass
|
||||
|
||||
async def processing_finished(self):
|
||||
# Placeholder for trigger
|
||||
pass
|
||||
|
||||
async def task_response_received(self):
|
||||
# Placeholder for trigger
|
||||
pass
|
||||
|
||||
async def order_completed(self):
|
||||
# Placeholder for trigger
|
||||
pass
|
||||
|
||||
async def abort(self):
|
||||
# Placeholder for trigger
|
||||
pass
|
||||
|
||||
# endregion
|
||||
|
||||
# State Methods
|
||||
async def after_receiving_order(self):
|
||||
self.get_logger().info('Initializing bo...')
|
||||
|
||||
async def user_query_state(self):
|
||||
self.get_logger().info('Deciding wheter to ask the user...')
|
||||
srv_msg = Query()
|
||||
response = await self.call_service_async(self.query_client, srv_msg)
|
||||
if response.interaction:
|
||||
await self.interactive()
|
||||
|
||||
else:
|
||||
await self.non_interactive()
|
||||
|
||||
async def ask_user_state(self):
|
||||
self.get_logger().info('Waiting for user response...')
|
||||
|
||||
async def processing_next_episode(self):
|
||||
pass
|
||||
|
||||
async def waiting_for_task_response(self):
|
||||
pass
|
||||
|
||||
async def completing_order(self):
|
||||
pass
|
||||
|
||||
async def abort(self):
|
||||
pass
|
Loading…
Reference in New Issue
Block a user