Compare commits

..

24 Commits

Author SHA1 Message Date
ffca9d2bc0 updating the task node and added TaskEvaluation.action 2024-03-26 15:38:54 +01:00
d8bada4917 updating gitignore 2024-03-18 18:38:38 +01:00
9ad94fd1ce fixed TaskEvaluation.action 2024-03-18 18:33:59 +01:00
bf49376d3c Adding Multithreading (as Callback Group) to have the task_execute_callback next to the State MAchine 2024-03-18 18:25:58 +01:00
f875ee34c1 Changed TaskEvaluation.srv to service
added Action server to task node
2024-03-18 18:00:12 +01:00
2422693b42 Changed TaskEvaluation.srv to service
Adapted cmaes_optimization_node.py for the action
2024-03-18 16:38:21 +01:00
bffe826a74 task node basic structure 2024-03-18 15:26:49 +01:00
e0874c3d9d cmaes node TODO:
error state handling
finished state handling
parameters forwared to the query node
2024-03-13 15:04:15 +01:00
aa04e0b833 continue with optimizer node 2024-03-12 14:48:40 +01:00
7c3ad608f7 added utils 2024-02-29 15:31:43 +01:00
a23e5be64c cma-es node in dev 2024-02-29 14:37:36 +01:00
acf51f01c6 dual quaternion policy model added 2024-02-26 12:08:47 +01:00
25c54417eb dual quaternion policy model added 2024-02-23 17:36:44 +01:00
ddb98b62b3 dual quaternion policy model added 2024-02-23 15:24:16 +01:00
3b7ec52502 Init 2024-02-23 10:35:10 +01:00
7056a6da07 Merge remote-tracking branch 'master/master' 2024-02-23 10:00:52 +01:00
7fe46bfa25 dual quaternion policy model added 2024-02-23 10:00:43 +01:00
5e42128f06 dual quaternion policy model added 2024-02-21 14:02:21 +01:00
b83c490464 setup for implementation of representation models 2024-02-20 18:01:09 +01:00
7bcba1132a Bo node completely implemented but not tested 2024-02-20 15:50:44 +01:00
d8dd0c4f4b initializing bo finished 2024-02-20 14:41:38 +01:00
5c6750886c tested async functions 2024-02-16 16:01:22 +01:00
220d590c05 started with bo node 2024-02-15 18:24:31 +01:00
4b7dd8621a interaction_query finished 2024-02-15 17:32:20 +01:00
102 changed files with 1669 additions and 8 deletions

4
.gitignore vendored
View File

@ -1,4 +1,6 @@
.idea
/build/
/install/
/log/
/log/
/src/build/
/src/install/

View File

