Spaces:
Sleeping
Sleeping
import streamlit as st | |
import time | |
from typing import List | |
from streamlit_webrtc import webrtc_streamer, WebRtcMode | |
import av | |
import numpy as np | |
import onnxruntime as rt | |
import threading | |
import mediapipe as mp | |
import os | |
from twilio.rest import Client | |
import cv2 | |
from skimage.transform import SimilarityTransform | |
from types import SimpleNamespace | |
from sklearn.metrics.pairwise import cosine_distances | |
class Detection(SimpleNamespace): | |
bbox: List[List[float]] = None | |
landmarks: List[List[float]] = None | |
class Identity(SimpleNamespace): | |
detection: Detection = Detection() | |
name: str = None | |
embedding: np.ndarray = None | |
face: np.ndarray = None | |
class Match(SimpleNamespace): | |
subject_id: Identity = Identity() | |
gallery_id: Identity = Identity() | |
distance: float = None | |
name: str = None | |
class Grabber(object): | |
def __init__(self, video_receiver) -> None: | |
self.currentFrame = None | |
self.capture = video_receiver | |
self.thread = threading.Thread(target=self.update_frame) | |
self.thread.daemon = True | |
def update_frame(self) -> None: | |
while True: | |
self.currentFrame = self.capture.get_frame() | |
def get_frame(self) -> av.VideoFrame: | |
return self.currentFrame | |
# Similarity threshold for face matching | |
SIMILARITY_THRESHOLD = 1.2 | |
# Get twilio ice server configuration using twilio credentials from environment variables (set in streamlit secrets) | |
# Ref: https://www.twilio.com/docs/stun-turn/api | |
client = Client(os.environ["TWILIO_ACCOUNT_SID"], os.environ["TWILIO_AUTH_TOKEN"]) | |
token = client.tokens.create() | |
ICE_SERVERS = token.ice_servers | |
# Set page layout for streamlit to wide | |
st.set_page_config(layout="wide", page_title="Live Face Recognition", page_icon=":sunglasses:") | |
# Streamlit app | |
st.title("Live Webcam Face Recognition") | |
st.markdown("**Live Stream**") | |
ctx_container = st.container() | |
stream_container = st.empty() | |
st.markdown("**Matches**") | |
matches_container = st.info("No matches found yet ...") | |
st.markdown("**Info**") | |
info_container = st.empty() | |
# Init face detector and face recognizer | |
face_recognizer = rt.InferenceSession("model.fixed.onnx", providers=rt.get_available_providers()) | |
face_detector = mp.solutions.face_mesh.FaceMesh( | |
refine_landmarks=True, | |
min_detection_confidence=0.5, | |
min_tracking_confidence=0.5, | |
max_num_faces=5, | |
) | |
def detect_faces(frame: np.ndarray) -> List[Detection]: | |
# Process the frame with the face detector | |
result = face_detector.process(frame) | |
# Initialize an empty list to store the detected faces | |
detections = [] | |
# Check if any faces were detected | |
if result.multi_face_landmarks: | |
# Iterate over each detected face | |
for count, detection in enumerate(result.multi_face_landmarks): | |
# Select 5 Landmarks | |
five_landmarks = np.asarray(detection.landmark)[[470, 475, 1, 57, 287]] | |
# Extract the x and y coordinates of the landmarks of interest | |
landmarks = [[landmark.x * frame.shape[1], landmark.y * frame.shape[0]] for landmark in five_landmarks] | |
# Extract the x and y coordinates of all landmarks | |
all_x_coords = [landmark.x * frame.shape[1] for landmark in detection.landmark] | |
all_y_coords = [landmark.y * frame.shape[0] for landmark in detection.landmark] | |
# Compute the bounding box of the face | |
x_min, x_max = int(min(all_x_coords)), int(max(all_x_coords)) | |
y_min, y_max = int(min(all_y_coords)), int(max(all_y_coords)) | |
bbox = [[x_min, y_min], [x_max, y_max]] | |
# Create a Detection object for the face | |
detection = Detection( | |
idx=count, | |
bbox=bbox, | |
landmarks=landmarks, | |
confidence=None, | |
) | |
# Add the detection to the list | |
detections.append(detection) | |
# Return the list of detections | |
return detections | |
def recognize_faces(frame: np.ndarray, detections: List[Detection]) -> List[Identity]: | |
if not detections: | |
return [] | |
identities = [] | |
for detection in detections: | |
# ALIGNMENT ----------------------------------------------------------- | |
# Target landmark coordinates (as used in training) | |
landmarks_target = np.array( | |
[ | |
[38.2946, 51.6963], | |
[73.5318, 51.5014], | |
[56.0252, 71.7366], | |
[41.5493, 92.3655], | |
[70.7299, 92.2041], | |
], | |
dtype=np.float32, | |
) | |
tform = SimilarityTransform() | |
tform.estimate(detection.landmarks, landmarks_target) | |
tmatrix = tform.params[0:2, :] | |
face_aligned = cv2.warpAffine(frame, tmatrix, (112, 112), borderValue=0.0) | |
# --------------------------------------------------------------------- | |
# INFERENCE ----------------------------------------------------------- | |
# Inference face embeddings with onnxruntime | |
input_image = (np.asarray([face_aligned]).astype(np.float32) / 255.0).clip(0.0, 1.0) | |
embedding = face_recognizer.run(None, {"input_image": input_image})[0][0] | |
# --------------------------------------------------------------------- | |
# Create Identity object | |
identities.append(Identity(detection=detection, embedding=embedding, face=face_aligned)) | |
return identities | |
def match_faces(subjects: List[Identity], gallery: List[Identity]) -> List[Match]: | |
if len(gallery) == 0 or len(subjects) == 0: | |
return [] | |
# Get Embeddings | |
embs_gal = np.asarray([identity.embedding for identity in gallery]) | |
embs_det = np.asarray([identity.embedding for identity in subjects]) | |
# Calculate Cosine Distances | |
cos_distances = cosine_distances(embs_det, embs_gal) | |
# Find Matches | |
matches = [] | |
for ident_idx, identity in enumerate(subjects): | |
dists_to_identity = cos_distances[ident_idx] | |
idx_min = np.argmin(dists_to_identity) | |
if dists_to_identity[idx_min] < SIMILARITY_THRESHOLD: | |
matches.append( | |
Match( | |
subject_id=identity, | |
gallery_id=gallery[idx_min], | |
distance=dists_to_identity[idx_min], | |
) | |
) | |
# Sort Matches by identity_idx | |
matches = sorted(matches, key=lambda match: match.gallery_id.name) | |
return matches | |
def draw_annotations(frame: np.ndarray, detections: List[Detection], matches: List[Match]) -> np.ndarray: | |
global timestamp | |
shape = np.asarray(frame.shape[:2][::-1]) | |
# Upscale frame to 1080p for better visualization of drawn annotations | |
frame = cv2.resize(frame, (1920, 1080)) | |
upscale_factor = np.asarray([1920 / shape[0], 1080 / shape[1]]) | |
shape = np.asarray(frame.shape[:2][::-1]) | |
# Make frame writeable (for better performance) | |
frame.flags.writeable = True | |
fps = 1 / (time.time() - timestamp) | |
timestamp = time.time() | |
# Draw FPS | |
cv2.putText( | |
frame, | |
f"FPS: {fps:.1f}", | |
(20, 40), | |
cv2.FONT_HERSHEY_SIMPLEX, | |
1, | |
(0, 255, 0), | |
2, | |
) | |
# Draw Detections | |
for detection in detections: | |
# Draw Landmarks | |
for landmark in detection.landmarks: | |
cv2.circle( | |
frame, | |
(landmark * upscale_factor).astype(int), | |
2, | |
(255, 255, 255), | |
-1, | |
) | |
# Draw Bounding Box | |
cv2.rectangle( | |
frame, | |
(detection.bbox[0] * upscale_factor).astype(int), | |
(detection.bbox[1] * upscale_factor).astype(int), | |
(255, 0, 0), | |
2, | |
) | |
# Draw Index | |
cv2.putText( | |
frame, | |
str(detection.idx), | |
( | |
((detection.bbox[1][0] + 2) * upscale_factor[0]).astype(int), | |
((detection.bbox[1][1] + 2) * upscale_factor[1]).astype(int), | |
), | |
cv2.LINE_AA, | |
0.5, | |
(0, 0, 0), | |
2, | |
) | |
# Draw Matches | |
for match in matches: | |
detection = match.subject_id.detection | |
name = match.gallery_id.name | |
# Draw Bounding Box in green | |
cv2.rectangle( | |
frame, | |
(detection.bbox[0] * upscale_factor).astype(int), | |
(detection.bbox[1] * upscale_factor).astype(int), | |
(0, 255, 0), | |
2, | |
) | |
# Draw Banner | |
cv2.rectangle( | |
frame, | |
( | |
(detection.bbox[0][0] * upscale_factor[0]).astype(int), | |
(detection.bbox[0][1] * upscale_factor[1] - (shape[1] // 25)).astype(int), | |
), | |
( | |
(detection.bbox[1][0] * upscale_factor[0]).astype(int), | |
(detection.bbox[0][1] * upscale_factor[1]).astype(int), | |
), | |
(255, 255, 255), | |
-1, | |
) | |
# Draw Name | |
cv2.putText( | |
frame, | |
name, | |
( | |
((detection.bbox[0][0] + shape[0] // 400) * upscale_factor[0]).astype(int), | |
((detection.bbox[0][1] - shape[1] // 50) * upscale_factor[1]).astype(int), | |
), | |
cv2.LINE_AA, | |
0.7, | |
(0, 0, 0), | |
2, | |
) | |
# Draw Distance | |
cv2.putText( | |
frame, | |
f" Distance: {match.distance:.2f}", | |
( | |
((detection.bbox[0][0] + shape[0] // 400) * upscale_factor[0]).astype(int), | |
((detection.bbox[0][1] - shape[1] // 350) * upscale_factor[1]).astype(int), | |
), | |
cv2.LINE_AA, | |
0.5, | |
(0, 0, 0), | |
2, | |
) | |
return frame | |
def video_frame_callback(frame: av.VideoFrame) -> av.VideoFrame: | |
# Convert frame to numpy array | |
frame = frame.to_ndarray(format="rgb24") | |
# Run face detection | |
detections = detect_faces(frame) | |
# Run face recognition | |
subjects = recognize_faces(frame, detections) | |
# Run face matching | |
matches = match_faces(subjects, gallery) | |
# Draw annotations | |
frame = draw_annotations(frame, detections, matches) | |
# Convert frame back to av.VideoFrame | |
frame = av.VideoFrame.from_ndarray(frame, format="rgb24") | |
return frame, matches | |
# Sidebar for face gallery | |
with st.sidebar: | |
st.markdown("# Face Gallery") | |
files = st.sidebar.file_uploader( | |
"Upload images to gallery", | |
type=["png", "jpg", "jpeg"], | |
accept_multiple_files=True, | |
label_visibility="collapsed", | |
) | |
# Init gallery | |
gallery = [] | |
for file in files: | |
# Read file bytes | |
file_bytes = np.asarray(bytearray(file.read()), dtype=np.uint8) | |
# Decode image and convert from BGR to RGB | |
img = cv2.cvtColor(cv2.imdecode(file_bytes, cv2.IMREAD_COLOR), cv2.COLOR_BGR2RGB) | |
# Detect faces | |
detections = detect_faces(img) | |
if detections: | |
# recognize faces | |
subjects = recognize_faces(img, detections[:1]) | |
# Add subjects to gallery | |
gallery.append( | |
Identity( | |
name=os.path.splitext(file.name)[0], | |
embedding=subjects[0].embedding, | |
face=subjects[0].face, | |
) | |
) | |
# Show gallery images | |
st.image( | |
image=[identity.face for identity in gallery], | |
caption=[identity.name for identity in gallery], | |
) | |
# Start streaming component | |
with ctx_container: | |
ctx = webrtc_streamer( | |
key="LiveFaceRecognition", | |
mode=WebRtcMode.SENDONLY, | |
rtc_configuration={"iceServers": ICE_SERVERS}, | |
media_stream_constraints={"video": {"width": 1920}, "audio": False}, | |
) | |
# Initialize frame grabber | |
grabber = Grabber(ctx.video_receiver) | |
if ctx.state.playing: | |
# Start frame grabber in background thread | |
grabber.thread.start() | |
timestamp = time.time() | |
# Start main loop | |
while True: | |
frame = grabber.get_frame() | |
if frame is not None: | |
# Print frame timestamp to streamlit | |
info_container.write(f"Frame timestamp: {frame.time}") | |
# Run face detection and recognition | |
frame, matches = video_frame_callback(frame) | |
# Convert frame to numpy array | |
frame = frame.to_ndarray(format="rgb24") | |
# Show Stream | |
stream_container.image(frame, channels="RGB") | |
# Show Matches | |
if matches: | |
matches_container.image( | |
image=[match.subject_id.face for match in matches], | |
caption=[match.gallery_id.name for match in matches], | |
) | |
else: | |
matches_container.info("No matches found yet ...") | |