franka_logistics/franka_logistics/logistics_node.py
2024-01-30 16:06:31 +01:00

101 lines
3.4 KiB
Python
Executable File

import rclpy
from rclpy.node import Node
from rclpy.action import ActionClient
from std_msgs.msg import String
from std_msgs.msg import Empty
from geometry_msgs.msg import Pose
from geometry_msgs.msg import PoseArray
from franka_msgs.action import Move
from franka_msgs.action import Grasp
import sqlite3
class LogisticsNode(Node):
def __init__(self):
super().__init__('logistics_node')
self.trajectory_pub = self.create_publisher(PoseArray,
'moveit_interface/task_space_trajectory',
10)
self.execute_pub = self.create_publisher(Empty,
'moveit_interface/execution', 10)
self.move_action_client = ActionClient(self, Move, 'franka_gripper/move')
self.grasp_action_client = ActionClient(self, Grasp, 'franka_gripper/grasp')
self.task_sub = self.create_subscription(String, 'logistics/task', self.logistics_callback,10)
# Connect to Database
self.conn = None
self.cursor = None
self.database_name = '/home/niko/sqlite_dbs/logistics_db'
self.init_db()
# Name of Special Poses
self.idle_pose = 'IDLE'
self.drop_pose = 'DROP'
def init_db(self):
# Connect to the sqlite db
self.conn = sqlite3.connect(self.database_name)
self.cursor = self.conn.cursor()
def logistics_callback(self, msg):
sequence_name = msg.data
self.get_logger().info(f'Received sequence name: {sequence_name}')
self.process_sequence(sequence_name)
def process_sequence(self, sequence_name):
# Query the database for the sequence
self.cursor.execute('''
SELECT p1.position_x, p1.position_y, p1.position_z,
p1.orientation_x, p1.orientation_y, p1.orientation_z, p1.orientation_w,
p2.position_x, p2.position_y, p2.position_z,
p2.orientation_x, p2.orientation_y, p2.orientation_z, p2.orientation_w,
sequence_table.width
FROM sequence_table
INNER JOIN pose_table AS p1 ON pose_sequence_table.pose_1 = p1.pose_id
INNER JOIN pose_table AS p2 ON pose_sequence_table.pose_2 = p2.pose_id
WHERE pose_sequence_table.sequence_name = ?
''', (sequence_name,))
result = self.cursor.fetchone()
if result:
pose_array = PoseArray()
# Pose 1
pose1 = Pose()
pose1.position.x, pose1.position.y, pose1.position.z = result[0:3]
pose1.orientation.x, pose1.orientation.y, pose1.orientation.z, pose1.orientation.w = result[3:7]
pose_array.poses.append(pose1)
# Pose 2
pose2 = Pose()
pose2.position.x, pose2.position.y, pose2.position.z = result[7:10]
pose2.orientation.x, pose2.orientation.y, pose2.orientation.z, pose2.orientation.w = result[10:14]
pose_array.poses.append(pose2)
# Width of the object
width = result[-1]
# Publish or process the PoseArray as needed
# ...
else:
self.get_logger().info('Sequence not found')
def main(args=None):
rclpy.init(args=args)
logistic_node = LogisticsNode()
rclpy.spin(logistic_node)
logistic_node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()