move_square tested

This commit is contained in:
Niko Feith 2024-01-26 17:26:11 +01:00
parent 3cb48df2d8
commit 4b6a5330e8
10 changed files with 323 additions and 0 deletions

View File

View File

@ -0,0 +1,100 @@
import rclpy
from rclpy.node import Node
from rclpy.action import ActionClient
from std_msgs.msg import String
from std_msgs.msg import Empty
from geometry_msgs.msg import Pose
from geometry_msgs.msg import PoseArray
from franka_msgs.action import Move
from franka_msgs.action import Grasp
import sqlite3
class LogisticsNode(Node):
def __init__(self):
super().__init__('logistics_node')
self.trajectory_pub = self.create_publisher(PoseArray,
'moveit_interface/task_space_trajectory',
10)
self.execute_pub = self.create_publisher(Empty,
'moveit_interface/execution', 10)
self.move_action_client = ActionClient(self, Move, 'franka_gripper/move')
self.grasp_action_client = ActionClient(self, Grasp, 'franka_gripper/grasp')
self.task_sub = self.create_subscription(String, 'logistics/task', self.logistics_callback,10)
# Connect to Database
self.conn = None
self.cursor = None
self.database_name = '/home/niko/sqlite_dbs/logistics_db'
self.init_db()
# Name of Special Poses
self.idle_pose = 'IDLE'
self.drop_pose = 'DROP'
def init_db(self):
# Connect to the sqlite db
self.conn = sqlite3.connect(self.database_name)
self.cursor = self.conn.cursor()
def logistics_callback(self, msg):
sequence_name = msg.data
self.get_logger().info(f'Received sequence name: {sequence_name}')
self.process_sequence(sequence_name)
def process_sequence(self, sequence_name):
# Query the database for the sequence
self.cursor.execute('''
SELECT p1.position_x, p1.position_y, p1.position_z,
p1.orientation_x, p1.orientation_y, p1.orientation_z, p1.orientation_w,
p2.position_x, p2.position_y, p2.position_z,
p2.orientation_x, p2.orientation_y, p2.orientation_z, p2.orientation_w,
sequence_table.width
FROM sequence_table
INNER JOIN pose_table AS p1 ON pose_sequence_table.pose_1 = p1.pose_id
INNER JOIN pose_table AS p2 ON pose_sequence_table.pose_2 = p2.pose_id
WHERE pose_sequence_table.sequence_name = ?
''', (sequence_name,))
result = self.cursor.fetchone()
if result:
pose_array = PoseArray()
# Pose 1
pose1 = Pose()
pose1.position.x, pose1.position.y, pose1.position.z = result[0:3]
pose1.orientation.x, pose1.orientation.y, pose1.orientation.z, pose1.orientation.w = result[3:7]
pose_array.poses.append(pose1)
# Pose 2
pose2 = Pose()
pose2.position.x, pose2.position.y, pose2.position.z = result[7:10]
pose2.orientation.x, pose2.orientation.y, pose2.orientation.z, pose2.orientation.w = result[10:14]
pose_array.poses.append(pose2)
# Width of the object
width = result[-1]
# Publish or process the PoseArray as needed
# ...
else:
self.get_logger().info('Sequence not found')
def main(args=None):
rclpy.init(args=args)
logistic_node = LogisticsNode()
rclpy.spin(logistic_node)
logistic_node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,98 @@
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import PoseStamped
from std_msgs.msg import String
import sqlite3
class PoseLearnNode(Node):
def __init__(self):
super().__init__('pose_learn_node')
self.current_pose = PoseStamped()
self.pose_sub = self.create_subscription(PoseStamped, 'moveit_interface/current_pose',
self.pose_callback, 10)
self.save_sub = self.create_subscription(String, '/logistic/save_pose', self.save_callback, 10)
self.seq_sub = self.create_subscription(String, '/logistic/save_seq', self.seq_callback, 10)
# Connect to Database
self.conn = None
self.cursor = None
self.database_name = '/home/niko/sqlite_dbs/logistics_db'
self.init_db()
def init_db(self):
# Connect to the pose db
self.conn = sqlite3.connect(self.database_name)
self.conn.execute("PRAGMA foreign_keys = ON")
self.cursor = self.conn.cursor()
self.cursor.execute(
'''CREATE TABLE IF NOT EXISTS pose_table (
pose_id INTEGER PRIMARY KEY AUTOINCREMENT,
pose_name TEXT UNIQUE NOT NULL,
position_x REAL NOT NULL,
position_y REAL NOT NULL,
position_z REAL NOT NULL,
orientation_x REAL NOT NULL,
orientation_y REAL NOT NULL,
orientation_z REAL NOT NULL,
orientation_w REAL NOT NULL)''')
self.cursor.execute('''
CREATE TABLE IF NOT EXISTS pose_sequence_table (
sequence_id INTEGER PRIMARY KEY AUTOINCREMENT,
sequence_name TEXT UNIQUE NOT NULL,
pose_1 INTEGER NOT NULL,
pose_2 INTEGER NOT NULL,
width REAL NOT NULL,
FOREIGN KEY (pose_1) REFERENCES pose_table (pose_id),
FOREIGN KEY (pose_2) REFERENCES pose_table (pose_id)
)
''')
self.conn.commit()
def pose_callback(self, msg):
self.current_pose = msg
def save_callback(self, msg):
pose_name = msg.data
position = self.current_pose.pose.position
orientation = self.current_pose.pose.orientation
try:
self.cursor.execute('''INSERT INTO pose_table
(pose_name, position_x, position_y, position_z,
orientation_x, orientation_y, orientation_z, orientation_w)
VALUES (?,?, ?, ?, ?, ?, ?, ?)''',
(pose_name, position.x, position.y, position.z,
orientation.x, orientation.y, orientation.z, orientation.w))
self.conn.commit()
except sqlite3.IntegrityError:
self.get_logger().info(f"Pose with name '{pose_name}' already exists.")
def save_seq(self, msg):
try:
self.cursor.execute('''INSERT INTO sequence_table
(sequence_name, pose_1, pose_2, width) VALUES (?, ?, ?, ?)''',
(msg.sequence_name, msg.pose_1, msg.pose_2, msg.width))
self.conn.commit()
except sqlite3.IntegrityError:
self.get_logger().info(f"Sequence with name '{msg.sequence_name}' already exists.")
def main(args=None):
rclpy.init(args=args)
pose_learn_node = PoseLearnNode()
rclpy.spin(pose_learn_node)
pose_learn_node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

