diff --git a/franka_iml_experiment/dmp_node.py b/franka_iml_experiment/dmp_node.py new file mode 100644 index 0000000..ea2f1c0 --- /dev/null +++ b/franka_iml_experiment/dmp_node.py @@ -0,0 +1,69 @@ + +import rclpy +from rclpy.node import Node + +from geometry_msgs.msg import PoseArray +from geometry_msgs.msg import Pose +from active_bo_msgs.msg import DMP + +import pydmps +import pydmps.dmp_discrete + +import numpy as np + + +class DMPNode(Node): + + def __init__(self): + super().__init__('dmp_node') + self.subscription = self.create_subscription( + DMP, + 'franka_iml_experiment/dmps', + self.dmp_callback, + 10) + self.subscription # prevent unused variable warning + + self.traj_publisher = self.create_publisher(PoseArray, 'moveit_interface/task_space_trajectory', 10) + + def dmp_callback(self, msg): + start = msg.start + self.get_logger().info(f"{start}") + end = np.array(msg.end_point) + time = msg.time + nr_bfs = msg.nr_bfs + + # weights = np.vstack((msg.p_x, msg.p_y, msg.p_z, msg.o_x, msg.o_y, msg.o_z, msg.o_w)) + weights = np.vstack((msg.p_x, msg.p_y)) + + dmp = pydmps.dmp_discrete.DMPs_discrete(n_dmps=2, n_bfs=nr_bfs, w=weights, y0=start, goal=end) + y_track, _, _ = dmp.rollout(tau=time) + + pose_msg = PoseArray() + for i in range(y_track.shape[0]): + pose = Pose() + pose.position.x = y_track[i, 0] + pose.position.y = y_track[i, 1] + # pose.position.z = y_track[i, 2] + # pose.orientation.x = y_track[i, 3] + # pose.orientation.y = y_track[i, 4] + # pose.orientation.z = y_track[i, 5] + # pose.orientation.w = y_track[i, 6] + pose_msg.poses.append(pose) + + self.traj_publisher.publish(pose_msg) + + + +def main(args=None): + rclpy.init(args=args) + + dmp_node = DMPNode() + + rclpy.spin(dmp_node) + + dmp_node.destroy_node() + rclpy.shutdown() + + +if __name__ == '__main__': + main() diff --git a/franka_iml_experiment/dmp_test.py b/franka_iml_experiment/dmp_test.py new file mode 100644 index 0000000..472d6cd --- /dev/null +++ b/franka_iml_experiment/dmp_test.py @@ -0,0 +1,47 @@ +import rclpy +from rclpy.node import Node +from active_bo_msgs.msg import DMP +from geometry_msgs.msg import Pose +import numpy as np + + +class DMPPublisher(Node): + + def __init__(self): + super().__init__('dmp_test_publisher') + self.publisher_ = self.create_publisher(DMP, 'franka_iml_experiment/dmps', 10) + timer_period = 10 # seconds + self.timer = self.create_timer(timer_period, self.timer_callback) + + def timer_callback(self): + msg = DMP() + + end = [0.5, 0.5, 0., 0., 0., 0., 0.] + msg.end_point = end + msg.time = 10.0 + msg.nr_bfs = 10 + + # Assuming you want to send zero weights for the test. + # Modify if needed. + msg.p_x = list(np.random.rand(10)) + msg.p_y = list(np.random.rand(10)) + # msg.p_z = [0.0] * 10 + # msg.o_x = [0.0] * 10 + # msg.o_y = [0.0] * 10 + # msg.o_z = [0.0] * 10 + # msg.o_w = [0.0] * 10 + + self.publisher_.publish(msg) + self.get_logger().info('Publishing DMP message for testing') + + +def main(args=None): + rclpy.init(args=args) + publisher = DMPPublisher() + rclpy.spin(publisher) + publisher.destroy_node() + rclpy.shutdown() + + +if __name__ == '__main__': + main() diff --git a/franka_iml_experiment/ik_moveit.py b/franka_iml_experiment/ik_moveit.py deleted file mode 100644 index 39651d6..0000000 --- a/franka_iml_experiment/ik_moveit.py +++ /dev/null @@ -1 +0,0 @@ -import moveit.planning \ No newline at end of file diff --git a/launch/dmp.launch.py b/launch/dmp.launch.py new file mode 100755 index 0000000..dc8778f --- /dev/null +++ b/launch/dmp.launch.py @@ -0,0 +1,19 @@ +from launch import LaunchDescription +from launch_ros.actions import Node + + +def generate_launch_description(): + return LaunchDescription([ + Node( + package='franka_iml_experiment', + executable='dmp_node', + name='dmp_node', + output='screen' + ), + Node( + package='franka_iml_experiment', + executable='dmp_test', + name='dmp_test', + output='screen' + ), + ]) diff --git a/launch/moveit_panda.launch.py b/launch/moveit_panda.launch.py deleted file mode 100755 index 4d114f8..0000000 --- a/launch/moveit_panda.launch.py +++ /dev/null @@ -1,238 +0,0 @@ -# Copyright (c) 2021 Franka Emika GmbH -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# This file is an adapted version of -# https://github.com/ros-planning/moveit_resources/blob/ca3f7930c630581b5504f3b22c40b4f82ee6369d/panda_moveit_config/launch/demo.launch.py - -import os - -from ament_index_python.packages import get_package_share_directory -from launch import LaunchDescription -from launch.actions import (DeclareLaunchArgument, ExecuteProcess, IncludeLaunchDescription, - Shutdown) -from launch.conditions import UnlessCondition -from launch.launch_description_sources import PythonLaunchDescriptionSource -from launch.substitutions import Command, FindExecutable, LaunchConfiguration, PathJoinSubstitution -from launch_ros.actions import Node -from launch_ros.substitutions import FindPackageShare -import yaml - - -def load_yaml(package_name, file_path): - package_path = get_package_share_directory(package_name) - absolute_file_path = os.path.join(package_path, file_path) - - try: - with open(absolute_file_path, 'r') as file: - return yaml.safe_load(file) - except EnvironmentError: # parent of IOError, OSError *and* WindowsError where available - return None - - -def generate_launch_description(): - robot_ip_parameter_name = 'robot_ip' - use_fake_hardware_parameter_name = 'use_fake_hardware' - fake_sensor_commands_parameter_name = 'fake_sensor_commands' - - robot_ip = LaunchConfiguration(robot_ip_parameter_name) - use_fake_hardware = LaunchConfiguration(use_fake_hardware_parameter_name) - fake_sensor_commands = LaunchConfiguration(fake_sensor_commands_parameter_name) - - # Command-line arguments - - db_arg = DeclareLaunchArgument( - 'db', default_value='False', description='Database flag' - ) - log_level_arg = DeclareLaunchArgument(name='log_level', default_value='info') - - # planning_context - franka_xacro_file = os.path.join(get_package_share_directory('franka_description'), 'robots', - 'panda_arm.urdf.xacro') - robot_description_config = Command( - [FindExecutable(name='xacro'), ' ', franka_xacro_file, ' hand:=false', - ' robot_ip:=', robot_ip, ' use_fake_hardware:=', use_fake_hardware, - ' fake_sensor_commands:=', fake_sensor_commands]) - - robot_description = {'robot_description': robot_description_config} - - franka_semantic_xacro_file = os.path.join(get_package_share_directory('franka_moveit_config'), - 'srdf', - 'panda_arm.srdf.xacro') - robot_description_semantic_config = Command( - [FindExecutable(name='xacro'), ' ', franka_semantic_xacro_file, ' hand:=false'] - ) - robot_description_semantic = { - 'robot_description_semantic': robot_description_semantic_config - } - - kinematics_yaml = load_yaml( - 'franka_moveit_config', 'config/kinematics.yaml' - ) - - # Planning Functionality - ompl_planning_pipeline_config = { - 'move_group': { - 'planning_plugin': 'ompl_interface/OMPLPlanner', - 'request_adapters': 'default_planner_request_adapters/AddTimeOptimalParameterization ' - 'default_planner_request_adapters/ResolveConstraintFrames ' - 'default_planner_request_adapters/FixWorkspaceBounds ' - 'default_planner_request_adapters/FixStartStateBounds ' - 'default_planner_request_adapters/FixStartStateCollision ' - 'default_planner_request_adapters/FixStartStatePathConstraints', - 'start_state_max_bounds_error': 0.1, - } - } - ompl_planning_yaml = load_yaml( - 'franka_moveit_config', 'config/ompl_planning.yaml' - ) - ompl_planning_pipeline_config['move_group'].update(ompl_planning_yaml) - - # Trajectory Execution Functionality - moveit_simple_controllers_yaml = load_yaml( - 'franka_moveit_config', 'config/panda_controllers.yaml' - ) - moveit_controllers = { - 'moveit_simple_controller_manager': moveit_simple_controllers_yaml, - 'moveit_controller_manager': 'moveit_simple_controller_manager' - '/MoveItSimpleControllerManager', - } - - trajectory_execution = { - 'moveit_manage_controllers': True, - 'trajectory_execution.allowed_execution_duration_scaling': 1.2, - 'trajectory_execution.allowed_goal_duration_margin': 0.5, - 'trajectory_execution.allowed_start_tolerance': 0.01, - } - - planning_scene_monitor_parameters = { - 'publish_planning_scene': True, - 'publish_geometry_updates': True, - 'publish_state_updates': True, - 'publish_transforms_updates': True, - } - - # Start the actual move_group node/action server - run_move_group_node = Node( - package='moveit_ros_move_group', - executable='move_group', - output='screen', - parameters=[ - robot_description, - robot_description_semantic, - kinematics_yaml, - ompl_planning_pipeline_config, - trajectory_execution, - moveit_controllers, - planning_scene_monitor_parameters, - ], - ) - - # RViz - rviz_base = os.path.join(get_package_share_directory('franka_moveit_config'), 'rviz') - rviz_full_config = os.path.join(rviz_base, 'moveit.rviz') - - rviz_node = Node( - package='rviz2', - executable='rviz2', - name='rviz2', - output='log', - arguments=['-d', rviz_full_config], - parameters=[ - robot_description, - robot_description_semantic, - ompl_planning_pipeline_config, - kinematics_yaml, - ], - ) - - # Publish TF - robot_state_publisher = Node( - package='robot_state_publisher', - executable='robot_state_publisher', - name='robot_state_publisher', - output='both', - parameters=[robot_description], - ) - - ros2_controllers_path = os.path.join( - get_package_share_directory('franka_moveit_config'), - 'config', - 'panda_ros_controllers.yaml', - ) - ros2_control_node = Node( - package='controller_manager', - executable='ros2_control_node', - parameters=[robot_description, ros2_controllers_path], - remappings=[('joint_states', 'franka/joint_states')], - output={ - 'stdout': 'screen', - 'stderr': 'screen', - }, - on_exit=Shutdown(), - ) - - # Load controllers - load_controllers = [] - for controller in ['panda_arm_controller', 'joint_state_broadcaster']: - load_controllers += [ - ExecuteProcess( - cmd=['ros2 run controller_manager spawner {}'.format(controller)], - shell=True, - output='screen', - ) - ] - - joint_state_publisher = Node( - package='joint_state_publisher', - executable='joint_state_publisher', - name='joint_state_publisher', - parameters=[ - {'source_list': ['franka/joint_states'], 'rate': 30}], - ) - - franka_robot_state_broadcaster = Node( - package='controller_manager', - executable='spawner', - arguments=['franka_robot_state_broadcaster'], - output='screen', - condition=UnlessCondition(use_fake_hardware), - ) - - robot_arg = DeclareLaunchArgument( - robot_ip_parameter_name, - description='Hostname or IP address of the robot.') - - use_fake_hardware_arg = DeclareLaunchArgument( - use_fake_hardware_parameter_name, - default_value='true', - description='Use fake hardware') - fake_sensor_commands_arg = DeclareLaunchArgument( - fake_sensor_commands_parameter_name, - default_value='false', - description="Fake sensor commands. Only valid when '{}' is true".format( - use_fake_hardware_parameter_name)) - return LaunchDescription( - [robot_arg, - use_fake_hardware_arg, - fake_sensor_commands_arg, - db_arg, - rviz_node, - robot_state_publisher, - run_move_group_node, - ros2_control_node, - joint_state_publisher, - franka_robot_state_broadcaster - ] - + load_controllers - ) diff --git a/setup.py b/setup.py index 4b31a29..db1b3de 100644 --- a/setup.py +++ b/setup.py @@ -12,8 +12,7 @@ setup( ('share/ament_index/resource_index/packages', ['resource/' + package_name]), ('share/' + package_name, ['package.xml']), - ('share/' + package_name + '/config', ['config/kinematics.yaml']), - (os.path.join('share', package_name), glob('launch/*.launch.py')) + (os.path.join('share', package_name), glob('launch/*.launch.py')), ], install_requires=['setuptools'], zip_safe=True, @@ -24,7 +23,8 @@ setup( tests_require=['pytest'], entry_points={ 'console_scripts': [ - 'yaml_t = franka_iml_experiment.yaml_loader:main' + 'dmp_node = franka_iml_experiment.dmp_node:main', + 'dmp_test = franka_iml_experiment.dmp_test:main', ], }, )