GeislingerProject/lib/CameraStream.py
2024-11-23 19:42:46 +01:00

150 lines
6.6 KiB
Python

import cv2
import os
from datetime import datetime
from ultralytics import YOLO
class YOLOv8CameraStream:
def __init__(self, model_path: str, camera_source=0, confidence_threshold=0.5, save_threshold=0.9,
log_file="detections.log", logging_level="high"):
self.model = YOLO(model_path)
self.camera_source = camera_source
self.confidence_threshold = confidence_threshold
self.default_confidence_threshold = confidence_threshold
self.save_threshold = save_threshold
self.default_save_threshold = save_threshold
self.log_file = log_file
self.logging_level = logging_level
self.cap = cv2.VideoCapture(camera_source)
# Create directories for saved frames and archived logs if they don't exist
os.makedirs("saved_frames", exist_ok=True)
os.makedirs("archive", exist_ok=True)
# Initialize display settings
self.show_thresholds = True
def log_detection(self, label, confidence):
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.check_log_size() # Check log size before logging
with open(self.log_file, "a") as log:
log.write(f"{timestamp} - Detected: {label} with confidence {confidence:.2f}\n")
def check_log_size(self):
"""Checks the size of the log file and archives it if it exceeds 5 MB."""
if os.path.exists(self.log_file) and os.path.getsize(self.log_file) > 5 * 1024 * 1024: # 5 MB
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
archived_log_file = f"archive/{self.log_file}_{timestamp}.log"
os.rename(self.log_file, archived_log_file) # Rename the old log file
print(f"Archived old log file to {archived_log_file}")
# Create a new log file
open(self.log_file, 'w').close() # Create an empty log file
def save_frame(self, frame, label):
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"saved_frames/{label}_{timestamp}.jpg"
cv2.imwrite(filename, frame)
print(f"Frame saved: {filename}")
def get_color_for_confidence(self, confidence):
if confidence > 0.75:
return (0, 255, 0)
elif confidence > 0.5:
return (0, 255, 255)
else:
return (0, 0, 255)
def process_frame(self, frame):
results = self.model.predict(frame)
for result in results:
for box in result.boxes:
confidence = float(box.conf.item())
if self.logging_level == "high" and confidence < self.save_threshold:
continue
if confidence > self.confidence_threshold:
x1, y1, x2, y2 = map(int, box.xyxy[0])
class_id = int(box.cls.item())
label = f"{result.names[class_id]} {confidence:.2f}"
color = self.get_color_for_confidence(confidence)
cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
cv2.putText(frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
self.log_detection(result.names[class_id], confidence)
if confidence > self.save_threshold:
self.save_frame(frame, result.names[class_id])
if self.show_thresholds:
self.display_thresholds(frame)
self.display_keyboard_options(frame)
return frame
def display_thresholds(self, frame):
text = f"Confidence Threshold: {self.confidence_threshold:.2f} | Save Threshold: {self.save_threshold:.2f}"
cv2.putText(frame, text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
def display_keyboard_options(self, frame):
options = [
"Keyboard Controls:",
"'c' - Increase Confidence Threshold | 'v' - Decrease Confidence Threshold",
"'s' - Increase Save Threshold | 'x' - Decrease Save Threshold",
"'t' - Toggle Threshold Display | 'r' - Reset Thresholds to Default",
"'q' - Quit"
]
for i, option in enumerate(options):
cv2.putText(frame, option, (10, 50 + i * 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
def adjust_thresholds(self):
print("\nPress 'c' to increase confidence threshold, 'v' to decrease it.")
print("Press 's' to increase save threshold, 'x' to decrease it.")
print("Press 't' to toggle threshold display, 'r' to reset thresholds to default.")
print("Press 'q' to quit the program.\n")
def start(self):
if not self.cap.isOpened():
print("Error: Could not open video source.")
return
self.adjust_thresholds()
while True:
ret, frame = self.cap.read()
if not ret:
print("Error: Failed to capture frame.")
break
processed_frame = self.process_frame(frame)
cv2.imshow("YOLOv8 Camera Stream", processed_frame)
key = cv2.waitKey(1) & 0xFF
if key == ord('q'):
break
elif key == ord('c'):
self.confidence_threshold = min(self.confidence_threshold + 0.05, 1.0)
print(f"Confidence threshold increased to {self.confidence_threshold:.2f}")
elif key == ord('v'):
self.confidence_threshold = max(self.confidence_threshold - 0.05, 0.0)
print(f"Confidence threshold decreased to {self.confidence_threshold:.2f}")
elif key == ord('s'):
self.save_threshold = min(self.save_threshold + 0.05, 1.0)
print(f"Save threshold increased to {self.save_threshold:.2f}")
elif key == ord('x'):
self.save_threshold = max(self.save_threshold - 0.05, 0.0)
print(f"Save threshold decreased to {self.save_threshold:.2f}")
elif key == ord('t'):
self.show_thresholds = not self.show_thresholds
print(f"Threshold display toggled to {'on' if self.show_thresholds else 'off'}")
elif key == ord('r'):
self.confidence_threshold = self.default_confidence_threshold
self.save_threshold = self.default_save_threshold
print("Thresholds reset to default values.")
self.cap.release()
cv2.destroyAllWindows()
# Run the object detection stream
if __name__ == "__main__":
yolo_stream = YOLOv8CameraStream(model_path="models/yolov8m_seg_e300.pt", logging_level="high")
yolo_stream.start()