import cv2 import os from datetime import datetime from ultralytics import YOLO class YOLOv8CameraStream: def __init__(self, model_path: str, camera_source=0, confidence_threshold=0.5, save_threshold=0.9, log_file="detections.log", logging_level="high"): self.model = YOLO(model_path) self.camera_source = camera_source self.confidence_threshold = confidence_threshold self.default_confidence_threshold = confidence_threshold self.save_threshold = save_threshold self.default_save_threshold = save_threshold self.log_file = log_file self.logging_level = logging_level self.cap = cv2.VideoCapture(camera_source) # Create directories for saved frames and archived logs if they don't exist os.makedirs("saved_frames", exist_ok=True) os.makedirs("archive", exist_ok=True) # Initialize display settings self.show_thresholds = True def log_detection(self, label, confidence): timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") self.check_log_size() # Check log size before logging with open(self.log_file, "a") as log: log.write(f"{timestamp} - Detected: {label} with confidence {confidence:.2f}\n") def check_log_size(self): """Checks the size of the log file and archives it if it exceeds 5 MB.""" if os.path.exists(self.log_file) and os.path.getsize(self.log_file) > 5 * 1024 * 1024: # 5 MB timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") archived_log_file = f"archive/{self.log_file}_{timestamp}.log" os.rename(self.log_file, archived_log_file) # Rename the old log file print(f"Archived old log file to {archived_log_file}") # Create a new log file open(self.log_file, 'w').close() # Create an empty log file def save_frame(self, frame, label): timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"saved_frames/{label}_{timestamp}.jpg" cv2.imwrite(filename, frame) print(f"Frame saved: {filename}") def get_color_for_confidence(self, confidence): if confidence > 0.75: return (0, 255, 0) elif confidence > 0.5: return (0, 255, 255) else: return (0, 0, 255) def process_frame(self, frame): results = self.model.predict(frame) for result in results: for box in result.boxes: confidence = float(box.conf.item()) if self.logging_level == "high" and confidence < self.save_threshold: continue if confidence > self.confidence_threshold: x1, y1, x2, y2 = map(int, box.xyxy[0]) class_id = int(box.cls.item()) label = f"{result.names[class_id]} {confidence:.2f}" color = self.get_color_for_confidence(confidence) cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2) cv2.putText(frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2) self.log_detection(result.names[class_id], confidence) if confidence > self.save_threshold: self.save_frame(frame, result.names[class_id]) if self.show_thresholds: self.display_thresholds(frame) self.display_keyboard_options(frame) return frame def display_thresholds(self, frame): text = f"Confidence Threshold: {self.confidence_threshold:.2f} | Save Threshold: {self.save_threshold:.2f}" cv2.putText(frame, text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) def display_keyboard_options(self, frame): options = [ "Keyboard Controls:", "'c' - Increase Confidence Threshold | 'v' - Decrease Confidence Threshold", "'s' - Increase Save Threshold | 'x' - Decrease Save Threshold", "'t' - Toggle Threshold Display | 'r' - Reset Thresholds to Default", "'q' - Quit" ] for i, option in enumerate(options): cv2.putText(frame, option, (10, 50 + i * 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1) def adjust_thresholds(self): print("\nPress 'c' to increase confidence threshold, 'v' to decrease it.") print("Press 's' to increase save threshold, 'x' to decrease it.") print("Press 't' to toggle threshold display, 'r' to reset thresholds to default.") print("Press 'q' to quit the program.\n") def start(self): if not self.cap.isOpened(): print("Error: Could not open video source.") return self.adjust_thresholds() while True: ret, frame = self.cap.read() if not ret: print("Error: Failed to capture frame.") break processed_frame = self.process_frame(frame) cv2.imshow("YOLOv8 Camera Stream", processed_frame) key = cv2.waitKey(1) & 0xFF if key == ord('q'): break elif key == ord('c'): self.confidence_threshold = min(self.confidence_threshold + 0.05, 1.0) print(f"Confidence threshold increased to {self.confidence_threshold:.2f}") elif key == ord('v'): self.confidence_threshold = max(self.confidence_threshold - 0.05, 0.0) print(f"Confidence threshold decreased to {self.confidence_threshold:.2f}") elif key == ord('s'): self.save_threshold = min(self.save_threshold + 0.05, 1.0) print(f"Save threshold increased to {self.save_threshold:.2f}") elif key == ord('x'): self.save_threshold = max(self.save_threshold - 0.05, 0.0) print(f"Save threshold decreased to {self.save_threshold:.2f}") elif key == ord('t'): self.show_thresholds = not self.show_thresholds print(f"Threshold display toggled to {'on' if self.show_thresholds else 'off'}") elif key == ord('r'): self.confidence_threshold = self.default_confidence_threshold self.save_threshold = self.default_save_threshold print("Thresholds reset to default values.") self.cap.release() cv2.destroyAllWindows() # Run the object detection stream if __name__ == "__main__": yolo_stream = YOLOv8CameraStream(model_path="models/yolov8m_seg_e300.pt", logging_level="high") yolo_stream.start()