23
package.xml Normal file
View File

@ -0,0 +1,23 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>franka_logistics</name>
<version>0.0.0</version>
<description>TODO: Package description</description>
<maintainer email="nikolaus.feith@unileoben.ac.at">niko</maintainer>
<license>TODO: License declaration</license>
<depend>franka_msgs</depend>
<depend>geometry_msgs</depend>
<depend>moveit_msgs</depend>
<depend>std_msgs</depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

4
setup.cfg Normal file
View File

@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/franka_logistics
[install]
install_scripts=$base/lib/franka_logistics

25
setup.py Normal file
View File

@ -0,0 +1,25 @@
from setuptools import find_packages, setup
package_name = 'franka_logistics'
setup(
name=package_name,
version='0.0.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='niko',
maintainer_email='nikolaus.feith@unileoben.ac.at',
description='TODO: Package description',
license='TODO: License declaration',
tests_require=['pytest'],
entry_points={
'console_scripts': [
],
},
)

25
test/test_copyright.py Normal file
View File

@ -0,0 +1,25 @@
# Copyright 2015 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_copyright.main import main
import pytest
# Remove the `skip` decorator once the source file(s) have a copyright header
@pytest.mark.skip(reason='No copyright header has been placed in the generated source file.')
@pytest.mark.copyright
@pytest.mark.linter
def test_copyright():
rc = main(argv=['.', 'test'])
assert rc == 0, 'Found errors'

25
test/test_flake8.py Normal file
View File

@ -0,0 +1,25 @@
# Copyright 2017 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_flake8.main import main_with_errors
import pytest
@pytest.mark.flake8
@pytest.mark.linter
def test_flake8():
rc, errors = main_with_errors(argv=[])
assert rc == 0, \
'Found %d code style errors / warnings:\n' % len(errors) + \
'\n'.join(errors)

23
test/test_pep257.py Normal file
View File

@ -0,0 +1,23 @@
# Copyright 2015 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_pep257.main import main
import pytest
@pytest.mark.linter
@pytest.mark.pep257
def test_pep257():
rc = main(argv=['.', 'test'])
assert rc == 0, 'Found code style errors / warnings'