vision-ai-test/activity_analyzer.py

580 lines
23 KiB
Python

import cv2
import onnxruntime as ort
import torch
import torch.nn as nn
import numpy as np
from torchvision import transforms
from ultralytics import YOLO
import time
from collections import deque
import json
import urllib
from torchvision.transforms import Compose, Lambda
from torchvision.transforms._transforms_video import (
CenterCropVideo,
NormalizeVideo,
)
# Try to import PyTorchVideo for MobileNet3D
try:
import torch.hub
PYTORCHVIDEO_AVAILABLE = True
except ImportError:
PYTORCHVIDEO_AVAILABLE = False
print("PyTorchVideo not available, using simplified model")
class ActivityAnalyzer:
def __init__(self, yolo_model_path, pose_model_path, video_path):
# Load YOLO model for person detection
self.yolo_model = YOLO(yolo_model_path)
# Load YOLO pose estimation model
self.pose_model = YOLO(pose_model_path)
# Video path
self.video_path = video_path
# Transform for preprocessing
self.transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])
# Activity labels
self.sop_activities = [
"speaking_with_customer",
"inputting_order",
"giving_item",
"cleaning_table"
]
self.non_compliant_activities = [
"using_mobile_phone",
"talking_with_colleagues",
"idle"
]
# Initialize activity labels first, then MobileNet3D
self.activity_labels = [
"speaking_with_customer",
"inputting_order",
"giving_item",
"cleaning_table",
"using_mobile_phone",
"talking_with_colleagues",
"idle",
"bringing_menu",
"bringing_food",
"opening_door",
]
# Initialize MobileNet3D for activity classification
self.activity_model = self._initialize_mobilenet3d()
# Person tracking
self.person_tracks = {}
self.next_person_id = 0
# Compliance tracking
self.compliance_log = []
def _initialize_mobilenet3d(self):
"""Initialize MobileNet3D model for activity classification"""
if not PYTORCHVIDEO_AVAILABLE:
print("PyTorchVideo not available, using simplified model")
# Create a simplified version for demonstration
try:
model = nn.Sequential(
nn.Conv3d(3, 32, kernel_size=(3, 3, 3), padding=(1, 1, 1)),
nn.ReLU(),
nn.AdaptiveAvgPool3d((1, 1, 1)),
nn.Flatten(),
nn.Linear(32, len(self.activity_labels))
)
return model
except Exception as e:
print(f"Warning: Could not initialize simplified MobileNet3D model: {e}")
print("Falling back to heuristic-based classification")
return None
# Try to load a real MobileNet3D model from PyTorchVideo
try:
# Load a pretrained MobileNet3D model
# Note: This is a placeholder - you would need to select an appropriate model
# For example: torch.hub.load('facebookresearch/pytorchvideo', 'slow_r50', pretrained=True)
# model = torch.hub.load('facebookresearch/pytorchvideo', 'slow_r50', pretrained=True)
model_name='slowfast_r50'
model_dir='C:/Users/suherdy.yacob/.cache/torch/hub/checkpoints/'
#model = torch.hub.load(repo_or_dir=model_dir,model = model_name, pretrained=True)
#model = torch.hub.load(model_dir, model_name, source='local', pretrained=True)
# 'slow_r50', 'x3d_m', x3d_l', 'slowfast_r50', 'slowfast_r101', 'c2d', 'i2d', 'mvit'
model = torch.hub.load('facebookresearch/pytorchvideo', 'slow_r50', pretrained=True)
# Modify the final layer to match our number of activity classes
# This is a simplified approach - in practice you might need a more complex adaptation
# json_url = "https://dl.fbaipublicfiles.com/pyslowfast/dataset/class_names/kinetics_classnames.json"
# json_filename = "kinetics_classnames.json"
# try: urllib.URLopener().retrieve(json_url, json_filename)
# except: urllib.request.urlretrieve(json_url, json_filename)
# with open(json_filename, "r") as f:
# kinetics_classnames = json.load(f)
# # Create an id to label name mapping
# self.activity_labels = []
# kinetics_id_to_classname = {}
# for k, v in kinetics_classnames.items():
# kinetics_id_to_classname[v] = str(k).replace('"', "")
# self.activity_labels.append(str(k).replace('"',""))
model.blocks[-1].proj = nn.Linear(model.blocks[-1].proj.in_features, len(self.activity_labels))
return model
except Exception as e:
print(f"Warning: Could not load pretrained MobileNet3D model: {e}")
print("Using simplified model instead")
try:
# Fallback to simplified model
model = nn.Sequential(
nn.Conv3d(3, 32, kernel_size=(3, 3, 3), padding=(1, 1, 1)),
nn.ReLU(),
nn.AdaptiveAvgPool3d((1, 1, 1)),
nn.Flatten(),
nn.Linear(32, len(self.activity_labels))
)
return model
except Exception as e:
print(f"Warning: Could not initialize simplified MobileNet3D model: {e}")
print("Falling back to heuristic-based classification")
return None
def extract_person_bboxes(self, results):
"""Extract person bounding boxes from YOLO results"""
persons = []
if results.boxes is not None:
for box in results.boxes:
# Assuming class 0 is 'person' in your model
if box.cls == 0 and box.conf > 0.5:
persons.append(box.xyxy[0].cpu().numpy())
return persons
def calculate_iou(self, box1, box2):
"""Calculate Intersection over Union between two bounding boxes"""
x1 = max(box1[0], box2[0])
y1 = max(box1[1], box2[1])
x2 = min(box1[2], box2[2])
y2 = min(box1[3], box2[3])
if x2 <= x1 or y2 <= y1:
return 0.0
intersection = (x2 - x1) * (y2 - y1)
area1 = (box1[2] - box1[0]) * (box1[3] - box1[1])
area2 = (box2[2] - box2[0]) * (box2[3] - box2[1])
union = area1 + area2 - intersection
return intersection / union if union > 0 else 0
def track_persons(self, current_persons, frame=None):
"""Simple tracking based on IoU"""
if not self.person_tracks:
# Initialize tracks for first frame
for person in current_persons:
self.person_tracks[self.next_person_id] = {
'id': self.next_person_id,
'bbox': person,
'activity_history': deque(maxlen=30),
'frame_crops': deque(maxlen=16), # For MobileNet3D
'last_seen': 0
}
self.next_person_id += 1
return list(self.person_tracks.values())
# Match current persons with existing tracks
tracked_persons = []
used_tracks = set()
for person in current_persons:
best_match = None
best_iou = 0
for track_id, track in self.person_tracks.items():
if track_id in used_tracks:
continue
iou = self.calculate_iou(person, track['bbox'])
if iou > best_iou and iou > 0.3: # Threshold for matching
best_iou = iou
best_match = track_id
if best_match is not None:
# Update existing track
self.person_tracks[best_match]['bbox'] = person
self.person_tracks[best_match]['last_seen'] = 0
# Extract and store frame crop for MobileNet3D
if frame is not None:
x1, y1, x2, y2 = map(int, person)
# Add padding
pad = 20
x1 = max(0, x1 - pad)
y1 = max(0, y1 - pad)
x2 = min(frame.shape[1], x2 + pad)
y2 = min(frame.shape[0], y2 + pad)
if x2 > x1 and y2 > y1:
crop = frame[y1:y2, x1:x2]
self.person_tracks[best_match]['frame_crops'].append(crop)
used_tracks.add(best_match)
tracked_persons.append(self.person_tracks[best_match])
else:
# Create new track
new_track = {
'id': self.next_person_id,
'bbox': person,
'activity_history': deque(maxlen=30),
'frame_crops': deque(maxlen=16), # For MobileNet3D
'last_seen': 0
}
# Extract and store frame crop for MobileNet3D
if frame is not None:
x1, y1, x2, y2 = map(int, person)
# Add padding
pad = 20
x1 = max(0, x1 - pad)
y1 = max(0, y1 - pad)
x2 = min(frame.shape[1], x2 + pad)
y2 = min(frame.shape[0], y2 + pad)
if x2 > x1 and y2 > y1:
crop = frame[y1:y2, x1:x2]
new_track['frame_crops'].append(crop)
self.person_tracks[self.next_person_id] = new_track
used_tracks.add(self.next_person_id)
tracked_persons.append(new_track)
self.next_person_id += 1
# Increment last_seen for unused tracks
for track_id, track in self.person_tracks.items():
if track_id not in used_tracks:
self.person_tracks[track_id]['last_seen'] += 1
# Remove old tracks (not seen for 10 frames)
self.person_tracks = {k: v for k, v in self.person_tracks.items() if v['last_seen'] < 10}
return tracked_persons
def estimate_pose(self, frame, bbox):
"""Estimate pose for a person in bounding box"""
x1, y1, x2, y2 = map(int, bbox)
# Add some padding
pad = 20
x1 = max(0, x1 - pad)
y1 = max(0, y1 - pad)
x2 = min(frame.shape[1], x2 + pad)
y2 = min(frame.shape[0], y2 + pad)
roi = frame[y1:y2, x1:x2]
if roi.size == 0:
return None
# Use YOLO pose model
results = self.pose_model(roi)
return results
def _preprocess_for_mobilenet3d(self, frame_sequence):
"""Preprocess frame sequence for MobileNet3D input"""
if not frame_sequence:
return None
# Convert to tensor and normalize
processed_frames = []
for frame in frame_sequence:
# Resize frame to model input size
resized = cv2.resize(frame, (224, 224))
# Convert BGR to RGB
rgb = cv2.cvtColor(resized, cv2.COLOR_BGR2RGB)
# Normalize
normalized = rgb.astype(np.float32) / 255.0
processed_frames.append(normalized)
# Stack frames into tensor (C, D, H, W) format for 3D conv
if processed_frames:
frames_tensor = np.stack(processed_frames, axis=0) # (D, H, W, C)
frames_tensor = np.transpose(frames_tensor, (3, 0, 1, 2)) # (C, D, H, W)
return torch.tensor(frames_tensor, dtype=torch.float32).unsqueeze(0) # Add batch dimension
return None
def classify_activity(self, pose_results, track):
"""Classify activity using MobileNet3D"""
# Use MobileNet3D if available, otherwise fall back to heuristic
if self.activity_model is not None:
try:
# Extract person crop from track
if 'frame_crops' not in track:
track['frame_crops'] = deque(maxlen=16) # 16 frames for 3D processing
# Add current frame crop to sequence if available
# In a real implementation, you would extract the person crop from the current frame
# For this example, we'll simulate with a placeholder
# If we have enough frames for 3D processing
if len(track['frame_crops']) >= 8:
# Preprocess frames for MobileNet3D
input_tensor = self._preprocess_for_mobilenet3d(list(track['frame_crops']))
if input_tensor is not None:
# Run inference
with torch.no_grad():
outputs = self.activity_model(input_tensor)
probabilities = torch.softmax(outputs, dim=1)
confidence, predicted = torch.max(probabilities, 1)
# Get activity label
activity = self.activity_labels[predicted.item()]
return activity, confidence.item()
# If not enough frames or model fails, fall back to heuristic
pass
except Exception as e:
print(f"MobileNet3D classification failed: {e}")
# Fall back to heuristic classification
pass
# Heuristic-based classification (fallback)
if len(pose_results) == 0:
return "idle", 0.5
keypoints = pose_results[0].keypoints
if keypoints is None or len(keypoints) == 0:
return "idle", 0.5
# Check if keypoints has xy attribute and it's not empty
if not hasattr(keypoints, 'xy') or len(keypoints.xy) == 0:
return "idle", 0.5
# Extract keypoints
kpts = keypoints.xy[0].cpu().numpy() if len(keypoints.xy) > 0 else np.array([])
confs = keypoints.conf[0].cpu().numpy() if hasattr(keypoints, 'conf') and keypoints.conf is not None and len(keypoints.conf) > 0 else np.ones(len(kpts)) if len(kpts) > 0 else np.array([])
# If no keypoints detected, return idle
if len(kpts) == 0:
return "idle", 0.5
# Simple heuristic-based activity classification
# Check if person is using mobile phone (head tilted down, hand near face)
if len(kpts) > 10:
# Check if nose and eyes are visible
nose_visible = confs[0] > 0.5 if len(confs) > 0 else False
left_eye_visible = confs[1] > 0.5 if len(confs) > 1 else False
right_eye_visible = confs[2] > 0.5 if len(confs) > 2 else False
# Check if hands are near head (possible mobile phone use)
left_wrist_visible = confs[9] > 0.5 if len(confs) > 9 else False
right_wrist_visible = confs[10] > 0.5 if len(confs) > 10 else False
if nose_visible and (left_eye_visible or right_eye_visible) and (left_wrist_visible or right_wrist_visible):
# Simple check for hand near head
if len(kpts) > 10:
head_y = kpts[0][1] # nose
left_hand_y = kpts[9][1] if left_wrist_visible else None
right_hand_y = kpts[10][1] if right_wrist_visible else None
if left_hand_y is not None and abs(left_hand_y - head_y) < 100:
return "using_mobile_phone", 0.8
if right_hand_y is not None and abs(right_hand_y - head_y) < 100:
return "using_mobile_phone", 0.8
# Add activity to history
track['activity_history'].append({
'timestamp': time.time(),
'keypoints': kpts.tolist() if len(kpts) > 0 else [],
'confidences': confs.tolist() if len(confs) > 0 else []
})
# Default to idle if no specific activity detected
return "idle", 0.6
def check_sop_compliance(self, activity, confidence, track):
"""Check if activity complies with SOP"""
# Non-compliant activities
if activity in self.non_compliant_activities:
return False, f"Non-compliant activity: {activity}"
# Check for proper SOP sequence
if len(track['activity_history']) > 1:
recent_activities = [act for act in list(track['activity_history'])[-5:] if 'activity' in act]
if len(recent_activities) > 1:
# Simple sequence validation
pass # More complex logic would go here
# Compliant if it's an SOP activity or idle
if activity in self.sop_activities or activity == "idle":
return True, "Following SOP"
# Default to compliant with low confidence activities
if confidence < 0.7:
return True, "Uncertain activity"
return True, "Following SOP"
def log_compliance(self, track_id, activity, compliant, reason, timestamp):
"""Log compliance information"""
log_entry = {
'timestamp': timestamp,
'person_id': track_id,
'activity': activity,
'compliant': compliant,
'reason': reason
}
self.compliance_log.append(log_entry)
def analyze_video(self):
"""Main analysis function"""
cap = cv2.VideoCapture(self.video_path)
if not cap.isOpened():
print("Error opening video file")
return
# Get video properties for output video
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(cap.get(cv2.CAP_PROP_FPS))
# Initialize video writer
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter('output_video.mp4', fourcc, fps, (frame_width, frame_height))
frame_count = 0
start_time = time.time()
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
frame_count += 1
# Process every 3rd frame for performance
if frame_count % 3 != 0:
# Write unprocessed frame to output video
#out.write(frame)
continue
# Detect persons in frame
results = self.yolo_model(frame)
# Extract person bounding boxes
persons = []
for result in results:
persons.extend(self.extract_person_bboxes(result))
# Track persons across frames
tracked_persons = self.track_persons(persons, frame)
# Analyze each tracked person
for track in tracked_persons:
bbox = track['bbox']
x1, y1, x2, y2 = map(int, bbox)
# Estimate pose
pose_results = self.estimate_pose(frame, bbox)
# Classify activity
activity, confidence = self.classify_activity(pose_results, track)
# Check SOP compliance
compliant, reason = self.check_sop_compliance(activity, confidence, track)
# Log compliance
self.log_compliance(track['id'], activity, compliant, reason, time.time())
# Draw bounding box
color = (0, 255, 0) if compliant else (0, 0, 255)
cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
# Add labels
label = f"ID:{track['id']} {activity}"
status = "Compliant" if compliant else "Non-compliant"
cv2.putText(frame, label, (x1, y1 - 30), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
cv2.putText(frame, status, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
# Write processed frame to output video
out.write(frame)
# Display frame
cv2.imshow('SOP Compliance Analyzer', frame)
# Print progress
if frame_count % 30 == 0:
elapsed = time.time() - start_time
fps = frame_count / elapsed if elapsed > 0 else 0
print(f"Frame: {frame_count}, FPS: {fps:.2f}, Persons tracked: {len(tracked_persons)}")
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
out.release()
cv2.destroyAllWindows()
# Save compliance log
self.save_compliance_report()
# Print summary
self.print_summary()
print("Output video saved as 'output_video.mp4'")
def save_compliance_report(self):
"""Save compliance report to file"""
with open('compliance_report.json', 'w') as f:
json.dump(self.compliance_log, f, indent=2)
print("Compliance report saved to compliance_report.json")
def print_summary(self):
"""Print compliance summary"""
if not self.compliance_log:
print("No compliance data recorded")
return
total_checks = len(self.compliance_log)
compliant_checks = sum(1 for log in self.compliance_log if log['compliant'])
non_compliant_checks = total_checks - compliant_checks
print("\n=== SOP Compliance Summary ===")
print(f"Total activity checks: {total_checks}")
print(f"Compliant activities: {compliant_checks} ({compliant_checks/total_checks*100:.1f}%)")
print(f"Non-compliant activities: {non_compliant_checks} ({non_compliant_checks/total_checks*100:.1f}%)")
# Count violations by type
violation_types = {}
for log in self.compliance_log:
if not log['compliant']:
activity = log['activity']
violation_types[activity] = violation_types.get(activity, 0) + 1
if violation_types:
print("\nViolation types:")
for activity, count in sorted(violation_types.items(), key=lambda x: x[1], reverse=True):
print(f" {activity}: {count}")
def main():
# Initialize analyzer
analyzer = ActivityAnalyzer(
yolo_model_path='yolo11m-2_uniform.onnx',
pose_model_path='yolo11s-pose.pt',
video_path='Gayungsari_110825_2.mp4'
)
# Run analysis
print("Starting SOP compliance analysis...")
print("Press 'q' to quit the video display")
analyzer.analyze_video()
if __name__ == "__main__":
main()