175 lines
6.4 KiB
Python
Executable File
175 lines
6.4 KiB
Python
Executable File
from flask import Flask, request, jsonify
|
|
from flask_cors import CORS
|
|
import face_recognition
|
|
import numpy as np
|
|
import base64
|
|
import cv2
|
|
import os
|
|
|
|
app = Flask(__name__)
|
|
CORS(app) # Enable CORS for all routes
|
|
|
|
# In-memory storage for known face encodings and names
|
|
known_face_encodings = []
|
|
known_face_names = []
|
|
known_face_ids = []
|
|
|
|
def load_known_faces():
|
|
"""
|
|
Load known faces from a directory or database.
|
|
For this MVP, we'll assume images are stored in a 'faces' directory
|
|
with filenames like 'partner_id_name.jpg'.
|
|
In a real production scenario, you'd fetch these from Odoo or a shared storage.
|
|
"""
|
|
global known_face_encodings, known_face_names, known_face_ids
|
|
|
|
faces_dir = "faces"
|
|
if not os.path.exists(faces_dir):
|
|
os.makedirs(faces_dir)
|
|
print(f"Created {faces_dir} directory. Please add images there.")
|
|
return
|
|
|
|
for filename in os.listdir(faces_dir):
|
|
if filename.endswith((".jpg", ".png", ".jpeg")):
|
|
try:
|
|
# Filename format: ID_Name.jpg
|
|
name_part = os.path.splitext(filename)[0]
|
|
parts = name_part.split('_', 1)
|
|
if len(parts) == 2:
|
|
p_id = int(parts[0])
|
|
name = parts[1]
|
|
else:
|
|
p_id = 0
|
|
name = name_part
|
|
|
|
image_path = os.path.join(faces_dir, filename)
|
|
image = face_recognition.load_image_file(image_path)
|
|
encodings = face_recognition.face_encodings(image)
|
|
|
|
if encodings:
|
|
known_face_encodings.append(encodings[0])
|
|
known_face_names.append(name)
|
|
known_face_ids.append(p_id)
|
|
print(f"Loaded face for: {name} (ID: {p_id})")
|
|
except Exception as e:
|
|
print(f"Error loading {filename}: {e}")
|
|
|
|
@app.route('/recognize', methods=['POST'])
|
|
def recognize_face():
|
|
data = request.json
|
|
if 'image' not in data:
|
|
return jsonify({'error': 'No image provided'}), 400
|
|
|
|
try:
|
|
# Decode base64 image
|
|
image_data = base64.b64decode(data['image'])
|
|
nparr = np.frombuffer(image_data, np.uint8)
|
|
frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
|
|
|
|
if frame is None:
|
|
return jsonify({'error': 'Invalid image data'}), 400
|
|
|
|
# Convert BGR (OpenCV) to RGB (face_recognition)
|
|
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
|
|
|
# Ensure the image is in the correct format
|
|
rgb_frame = np.ascontiguousarray(rgb_frame, dtype=np.uint8)
|
|
|
|
# Find all faces in the current frame
|
|
face_locations = face_recognition.face_locations(rgb_frame)
|
|
face_encodings = face_recognition.face_encodings(rgb_frame, face_locations)
|
|
|
|
matches_list = []
|
|
|
|
for face_encoding in face_encodings:
|
|
# Use threshold of 0.4 for slightly less strict matching (around 60% match)
|
|
matches = face_recognition.compare_faces(known_face_encodings, face_encoding, tolerance=0.4)
|
|
name = "Unknown"
|
|
p_id = 0
|
|
probability = 0.0
|
|
|
|
# Or instead, use the known face with the smallest distance to the new face
|
|
face_distances = face_recognition.face_distance(known_face_encodings, face_encoding)
|
|
if len(face_distances) > 0:
|
|
best_match_index = np.argmin(face_distances)
|
|
if matches[best_match_index]:
|
|
name = known_face_names[best_match_index]
|
|
p_id = known_face_ids[best_match_index]
|
|
# Simple probability estimation based on distance (lower distance = higher prob)
|
|
# Distance 0.0 -> 100%, Distance 0.3 -> 70%
|
|
distance = face_distances[best_match_index]
|
|
probability = max(0, 1.0 - distance)
|
|
|
|
print(f"Best match for current face: {name} (ID: {p_id}) with distance {distance:.3f} (Prob: {probability:.3f})")
|
|
|
|
if probability >= 0.6:
|
|
matches_list.append({
|
|
'id': p_id,
|
|
'name': name,
|
|
'probability': probability
|
|
})
|
|
|
|
# Sort by probability
|
|
matches_list.sort(key=lambda x: x['probability'], reverse=True)
|
|
|
|
return jsonify({'matches': matches_list})
|
|
|
|
except Exception as e:
|
|
print(f"Error processing image: {e}")
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
@app.route('/train', methods=['POST'])
|
|
def train_face():
|
|
data = request.json
|
|
if 'partner_id' not in data or 'images' not in data:
|
|
return jsonify({'error': 'Missing partner_id or images'}), 400
|
|
|
|
partner_id = data['partner_id']
|
|
name = data.get('name', 'Unknown')
|
|
images = data['images'] # List of base64 strings
|
|
|
|
faces_dir = "faces"
|
|
if not os.path.exists(faces_dir):
|
|
os.makedirs(faces_dir)
|
|
|
|
processed_count = 0
|
|
|
|
# Clear existing images for this partner to avoid duplicates/stale data?
|
|
# For simplicity, we might just overwrite or append. Let's append with index.
|
|
# Or better, delete old ones first if we want to keep it clean.
|
|
# Let's just save new ones for now.
|
|
|
|
for idx, img_data in enumerate(images):
|
|
if not img_data:
|
|
continue
|
|
|
|
try:
|
|
# Decode base64
|
|
image_bytes = base64.b64decode(img_data)
|
|
filename = f"{partner_id}_{name.replace(' ', '_')}_{idx}.jpg"
|
|
filepath = os.path.join(faces_dir, filename)
|
|
|
|
with open(filepath, "wb") as f:
|
|
f.write(image_bytes)
|
|
|
|
# Update in-memory knowledge
|
|
image = face_recognition.load_image_file(filepath)
|
|
encodings = face_recognition.face_encodings(image)
|
|
|
|
if encodings:
|
|
known_face_encodings.append(encodings[0])
|
|
known_face_names.append(name)
|
|
known_face_ids.append(partner_id)
|
|
processed_count += 1
|
|
print(f"Trained face for: {name} (ID: {partner_id}) - Image {idx}")
|
|
|
|
except Exception as e:
|
|
print(f"Error saving/training image {idx} for {name}: {e}")
|
|
|
|
return jsonify({'message': f'Processed {processed_count} images for {name}'})
|
|
|
|
load_known_faces()
|
|
|
|
if __name__ == '__main__':
|
|
app.run(host='0.0.0.0', port=5000)
|