Testing gripper forward controller
Added mapping routine for ps5 controller
This commit is contained in:
parent
5077372894
commit
c891c2c741
20
src/ur_robotiq_servo/bash/mapping_routine.sh
Normal file
20
src/ur_robotiq_servo/bash/mapping_routine.sh
Normal file
@ -0,0 +1,20 @@
|
||||
#!/bin/bash
|
||||
|
||||
# Specify the input device (adjust the device path if needed)
|
||||
DEVICE="/dev/input/js0"
|
||||
|
||||
# Specify the exact path to the ROS 2 workspace
|
||||
ROS2_WS_PATH=~/PycharmProjects/UR_Robotiq
|
||||
|
||||
# Source the ROS 2 setup script
|
||||
source /opt/ros/$ROS_DISTRO/setup.bash
|
||||
source ~/ros2_ws/install/setup.bash
|
||||
|
||||
# Launch the joy_node in a new terminal with the specified input device
|
||||
gnome-terminal -- bash -c "source /opt/ros/$ROS_DISTRO/setup.bash; source $ROS2_WS_PATH/install/setup.bash; ros2 run joy joy_node --ros-args --param dev:=$DEVICE; exec bash"
|
||||
|
||||
# Wait for the joy_node to initialize
|
||||
sleep 2
|
||||
|
||||
# Launch the ps5_mapping_script in another new terminal
|
||||
gnome-terminal -- bash -c "source /opt/ros/$ROS_DISTRO/setup.bash; source $ROS2_WS_PATH/install/setup.bash; ros2 run ur_robotiq_servo ps5_mapping; exec bash"
|
20
src/ur_robotiq_servo/config/ps5-controller-mapping.yaml
Normal file
20
src/ur_robotiq_servo/config/ps5-controller-mapping.yaml
Normal file
@ -0,0 +1,20 @@
|
||||
axes:
|
||||
DPadHorizontal: 6
|
||||
DPadVertical: 7
|
||||
L2: 2
|
||||
LeftStickHorizontal: 0
|
||||
LeftStickVertical: 1
|
||||
R2: 5
|
||||
RightStickHorizontal: 3
|
||||
RightStickVertical: 4
|
||||
buttons:
|
||||
Circle: 1
|
||||
L1: 4
|
||||
L2: 6
|
||||
Options: 9
|
||||
R1: 5
|
||||
R2: 7
|
||||
Share: 8
|
||||
Square: 3
|
||||
Triangle: 2
|
||||
X: 0
|
@ -1,6 +1,7 @@
|
||||
ps5_servo_node:
|
||||
ros__parameters:
|
||||
linear_speed_multiplier: 0.5
|
||||
gripper_position_multiplier: 0.004
|
||||
gripper_position_multiplier: 0.006
|
||||
gripper_cycle_time: 0.01
|
||||
gripper_lower_limit: 0.0
|
||||
gripper_upper_limit: 0.14
|
@ -10,6 +10,8 @@ def generate_launch_description():
|
||||
'config')
|
||||
joy_params = os.path.join(config_directory, 'joy-params.yaml')
|
||||
ps5_params = os.path.join(config_directory, 'ps5-params.yaml')
|
||||
ps5_controller_mapping = os.path.join(config_directory, 'ps5-controller-mapping.yaml')
|
||||
print(ps5_controller_mapping)
|
||||
return LaunchDescription([
|
||||
Node(
|
||||
package='joy',
|
||||
@ -23,7 +25,8 @@ def generate_launch_description():
|
||||
executable='ps5_servo',
|
||||
name='ps5_servo_node',
|
||||
output='screen',
|
||||
parameters=[ps5_params],
|
||||
parameters=[ps5_params,
|
||||
{'yaml_file_path': ps5_controller_mapping}],
|
||||
arguments=[],
|
||||
),
|
||||
])
|
@ -24,7 +24,8 @@ setup(
|
||||
tests_require=['pytest'],
|
||||
entry_points={
|
||||
'console_scripts': [
|
||||
'ps5_servo = ur_robotiq_servo.ps5_control:main'
|
||||
'ps5_servo = ur_robotiq_servo.ps5_control:main',
|
||||
'ps5_mapping = ur_robotiq_servo.ps5_mapping_script:main',
|
||||
],
|
||||
},
|
||||
)
|
||||
|
@ -5,15 +5,35 @@ from geometry_msgs.msg import TwistStamped
|
||||
from control_msgs.msg import JointJog
|
||||
from std_srvs.srv import Trigger
|
||||
from robotiq_2f_msgs.msg import ForwardCommand
|
||||
import yaml
|
||||
import os
|
||||
|
||||
def load_yaml(file_path):
|
||||
with open(file_path, 'r') as file:
|
||||
return yaml.safe_load(file)
|
||||
|
||||
class PS5ControllerNode(Node):
|
||||
def __init__(self):
|
||||
super().__init__('ps5_controller_node')
|
||||
|
||||
# Load the YAML file path from parameters
|
||||
self.yaml_file_path = self.declare_parameter('yaml_file_path', '').get_parameter_value().string_value
|
||||
|
||||
# Validate and load the YAML file
|
||||
if not self.yaml_file_path or not os.path.exists(self.yaml_file_path):
|
||||
self.get_logger().error(f"Invalid YAML file path: {self.yaml_file_path}")
|
||||
rclpy.shutdown()
|
||||
return
|
||||
|
||||
self.mappings = load_yaml(self.yaml_file_path)
|
||||
self.get_logger().info(f"Loaded mappings: {self.mappings}")
|
||||
|
||||
|
||||
# States
|
||||
self.mode = 'twist' # Initialize mode to 'twist'. Alternatives: 'twist', 'joint'
|
||||
self.last_button_state = 0 # Track the last state of the toggle button to detect presses
|
||||
self.last_finger_joint_angle = 0.0
|
||||
self.last_gripper_gap = 0.0
|
||||
self.gripper_cmd = 0.0
|
||||
|
||||
# Parameters
|
||||
self.linear_speed_multiplier = self.declare_parameter('linear_speed_multiplier', 1.0)
|
||||
@ -29,6 +49,11 @@ class PS5ControllerNode(Node):
|
||||
.get_parameter_value().double_value)
|
||||
self.get_logger().info(f"Gripper position multiplier: {self.gripper_position_multiplier}")
|
||||
|
||||
self.gripper_cycle_time = self.declare_parameter('gripper_cycle_time', 1.0)
|
||||
self.gripper_cycle_time = (self.get_parameter('gripper_cycle_time')
|
||||
.get_parameter_value().double_value)
|
||||
self.get_logger().info(f"Gripper cycle time: {self.gripper_cycle_time}")
|
||||
|
||||
self.gripper_lower_limit = self.declare_parameter('gripper_lower_limit', 0.0)
|
||||
self.gripper_lower_limit = (self.get_parameter('gripper_lower_limit')
|
||||
.get_parameter_value().double_value)
|
||||
@ -38,6 +63,7 @@ class PS5ControllerNode(Node):
|
||||
self.gripper_upper_limit = (self.get_parameter('gripper_upper_limit')
|
||||
.get_parameter_value().double_value)
|
||||
self.get_logger().info(f"Gripper upper limit: {self.gripper_upper_limit}")
|
||||
|
||||
# Subscriber and Publisher
|
||||
self.joint_state_sub = self.create_subscription(
|
||||
JointState,
|
||||
@ -69,12 +95,14 @@ class PS5ControllerNode(Node):
|
||||
# Services
|
||||
self.servo_client = self.create_client(Trigger, '/servo_node/start_servo')
|
||||
|
||||
srv_msg = Trigger.Request()
|
||||
while not self.servo_client.wait_for_service(timeout_sec=5.0):
|
||||
self.get_logger().info('/servo_node/start_servo service not available, waiting again...')
|
||||
|
||||
self.call_start_servo()
|
||||
|
||||
# Timer for publishing gripper command
|
||||
self.gripper_timer = self.create_timer(self.gripper_cycle_time, self.publish_gripper_command)
|
||||
|
||||
self.get_logger().info('ps5_servo_node started!')
|
||||
|
||||
def call_start_servo(self):
|
||||
@ -90,33 +118,52 @@ class PS5ControllerNode(Node):
|
||||
def joint_state_callback(self, msg):
|
||||
try:
|
||||
index = msg.name.index('gripper_gap')
|
||||
self.last_finger_joint_angle = msg.position[index]
|
||||
self.last_gripper_gap = msg.position[index]
|
||||
except ValueError:
|
||||
self.get_logger().error('finger_joint not found in /joint_states msg')
|
||||
self.get_logger().error('gripper_gap not found in /joint_states msg')
|
||||
|
||||
def publish_gripper_command(self):
|
||||
if self.gripper_cmd is not None:
|
||||
|
||||
new_gripper_position = self.gripper_cmd + self.last_gripper_gap
|
||||
if new_gripper_position >= self.gripper_upper_limit:
|
||||
new_gripper_position = self.gripper_upper_limit
|
||||
elif new_gripper_position <= self.gripper_lower_limit:
|
||||
new_gripper_position = self.gripper_lower_limit
|
||||
|
||||
|
||||
self.gripper_cmd = 0.0
|
||||
gripper_msg = ForwardCommand()
|
||||
gripper_msg.position = new_gripper_position
|
||||
gripper_msg.max_velocity = 0.1
|
||||
gripper_msg.max_effort = 235.0
|
||||
self.gripper_cmd_pub.publish(gripper_msg)
|
||||
else:
|
||||
self.get_logger().warn('Gripper command not available!')
|
||||
|
||||
def joy_callback(self, msg):
|
||||
# Check for button press to toggle mode
|
||||
# Assuming button 2 (e.g., Triangle on PS5) for toggling mode
|
||||
current_button_state = msg.buttons[8]
|
||||
toggle_button_index = self.mappings['buttons']['Options'] # Update to the correct button index for toggling
|
||||
current_button_state = msg.buttons[toggle_button_index]
|
||||
if current_button_state == 1 and self.last_button_state == 0:
|
||||
self.mode = 'joint' if self.mode == 'twist' else 'twist'
|
||||
self.get_logger().info(f'Mode switched to: {self.mode}')
|
||||
self.last_button_state = current_button_state
|
||||
|
||||
left_trigger = (msg.axes[2] - 1) / - 2.0
|
||||
right_trigger = (msg.axes[5] - 1) / - 2.0
|
||||
left_trigger = (msg.axes[self.mappings['axes']['L2']] - 1) / -2.0
|
||||
right_trigger = (msg.axes[self.mappings['axes']['R2']] - 1) / -2.0
|
||||
|
||||
# Process control input based on current mode
|
||||
if self.mode == 'twist':
|
||||
twist_msg = TwistStamped()
|
||||
twist_msg.header.frame_id = 'tool0'
|
||||
twist_msg.header.stamp = self.get_clock().now().to_msg()
|
||||
twist_msg.twist.linear.x = msg.axes[0] * self.linear_speed_multiplier
|
||||
twist_msg.twist.linear.y = -msg.axes[1] * self.linear_speed_multiplier
|
||||
twist_msg.twist.linear.x = msg.axes[self.mappings['axes']['LeftStickHorizontal']] * self.linear_speed_multiplier
|
||||
twist_msg.twist.linear.y = -msg.axes[self.mappings['axes']['LeftStickVertical']] * self.linear_speed_multiplier
|
||||
twist_msg.twist.linear.z = (left_trigger - right_trigger) * self.linear_speed_multiplier
|
||||
twist_msg.twist.angular.x = msg.axes[3]
|
||||
twist_msg.twist.angular.y = msg.axes[4]
|
||||
twist_msg.twist.angular.z = (msg.buttons[4] - msg.buttons[5]) * 1.0
|
||||
twist_msg.twist.angular.x = msg.axes[self.mappings['axes']['RightStickHorizontal']]
|
||||
twist_msg.twist.angular.y = msg.axes[self.mappings['axes']['RightStickVertical']]
|
||||
twist_msg.twist.angular.z = (msg.buttons[self.mappings['buttons']['L1']] - msg.buttons[self.mappings['buttons']['R1']]) * 1.0
|
||||
self.twist_pub.publish(twist_msg)
|
||||
elif self.mode == 'joint':
|
||||
joint_msg = JointJog()
|
||||
@ -130,26 +177,20 @@ class PS5ControllerNode(Node):
|
||||
"wrist_2_joint",
|
||||
"wrist_3_joint"
|
||||
]
|
||||
joint_msg.velocities = [msg.axes[0],
|
||||
msg.axes[1],
|
||||
msg.axes[3],
|
||||
msg.axes[4],
|
||||
msg.axes[6],
|
||||
msg.axes[7]]
|
||||
joint_msg.velocities = [
|
||||
msg.axes[self.mappings['axes']['LeftStickHorizontal']],
|
||||
msg.axes[self.mappings['axes']['LeftStickVertical']],
|
||||
msg.axes[self.mappings['axes']['RightStickHorizontal']],
|
||||
msg.axes[self.mappings['axes']['RightStickVertical']],
|
||||
msg.axes[self.mappings['axes']['DPadHorizontal']],
|
||||
msg.axes[self.mappings['axes']['DPadVertical']]
|
||||
]
|
||||
self.joint_pub.publish(joint_msg)
|
||||
|
||||
# Gripper controller
|
||||
new_finger_joint_angle = (self.last_finger_joint_angle +
|
||||
(msg.buttons[1] - msg.buttons[0]) * self.gripper_position_multiplier)
|
||||
if self.gripper_lower_limit > new_finger_joint_angle or self.gripper_upper_limit < new_finger_joint_angle:
|
||||
self.get_logger().debug(f"New finger joint angle out of bounds: {new_finger_joint_angle}")
|
||||
new_finger_joint_angle = self.last_finger_joint_angle
|
||||
gripper_msg = ForwardCommand()
|
||||
gripper_msg.position = new_finger_joint_angle
|
||||
gripper_msg.max_velocity = 0.15
|
||||
gripper_msg.max_effort = 235.0
|
||||
self.gripper_cmd_pub.publish(gripper_msg)
|
||||
|
||||
gripper_open_button = self.mappings['buttons']['Circle']
|
||||
gripper_close_button = self.mappings['buttons']['X']
|
||||
self.gripper_cmd = self.gripper_cmd + int((msg.buttons[gripper_open_button] - msg.buttons[gripper_close_button])) * self.gripper_position_multiplier
|
||||
|
||||
def main(args=None):
|
||||
rclpy.init(args=args)
|
||||
|
162
src/ur_robotiq_servo/ur_robotiq_servo/ps5_mapping_script.py
Executable file
162
src/ur_robotiq_servo/ur_robotiq_servo/ps5_mapping_script.py
Executable file
@ -0,0 +1,162 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from sensor_msgs.msg import Joy
|
||||
import yaml
|
||||
import os
|
||||
import time
|
||||
from ament_index_python.packages import get_package_share_directory
|
||||
import threading
|
||||
|
||||
class PS5ControllerMappingNode(Node):
|
||||
def __init__(self):
|
||||
super().__init__('ps5_controller_mapping_node')
|
||||
self.subscription = self.create_subscription(
|
||||
Joy,
|
||||
'joy',
|
||||
self.joy_callback,
|
||||
10)
|
||||
self.subscription # prevent unused variable warning
|
||||
|
||||
self.button_mapping = {}
|
||||
self.axis_mapping = {}
|
||||
self.joy_msg = None
|
||||
self.previous_buttons = []
|
||||
self.l2_axis_index = None
|
||||
self.r2_axis_index = None
|
||||
self.prompt_thread = threading.Thread(target=self.prompt_user)
|
||||
self.prompt_thread.start()
|
||||
|
||||
self.get_logger().info("Waiting for joystick messages...")
|
||||
|
||||
def joy_callback(self, msg):
|
||||
# Normalize L2 and R2 axis values if indices are known
|
||||
if self.l2_axis_index is not None and self.r2_axis_index is not None:
|
||||
msg.axes[self.l2_axis_index] -= 1.0
|
||||
msg.axes[self.r2_axis_index] -= 1.0
|
||||
|
||||
self.joy_msg = msg
|
||||
|
||||
if not self.previous_buttons:
|
||||
self.previous_buttons = list(msg.buttons)
|
||||
|
||||
for index, (prev, curr) in enumerate(zip(self.previous_buttons, msg.buttons)):
|
||||
if prev != curr and curr == 1:
|
||||
self.get_logger().info(f"Button with index {index} was pressed.")
|
||||
|
||||
self.previous_buttons = list(msg.buttons) # Update the previous_buttons list
|
||||
|
||||
def prompt_user(self):
|
||||
while self.joy_msg is None:
|
||||
self.get_logger().info("Waiting for joystick messages...")
|
||||
time.sleep(1) # Allow other ROS 2 operations to run
|
||||
|
||||
self.get_logger().info("Joystick messages received. Please press each button and move each axis one by one.")
|
||||
|
||||
self.prompt_button('X')
|
||||
self.prompt_button('Circle')
|
||||
self.prompt_button('Square')
|
||||
self.prompt_button('Triangle')
|
||||
self.prompt_button('L1')
|
||||
self.prompt_button('R1')
|
||||
self.prompt_button('L2')
|
||||
self.prompt_button('R2')
|
||||
self.prompt_button('Share')
|
||||
self.prompt_button('Options')
|
||||
|
||||
self.prompt_axis('LeftStickHorizontal')
|
||||
self.prompt_axis('LeftStickVertical')
|
||||
self.prompt_trigger_axis('L2')
|
||||
|
||||
self.prompt_axis('RightStickHorizontal')
|
||||
self.prompt_axis('RightStickVertical')
|
||||
self.prompt_trigger_axis('R2')
|
||||
|
||||
self.prompt_axis('DPadHorizontal')
|
||||
self.prompt_axis('DPadVertical')
|
||||
|
||||
self.save_mapping_to_yaml()
|
||||
|
||||
def prompt_button(self, button_name):
|
||||
input(f"Press the {button_name} button and then press Enter.")
|
||||
index = self.get_button_index()
|
||||
self.button_mapping[button_name] = index
|
||||
if button_name == 'L2':
|
||||
self.l2_axis_index = self.get_trigger_axis_index()
|
||||
elif button_name == 'R2':
|
||||
self.r2_axis_index = self.get_trigger_axis_index()
|
||||
time.sleep(1) # Allow other ROS 2 operations to run
|
||||
|
||||
def prompt_axis(self, axis_name):
|
||||
input(f"Move the {axis_name} and then press Enter.\n")
|
||||
self.get_logger().info(f"Current axis: {self.get_axis_index()} was moved.")
|
||||
self.axis_mapping[axis_name] = self.get_axis_index()
|
||||
time.sleep(1) # Allow other ROS 2 operations to run
|
||||
|
||||
def prompt_trigger_axis(self, axis_name):
|
||||
input(f"Move the {axis_name} and then press Enter.\n")
|
||||
self.axis_mapping[axis_name] = self.get_trigger_axis_index()
|
||||
self.get_logger().info(f"Current axis: {self.get_axis_index()} was moved.")
|
||||
time.sleep(1) # Allow other ROS 2 operations to run
|
||||
|
||||
def get_button_index(self):
|
||||
if self.joy_msg:
|
||||
return self.joy_msg.buttons.index(1)
|
||||
else:
|
||||
self.get_logger().warn("Joy message not received yet.")
|
||||
return -1
|
||||
|
||||
def get_axis_index(self):
|
||||
if self.joy_msg:
|
||||
max_val = max(self.joy_msg.axes, key=abs)
|
||||
return self.joy_msg.axes.index(max_val)
|
||||
else:
|
||||
self.get_logger().warn("Joy message not received yet.")
|
||||
return -1
|
||||
|
||||
def get_trigger_axis_index(self):
|
||||
if self.joy_msg:
|
||||
min_val = min(self.joy_msg.axes)
|
||||
return self.joy_msg.axes.index(min_val)
|
||||
else:
|
||||
self.get_logger().warn("Joy message not received yet.")
|
||||
return -1
|
||||
|
||||
def save_mapping_to_yaml(self):
|
||||
package_name = 'ur_robotiq_servo' # Replace with your package name
|
||||
share_path = get_package_share_directory(package_name)
|
||||
|
||||
# Navigate to the source config directory
|
||||
config_path = os.path.join(share_path, '..', '..', '..', '..', 'src', package_name, 'config')
|
||||
config_path = os.path.realpath(config_path)
|
||||
|
||||
if not os.path.exists(config_path):
|
||||
os.makedirs(config_path)
|
||||
|
||||
file_path = os.path.join(config_path, 'ps5-controller-mapping.yaml')
|
||||
|
||||
mapping = {
|
||||
'buttons': self.button_mapping,
|
||||
'axes': self.axis_mapping
|
||||
}
|
||||
|
||||
with open(file_path, 'w') as file:
|
||||
yaml.dump(mapping, file)
|
||||
|
||||
self.get_logger().info(f"Mapping saved to {file_path}")
|
||||
|
||||
def main(args=None):
|
||||
rclpy.init(args=args)
|
||||
|
||||
node = PS5ControllerMappingNode()
|
||||
try:
|
||||
rclpy.spin(node)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
Loading…
Reference in New Issue
Block a user