150 lines
6.6 KiB
Python
150 lines
6.6 KiB
Python
import cv2
|
|
import os
|
|
from datetime import datetime
|
|
from ultralytics import YOLO
|
|
|
|
class YOLOv8CameraStream:
|
|
def __init__(self, model_path: str, camera_source=0, confidence_threshold=0.5, save_threshold=0.9,
|
|
log_file="detections.log", logging_level="high"):
|
|
self.model = YOLO(model_path)
|
|
self.camera_source = camera_source
|
|
self.confidence_threshold = confidence_threshold
|
|
self.default_confidence_threshold = confidence_threshold
|
|
self.save_threshold = save_threshold
|
|
self.default_save_threshold = save_threshold
|
|
self.log_file = log_file
|
|
self.logging_level = logging_level
|
|
self.cap = cv2.VideoCapture(camera_source)
|
|
|
|
# Create directories for saved frames and archived logs if they don't exist
|
|
os.makedirs("saved_frames", exist_ok=True)
|
|
os.makedirs("archive", exist_ok=True)
|
|
|
|
# Initialize display settings
|
|
self.show_thresholds = True
|
|
|
|
def log_detection(self, label, confidence):
|
|
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
self.check_log_size() # Check log size before logging
|
|
with open(self.log_file, "a") as log:
|
|
log.write(f"{timestamp} - Detected: {label} with confidence {confidence:.2f}\n")
|
|
|
|
def check_log_size(self):
|
|
"""Checks the size of the log file and archives it if it exceeds 5 MB."""
|
|
if os.path.exists(self.log_file) and os.path.getsize(self.log_file) > 5 * 1024 * 1024: # 5 MB
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
archived_log_file = f"archive/{self.log_file}_{timestamp}.log"
|
|
os.rename(self.log_file, archived_log_file) # Rename the old log file
|
|
print(f"Archived old log file to {archived_log_file}")
|
|
# Create a new log file
|
|
open(self.log_file, 'w').close() # Create an empty log file
|
|
|
|
def save_frame(self, frame, label):
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
filename = f"saved_frames/{label}_{timestamp}.jpg"
|
|
cv2.imwrite(filename, frame)
|
|
print(f"Frame saved: {filename}")
|
|
|
|
def get_color_for_confidence(self, confidence):
|
|
if confidence > 0.75:
|
|
return (0, 255, 0)
|
|
elif confidence > 0.5:
|
|
return (0, 255, 255)
|
|
else:
|
|
return (0, 0, 255)
|
|
|
|
def process_frame(self, frame):
|
|
results = self.model.predict(frame)
|
|
for result in results:
|
|
for box in result.boxes:
|
|
confidence = float(box.conf.item())
|
|
if self.logging_level == "high" and confidence < self.save_threshold:
|
|
continue
|
|
|
|
if confidence > self.confidence_threshold:
|
|
x1, y1, x2, y2 = map(int, box.xyxy[0])
|
|
class_id = int(box.cls.item())
|
|
label = f"{result.names[class_id]} {confidence:.2f}"
|
|
color = self.get_color_for_confidence(confidence)
|
|
cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
|
|
cv2.putText(frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
|
|
|
|
self.log_detection(result.names[class_id], confidence)
|
|
|
|
if confidence > self.save_threshold:
|
|
self.save_frame(frame, result.names[class_id])
|
|
|
|
if self.show_thresholds:
|
|
self.display_thresholds(frame)
|
|
self.display_keyboard_options(frame)
|
|
|
|
return frame
|
|
|
|
def display_thresholds(self, frame):
|
|
text = f"Confidence Threshold: {self.confidence_threshold:.2f} | Save Threshold: {self.save_threshold:.2f}"
|
|
cv2.putText(frame, text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
|
|
|
|
def display_keyboard_options(self, frame):
|
|
options = [
|
|
"Keyboard Controls:",
|
|
"'c' - Increase Confidence Threshold | 'v' - Decrease Confidence Threshold",
|
|
"'s' - Increase Save Threshold | 'x' - Decrease Save Threshold",
|
|
"'t' - Toggle Threshold Display | 'r' - Reset Thresholds to Default",
|
|
"'q' - Quit"
|
|
]
|
|
for i, option in enumerate(options):
|
|
cv2.putText(frame, option, (10, 50 + i * 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
|
|
|
|
def adjust_thresholds(self):
|
|
print("\nPress 'c' to increase confidence threshold, 'v' to decrease it.")
|
|
print("Press 's' to increase save threshold, 'x' to decrease it.")
|
|
print("Press 't' to toggle threshold display, 'r' to reset thresholds to default.")
|
|
print("Press 'q' to quit the program.\n")
|
|
|
|
def start(self):
|
|
if not self.cap.isOpened():
|
|
print("Error: Could not open video source.")
|
|
return
|
|
|
|
self.adjust_thresholds()
|
|
|
|
while True:
|
|
ret, frame = self.cap.read()
|
|
if not ret:
|
|
print("Error: Failed to capture frame.")
|
|
break
|
|
|
|
processed_frame = self.process_frame(frame)
|
|
cv2.imshow("YOLOv8 Camera Stream", processed_frame)
|
|
|
|
key = cv2.waitKey(1) & 0xFF
|
|
if key == ord('q'):
|
|
break
|
|
elif key == ord('c'):
|
|
self.confidence_threshold = min(self.confidence_threshold + 0.05, 1.0)
|
|
print(f"Confidence threshold increased to {self.confidence_threshold:.2f}")
|
|
elif key == ord('v'):
|
|
self.confidence_threshold = max(self.confidence_threshold - 0.05, 0.0)
|
|
print(f"Confidence threshold decreased to {self.confidence_threshold:.2f}")
|
|
elif key == ord('s'):
|
|
self.save_threshold = min(self.save_threshold + 0.05, 1.0)
|
|
print(f"Save threshold increased to {self.save_threshold:.2f}")
|
|
elif key == ord('x'):
|
|
self.save_threshold = max(self.save_threshold - 0.05, 0.0)
|
|
print(f"Save threshold decreased to {self.save_threshold:.2f}")
|
|
elif key == ord('t'):
|
|
self.show_thresholds = not self.show_thresholds
|
|
print(f"Threshold display toggled to {'on' if self.show_thresholds else 'off'}")
|
|
elif key == ord('r'):
|
|
self.confidence_threshold = self.default_confidence_threshold
|
|
self.save_threshold = self.default_save_threshold
|
|
print("Thresholds reset to default values.")
|
|
|
|
self.cap.release()
|
|
cv2.destroyAllWindows()
|
|
|
|
# Run the object detection stream
|
|
if __name__ == "__main__":
|
|
yolo_stream = YOLOv8CameraStream(model_path="models/yolov8m_seg_e300.pt", logging_level="high")
|
|
yolo_stream.start()
|