cps_loki/run_localization.py

72 lines
2.0 KiB
Python
Raw Normal View History

2023-06-16 09:21:11 +00:00
import subprocess
import signal
import os
import time
2023-06-19 06:38:52 +00:00
import threading
2023-06-16 09:21:11 +00:00
processes = []
current_directory = os.getcwd()
def source_setup_files():
subprocess.Popen("source /opt/ros/humble/setup.bash", shell=True, executable="/bin/bash")
subprocess.Popen("source ./install/setup.bash", shell=True, executable="/bin/bash")
print("Setup files sourced")
def start_process(command):
process = subprocess.Popen(command, shell=True, executable="/bin/bash", preexec_fn=os.setsid)
processes.append(process)
print(f"Started process with command: {command}")
def stop_processes():
for process in processes:
os.killpg(os.getpgid(process.pid), signal.SIGTERM)
print("Stopped all processes")
def handle_interrupt(signal, frame):
print("Keyboard interrupt detected")
stop_processes()
exit(0)
# Define the base command
base_command = "ros2 launch bot_mini_bringup"
# Define the commands to launch
launch_commands = [
2023-06-16 10:01:14 +00:00
"rsp.launch.py",
2023-06-16 09:21:11 +00:00
"robot_controller.launch.py",
"robot_joy_teleop.launch.py",
"robot_twist_mux.launch.py",
2023-06-19 06:38:52 +00:00
"robot_lidar.launch.py"
2023-06-16 09:21:11 +00:00
]
2023-06-16 10:01:14 +00:00
# Create the commands by joining the base command with each launch command
commands = [f"{base_command} {command}" for command in launch_commands]
2023-06-16 09:21:11 +00:00
# Add the path to the map file
2023-06-19 06:38:52 +00:00
map_file = os.path.join(current_directory, "src", "bot_mini_bringup", "maps", "default_save_map.yaml")
2023-06-16 09:21:11 +00:00
map_command = f"ros2 launch nav2_bringup localization_launch.py map:={map_file}"
commands.append(map_command)
# Register the signal handler
signal.signal(signal.SIGINT, handle_interrupt)
# Source the setup files
source_setup_files()
2023-06-19 06:38:52 +00:00
# Create and start threads for each command
threads = []
2023-06-16 09:21:11 +00:00
for command in commands:
2023-06-19 06:38:52 +00:00
thread = threading.Thread(target=start_process, args=(command,))
thread.start()
threads.append(thread)
2023-06-16 09:21:11 +00:00
# Wait for user input to stop the processes
input("Press Enter to stop all processes...")
# Stop all processes
stop_processes()
2023-06-19 06:38:52 +00:00
# Wait for all threads to complete
for thread in threads:
thread.join()