add Human Activity Recognition using SlowFast

This commit is contained in:
admin.suherdy 2025-08-11 16:59:34 +07:00
parent 6fd7cbf07e
commit 903b82e4db
12 changed files with 3470 additions and 40 deletions

1
.gitignore vendored
View File

@ -26,6 +26,7 @@ share/python-wheels/
.installed.cfg .installed.cfg
*.egg *.egg
MANIFEST MANIFEST
*.mp4
# PyInstaller # PyInstaller
# Usually these files are written by a python script from a template # Usually these files are written by a python script from a template

View File

@ -1,3 +1,91 @@
# vision-ai-test # SOP Compliance Analyzer
Belajar vision AI This program analyzes human activity in videos to check if workers are following Standard Operating Procedures (SOPs). It uses computer vision techniques to detect people, estimate their poses, classify activities, and evaluate compliance with predefined SOPs.
## Features
- Person detection using YOLO model
- Pose estimation for activity analysis
- Activity classification (SOP compliance checking)
- Real-time video analysis with visual feedback
- Compliance reporting and logging
- Person tracking across video frames
## Requirements
- Python 3.7+
- OpenCV
- PyTorch
- ONNX Runtime
- Ultralytics YOLO
- NumPy
- PyTorchVideo (for MobileNet3D support)
## Installation
1. Install the required packages:
```
pip install -r requirements.txt
```
## Usage
1. Prepare your video file and models:
- Place your video file in the project directory
- Ensure you have the YOLO models (`yolo11m-2_uniform.onnx` and `yolo11s-pose.pt`)
2. Run the analyzer:
```
python activity_analyzer.py
```
3. View the results:
- Real-time video display with compliance indicators
- Compliance report saved to `compliance_report.json`
- Summary statistics printed to console
## How It Works
1. **Person Detection**: Uses YOLO to detect people in each video frame
2. **Person Tracking**: Tracks individuals across frames using IoU-based matching
3. **Pose Estimation**: Estimates body pose for each detected person
4. **Activity Classification**: Classifies activities based on pose patterns
5. **SOP Compliance Check**: Evaluates if activities comply with predefined SOPs
6. **Visualization**: Displays results with color-coded bounding boxes
7. **Reporting**: Generates detailed compliance reports
## SOP Activities
The program recognizes the following SOP-compliant activities:
- Speaking with customer
- Inputting order
- Giving item to customer
- Cleaning table
Non-compliant activities detected:
- Using mobile phone
- Talking with colleagues
- Idle behavior
## Customization
You can customize the program by modifying:
- Activity definitions in the `sop_activities` and `non_compliant_activities` lists
- Confidence thresholds for detection and classification
- Tracking parameters
- Compliance rules in the `check_sop_compliance` method
## Output
The program generates:
1. Real-time video display with bounding boxes (green= compliant, red=non-compliant)
2. `compliance_report.json` with detailed activity logs
3. Console summary with compliance statistics
## Troubleshooting
If you encounter issues:
1. Ensure all required models are in the correct location
2. Check that your video file is accessible
3. Verify all dependencies are installed correctly
4. Adjust confidence thresholds if detection is inaccurate

578
activity_analyzer.py Normal file
View File

