import cv2 import onnxruntime as ort import torch import torch.nn as nn import numpy as np from torchvision import transforms from ultralytics import YOLO import time from collections import deque import json import urllib from torchvision.transforms import Compose, Lambda from torchvision.transforms._transforms_video import ( CenterCropVideo, NormalizeVideo, ) # Try to import PyTorchVideo for MobileNet3D try: import torch.hub PYTORCHVIDEO_AVAILABLE = True except ImportError: PYTORCHVIDEO_AVAILABLE = False print("PyTorchVideo not available, using simplified model") class ActivityAnalyzer: def __init__(self, yolo_model_path, pose_model_path, video_path): # Load YOLO model for person detection self.yolo_model = YOLO(yolo_model_path) # Load YOLO pose estimation model self.pose_model = YOLO(pose_model_path) # Video path self.video_path = video_path # Transform for preprocessing self.transform = transforms.Compose([ transforms.ToTensor(), transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), ]) # Activity labels self.sop_activities = [ "speaking_with_customer", "inputting_order", "giving_item", "cleaning_table" ] self.non_compliant_activities = [ "using_mobile_phone", "talking_with_colleagues", "idle" ] # Initialize activity labels first, then MobileNet3D self.activity_labels = [ "speaking_with_customer", "inputting_order", "giving_item", "cleaning_table", "using_mobile_phone", "talking_with_colleagues", "idle", "bringing_menu", "bringing_food", "opening_door", ] # Initialize MobileNet3D for activity classification self.activity_model = self._initialize_mobilenet3d() # Person tracking self.person_tracks = {} self.next_person_id = 0 # Compliance tracking self.compliance_log = [] def _initialize_mobilenet3d(self): """Initialize MobileNet3D model for activity classification""" if not PYTORCHVIDEO_AVAILABLE: print("PyTorchVideo not available, using simplified model") # Create a simplified version for demonstration try: model = nn.Sequential( nn.Conv3d(3, 32, kernel_size=(3, 3, 3), padding=(1, 1, 1)), nn.ReLU(), nn.AdaptiveAvgPool3d((1, 1, 1)), nn.Flatten(), nn.Linear(32, len(self.activity_labels)) ) return model except Exception as e: print(f"Warning: Could not initialize simplified MobileNet3D model: {e}") print("Falling back to heuristic-based classification") return None # Try to load a real MobileNet3D model from PyTorchVideo try: # Load a pretrained MobileNet3D model # Note: This is a placeholder - you would need to select an appropriate model # For example: torch.hub.load('facebookresearch/pytorchvideo', 'slow_r50', pretrained=True) # model = torch.hub.load('facebookresearch/pytorchvideo', 'slow_r50', pretrained=True) model_name='slowfast_r50' model_dir='C:/Users/suherdy.yacob/.cache/torch/hub/checkpoints/' #model = torch.hub.load(repo_or_dir=model_dir,model = model_name, pretrained=True) #model = torch.hub.load(model_dir, model_name, source='local', pretrained=True) # 'slow_r50', 'x3d_m', x3d_l', 'slowfast_r50', 'slowfast_r101', 'c2d', 'i2d', 'mvit' model = torch.hub.load('facebookresearch/pytorchvideo', 'slow_r50', pretrained=True) # Modify the final layer to match our number of activity classes # This is a simplified approach - in practice you might need a more complex adaptation # json_url = "https://dl.fbaipublicfiles.com/pyslowfast/dataset/class_names/kinetics_classnames.json" # json_filename = "kinetics_classnames.json" # try: urllib.URLopener().retrieve(json_url, json_filename) # except: urllib.request.urlretrieve(json_url, json_filename) # with open(json_filename, "r") as f: # kinetics_classnames = json.load(f) # # Create an id to label name mapping # self.activity_labels = [] # kinetics_id_to_classname = {} # for k, v in kinetics_classnames.items(): # kinetics_id_to_classname[v] = str(k).replace('"', "") # self.activity_labels.append(str(k).replace('"',"")) model.blocks[-1].proj = nn.Linear(model.blocks[-1].proj.in_features, len(self.activity_labels)) return model except Exception as e: print(f"Warning: Could not load pretrained MobileNet3D model: {e}") print("Using simplified model instead") try: # Fallback to simplified model model = nn.Sequential( nn.Conv3d(3, 32, kernel_size=(3, 3, 3), padding=(1, 1, 1)), nn.ReLU(), nn.AdaptiveAvgPool3d((1, 1, 1)), nn.Flatten(), nn.Linear(32, len(self.activity_labels)) ) return model except Exception as e: print(f"Warning: Could not initialize simplified MobileNet3D model: {e}") print("Falling back to heuristic-based classification") return None def extract_person_bboxes(self, results): """Extract person bounding boxes from YOLO results""" persons = [] if results.boxes is not None: for box in results.boxes: # Assuming class 0 is 'person' in your model if box.cls == 0 and box.conf > 0.5: persons.append(box.xyxy[0].cpu().numpy()) return persons def calculate_iou(self, box1, box2): """Calculate Intersection over Union between two bounding boxes""" x1 = max(box1[0], box2[0]) y1 = max(box1[1], box2[1]) x2 = min(box1[2], box2[2]) y2 = min(box1[3], box2[3]) if x2 <= x1 or y2 <= y1: return 0.0 intersection = (x2 - x1) * (y2 - y1) area1 = (box1[2] - box1[0]) * (box1[3] - box1[1]) area2 = (box2[2] - box2[0]) * (box2[3] - box2[1]) union = area1 + area2 - intersection return intersection / union if union > 0 else 0 def track_persons(self, current_persons, frame=None): """Simple tracking based on IoU""" if not self.person_tracks: # Initialize tracks for first frame for person in current_persons: self.person_tracks[self.next_person_id] = { 'id': self.next_person_id, 'bbox': person, 'activity_history': deque(maxlen=30), 'frame_crops': deque(maxlen=16), # For MobileNet3D 'last_seen': 0 } self.next_person_id += 1 return list(self.person_tracks.values()) # Match current persons with existing tracks tracked_persons = [] used_tracks = set() for person in current_persons: best_match = None best_iou = 0 for track_id, track in self.person_tracks.items(): if track_id in used_tracks: continue iou = self.calculate_iou(person, track['bbox']) if iou > best_iou and iou > 0.3: # Threshold for matching best_iou = iou best_match = track_id if best_match is not None: # Update existing track self.person_tracks[best_match]['bbox'] = person self.person_tracks[best_match]['last_seen'] = 0 # Extract and store frame crop for MobileNet3D if frame is not None: x1, y1, x2, y2 = map(int, person) # Add padding pad = 20 x1 = max(0, x1 - pad) y1 = max(0, y1 - pad) x2 = min(frame.shape[1], x2 + pad) y2 = min(frame.shape[0], y2 + pad) if x2 > x1 and y2 > y1: crop = frame[y1:y2, x1:x2] self.person_tracks[best_match]['frame_crops'].append(crop) used_tracks.add(best_match) tracked_persons.append(self.person_tracks[best_match]) else: # Create new track new_track = { 'id': self.next_person_id, 'bbox': person, 'activity_history': deque(maxlen=30), 'frame_crops': deque(maxlen=16), # For MobileNet3D 'last_seen': 0 } # Extract and store frame crop for MobileNet3D if frame is not None: x1, y1, x2, y2 = map(int, person) # Add padding pad = 20 x1 = max(0, x1 - pad) y1 = max(0, y1 - pad) x2 = min(frame.shape[1], x2 + pad) y2 = min(frame.shape[0], y2 + pad) if x2 > x1 and y2 > y1: crop = frame[y1:y2, x1:x2] new_track['frame_crops'].append(crop) self.person_tracks[self.next_person_id] = new_track used_tracks.add(self.next_person_id) tracked_persons.append(new_track) self.next_person_id += 1 # Increment last_seen for unused tracks for track_id, track in self.person_tracks.items(): if track_id not in used_tracks: self.person_tracks[track_id]['last_seen'] += 1 # Remove old tracks (not seen for 10 frames) self.person_tracks = {k: v for k, v in self.person_tracks.items() if v['last_seen'] < 10} return tracked_persons def estimate_pose(self, frame, bbox): """Estimate pose for a person in bounding box""" x1, y1, x2, y2 = map(int, bbox) # Add some padding pad = 20 x1 = max(0, x1 - pad) y1 = max(0, y1 - pad) x2 = min(frame.shape[1], x2 + pad) y2 = min(frame.shape[0], y2 + pad) roi = frame[y1:y2, x1:x2] if roi.size == 0: return None # Use YOLO pose model results = self.pose_model(roi) return results def _preprocess_for_mobilenet3d(self, frame_sequence): """Preprocess frame sequence for MobileNet3D input""" if not frame_sequence: return None # Convert to tensor and normalize processed_frames = [] for frame in frame_sequence: # Resize frame to model input size resized = cv2.resize(frame, (224, 224)) # Convert BGR to RGB rgb = cv2.cvtColor(resized, cv2.COLOR_BGR2RGB) # Normalize normalized = rgb.astype(np.float32) / 255.0 processed_frames.append(normalized) # Stack frames into tensor (C, D, H, W) format for 3D conv if processed_frames: frames_tensor = np.stack(processed_frames, axis=0) # (D, H, W, C) frames_tensor = np.transpose(frames_tensor, (3, 0, 1, 2)) # (C, D, H, W) return torch.tensor(frames_tensor, dtype=torch.float32).unsqueeze(0) # Add batch dimension return None def classify_activity(self, pose_results, track): """Classify activity using MobileNet3D""" # Use MobileNet3D if available, otherwise fall back to heuristic if self.activity_model is not None: try: # Extract person crop from track if 'frame_crops' not in track: track['frame_crops'] = deque(maxlen=16) # 16 frames for 3D processing # Add current frame crop to sequence if available # In a real implementation, you would extract the person crop from the current frame # For this example, we'll simulate with a placeholder # If we have enough frames for 3D processing if len(track['frame_crops']) >= 8: # Preprocess frames for MobileNet3D input_tensor = self._preprocess_for_mobilenet3d(list(track['frame_crops'])) if input_tensor is not None: # Run inference with torch.no_grad(): outputs = self.activity_model(input_tensor) probabilities = torch.softmax(outputs, dim=1) confidence, predicted = torch.max(probabilities, 1) # Get activity label activity = self.activity_labels[predicted.item()] return activity, confidence.item() # If not enough frames or model fails, fall back to heuristic pass except Exception as e: print(f"MobileNet3D classification failed: {e}") # Fall back to heuristic classification pass # Heuristic-based classification (fallback) if len(pose_results) == 0: return "idle", 0.5 keypoints = pose_results[0].keypoints if keypoints is None or len(keypoints) == 0: return "idle", 0.5 # Check if keypoints has xy attribute and it's not empty if not hasattr(keypoints, 'xy') or len(keypoints.xy) == 0: return "idle", 0.5 # Extract keypoints kpts = keypoints.xy[0].cpu().numpy() if len(keypoints.xy) > 0 else np.array([]) confs = keypoints.conf[0].cpu().numpy() if hasattr(keypoints, 'conf') and keypoints.conf is not None and len(keypoints.conf) > 0 else np.ones(len(kpts)) if len(kpts) > 0 else np.array([]) # If no keypoints detected, return idle if len(kpts) == 0: return "idle", 0.5 # Simple heuristic-based activity classification # Check if person is using mobile phone (head tilted down, hand near face) if len(kpts) > 10: # Check if nose and eyes are visible nose_visible = confs[0] > 0.5 if len(confs) > 0 else False left_eye_visible = confs[1] > 0.5 if len(confs) > 1 else False right_eye_visible = confs[2] > 0.5 if len(confs) > 2 else False # Check if hands are near head (possible mobile phone use) left_wrist_visible = confs[9] > 0.5 if len(confs) > 9 else False right_wrist_visible = confs[10] > 0.5 if len(confs) > 10 else False if nose_visible and (left_eye_visible or right_eye_visible) and (left_wrist_visible or right_wrist_visible): # Simple check for hand near head if len(kpts) > 10: head_y = kpts[0][1] # nose left_hand_y = kpts[9][1] if left_wrist_visible else None right_hand_y = kpts[10][1] if right_wrist_visible else None if left_hand_y is not None and abs(left_hand_y - head_y) < 100: return "using_mobile_phone", 0.8 if right_hand_y is not None and abs(right_hand_y - head_y) < 100: return "using_mobile_phone", 0.8 # Add activity to history track['activity_history'].append({ 'timestamp': time.time(), 'keypoints': kpts.tolist() if len(kpts) > 0 else [], 'confidences': confs.tolist() if len(confs) > 0 else [] }) # Default to idle if no specific activity detected return "idle", 0.6 def check_sop_compliance(self, activity, confidence, track): """Check if activity complies with SOP""" # Non-compliant activities if activity in self.non_compliant_activities: return False, f"Non-compliant activity: {activity}" # Check for proper SOP sequence if len(track['activity_history']) > 1: recent_activities = [act for act in list(track['activity_history'])[-5:] if 'activity' in act] if len(recent_activities) > 1: # Simple sequence validation pass # More complex logic would go here # Compliant if it's an SOP activity or idle if activity in self.sop_activities or activity == "idle": return True, "Following SOP" # Default to compliant with low confidence activities if confidence < 0.7: return True, "Uncertain activity" return True, "Following SOP" def log_compliance(self, track_id, activity, compliant, reason, timestamp): """Log compliance information""" log_entry = { 'timestamp': timestamp, 'person_id': track_id, 'activity': activity, 'compliant': compliant, 'reason': reason } self.compliance_log.append(log_entry) def analyze_video(self): """Main analysis function""" cap = cv2.VideoCapture(self.video_path) if not cap.isOpened(): print("Error opening video file") return # Get video properties for output video frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) fps = int(cap.get(cv2.CAP_PROP_FPS)) # Initialize video writer fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter('output_video.mp4', fourcc, fps, (frame_width, frame_height)) frame_count = 0 start_time = time.time() while cap.isOpened(): ret, frame = cap.read() if not ret: break frame_count += 1 # Process every 3rd frame for performance if frame_count % 3 != 0: # Write unprocessed frame to output video #out.write(frame) continue # Detect persons in frame results = self.yolo_model(frame) # Extract person bounding boxes persons = [] for result in results: persons.extend(self.extract_person_bboxes(result)) # Track persons across frames tracked_persons = self.track_persons(persons, frame) # Analyze each tracked person for track in tracked_persons: bbox = track['bbox'] x1, y1, x2, y2 = map(int, bbox) # Estimate pose pose_results = self.estimate_pose(frame, bbox) # Classify activity activity, confidence = self.classify_activity(pose_results, track) # Check SOP compliance compliant, reason = self.check_sop_compliance(activity, confidence, track) # Log compliance self.log_compliance(track['id'], activity, compliant, reason, time.time()) # Draw bounding box color = (0, 255, 0) if compliant else (0, 0, 255) cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2) # Add labels label = f"ID:{track['id']} {activity}" status = "Compliant" if compliant else "Non-compliant" cv2.putText(frame, label, (x1, y1 - 30), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2) cv2.putText(frame, status, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2) # Write processed frame to output video out.write(frame) # Display frame cv2.imshow('SOP Compliance Analyzer', frame) # Print progress if frame_count % 30 == 0: elapsed = time.time() - start_time fps = frame_count / elapsed if elapsed > 0 else 0 print(f"Frame: {frame_count}, FPS: {fps:.2f}, Persons tracked: {len(tracked_persons)}") if cv2.waitKey(1) & 0xFF == ord('q'): break cap.release() out.release() cv2.destroyAllWindows() # Save compliance log self.save_compliance_report() # Print summary self.print_summary() print("Output video saved as 'output_video.mp4'") def save_compliance_report(self): """Save compliance report to file""" with open('compliance_report.json', 'w') as f: json.dump(self.compliance_log, f, indent=2) print("Compliance report saved to compliance_report.json") def print_summary(self): """Print compliance summary""" if not self.compliance_log: print("No compliance data recorded") return total_checks = len(self.compliance_log) compliant_checks = sum(1 for log in self.compliance_log if log['compliant']) non_compliant_checks = total_checks - compliant_checks print("\n=== SOP Compliance Summary ===") print(f"Total activity checks: {total_checks}") print(f"Compliant activities: {compliant_checks} ({compliant_checks/total_checks*100:.1f}%)") print(f"Non-compliant activities: {non_compliant_checks} ({non_compliant_checks/total_checks*100:.1f}%)") # Count violations by type violation_types = {} for log in self.compliance_log: if not log['compliant']: activity = log['activity'] violation_types[activity] = violation_types.get(activity, 0) + 1 if violation_types: print("\nViolation types:") for activity, count in sorted(violation_types.items(), key=lambda x: x[1], reverse=True): print(f" {activity}: {count}") def main(): # Initialize analyzer analyzer = ActivityAnalyzer( yolo_model_path='yolo11m-2_uniform.onnx', pose_model_path='yolo11s-pose.pt', video_path='Gayungsari_110825_2.mp4' ) # Run analysis print("Starting SOP compliance analysis...") print("Press 'q' to quit the video display") analyzer.analyze_video() if __name__ == "__main__": main()