@ -3,12 +3,13 @@ FROM osrf/ros:humble-desktop-full-jammy
# Update and install dependencies
RUN apt-get update && apt-get install -y \
python3-colcon-common-extensions python3-pip \
&& rm -rf /var/lib/apt/lists/*
python3-colcon-common-extensions python3-pip
COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt
RUN rm -rf /var/lib/apt/lists/*
# Create a workspace
WORKDIR /ros2_ws
@ -21,6 +22,7 @@ RUN . /opt/ros/humble/setup.sh && \
RUN echo "source /opt/ros/humble/setup.bash" >> ~/.bashrc
ENV PATH="/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/opt/ros/humble/bin"
# Source the workspace
CMD ["/bin/bash"]

View File

@ -27,6 +27,25 @@ docker build -t interactive-robot-learning-framework .
docker-compose up -d
```
## Setting up Pycharm Environment
Add a new Remote Interpreter based on the docker-compose file.
Then add the following paths to the interpreter paths. You can find them when clicking on the Interpreter in the lower right corner of the Pycharm window.
Then select "Interpreter Settings" within the setting go to "Show All..." Python interpreters (its in the list of the Python Interpreters).
Subsequently, you can add with the "+" a new interpreter, choose "Docker Compose" and select the "docker-compose.yaml" file from this repository.
The last step is to add the ros2 paths within your docker container to the interpreter paths (its one of the buttons next to the "+".
The following paths are necessary to develop in the docker container otherwise python cannot find rclpy and the custom msg packages:
```bash
/opt/ros/humble/local/lib/python3.10/dist-packages
/opt/ros/humble/lib/python3.10/site-packages
/ros2_ws/install/interaction_msgs/local/lib/python3.10/dist-packages
```
If there is a
## Framework Structure
The Interactive Robot Learning Framework consists of several key ROS2 packages, each responsible for different aspects of robot learning and interaction. Below is an overview of each package:

View File

@ -5,4 +5,8 @@ pydot~=1.4.2
empy~=3.3.4
lark~=1.1.1
scipy~=1.12.0
scikit-learn~=1.4.0
scikit-learn~=1.4.0
transitions==0.9.0
movement-primitives[all]~=0.7.0
cma~=3.3.0
PyYAML~=5.4.1

View File

@ -0,0 +1,18 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>interaction_benchmark</name>
<version>0.0.0</version>
<description>TODO: Package description</description>
<maintainer email="nikolaus.feith@unileoben.ac.at">niko</maintainer>
<license>TODO: License declaration</license>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/interaction_benchmark
[install]
install_scripts=$base/lib/interaction_benchmark

View File

@ -0,0 +1,25 @@
from setuptools import find_packages, setup
package_name = 'interaction_benchmark'
setup(
name=package_name,
version='0.0.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='niko',
maintainer_email='nikolaus.feith@unileoben.ac.at',
description='TODO: Package description',
license='TODO: License declaration',
tests_require=['pytest'],
entry_points={
'console_scripts': [
],
},
)

View File

@ -0,0 +1,25 @@
# Copyright 2015 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_copyright.main import main
import pytest
# Remove the `skip` decorator once the source file(s) have a copyright header
@pytest.mark.skip(reason='No copyright header has been placed in the generated source file.')
@pytest.mark.copyright
@pytest.mark.linter
def test_copyright():
rc = main(argv=['.', 'test'])
assert rc == 0, 'Found errors'

View File

@ -0,0 +1,25 @@
# Copyright 2017 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_flake8.main import main_with_errors
import pytest
@pytest.mark.flake8
@pytest.mark.linter
def test_flake8():
rc, errors = main_with_errors(argv=[])
assert rc == 0, \
'Found %d code style errors / warnings:\n' % len(errors) + \
'\n'.join(errors)

View File

@ -0,0 +1,23 @@
# Copyright 2015 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_pep257.main import main
import pytest
@pytest.mark.linter
@pytest.mark.pep257
def test_pep257():
rc = main(argv=['.', 'test'])
assert rc == 0, 'Found code style errors / warnings'

View File

@ -9,7 +9,18 @@ find_package(ament_cmake REQUIRED)
find_package(rosidl_default_generators REQUIRED)
rosidl_generate_interfaces(${PROJECT_NAME}
"action/TaskEvaluation.action"
"srv/Query.srv"
"srv/Task.srv"
"srv/UserInterface.srv"
"srv/ParameterChange.srv"
# "srv/TaskEvaluation.srv"
"msg/OptimizerState.msg"
"msg/Opt2UI.msg"
"msg/UI2Opt.msg"
"msg/TaskOrder.msg"
# "msg/Opt2Task.msg"
# "msg/Task2Opt.msg"
DEPENDENCIES
)

View File

@ -0,0 +1,26 @@
# Goal
bool user_input
uint16 number_of_population
float32 duration
uint16 number_of_time_steps
# case if user_input is true
float32[] user_parameters # Length: number_of_dimensions * number_of_parameters_per_dimension
float32[] user_covariance_diag # Length: number_of_dimensions * number_of_parameters_per_dimension
float32[] current_optimizer_mean # Length: number_of_dimensions * number_of_parameters_per_dimension
float32[] conditional_points # Length: (number_of_dimensions + time_stamp[0,1]) * number_of_conditional_points
float32[] weight_parameter # this parameter sets the weighted average 0 dont trust user 1 completly trust user (it is set by the user or it is decays over time i have to do some experiments on that)
# case if user_input is false
uint16 number_of_dimensions # this is the number of ProMPs * 2 (Position and Velocity)
uint16 number_of_parameters_per_dimensions
float32[] parameter_array # Length: number_of_population * number_of_dimensions * number_of_parameters_per_dimension
---
# Feedback
string current_state
uint16 processed_trajectories
---
# Result
float32[] new_means # Length: number_of_population * number_of_dimensions * number_of_parameters_per_dimension, this is needed because in case of user input the parameters arent known yet
float32[] score # Length: number_of_population

View File

@ -0,0 +1,8 @@
# Return reward - True: for updating the model, False: for displaying the x_best to the user
bool reward_return
# Number of dimensions of the representation model
uint16 nr_dim
# Number of parameters per dimensions
uint16 nr_parameter
# Next parameters
float32[] x_next

View File

@ -0,0 +1,5 @@
# Best parameter set so far
float32[] x_best
# parameters which were fixed by the user
bool[] fixed_parameters

View File

@ -0,0 +1,9 @@
# Current State
string current_state
# Current BO Episode
int32 current_episode
# Best result so far
float32 reward_best
float32[] x_best

View File

View File

@ -0,0 +1,4 @@
# observed parameters
float32[] x_observed
# observed reward
float32 reward

View File

@ -0,0 +1 @@
uint16 bla

View File

@ -0,0 +1,7 @@
# parameters which were fixed by the user
bool[] fixed_parameters
# parameters set by the user
float32[] set_parameter_values
# proposed new poses and their time steps
float32[] new_poses
float32[] new_poses_time_stamps

View File

@ -0,0 +1,6 @@
string[] parameter_name
string[] parameter_type # One of 'float', 'string', 'bool', potentially others
string[] parameter_value
---
bool success
string message # For error reporting

View File

@ -0,0 +1,18 @@
# Number of dimensions of the representation model
uint16 nr_dim
# Number of parameters per dimensions
uint16 nr_parameter
# Next parameters
float32[] x_next
# time array
float32[] time_steps_array
# did the user updated trajectory in the ui
bool user_update
# if the user updated update Pose + its timestamp (multiple Poses possible
float32[] update_pose
float32[] update_timestamps
---
# observed parameters
float32[] x_observed
# observed reward
float32 reward

View File

@ -0,0 +1,20 @@
bool user_input
uint16 number_of_population
float32 duration
uint16 number_of_time_steps
# case if user_input is true
float32[] user_parameters # Length: number_of_dimensions * number_of_parameters_per_dimension
float32[] user_covariance_diag # Length: number_of_dimensions * number_of_parameters_per_dimension
float32[] current_cma_mean # Length: number_of_dimensions * number_of_parameters_per_dimension
float32[] conditional_points # Length: (number_of_dimensions + time_stamp[0,1]) * number_of_conditional_points
float32[] weight_parameter # this parameter sets the weighted average 0 dont trust user 1 completly trust user (it is set by the user or it is decays over time i have to do some experiments on that)
# case if user_input is false
uint16 number_of_dimensions # this is the number of ProMPs * 2 (Position and Velocity)
uint16 number_of_parameters_per_dimensions
float32[] parameter_array # Length: number_of_population * number_of_dimensions * number_of_parameters_per_dimension
---
# response
float32[] parameter_array # this is needed because in case of user input the parameters arent known yet
float32[] score

View File

@ -0,0 +1,8 @@
float32[] current_cma_mean # Length: number_of_dimensions * number_of_parameters_per_dimension
float32[] best_parameters # Length: number_of_dimensions * number_of_parameters_per_dimension
float32[] current_user_covariance_diag # Length: number_of_dimensions * number_of_parameters_per_dimension
---
float32[] user_parameters # Length: number_of_dimensions * number_of_parameters_per_dimension
float32[] user_covariance_diag # Length: number_of_dimensions * number_of_parameters_per_dimension
float32[] conditional_points # Length: (number_of_dimensions + time_stamp[0,1]) * number_of_conditional_points
float32 weight_parameter # this parameter sets the weighted average 0 dont trust user 1 completly trust user (it is set by the user or it is decays over time i have to do some experiments on that)

View File

@ -0,0 +1,78 @@
import rclpy
from rclpy.node import Node
from transitions.extensions.asyncio import AsyncMachine
from std_msgs.msg import String
import asyncio
import threading
import time
class StatefulNode(Node):
def __init__(self, event_loop):
super().__init__('stateful_node')
self.event_loop = event_loop
self.subscription = self.create_subscription(String, 'my_topic', self.listener_callback, 10)
states = [
{'name': 'state1', 'on_enter': 'on_enter_state1'},
{'name': 'state2', 'on_enter': 'on_enter_state2'},
{'name': 'state3', 'on_enter': 'on_enter_state3'}
]
transitions = [
{'trigger': 'go_to_state1', 'source': '*', 'dest': 'state1'},
{'trigger': 'go_to_state2', 'source': '*', 'dest': 'state2'},
{'trigger': 'go_to_state3', 'source': '*', 'dest': 'state3'},
]
self.machine = AsyncMachine(model=self, states=states, transitions=transitions, initial='state1')
async def on_enter_state1(self):
self.get_logger().info(f"Entering State 1 - {time.time()}")
async def on_enter_state2(self):
self.get_logger().info(f"Entering State 2 - {time.time()}")
async def on_enter_state3(self):
self.get_logger().info(f"Entering State 3 - {time.time()}")
def listener_callback(self, msg):
try:
self.get_logger().info(f'Received message: "{msg.data}"')
if msg.data == 'trigger1':
self.get_logger().info("Attempting to go to state2")
asyncio.run_coroutine_threadsafe(self.go_to_state1(), self.event_loop)
elif msg.data == 'trigger2':
self.get_logger().info("Attempting to go to state1")
asyncio.run_coroutine_threadsafe(self.go_to_state2(), self.event_loop)
elif msg.data == 'trigger3':
self.get_logger().info("Attempting to go to state1")
asyncio.run_coroutine_threadsafe(self.go_to_state3(), self.event_loop)
except Exception as e:
self.get_logger().error(f"Error in listener_callback: {str(e)}")
def run_asyncio_loop():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_forever()
return loop # Ensure you return the loop reference
def main():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
threading.Thread(target=loop.run_forever, daemon=True).start()
# Initialize ROS and pass the event loop to the node
rclpy.init()
node = StatefulNode(loop)
# Now, the ROS 2 node has the correct event loop reference for asyncio operations
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
loop.call_soon_threadsafe(loop.stop)
if __name__ == '__main__':
main()

View File

@ -20,6 +20,7 @@ setup(
tests_require=['pytest'],
entry_points={
'console_scripts': [
'async_node = interaction_objective_function.async_testing:main'
],
},
)

View File

@ -0,0 +1,4 @@
initial_mean_centre: 0.0
initial_mean_std_dev: 0.2
initial_variance: 0.3
random_seed: ''

View File

@ -1 +1,326 @@
import rclpy
from rclpy.node import Node
from rclpy.callback_groups import ReentrantCallbackGroup
from interaction_msgs.srv import Query
from interaction_msgs.srv import Task
from interaction_msgs.msg import UI2Opt
from interaction_msgs.msg import TaskOrder
from interaction_msgs.msg import Opt2UI
from interaction_msgs.msg import OptimizerState
from .optimizers.bayesian_optimization import BayesianOptimization
from transitions.extensions.asyncio import AsyncMachine
import asyncio
import threading
import time
import os
import numpy as np
class BayesianOptimizationNode(Node):
def __init__(self, event_loop):
super().__init__('bayesian_optimization_node')
self.event_loop = event_loop
# region Parameters
"""
self.acquisition_function_name = (self.declare_parameter('acquisition_function_name', 'EI')
.get_parameter_value().string_value)
self.nr_bo_steps = self.declare_parameter('bo_steps', 100).get_parameter_value().integer_value
self.nr_dimensions = self.declare_parameter('nr_dimensions', 1).get_parameter_value().integer_value
self.nr_policy_parameters = (self.declare_parameter('nr_policy_parameters', 100)
.get_parameter_value().integer_value)
"""
self.kernel_type = self.declare_parameter('kernel_type', 'Matern').get_parameter_value().string_value
self.nr_init = self.declare_parameter('nr_init', 3).get_parameter_value().integer_value
self.task_request_timeout = self.declare_parameter('task_request_timeout', 10).get_parameter_value().integer_value
self.task_request_max_tries = self.declare_parameter('task_request_max_tries', 3).get_parameter_value().integer_value
# endregion
# State Machine
states = [
{'name': 'idle'},
{'name': 'initialize_bo', 'on_enter': 'initialize_bo_fun'},
{'name': 'user_query', 'on_enter': 'user_query_fun'},
{'name': 'non_interactive_mode', 'on_enter': 'non_interactive_mode_fun'},
{'name': 'waiting_for_user_response', 'on_enter': 'waiting_for_user_response_fun'},
{'name': 'processing_user_proposal_fun', 'on_enter': 'processing_fun'},
]
transitions = [
{'trigger': 'order_received', 'source': 'idle', 'dest': 'initialize_bo'},
{'trigger': 'initialization_finished', 'source': 'initialize_bo', 'dest': 'user_query'},
{'trigger': 'non_interactive', 'source': 'user_query', 'dest': 'non_interactive_mode'},
{'trigger': 'non_interactive_finished', 'source': 'non_interactive_mode', 'dest': 'user_query'},
{'trigger': 'interactive', 'source': 'user_query', 'dest': 'waiting_for_user_response'},
{'trigger': 'user_response_received', 'source': 'waiting_for_user_response', 'dest': 'processing_user_proposal_fun'},
{'trigger': 'processing_finished', 'source': 'processing_user_proposal_fun', 'dest': 'user_query'},
{'trigger': 'order_completed', 'source': 'waiting_for_task_response', 'dest': 'idle'},
{'trigger': 'abort', 'source': '*', 'dest': 'idle'}
]
"""
Algo:
A) idle -> order_received() -> initializing_bo:
order_sub - receives order
reset_bo with the order parameters
trigger: order_received()
for i=0:nr_episodes
B) initializing_bo -> initialization_finished() -> user_query:
for j = 0:nr_init
x_next <- initial sampling
send x_next to task node
reward received from task node
fit model
trigger: initialization_finished()
C1.1) user_query -> non_interactive() -> sampling_x_next
service call for user_query
if user_query is False:
trigger: non_interactive()
C1.2) sampling_x_next -> sampling_finished() -> processing
sampling x_next with acqusition function
trigger: sampling_finished()
C2.1) user_query -> interactive() -> waiting_for_user
service call for user_query
if user_query is True:
trigger: interactive()
C2.2) waiting_for_user_response -> user_response_received() -> processing
send x_best to task node
send x_best to UI
user adapts x_best
(UI sends x_next to task node)
receives x_next from UI
trigger: user_response_received()
D) processing -> processing_finished() -> waiting_for_task_response
sends x_next to task node
trigger: processing_finished()
E) waiting_for_task_response -> task_response_received() -> user_query
if episode < nr_episodes:
get results from reward_sub
model fit with additional observation
trigger: task_response_received()
F) waiting_for_task_response -> order_completed() -> idle
if episode = nr_episodes:
completion_pub with the completed results
trigger: order_completed()
Additional Transitions:
Abort: * -> abort() -> idle
"""
self.machine = AsyncMachine(model=self, states=states,
transitions=transitions, initial='idle',
ignore_invalid_triggers=True)
# Subscribers
self.ui_sub = self.create_subscription(UI2Opt, 'interaction/ui_response', self.ui_callback, 10)
self.order_sub = self.create_subscription(TaskOrder, 'interaction/order', self.task_order_callback, 10)
# Publishers
self.ui_pub = self.create_publisher(Opt2UI, 'interaction/ui_request', 10)
self.state_pub = self.create_publisher(OptimizerState, 'interaction/optimizer/state', 10)
# Service Clients
self.query_client = self.create_client(Query, 'interaction/user_query_srv')
self.task_client = self.create_client(Task, 'interaction/task_srv')
# Timer Objects
self.state_timer = self.create_timer(0.1, self.state_callback)
# Bayesian Optimization
self.bayesian_optimization = None
self.current_episodes = 0
self.nr_bo_steps = 0
self.nr_dimensions = 0
self.nr_policy_parameters = 0
self.acquisition_function_name = ''
self.fixed_dimensions = None
self.seed = None
self.lower_bound = None
self.upper_bound = None
self.x_temp = None # temp x_next
self.get_logger().info(f"Bayesian optimization Node started up!")
def reset_bo(self, fixed_dimensions=None, **kwargs):
self.bayesian_optimization = BayesianOptimization(self.nr_bo_steps,
self.nr_dimensions,
self.nr_policy_parameters,
seed=self.seed,
fixed_dimensions=fixed_dimensions,
lower_bound=self.lower_bound,
upper_bound=self.upper_bound,
acquisition_function_name=self.acquisition_function_name,
kernel_type=self.kernel_type, **kwargs)
# region Callback & Async Send functions
def task_order_callback(self, msg):
self.current_episodes = 0
self.nr_bo_steps = msg.nr_bo_steps
self.nr_dimensions = msg.nr_dimensions
self.nr_policy_parameters = msg.nr_policy_parameters
self.seed = msg.seed
self.fixed_dimensions = msg.fixed_dimensions
self.acquisition_function_name = msg.acquisition_function_name
asyncio.run_coroutine_threadsafe(self.order_received(), self.event_loop)
def ui_callback(self, msg):
self.fixed_dimensions = msg.fixed_dimensions
self.x_temp = msg.set_parameter_values
asyncio.run_coroutine_threadsafe(self.user_response_received)
async def send_task_request_with_retry(self, request):
for attempt in range(self.task_request_max_tries):
self.get_logger().info(f"Attempt {attempt + 1}, send task request with retry!")
future = self.task_client.call_async(request)
try:
response = await asyncio.wait_for(future, self.task_request_timeout)
return response
except asyncio.TimeoutError:
self.get_logger().warning(f"Task request timed out, resending...")
self.get_logger().error(f"Max retries reached for task request: {self.task_request_max_tries}")
return None
def state_callback(self):
msg = OptimizerState()
msg.current_state = self.state
if self.state != 'idle':
msg.current_episode = self.bayesian_optimization.episode
msg.reward_best, msg.x_best, _ = self.bayesian_optimization.get_best_reward()
self.state_pub.publish(msg)
# endregion
# State Methods
async def initializing_bo_fun(self):
self.get_logger().info('Initializing bo...')
self.reset_bo(fixed_dimensions=self.fixed_dimensions)
request = Task.Request()
request.nr_dim = self.nr_dimensions
request.nr_parameter = self.nr_policy_parameters
for i in range(self.nr_init):
request.x_next = self.bayesian_optimization.sample_random_next_observation()
response = await self.send_task_request_with_retry(request)
if i < self.nr_init:
self.bayesian_optimization.add_observation(response.reward, response.x_observed, fit=False)
else:
self.bayesian_optimization.add_observation(response.reward, response.x_observed)
await self.initialization_finished()
async def user_query_fun(self):
self.get_logger().info('Deciding wheter to ask the user...')
srv_msg = Query()
future = self.query_client.call_async(srv_msg)
try:
response = await asyncio.wait_for(future, timeout=2)
if response.interaction:
await self.interactive()
else:
await self.non_interactive()
except asyncio.TimeoutError:
self.get_logger().info("Timeout for Query Service...")
async def non_interactive_mode_fun(self):
request = Task.Request()
request.nr_dim = self.nr_dimensions
request.nr_parameter = self.nr_policy_parameters
request.x_next = self.bayesian_optimization.next_observation()
response = await self.send_task_request_with_retry(request)
self.bayesian_optimization.add_observation(response.reward, response.x_observed)
if self.bayesian_optimization.episode == self.nr_bo_steps:
await self.order_completed()
await self.non_interactive_finished()
async def waiting_for_user_response_fun(self):
self.get_logger().info('Waiting for user response...')
# sending the best result so far to display it to the user
_, x_max, _ = self.bayesian_optimization.get_best_result()
ui_msg = Opt2UI()
ui_msg.x_best = x_max
ui_msg.fixed_parameters = self.fixed_dimensions
self.ui_pub.publish(ui_msg)
# send it to the task node to display the movement
request = Task.Request()
request.nr_dim = self.nr_dimensions
request.nr_parameter = self.nr_policy_parameters
request.x_next = self.bayesian_optimization.next_observation()
_ = await self.send_task_request_with_retry(request)
self.get_logger().info('User best solution displayed completed.')
async def processing_user_proposal_fun(self):
request = Task.Request()
request.nr_dim = self.nr_dimensions
request.nr_parameter = self.nr_policy_parameters
request.x_next = self.x_temp
self.x_temp = None
response = await self.send_task_request_with_retry(request)
self.bayesian_optimization.add_observation(response.reward, response.x_observed)
if self.bayesian_optimization.episode == self.nr_bo_steps:
await self.order_completed()
await self.non_interactive_finished()
def run_asyncio_loop():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_forever()
return loop # Ensure you return the loop reference
def main():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
threading.Thread(target=loop.run_forever, daemon=True).start()
# Initialize ROS and pass the event loop to the node
rclpy.init()
node = BayesianOptimizationNode(loop)
# Now, the ROS 2 node has the correct event loop reference for asyncio operations
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
loop.call_soon_threadsafe(loop.stop)
if __name__ == '__main__':
main()

View File

@ -0,0 +1,348 @@
import os
import rclpy
from rclpy.node import Node
from rclpy.parameter import Parameter
from rclpy.action import ActionClient
from transitions import Machine
import cma
import yaml
import numpy as np
from src.interaction_utils.serialization import flatten_population, unflatten_population
# Msg/Srv/Action
from interaction_msgs.srv import Query
from interaction_msgs.action import TaskEvaluation
from interaction_msgs.srv import ParameterChange
from interaction_msgs.srv import UserInterface
from std_msgs.msg import Bool
class CMAESOptimizationNode(Node):
def __init__(self):
super().__init__('cmaes_optimization_node')
# CMA-ES Attributes
self.cmaes = None
self.number_of_initial_episodes = self.declare_parameter('number_of_initial_episodes', 5)
self.initial_user_covariance = self.declare_parameter('initial_user_covariance', 1.0)
self.episode = 0
self.number_of_dimensions = 3
self.number_of_parameters_per_dimensions = 10
# the number of weights is double the number of dims * params per dim since its Position and Velocity
self.number_of_weights = 2 * self.number_of_dimensions * self.number_of_parameters_per_dimensions
self.user_covariance = np.ones((self.number_of_weights, 1)) * self.initial_user_covariance.value
self.random_seed = None
# Query Attributes
self.query_metric = 'random'
self.query_parameters = {}
self.best_parameters_per_interation = []
self.best_reward_per_iteration = []
# ROS2 Interfaces
self.__future = None
self.__send_future = None
self.__response_future = None
self.__task_response = None
self.__user_response = None
# Heartbeat Topics - to make sure that there is no deadlock
self.heartbeat_timeout = 30 # secs
self.heartbeat_timer = self.create_timer(1.0, self.check_heartbeats)
# Heartbeat
self.last_mr_heartbeat_time = None
self.mr_heartbeat_sub = self.create_subscription(Bool, 'interaction/mr_heartbeat', self.mr_heatbeat_callback)
# Topic
# Service
self.parameter_srv = self.create_service(ParameterChange, 'interaction/cmaes_parameter_srv', self.parameter_callback)
self.query_srv = self.create_client(Query, 'interaction/query_srv')
self.user_interface_srv = self.create_client(UserInterface, 'interaction/user_interface_srv')
# Action
self._task_action = ActionClient(self, TaskEvaluation, 'interaction/task_action')
# State Machine
# States
self.states = [
'idle',
'initialization',
'query_decision',
'non_interactive_mode',
'wait_for_evaluation',
'interactive_mode',
'wait_for_user_response',
'prepare_user_data_for_evaluation',
'wait_for_user_evaluation',
'update_optimizer',
'check_termination',
'complete',
'error_recovery',
]
# Initialize state machine
self.machine = Machine(self, states=self.states, initial='idle')
# region Transitions
self.machine.add_transition(trigger='order_received', source='idle', dest='initialization')
self.machine.add_transition(trigger='initialization_complete', source='initialization', dest='query_decision')
self.machine.add_transition(trigger='no_interaction', source='query_decision', dest='non_interactive_mode')
self.machine.add_transition(trigger='data_send_for_evaluation', source='non_interactive_mode', dest='wait_for_evaluation')
self.machine.add_transition(trigger='evaluation_response_received', source='wait_for_evaluation', dest='update_optimizer')
self.machine.add_transition(trigger='interaction', source='query_decision', dest='interactive_mode')
self.machine.add_transition(trigger='data_send_to_user', source='interactive_mode', dest='wait_for_user_response')
self.machine.add_transition(trigger='user_response_received', source='wait_for_user_response', dest='prepare_user_data_for_evaluation')
self.machine.add_transition(trigger='send_user_data_to_evaluation', source='prepare_user_data_for_evaluation', dest='wait_for_user_evaluation')
self.machine.add_transition(trigger='received_user_data_for_evaluation', source='wait_for_user_evaluation', dest='update_optimizer')
self.machine.add_transition(trigger='optimizer_updated', source='update_optimizer', dest='check_termination')
self.machine.add_transition(trigger='next_optimizer_step', source='check_termination', dest='query_decision')
self.machine.add_transition(trigger='finished', source='check_termination', dest='complete')
self.machine.add_transition(trigger='error_trigger', source='*', dest='error_recovery')
self.machine.add_transition(trigger='recovery_complete', source='error_recovery', dest='idle')
# endregion
# region State Functions
def on_enter_initialization(self):
config_file_path = self.get_parameter('cmaes_config_file_path').get_parameter_value().string_value
# Load YAML
with open(config_file_path, 'r') as file:
config = yaml.safe_load(file)
if config['random_seed'] == '':
self.random_seed = None
else:
self.random_seed = int(config['random_seed'])
config['seed'] = self.random_seed
mean_centre = config['initial_mean_centre']
mean_std_dev = config['initial_mean_std_dev']
random_gen = np.random.default_rng(seed=self.random_seed)
initial_mean = random_gen.normal(mean_centre, mean_std_dev, size=(self.number_of_weights, 1))
initial_variance = config['initial_variance']
self.cmaes = cma.CMAEvolutionStrategy(initial_mean, initial_variance, inopts=config)
# Trigger transition
self.initialization_complete()
def on_enter_query_decision(self):
request = Query.Request()
if self.query_metric == 'random':
pass #TODO
elif self.query_metric == 'regular':
pass #TODO
elif self.query_metric == 'improvement':
pass #TODO
self.query_srv.wait_for_service(10)
self.__future = self.query_srv.call_async(request)
self.__future.add_done_callback(self.handle_query_response)
def on_enter_non_interactive_mode(self):
goal = TaskEvaluation.Goal()
goal.user_input = False
goal.number_of_population = self.cmaes.popsize
goal.number_of_dimensions = self.number_of_dimensions * 2
goal.number_of_parameters_per_dimensions = self.number_of_parameters_per_dimensions
population = self.cmaes.ask()
flat_population = flatten_population(population)
goal.parameter_array = flat_population
self._task_action.wait_for_server(timeout_sec=10.0)
self.__send_future = self._task_action.send_goal_async(goal)
self.__send_future.add_done_callback(self._task_response_callback)
self.data_send_for_evaluation()
def on_enter_interactive_mode(self):
# Reset Mixed Reality heartbeat to check if the other node crashed
self.last_mr_heartbeat_time = None
request = UserInterface.Request()
request.current_cma_mean = flatten_population(self.cmaes.mean)
request.best_parameters = flatten_population(self.best_parameters_per_interation[-1])
request.current_user_covariance_diag = self.user_covariance
self.__future = self.user_interface_srv.call_async(request)
self.__future.add_done_callback(self.handle_user_response)
self.data_send_to_user()
def on_enter_prepare_user_data_for_evaluation(self):
# Update the user_covariance
self.user_covariance = self.__user_response.user_covariance_diag
request = TaskEvaluation.Request()
request.user_input = True
request.number_of_population = self.cmaes.popsize
request.user_parameters = self.__user_response.user_parameters
request.user_covariance_diag = self.user_covariance
request.current_cma_mean = flatten_population(self.cmaes.mean)
request.conditional_points = self.__user_response.conditional_points
request.weight_parameter = self.__user_response.weight_parameter
self.__future = self.task_srv.call_async(request)
self.__future.add_done_callback(self.handle_task_response)
self.send_user_data_to_evaluation()
def on_enter_update_optimizer(self):
population = unflatten_population(self.__task_response.parameter_array,
self.cmaes.popsize,
self.number_of_dimensions,
self.number_of_parameters_per_dimensions)
scores = self.__task_response.scores
self.cmaes.tell(population, scores)
# save best results
best_idx = max(enumerate(scores), key=lambda item: item[1])[0]
self.best_reward_per_iteration.append(scores[best_idx].copy())
self.best_parameters_per_interation.append(population[best_idx, :, :].copy())
self.__task_response = None
self.episode += 1
self.optimizer_updated()
def on_enter_check_termination(self):
max_episodes = self.get_parameter('max_episode_count').get_parameter_value().integer_value
if self.episode >= max_episodes:
self.finished()
else:
self.next_optimizer_step()
def on_enter_complete(self):
pass
def on_enter_error_recovery(self):
pass
# endregion
# region Callback Functions
def parameter_callback(self, request, response):
param_names = request.parameter_name
param_types = request.parameter_type
param_values_str = request.parameter_value
if len(param_names) != len(param_types) or len(param_types) != len(param_values_str):
response.success = False
response.message = 'Lists must have the same size'
return response
try:
# Parameter update loop
all_params = []
for i in range(len(param_names)):
param_name = param_names[i]
param_type = param_types[i]
param_value_str = param_values_str[i]
# Input Validation (adjust as needed)
if param_type not in ['float32', 'string', 'bool']:
response.success = False
response.message = 'Unsupported parameter type'
return response
# Value Conversion (based on param_type)
if param_type == 'float32':
param_value = float(param_value_str)
elif param_type == 'bool':
param_value = (param_value_str.lower() == 'true')
else: # 'string'
param_value = param_value_str
# Compose the all parameter list
param = Parameter(param_name, Parameter.Type.from_parameter_value(param_value), param_value)
all_params.append(param)
# Attempt to set the parameter
set_params_result = self.set_parameters(all_params)
response.success = True # Assume success unless set_parameters fails
for result in set_params_result:
if not result.successful:
response.success = False
response.message = result.reason
break
return response
except Exception as e:
self.get_logger().error(f'Parameter update failed: {str(e)}')
response.success = False
response.message = 'Parameter update failed, please check logs'
return response
def check_heartbeats(self):
if self.last_mr_heartbeat_time:
current_time = self.get_clock().now().nanoseconds
if (current_time - self.last_mr_heartbeat_time) > (self.heartbeat_timeout * 1e9):
self.get_logger().error("MR Interface heartbeat timed out!")
self.error_trigger()
else:
pass
def mr_heartbeat_callback(self, _):
self.last_mr_heartbeat_time = self.get_clock().now().nanoseconds
def handle_query_response(self, future):
try:
response = future.result()
if self.episode < self.number_of_initial_episodes.value:
self.non_interaction()
if response.interaction:
self.interaction()
else:
self.non_interaction()
except Exception as e:
self.get_logger().error(f'Query service call failed: {e}')
self.error_trigger()
def _task_goal_callback(self, future):
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().error(f'Task Goal rejected: {goal_handle}')
self.error_trigger()
self.__response_future = goal_handle.get_result_asyn()
self.__response_future.add_done_callback(self._task_result_callback)
def _task_feedback_callback(self, msg):
self.get_logger().info(f'Received Feedback: state={msg.current_state}, processed={msg.processed_trajectories}')
def _task_result_callback(self, future):
self.__task_response = future.result()
self.evaluation_response_received()
def handle_user_response(self, future):
try:
self.__user_response = future.result()
self.user_response_received()
except Exception as e:
self.get_logger().error(f'Task service call failed: {e}')
self.error_trigger()
# endregion
def main(args=None):
rclpy.init(args=args)
node = CMAESOptimizationNode()
rclpy.spin(node)
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@ -117,8 +117,8 @@ class BayesianOptimization:
x_next = self.acquisition_function(self.gauss_process, self.x_observed, seed=self.seed)
return x_next
def add_observation(self, y_new, x_new):
if self.episode == 0:
def add_observation(self, y_new, x_new, fit=True):
if self.x_observed.shape[0] == 1:
self.x_observed[0, :] = x_new
self.y_observed[0] = y_new
self.best_reward[0] = np.max(self.y_observed)
@ -127,8 +127,9 @@ class BayesianOptimization:
self.y_observed = np.vstack((self.y_observed, y_new))
self.best_reward = np.vstack((self.best_reward, np.max(self.y_observed)))
self.gauss_process.fit(self.x_observed, self.y_observed)
self.episode += 1
if fit:
self.gauss_process.fit(self.x_observed, self.y_observed)
self.episode += 1
def get_best_result(self):
y_max = np.max(self.y_observed)

View File

@ -7,6 +7,9 @@
<maintainer email="nikolaus.feith@unileoben.ac.at">niko</maintainer>
<license>TODO: License declaration</license>
<exec_depend>interaction_msgs</exec_depend>
<exec_depend>rclpy</exec_depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>

View File

@ -1,4 +1,6 @@
from setuptools import find_packages, setup
import os
from glob import glob
package_name = 'interaction_optimizers'
@ -10,6 +12,7 @@ setup(
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
(os.path.join('share', package_name, 'config'), glob('config/*.yaml')),
],
install_requires=['setuptools'],
zip_safe=True,
@ -20,6 +23,7 @@ setup(
tests_require=['pytest'],
entry_points={
'console_scripts': [
'bo_node = interaction_optimizers.bayesian_optimization_node:main'
],
},
)

View File

@ -0,0 +1,43 @@
from dual_quaternions import DualQuaternion
import numpy as np
class DualQuaternionSplines:
def __init__(self, pose_vector, nr_interpolation_steps=10):
self.pose_vector = pose_vector
self.nr_interpolation_steps = nr_interpolation_steps
self.dual_quaternions = self.parse_input_vector()
def generate_trajectory(self):
"""Generate an interpolated trajectory from the list of dual quaternions."""
interpolated_trajectory = []
for i in range(len(self.dual_quaternions) - 1):
interpolated_trajectory.extend(self.interpolate_dual_quaternions(self.dual_quaternions[i],
self.dual_quaternions[i+1],
self.nr_interpolation_steps))
return interpolated_trajectory
def parse_input_vector(self):
"""Parse the input vector into dual quaternions."""
dual_quats = []
for i in range(0, len(self.pose_vector), 7):
pose = self.pose_vector[i:i+7]
dq = self.quaternion_to_dual_quaternion(pose)
dual_quats.append(dq)
return dual_quats
@staticmethod
def quaternion_to_dual_quaternion(pose):
"""Convert position and quaternion to a dual quaternion.
:param pose: [q_rw, q_rx, q_ry, q_rz, q_tx, q_ty, q_tz]"""
return DualQuaternion.from_quat_pose_array(pose)
@staticmethod
def interpolate_dual_quaternions(dq1, dq2, steps=10):
"""Interpolate between two dual quaternions."""
return [DualQuaternion.sclerp(dq1, dq2, t) for t in np.linspace(0, 1, steps)]

View File

@ -0,0 +1,93 @@
from movement_primitives.promp import ProMP
import rclpy
from rclpy.node import Node
from rclpy.action import ActionClient
from interaction_msgs.srv import Task
from interaction_msgs.action import TaskExecution
import numpy as np
import asyncio
import threading
class PrompNode(Node):
def __init__(self, event_loop):
super().__init__('policy_promp_node')
self.event_loop = event_loop
self.task_srv = self.create_service(Task, 'interaction/task_srv', self.task_srv_callback)
self.execution_action = ActionClient(self, TaskExecution, 'interaction/task_execution')
# Parameters
self.timeout_buffer = self.declare_parameter('timeout_buffer', 5.0).get_parameter_value().double_value
def task_srv_callback(self, request, response):
coroutine = self.handle_request_async(request, response)
future = asyncio.run_coroutine_threadsafe(coroutine, self.event_loop)
try:
response = future.result(timeout=request.time_array[-1] + self.timeout_buffer)
except asyncio.TimeoutError:
self.get_logger().error('Task timed out!')
return response
async def handle_request_async(self, request, response):
if len(request.x_next) != request.nr_dim * request.nr_parameter:
self.get_logger().error('x_next size does not math to nr_dim * nr_parameter')
return response
time_array = np.array(request.time_array, dtype=np.float32)
weights = np.array(request.x_next, dtype=np.float32).reshape(request.nr_dim, request.nr_parameter)
promp = ProMP(n_dims=request.nr_dim, n_weights_per_dim=request.nr_parameter)
trajectory = promp.trajectory_from_weights(time_array, weights)
goal = TaskExecution.Goal()
goal.trajectory = trajectory
await self.execution_action.wait_for_server()
send_goal_future = self.execution_action.send_goal_async(goal)
goal_handle = await send_goal_future
if not goal_handle.accepted:
self.get_logger().error('Goal rejected!')
return response
self.get_logger().info('Goal accepted!')
result_handle = await goal_handle.get_result_async()
response.x_observed = request.x_next
response.reward = result_handle.result.reward
return response
def run_asyncio_loop():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_forever()
return loop # Ensure you return the loop reference
def main():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
threading.Thread(target=loop.run_forever, daemon=True).start()
# Initialize ROS and pass the event loop to the node
rclpy.init()
node = PrompNode(loop)
# Now, the ROS 2 node has the correct event loop reference for asyncio operations
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
loop.call_soon_threadsafe(loop.stop)
if __name__ == '__main__':
main()

View File

@ -0,0 +1,23 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>interaction_policy_representation</name>
<version>0.0.0</version>
<description>TODO: Package description</description>
<maintainer email="nikolaus.feith@unileoben.ac.at">niko</maintainer>
<license>TODO: License declaration</license>
<depend>geometry_msgs</depend>
<depend>interaction_msgs</depend>
<depend>control_msgs</depend>
<depend>moveit_msgs</depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/interaction_policy_representation
[install]
install_scripts=$base/lib/interaction_policy_representation

View File

@ -0,0 +1,25 @@
from setuptools import find_packages, setup
package_name = 'interaction_policy_representation'
setup(
name=package_name,
version='0.0.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='niko',
maintainer_email='nikolaus.feith@unileoben.ac.at',
description='TODO: Package description',
license='TODO: License declaration',
tests_require=['pytest'],
entry_points={
'console_scripts': [
],
},
)

View File

@ -0,0 +1,25 @@
# Copyright 2015 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_copyright.main import main
import pytest
# Remove the `skip` decorator once the source file(s) have a copyright header
@pytest.mark.skip(reason='No copyright header has been placed in the generated source file.')
@pytest.mark.copyright
@pytest.mark.linter
def test_copyright():
rc = main(argv=['.', 'test'])
assert rc == 0, 'Found errors'

View File

@ -0,0 +1,25 @@
# Copyright 2017 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_flake8.main import main_with_errors
import pytest
@pytest.mark.flake8
@pytest.mark.linter
def test_flake8():
rc, errors = main_with_errors(argv=[])
assert rc == 0, \
'Found %d code style errors / warnings:\n' % len(errors) + \
'\n'.join(errors)

View File

@ -0,0 +1,29 @@
import unittest
from dual_quaternions import DualQuaternion
from src.interaction_policy_representation.interaction_policy_representation.models.dual_quaternion_splines import DualQuaternionSplines
class TestDualQuaternionSplines(unittest.TestCase):
def setUp(self):
# Example input vector (position + quaternion for 2 poses)
self.input_vector = [1., 0., 0., 1., 0., 0., 0., 0., 1., 0., 0., 0., 1., 0.]
# Expected dual quaternion for the first pose, adapted based on actual class functionality
self.expected_first_dq = DualQuaternion.from_quat_pose_array(self.input_vector[:7])
def test_parse_input_vector(self):
trajectory = DualQuaternionSplines(self.input_vector)
# Check if the first dual quaternion matches the expected one
# This assumes the implementation details about how dual quaternions are stored
self.assertAlmostEqual(trajectory.dual_quaternions[0].q_r.w, self.expected_first_dq.q_r.w, places=5)
self.assertAlmostEqual(trajectory.dual_quaternions[0].q_r.x, self.expected_first_dq.q_r.x, places=5)
# Add more assertions as needed for y, z, dual part
def test_generate_trajectory(self):
trajectory = DualQuaternionSplines(self.input_vector)
interpolated_trajectory = trajectory.generate_trajectory()
# Ensure the interpolated trajectory is not empty
self.assertTrue(len(interpolated_trajectory) > 0)
# Add more specific tests, e.g., comparing specific interpolated values
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1,23 @@
# Copyright 2015 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_pep257.main import main
import pytest
@pytest.mark.linter
@pytest.mark.pep257
def test_pep257():
rc = main(argv=['.', 'test'])
assert rc == 0, 'Found code style errors / warnings'

View File

@ -0,0 +1,202 @@
import rclpy
from rclpy.node import Node
from rclpy.parameter import Parameter
from rclpy.action import ActionServer
from rclpy.action import GoalResponse, CancelResponse
from rclpy.callback_groups import ReentrantCallbackGroup
from transitions import Machine
import yaml
import numpy as np
from movement_primitives.promp import ProMP
from src.interaction_utils.serialization import flatten_population, unflatten_population
from interaction_msgs.action import TaskEvaluation
class TaskNode(Node):
def __init__(self):
super().__init__('task_node')
# Task Attributes
self.number_of_processed_trajectories = 0
self.goal_dict = {}
self.eval_strategy = 'robot'
self.seed = self.declare_parameter('seed', None)
# ROS2 Interfaces
self.task_callback_group = ReentrantCallbackGroup()
# Topic
# Service
# Action
self._goal = None
self._task_action = ActionServer(self,
TaskEvaluation,
'interaction/task_action',
goal_callback=self._task_goal_callback,
cancel_callback=self._task_cancel_callback,
execute_callback=self._task_execute_callback,
callback_group=self.task_callback_group)
# State Machine
self.state = None
# States
self.states = [
'waiting_for_task_specs',
'processing_non_interactive_input',
'processing_interactive_input',
'waiting_for_robot_response',
'waiting_for_objective_function_response',
'sending_request'
'error_recovery'
]
# initialize state machine
self.machine = Machine(self, states=self.states, initial='waiting_for_task_specs')
self.machine.add_transition(trigger='non_interactive_specs_received', source='waiting_for_task_specs', dest='processing_non_interactive_input')
self.machine.add_transition(trigger='interactive_specs_received', source='waiting_for_task_specs', dest='processing_interactive_input')
self.machine.add_transition(trigger='non_interactive_to_robot', source='processing_non_interactive_input', dest='waiting_for_robot_response')
self.machine.add_transition(trigger='non_interactive_to_obj_fun', source='processing_non_interactive_input', dest='waiting_for_objective_function_response')
self.machine.add_transition(trigger='interactive_to_robot', source='processing_interactive_input', dest='waiting_for_robot_response')
self.machine.add_transition(trigger='interactive_to_obj_fun', source='processing_interactive_input', dest='waiting_for_objective_function_response')
self.machine.add_transition(trigger='sending_robot_results', source='waiting_for_robot_response', dest='sending_request')
self.machine.add_transition(trigger='sending_obj_fun_results', source='waiting_for_obj_fun_response', dest='sending_request')
self.machine.add_transition(trigger='sending_back', source='sending_request', dest='waiting_for_task_specs')
self.machine.add_transition(trigger='error_trigger', source='*', dest='error_recovery')
self.machine.add_transition(trigger='recovery_complete', source='error_recovery', dest='waiting_for_task_specs')
def destroy(self):
self._task_action.destroy()
super().destroy_node()
# Helper function
def compute_weighted_mean(self, user_parameters, optimizer_mean, weight_parameter):
# Check if the weight_parameter is between 0 and 1
if not 0 <= weight_parameter <= 1:
self.get_logger().error(f'Invalid weight parameter for weighted average: {weight_parameter}')
self.error_trigger()
# Compute the weighted average
weighted_mean = (weight_parameter * user_parameters) + ((1 - weight_parameter) * optimizer_mean)
return weighted_mean
def compute_promp_trajectory(self, parameter_set,
number_of_dimensions, number_of_parameters_per_dimensions,
duration, number_of_time_steps):
time = np.linspace(0, duration, number_of_time_steps)
promp = ProMP(number_of_dimensions / 2, n_weights_per_dim=number_of_parameters_per_dimensions)
trajectory = promp.trajectory_from_weights(time, parameter_set)
return trajectory
# State Methods
def on_enter_processing_non_interactive_input(self):
# Unflatten parameter array to get the parameter sets
parameter_sets = unflatten_population(self._goal.parameter_array,
self._goal.number_of_population,
self._goal.number_of_dimensions,
self._goal.number_of_parameters_per_dimensions)
# Compute trajectories for each set
trajectories = [self.compute_promp_trajectory(parameter_sets[i, :, :],
self._goal.number_of_population,
self._goal.number_of_parameters_per_dimensions,
self._goal.duation,
self._goal.number_of_time_steps
) for i in range(parameter_sets.shape[0])]
# send the trajectories to the robot or objective function
if self.eval_strategy == 'robot':
# TODO: Implement Action Interface for Robot Evaluation
self.non_interactive_to_robot()
elif self.eval_strategy == 'obj_fun':
# TODO: Implement Action Interface for Objective Function Evaluation
self.non_interactive_to_obj_fun()
else:
self.get_logger().error(f"Unknown evaluation strategy: '{self.eval_strategy}'")
self.error_trigger()
def on_enter_processing_interactive_input(self):
weigthed_average = self.compute_weighted_mean(self._goal.user_parameters,
self._goal.current_optimizer_mean,
self._goal.weight_parameter)
promp = ProMP(self._goal.number_of_dimensions / 2,
n_weights_per_dim=self._goal.number_of_parameters_per_dimensions)
promp.from_weight_distribution(weigthed_average,
self._goal.user_covariance_diag)
if self._goal.conditional_points is not None:
# TODO: Fix the Action to add condititional cov
pass
time = np.linspace(0, self._goal.duration, self._goal.number_of_time_steps)
random_state = np.random.RandomState(self.seed.value)
trajectories = promp.sample_trajectories(time, self._goal.number_of_population, random_state)
# send the trajectories to the robot or objective function
if self.eval_strategy == 'robot':
# TODO: Implement Action Interface for Robot Evaluation
self.interactive_to_robot()
elif self.eval_strategy == 'obj_fun':
# TODO: Implement Action Interface for Objective Function Evaluation
self.interactive_to_obj_fun()
else:
self.get_logger().error(f"Unknown evaluation strategy: '{self.eval_strategy}'")
self.error_trigger()
def on_enter_waiting_for_robot_response(self):
pass
def on_enter_waiting_for_objective_function_response(self):
pass
# Callback functions
def _task_goal_callback(self, goal):
self._goal = goal
if goal.user_input:
self.interactive_specs_received()
else:
self.non_interactive_specs_received()
return GoalResponse.ACCEPT
def _task_cancel_callback(self, _):
self.error_trigger()
return CancelResponse.ACCEPT
async def _task_execute_callback(self, goal_handle):
feedback_msg = TaskEvaluation.Feedback()
result_msg = TaskEvaluation.Result()
# Feedback Loop
while not goal_handle.is_cancel_requested():
# Send Feedback msg
feedback_msg.current_state = self.state
feedback_msg.processed_trajectories = self.number_of_processed_trajectories
goal_handle.publish_feedback(feedback_msg)
if self.state == 'sending_request':
if self.goal_dict == {}:
self.error_trigger()
result_msg.score = self.goal_dict['score']
result_msg.new_means = self.goal_dict['new_means']
break
if goal_handle.is_cancel_requested():
goal_handle.canceled()
result_msg.score = -1
return result_msg
self.sending_back()
goal_handle.succeed()
return result_msg

View File

@ -0,0 +1,21 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>interaction_tasks</name>
<version>0.0.0</version>
<description>TODO: Package description</description>
<maintainer email="nikolaus.feith@unileoben.ac.at">niko</maintainer>
<license>TODO: License declaration</license>
<exec_depend>interaction_msgs</exec_depend>
<exec_depend>rclpy</exec_depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/interaction_tasks
[install]
install_scripts=$base/lib/interaction_tasks

View File

@ -0,0 +1,25 @@
from setuptools import find_packages, setup
package_name = 'interaction_tasks'
setup(
name=package_name,
version='0.0.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='niko',
maintainer_email='nikolaus.feith@unileoben.ac.at',
description='TODO: Package description',
license='TODO: License declaration',
tests_require=['pytest'],
entry_points={
'console_scripts': [
],
},
)

View File

@ -0,0 +1,25 @@
# Copyright 2015 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_copyright.main import main
import pytest
# Remove the `skip` decorator once the source file(s) have a copyright header
@pytest.mark.skip(reason='No copyright header has been placed in the generated source file.')
@pytest.mark.copyright
@pytest.mark.linter
def test_copyright():
rc = main(argv=['.', 'test'])
assert rc == 0, 'Found errors'

View File

@ -0,0 +1,25 @@
# Copyright 2017 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_flake8.main import main_with_errors
import pytest
@pytest.mark.flake8
@pytest.mark.linter
def test_flake8():
rc, errors = main_with_errors(argv=[])
assert rc == 0, \
'Found %d code style errors / warnings:\n' % len(errors) + \
'\n'.join(errors)

View File

@ -0,0 +1,23 @@
# Copyright 2015 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_pep257.main import main
import pytest
@pytest.mark.linter
@pytest.mark.pep257
def test_pep257():
rc = main(argv=['.', 'test'])
assert rc == 0, 'Found code style errors / warnings'

Some files were not shown because too many files have changed in this diff Show More