@ -0,0 +1,578 @@
import cv2
import onnxruntime as ort
import torch
import torch.nn as nn
import numpy as np
from torchvision import transforms
from ultralytics import YOLO
import time
from collections import deque
import json
import urllib
from torchvision.transforms import Compose, Lambda
from torchvision.transforms._transforms_video import (
CenterCropVideo,
NormalizeVideo,
)
# Try to import PyTorchVideo for MobileNet3D
try:
import torch.hub
PYTORCHVIDEO_AVAILABLE = True
except ImportError:
PYTORCHVIDEO_AVAILABLE = False
print("PyTorchVideo not available, using simplified model")
class ActivityAnalyzer:
def __init__(self, yolo_model_path, pose_model_path, video_path):
# Load YOLO model for person detection
self.yolo_model = YOLO(yolo_model_path)
# Load YOLO pose estimation model
self.pose_model = YOLO(pose_model_path)
# Video path
self.video_path = video_path
# Transform for preprocessing
self.transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])
# Activity labels
self.sop_activities = [
"speaking_with_customer",
"inputting_order",
"giving_item",
"cleaning_table"
]
self.non_compliant_activities = [
"using_mobile_phone",
"talking_with_colleagues",
"idle"
]
# Initialize activity labels first, then MobileNet3D
self.activity_labels = [
"speaking_with_customer",
"inputting_order",
"giving_item",
"cleaning_table",
"using_mobile_phone",
"talking_with_colleagues",
"idle",
"bringing_menu",
"bringing_food",
"opening_door",
]
# Initialize MobileNet3D for activity classification
self.activity_model = self._initialize_mobilenet3d()
# Person tracking
self.person_tracks = {}
self.next_person_id = 0
# Compliance tracking
self.compliance_log = []
def _initialize_mobilenet3d(self):
"""Initialize MobileNet3D model for activity classification"""
if not PYTORCHVIDEO_AVAILABLE:
print("PyTorchVideo not available, using simplified model")
# Create a simplified version for demonstration
try:
model = nn.Sequential(
nn.Conv3d(3, 32, kernel_size=(3, 3, 3), padding=(1, 1, 1)),
nn.ReLU(),
nn.AdaptiveAvgPool3d((1, 1, 1)),
nn.Flatten(),
nn.Linear(32, len(self.activity_labels))
)
return model
except Exception as e:
print(f"Warning: Could not initialize simplified MobileNet3D model: {e}")
print("Falling back to heuristic-based classification")
return None
# Try to load a real MobileNet3D model from PyTorchVideo
try:
# Load a pretrained MobileNet3D model
# Note: This is a placeholder - you would need to select an appropriate model
# For example: torch.hub.load('facebookresearch/pytorchvideo', 'slow_r50', pretrained=True)
# model = torch.hub.load('facebookresearch/pytorchvideo', 'slow_r50', pretrained=True)
model_name='X3D_L'
model_dir='C:/Users/suherdy.yacob/.cache/torch/hub/checkpoints/'
#model = torch.hub.load(repo_or_dir=model_dir,model = model_name, pretrained=True)
#model = torch.hub.load(model_dir, model_name, source='local', pretrained=True)
model = torch.hub.load('facebookresearch/pytorchvideo', 'slow_r50', pretrained=True)
# Modify the final layer to match our number of activity classes
# This is a simplified approach - in practice you might need a more complex adaptation
# json_url = "https://dl.fbaipublicfiles.com/pyslowfast/dataset/class_names/kinetics_classnames.json"
# json_filename = "kinetics_classnames.json"
# try: urllib.URLopener().retrieve(json_url, json_filename)
# except: urllib.request.urlretrieve(json_url, json_filename)
# with open(json_filename, "r") as f:
# kinetics_classnames = json.load(f)
# # Create an id to label name mapping
# self.activity_labels = []
# kinetics_id_to_classname = {}
# for k, v in kinetics_classnames.items():
# kinetics_id_to_classname[v] = str(k).replace('"', "")
# self.activity_labels.append(str(k).replace('"',""))
model.blocks[-1].proj = nn.Linear(model.blocks[-1].proj.in_features, len(self.activity_labels))
return model
except Exception as e:
print(f"Warning: Could not load pretrained MobileNet3D model: {e}")
print("Using simplified model instead")
try:
# Fallback to simplified model
model = nn.Sequential(
nn.Conv3d(3, 32, kernel_size=(3, 3, 3), padding=(1, 1, 1)),
nn.ReLU(),
nn.AdaptiveAvgPool3d((1, 1, 1)),
nn.Flatten(),
nn.Linear(32, len(self.activity_labels))
)
return model
except Exception as e:
print(f"Warning: Could not initialize simplified MobileNet3D model: {e}")
print("Falling back to heuristic-based classification")
return None
def extract_person_bboxes(self, results):
"""Extract person bounding boxes from YOLO results"""
persons = []
if results.boxes is not None:
for box in results.boxes:
# Assuming class 0 is 'person' in your model
if box.cls == 0 and box.conf > 0.5:
persons.append(box.xyxy[0].cpu().numpy())
return persons
def calculate_iou(self, box1, box2):
"""Calculate Intersection over Union between two bounding boxes"""
x1 = max(box1[0], box2[0])
y1 = max(box1[1], box2[1])
x2 = min(box1[2], box2[2])
y2 = min(box1[3], box2[3])
if x2 <= x1 or y2 <= y1:
return 0.0
intersection = (x2 - x1) * (y2 - y1)
area1 = (box1[2] - box1[0]) * (box1[3] - box1[1])
area2 = (box2[2] - box2[0]) * (box2[3] - box2[1])
union = area1 + area2 - intersection
return intersection / union if union > 0 else 0
def track_persons(self, current_persons, frame=None):
"""Simple tracking based on IoU"""
if not self.person_tracks:
# Initialize tracks for first frame
for person in current_persons:
self.person_tracks[self.next_person_id] = {
'id': self.next_person_id,
'bbox': person,
'activity_history': deque(maxlen=30),
'frame_crops': deque(maxlen=16), # For MobileNet3D
'last_seen': 0
}
self.next_person_id += 1
return list(self.person_tracks.values())
# Match current persons with existing tracks
tracked_persons = []
used_tracks = set()
for person in current_persons:
best_match = None
best_iou = 0
for track_id, track in self.person_tracks.items():
if track_id in used_tracks:
continue
iou = self.calculate_iou(person, track['bbox'])
if iou > best_iou and iou > 0.3: # Threshold for matching
best_iou = iou
best_match = track_id
if best_match is not None:
# Update existing track
self.person_tracks[best_match]['bbox'] = person
self.person_tracks[best_match]['last_seen'] = 0
# Extract and store frame crop for MobileNet3D
if frame is not None:
x1, y1, x2, y2 = map(int, person)
# Add padding
pad = 20
x1 = max(0, x1 - pad)
y1 = max(0, y1 - pad)
x2 = min(frame.shape[1], x2 + pad)
y2 = min(frame.shape[0], y2 + pad)
if x2 > x1 and y2 > y1:
crop = frame[y1:y2, x1:x2]
self.person_tracks[best_match]['frame_crops'].append(crop)
used_tracks.add(best_match)
tracked_persons.append(self.person_tracks[best_match])
else:
# Create new track
new_track = {
'id': self.next_person_id,
'bbox': person,
'activity_history': deque(maxlen=30),
'frame_crops': deque(maxlen=16), # For MobileNet3D
'last_seen': 0
}
# Extract and store frame crop for MobileNet3D
if frame is not None:
x1, y1, x2, y2 = map(int, person)
# Add padding
pad = 20
x1 = max(0, x1 - pad)
y1 = max(0, y1 - pad)
x2 = min(frame.shape[1], x2 + pad)
y2 = min(frame.shape[0], y2 + pad)
if x2 > x1 and y2 > y1:
crop = frame[y1:y2, x1:x2]
new_track['frame_crops'].append(crop)
self.person_tracks[self.next_person_id] = new_track
used_tracks.add(self.next_person_id)
tracked_persons.append(new_track)
self.next_person_id += 1
# Increment last_seen for unused tracks
for track_id, track in self.person_tracks.items():
if track_id not in used_tracks:
self.person_tracks[track_id]['last_seen'] += 1
# Remove old tracks (not seen for 10 frames)
self.person_tracks = {k: v for k, v in self.person_tracks.items() if v['last_seen'] < 10}
return tracked_persons
def estimate_pose(self, frame, bbox):
"""Estimate pose for a person in bounding box"""
x1, y1, x2, y2 = map(int, bbox)
# Add some padding
pad = 20
x1 = max(0, x1 - pad)
y1 = max(0, y1 - pad)
x2 = min(frame.shape[1], x2 + pad)
y2 = min(frame.shape[0], y2 + pad)
roi = frame[y1:y2, x1:x2]
if roi.size == 0:
return None
# Use YOLO pose model
results = self.pose_model(roi)
return results
def _preprocess_for_mobilenet3d(self, frame_sequence):
"""Preprocess frame sequence for MobileNet3D input"""
if not frame_sequence:
return None
# Convert to tensor and normalize
processed_frames = []
for frame in frame_sequence:
# Resize frame to model input size
resized = cv2.resize(frame, (224, 224))
# Convert BGR to RGB
rgb = cv2.cvtColor(resized, cv2.COLOR_BGR2RGB)
# Normalize
normalized = rgb.astype(np.float32) / 255.0
processed_frames.append(normalized)
# Stack frames into tensor (C, D, H, W) format for 3D conv
if processed_frames:
frames_tensor = np.stack(processed_frames, axis=0) # (D, H, W, C)
frames_tensor = np.transpose(frames_tensor, (3, 0, 1, 2)) # (C, D, H, W)
return torch.tensor(frames_tensor, dtype=torch.float32).unsqueeze(0) # Add batch dimension
return None
def classify_activity(self, pose_results, track):
"""Classify activity using MobileNet3D"""
# Use MobileNet3D if available, otherwise fall back to heuristic
if self.activity_model is not None:
try:
# Extract person crop from track
if 'frame_crops' not in track:
track['frame_crops'] = deque(maxlen=16) # 16 frames for 3D processing
# Add current frame crop to sequence if available
# In a real implementation, you would extract the person crop from the current frame
# For this example, we'll simulate with a placeholder
# If we have enough frames for 3D processing
if len(track['frame_crops']) >= 8:
# Preprocess frames for MobileNet3D
input_tensor = self._preprocess_for_mobilenet3d(list(track['frame_crops']))
if input_tensor is not None:
# Run inference
with torch.no_grad():
outputs = self.activity_model(input_tensor)
probabilities = torch.softmax(outputs, dim=1)
confidence, predicted = torch.max(probabilities, 1)
# Get activity label
activity = self.activity_labels[predicted.item()]
return activity, confidence.item()
# If not enough frames or model fails, fall back to heuristic
pass
except Exception as e:
print(f"MobileNet3D classification failed: {e}")
# Fall back to heuristic classification
pass
# Heuristic-based classification (fallback)
if len(pose_results) == 0:
return "idle", 0.5
keypoints = pose_results[0].keypoints
if keypoints is None or len(keypoints) == 0:
return "idle", 0.5
# Check if keypoints has xy attribute and it's not empty
if not hasattr(keypoints, 'xy') or len(keypoints.xy) == 0:
return "idle", 0.5
# Extract keypoints
kpts = keypoints.xy[0].cpu().numpy() if len(keypoints.xy) > 0 else np.array([])
confs = keypoints.conf[0].cpu().numpy() if hasattr(keypoints, 'conf') and keypoints.conf is not None and len(keypoints.conf) > 0 else np.ones(len(kpts)) if len(kpts) > 0 else np.array([])
# If no keypoints detected, return idle
if len(kpts) == 0:
return "idle", 0.5
# Simple heuristic-based activity classification
# Check if person is using mobile phone (head tilted down, hand near face)
if len(kpts) > 10:
# Check if nose and eyes are visible
nose_visible = confs[0] > 0.5 if len(confs) > 0 else False
left_eye_visible = confs[1] > 0.5 if len(confs) > 1 else False
right_eye_visible = confs[2] > 0.5 if len(confs) > 2 else False
# Check if hands are near head (possible mobile phone use)
left_wrist_visible = confs[9] > 0.5 if len(confs) > 9 else False
right_wrist_visible = confs[10] > 0.5 if len(confs) > 10 else False
if nose_visible and (left_eye_visible or right_eye_visible) and (left_wrist_visible or right_wrist_visible):
# Simple check for hand near head
if len(kpts) > 10:
head_y = kpts[0][1] # nose
left_hand_y = kpts[9][1] if left_wrist_visible else None
right_hand_y = kpts[10][1] if right_wrist_visible else None
if left_hand_y is not None and abs(left_hand_y - head_y) < 100:
return "using_mobile_phone", 0.8
if right_hand_y is not None and abs(right_hand_y - head_y) < 100:
return "using_mobile_phone", 0.8
# Add activity to history
track['activity_history'].append({
'timestamp': time.time(),
'keypoints': kpts.tolist() if len(kpts) > 0 else [],
'confidences': confs.tolist() if len(confs) > 0 else []
})
# Default to idle if no specific activity detected
return "idle", 0.6
def check_sop_compliance(self, activity, confidence, track):
"""Check if activity complies with SOP"""
# Non-compliant activities
if activity in self.non_compliant_activities:
return False, f"Non-compliant activity: {activity}"
# Check for proper SOP sequence
if len(track['activity_history']) > 1:
recent_activities = [act for act in list(track['activity_history'])[-5:] if 'activity' in act]
if len(recent_activities) > 1:
# Simple sequence validation
pass # More complex logic would go here
# Compliant if it's an SOP activity or idle
if activity in self.sop_activities or activity == "idle":
return True, "Following SOP"
# Default to compliant with low confidence activities
if confidence < 0.7:
return True, "Uncertain activity"
return True, "Following SOP"
def log_compliance(self, track_id, activity, compliant, reason, timestamp):
"""Log compliance information"""
log_entry = {
'timestamp': timestamp,
'person_id': track_id,
'activity': activity,
'compliant': compliant,
'reason': reason
}
self.compliance_log.append(log_entry)
def analyze_video(self):
"""Main analysis function"""
cap = cv2.VideoCapture(self.video_path)
if not cap.isOpened():
print("Error opening video file")
return
# Get video properties for output video
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(cap.get(cv2.CAP_PROP_FPS))
# Initialize video writer
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter('output_video.mp4', fourcc, fps, (frame_width, frame_height))
frame_count = 0
start_time = time.time()
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
frame_count += 1
# Process every 3rd frame for performance
if frame_count % 3 != 0:
# Write unprocessed frame to output video
#out.write(frame)
continue
# Detect persons in frame
results = self.yolo_model(frame)
# Extract person bounding boxes
persons = []
for result in results:
persons.extend(self.extract_person_bboxes(result))
# Track persons across frames
tracked_persons = self.track_persons(persons, frame)
# Analyze each tracked person
for track in tracked_persons:
bbox = track['bbox']
x1, y1, x2, y2 = map(int, bbox)
# Estimate pose
pose_results = self.estimate_pose(frame, bbox)
# Classify activity
activity, confidence = self.classify_activity(pose_results, track)
# Check SOP compliance
compliant, reason = self.check_sop_compliance(activity, confidence, track)
# Log compliance
self.log_compliance(track['id'], activity, compliant, reason, time.time())
# Draw bounding box
color = (0, 255, 0) if compliant else (0, 0, 255)
cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
# Add labels
label = f"ID:{track['id']} {activity}"
status = "Compliant" if compliant else "Non-compliant"
cv2.putText(frame, label, (x1, y1 - 30), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
cv2.putText(frame, status, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
# Write processed frame to output video
out.write(frame)
# Display frame
cv2.imshow('SOP Compliance Analyzer', frame)
# Print progress
if frame_count % 30 == 0:
elapsed = time.time() - start_time
fps = frame_count / elapsed if elapsed > 0 else 0
print(f"Frame: {frame_count}, FPS: {fps:.2f}, Persons tracked: {len(tracked_persons)}")
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
out.release()
cv2.destroyAllWindows()
# Save compliance log
self.save_compliance_report()
# Print summary
self.print_summary()
print("Output video saved as 'output_video.mp4'")
def save_compliance_report(self):
"""Save compliance report to file"""
with open('compliance_report.json', 'w') as f:
json.dump(self.compliance_log, f, indent=2)
print("Compliance report saved to compliance_report.json")
def print_summary(self):
"""Print compliance summary"""
if not self.compliance_log:
print("No compliance data recorded")
return
total_checks = len(self.compliance_log)
compliant_checks = sum(1 for log in self.compliance_log if log['compliant'])
non_compliant_checks = total_checks - compliant_checks
print("\n=== SOP Compliance Summary ===")
print(f"Total activity checks: {total_checks}")
print(f"Compliant activities: {compliant_checks} ({compliant_checks/total_checks*100:.1f}%)")
print(f"Non-compliant activities: {non_compliant_checks} ({non_compliant_checks/total_checks*100:.1f}%)")
# Count violations by type
violation_types = {}
for log in self.compliance_log:
if not log['compliant']:
activity = log['activity']
violation_types[activity] = violation_types.get(activity, 0) + 1
if violation_types:
print("\nViolation types:")
for activity, count in sorted(violation_types.items(), key=lambda x: x[1], reverse=True):
print(f" {activity}: {count}")
def main():
# Initialize analyzer
analyzer = ActivityAnalyzer(
yolo_model_path='yolo11m-2_uniform.onnx',
pose_model_path='yolo11s-pose.pt',
video_path='Gayungsari_110825_2.mp4'
)
# Run analysis
print("Starting SOP compliance analysis...")
print("Press 'q' to quit the video display")
analyzer.analyze_video()
if __name__ == "__main__":
main()

2543
compliance_report.json Normal file

File diff suppressed because it is too large Load Diff

61
demo.py Normal file
View File

@ -0,0 +1,61 @@
#!/usr/bin/env python3
"""
Demo script to run the SOP Compliance Analyzer
"""
import subprocess
import sys
import os
def main():
print("SOP Compliance Analyzer Demo")
print("============================")
print()
print("This demo will run the activity analyzer on the sample video.")
print("The analyzer will:")
print("1. Detect people in the video")
print("2. Track them across frames")
print("3. Estimate their poses")
print("4. Classify their activities")
print("5. Check for SOP compliance")
print("6. Generate a compliance report")
print("7. Save annotated video with labels to MP4 format")
print()
print("Press 'q' at any time to quit the video display.")
print()
input("Press Enter to start the analysis...")
# Run the activity analyzer
try:
result = subprocess.run([sys.executable, "activity_analyzer.py"],
capture_output=True, text=True, timeout=300)
print("\nAnalysis completed!")
print(f"Return code: {result.returncode}")
if result.stdout:
print("\nOutput:")
print(result.stdout)
if result.stderr:
print("\nErrors:")
print(result.stderr)
except subprocess.TimeoutExpired:
print("Analysis timed out after 5 minutes")
except Exception as e:
print(f"Error running analysis: {e}")
# Check if report was generated
if os.path.exists("compliance_report.json"):
print("\nCompliance report generated successfully!")
# Get file size
size = os.path.getsize("compliance_report.json")
print(f"Report size: {size} bytes")
else:
print("\nNo compliance report found.")
print("\nDemo completed.")
if __name__ == "__main__":
main()

1
kinetics_classnames.json Normal file

File diff suppressed because one or more lines are too long

View File

@ -22,7 +22,7 @@ while ret:
if ret: if ret:
results = model.predict(frame, conf=0.5) results = model.predict(frame, conf=0.5)
frame_ = results[0].plot() frame_ = results[0].plot()
pose_results = model_pose.predict(frame_, show_boxes=False, show_conf=False, show_labels=False) pose_results = model_pose.predict(frame_, show_boxes=False, show_conf=True, show_labels=True)
results[0].keypoints = pose_results[0].keypoints results[0].keypoints = pose_results[0].keypoints
frame_ = results[0].plot() frame_ = results[0].plot()

149
main3.py
View File

@ -1,50 +1,125 @@
from ultralytics import YOLO
import cv2 import cv2
import os import onnxruntime as ort
import torch
import ultralytics
from ultralytics import YOLO
import numpy as np import numpy as np
import ultralytics
from torchvision import transforms
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
# Load the object detection model yolo_model_path = 'yolo11m-2_uniform.onnx'
det_model = YOLO("yolo11m-2_uniform.onnx") # General object detection model ort_session = ort.InferenceSession(yolo_model_path)
# Load the pose estimation model pose_model_path = 'yolo11s-pose.pt'
pose_model = YOLO("yolo11s-pose.pt") # Pose estimation model pose_model = YOLO(pose_model_path)
video_path = "PF-071124-2.mp4" def extract_person_boxes(yolo_outputs):
"""Extract person bounding boxes from YOLO ONNX output"""
boxes = yolo_outputs[0]
conf_threshold = 0.5
# Filter based on confidence score only
selected_indices = np.where(
boxes[:, 4] > conf_threshold
)[0]
return boxes[selected_indices, :4].astype(int)
def evaluate_sop(pose_output):
"""Basic SOP compliance evaluation (example implementation):
- Checks if both arms are visible (keypoint confidence > 0.6)
- Checks torso vertical angle (placeholder logic)"""
if len(pose_output) == 0 or pose_output[0].keypoints.shape[0] == 0:
return False # No keypoints detected, assume non-compliant
keypoints = pose_output[0].keypoints
# Example conditions
left_shoulder_conf = keypoints[5, 2] if keypoints.shape[0] > 5 else 0
right_shoulder_conf = keypoints[6, 2] if keypoints.shape[0] > 6 else 0
left_elbow_conf = keypoints[7, 2] if keypoints.shape[0] > 7 else 0
right_elbow_conf = keypoints[8, 2] if keypoints.shape[0] > 8 else 0
# Simple compliance criteria
arms_visible = (
left_shoulder_conf > 0.6 and
right_shoulder_conf > 0.6 and
left_elbow_conf > 0.6 and
right_elbow_conf > 0.6
)
# Add more conditions based on actual SOP requirements
return arms_visible # Temporary compliance criteria
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])
video_path = 'PF-071124-2.mp4'
cap = cv2.VideoCapture(video_path) cap = cv2.VideoCapture(video_path)
ret = True if not cap.isOpened():
frame_count = 0 # Initialize a frame counter print("Error opening video file")
save_dir = "/results/" # Directory to save the crops exit()
os.makedirs(save_dir, exist_ok=True) # Create the directory if it doesn't exist
while ret: while cap.isOpened():
ret, frame = cap.read() ret, frame = cap.read()
frame_count += 1 # Increment the frame counter if not ret:
frame2_, frame_ = np.empty(2) break
if ret:
# Run object detection
det_results = det_model.predict(frame, conf=0.5)
# Filter detections for persons (class 0) input_frame = cv2.resize(frame, (640, 640))
person_detections = [det for det in det_results if det.names[0] == 'crew'] input_frame = np.transpose(input_frame, (2, 0, 1)).astype(np.float32) / 255.0
input_tensor = np.expand_dims(input_frame, 0)
#for i, det in enumerate(person_detections[0].boxes.xyxy): yolo_outputs = ort_session.run(None, {'images': input_tensor})
# Extract bounding box coordinates print("YOLO Outputs Shape:", [output.shape for output in yolo_outputs])
# x1, y1, x2, y2 = map(int, det[:4]) print("YOLO Outputs:", yolo_outputs)
# crop = frame[y1:y2, x1:x2]
persons = extract_person_boxes(yolo_outputs)
# Run pose estimation on detected persons
for i, person in person_detections: for bbox in persons:
#how to return the tensor to posemodel??? x1, y1, x2, y2 = map(lambda arr: arr[0], bbox)
x1, y1, x2, y2 = map(int, person[i].boxes.xyxy[:4]) #x1, y1, x2, y2 = map(int, bbox)
person_image = frame[y1:y2, x1:x2] # Crop the person from the image roi = frame[y1:y2, x1:x2]
pose_results = pose_model(person_image) roi_resized = cv2.resize(roi, (256, 256))
frame_ = pose_results[0].plot() roi_tensor = transform(roi_resized).unsqueeze(0)
cv2.imshow("frame", frame_)
with torch.no_grad():
if cv2.waitKey(25) & 0xFF == ord('q'): pose_output = pose_model(roi_tensor)
break
print("Pose Output Type:", type(pose_output))
print("Pose Output Keys:", pose_output.keys()) if hasattr(pose_output, 'keys') else print("Pose Output:", pose_output)
if pose_output[0].keypoints.shape[0] > 0:
keypoints = pose_output[0].keypoints
if keypoints.shape[1] > 5: # Ensure there are at least 6 keypoints
compliant = evaluate_sop(pose_output)
else:
compliant = False # Not enough keypoints detected, assume non-compliant
else:
compliant = False # No keypoints detected, assume non-compliant
color = (0, 255, 0) if compliant else (0, 0, 255)
cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
label = 'Compliant' if compliant else 'Non-compliant'
cv2.putText(frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, color, 2)
# Display the frame using matplotlib
plt.imshow(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
plt.axis('off') # Turn off axis labels
plt.show(block=False)
plt.pause(0.01)
plt.clf() # Clear the current figure
if cv2.waitKey(25) & 0xFF == ord('q'):
break
#if cv2.waitKey(1) & 0xFF == ord('q'):
# break
cap.release() cap.release()
cv2.destroyAllWindows()
cv2.destroyAllWindows()

70
main4.py Normal file
View File

@ -0,0 +1,70 @@
import cv2
import onnxruntime as ort
import torch
import numpy as np
from torchvision import transforms
# Load YOLO model for person detection
yolo_model_path = 'yolo11m-2_uniform.onnx'
ort_session = ort.InferenceSession(yolo_model_path)
# Load YOLO pose estimation model
pose_model_path = 'yolo11s-pose.pt'
pose_model = torch.load(pose_model_path)
pose_model.eval()
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])
video_path = 'PF-071124-2.mp4'
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print("Error opening video file")
exit()
def get_person_bboxes(outputs):
# Implement YOLO output parsing here
return []
def check_sop_compliance(pose_output):
# Implement your SOP compliance logic here
return True
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
input_frame = cv2.resize(frame, (640, 640))
input_frame = np.transpose(input_frame, (2, 0, 1)).astype(np.float32) / 255.0
input_tensor = np.expand_dims(input_frame, 0)
yolo_outputs = ort_session.run(None, {'images': input_tensor})
persons = get_person_bboxes(yolo_outputs)
for bbox in persons:
x1, y1, x2, y2 = map(int, bbox)
roi = frame[y1:y2, x1:x2]
roi_resized = cv2.resize(roi, (256, 256))
roi_tensor = transform(roi_resized).unsqueeze(0)
with torch.no_grad():
pose_output = pose_model(roi_tensor)
compliant = check_sop_compliance(pose_output)
color = (0, 255, 0) if compliant else (0, 0, 255)
cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
label = 'Compliant' if compliant else 'Non-compliant'
cv2.putText(frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, color, 2)
cv2.imshow('SOP Compliance', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()

9
requirements.txt Normal file
View File

@ -0,0 +1,9 @@
opencv-python>=4.5.0
onnxruntime>=1.10.0
torch>=1.10.0
torchvision>=0.11.0
ultralytics>=8.0.0
numpy>=1.21.0
# For MobileNet3D support
pytorchvideo>=0.1.5
fvcore>=0.1.5

4
sequence.txt Normal file
View File

@ -0,0 +1,4 @@
1. speaking with customer for taking order
2. input the order
3. give the ordered item to the customer
4. table cleaning when the customer is done and leaving the table

BIN
yolo11m-2_uniform.onnx Normal file

Binary file not shown.