import torch import tensorflow as tf import time import os import logging import queue from pathlib import Path from typing import List, NamedTuple import av import cv2 import numpy as np import streamlit as st from streamlit_webrtc import WebRtcMode, webrtc_streamer from utils.download import download_file from utils.turn import get_ice_servers from mtcnn import MTCNN # Import MTCNN for face detection from PIL import Image, ImageDraw # Import PIL for image processing from transformers import pipeline # Import Hugging Face transformers pipeline import requests from io import BytesIO # Import for handling byte streams # Named tuple to store detection results class Detection(NamedTuple): class_id: int label: str score: float box: np.ndarray # Queue to store detection results result_queue: "queue.Queue[List[Detection]]" = queue.Queue() # CHANGE CODE BELOW HERE, USE TO REPLACE WITH YOUR WANTED ANALYSIS. # Update below string to set display title of analysis # Appropriate imports needed for analysis # Initialize the Hugging Face pipeline for facial emotion detection emotion_pipeline = pipeline("image-classification", model="trpakov/vit-face-expression") # Default title - "Facial Sentiment Analysis" ANALYSIS_TITLE = "Facial Sentiment Analysis" # CHANGE THE CONTENTS OF THIS FUNCTION, USE TO REPLACE WITH YOUR WANTED ANALYSIS. # # # Function to analyze an input frame and generate an analyzed frame # This function takes an input video frame, detects faces in it using MTCNN, # then for each detected face, it analyzes the sentiment (emotion) using the analyze_sentiment function, # draws a rectangle around the face, and overlays the detected emotion on the frame. # It also records the time taken to process the frame and stores it in a global container. # Constants for text and line size in the output image TEXT_SIZE = 1 LINE_SIZE = 2 # Set analysis results in img_container and result queue for display # img_container["input"] - holds the input frame contents - of type np.ndarray # img_container["analyzed"] - holds the analyzed frame with any added annotations - of type np.ndarray # img_container["analysis_time"] - holds how long the analysis has taken in miliseconds # result_queue - holds the analysis metadata results - of type queue.Queue[List[Detection]] def analyze_frame(frame: np.ndarray): start_time = time.time() # Start timing the analysis img_container["input"] = frame # Store the input frame frame = frame.copy() # Create a copy of the frame to modify results = mtcnn.detect_faces(frame) # Detect faces in the frame for result in results: x, y, w, h = result["box"] # Get the bounding box of the detected face face = frame[y: y + h, x: x + w] # Extract the face from the frame # Analyze the sentiment of the face sentiment = analyze_sentiment(face) result["label"] = sentiment # Draw a rectangle around the face cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 0, 255), LINE_SIZE) text_size = cv2.getTextSize(sentiment, cv2.FONT_HERSHEY_SIMPLEX, TEXT_SIZE, 2)[ 0 ] text_x = x text_y = y - 10 background_tl = (text_x, text_y - text_size[1]) background_br = (text_x + text_size[0], text_y + 5) # Draw a black background for the text cv2.rectangle(frame, background_tl, background_br, (0, 0, 0), cv2.FILLED) # Put the sentiment text on the image cv2.putText( frame, sentiment, (text_x, text_y), cv2.FONT_HERSHEY_SIMPLEX, TEXT_SIZE, (255, 255, 255), 2, ) end_time = time.time() # End timing the analysis execution_time_ms = round( (end_time - start_time) * 1000, 2 ) # Calculate execution time in milliseconds # Store the execution time img_container["analysis_time"] = execution_time_ms result_queue.put(results) # Put the results in the result queue img_container["analyzed"] = frame # Store the analyzed frame return # End of the function # Function to analyze the sentiment (emotion) of a detected face # This function converts the face from BGR to RGB format, then converts it to a PIL image, # uses a pre-trained emotion detection model to get emotion predictions, # and finally returns the most dominant emotion detected. def analyze_sentiment(face): # Convert face to RGB format rgb_face = cv2.cvtColor(face, cv2.COLOR_BGR2RGB) pil_image = Image.fromarray(rgb_face) # Convert to PIL image results = emotion_pipeline(pil_image) # Run emotion detection on the image dominant_emotion = max(results, key=lambda x: x["score"])[ "label" ] # Get the dominant emotion return dominant_emotion # Return the detected emotion # # # DO NOT TOUCH THE BELOW CODE (NOT NEEDED) # # # Suppress FFmpeg logs os.environ["FFMPEG_LOG_LEVEL"] = "quiet" # Suppress TensorFlow or PyTorch progress bars tf.get_logger().setLevel("ERROR") os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3" # Suppress PyTorch logs logging.getLogger().setLevel(logging.WARNING) torch.set_num_threads(1) logging.getLogger("torch").setLevel(logging.ERROR) # Suppress Streamlit logs using the logging module logging.getLogger("streamlit").setLevel(logging.ERROR) # Container to hold image data and analysis results img_container = {"input": None, "analyzed": None, "analysis_time": None} # Initialize MTCNN for face detection mtcnn = MTCNN() # Logger for debugging and information logger = logging.getLogger(__name__) # Callback function to process video frames # This function is called for each video frame in the WebRTC stream. # It converts the frame to a numpy array in RGB format, analyzes the frame, # and returns the original frame. def video_frame_callback(frame: av.VideoFrame) -> av.VideoFrame: # Convert frame to numpy array in RGB format img = frame.to_ndarray(format="rgb24") analyze_frame(img) # Analyze the frame return frame # Return the original frame # Get ICE servers for WebRTC ice_servers = get_ice_servers() # Streamlit UI configuration st.set_page_config(layout="wide") # Custom CSS for the Streamlit page st.markdown( """ """, unsafe_allow_html=True, ) # Streamlit page title and subtitle st.title("Computer Vision Playground") # Add a link to the README file st.markdown( """

See the README to learn how to use this code to help you start your computer vision exploration.

""", unsafe_allow_html=True, ) st.subheader(ANALYSIS_TITLE) # Columns for input and output streams col1, col2 = st.columns(2) with col1: st.header("Input Stream") st.subheader("input") # WebRTC streamer to get video input from the webcam webrtc_ctx = webrtc_streamer( key="input-webcam", mode=WebRtcMode.SENDRECV, rtc_configuration=ice_servers, video_frame_callback=video_frame_callback, media_stream_constraints={"video": True, "audio": False}, async_processing=True, ) # File uploader for images st.subheader("Upload an Image") uploaded_file = st.file_uploader( "Choose an image...", type=["jpg", "jpeg", "png"]) # Text input for image URL st.subheader("Or Enter Image URL") image_url = st.text_input("Image URL") # File uploader for videos st.subheader("Upload a Video") uploaded_video = st.file_uploader( "Choose a video...", type=["mp4", "avi", "mov", "mkv"] ) # Text input for video URL st.subheader("Or Enter Video Download URL") video_url = st.text_input("Video URL") # Streamlit footer st.markdown( """

If you want to set up your own computer vision playground see here.

""", unsafe_allow_html=True ) # Function to initialize the analysis UI # This function sets up the placeholders and UI elements in the analysis section. # It creates placeholders for input and output frames, analysis time, and detected labels. def analysis_init(): global analysis_time, show_labels, labels_placeholder, input_placeholder, output_placeholder with col2: st.header("Analysis") st.subheader("Input Frame") input_placeholder = st.empty() # Placeholder for input frame st.subheader("Output Frame") output_placeholder = st.empty() # Placeholder for output frame analysis_time = st.empty() # Placeholder for analysis time show_labels = st.checkbox( "Show the detected labels", value=True ) # Checkbox to show/hide labels labels_placeholder = st.empty() # Placeholder for labels # Function to publish frames and results to the Streamlit UI # This function retrieves the latest frames and results from the global container and result queue, # and updates the placeholders in the Streamlit UI with the current input frame, analyzed frame, analysis time, and detected labels. def publish_frame(): if not result_queue.empty(): result = result_queue.get() if show_labels: labels_placeholder.table( result ) # Display labels if the checkbox is checked img = img_container["input"] if img is None: return input_placeholder.image(img, channels="RGB") # Display the input frame analyzed = img_container["analyzed"] if analyzed is None: return # Display the analyzed frame output_placeholder.image(analyzed, channels="RGB") time = img_container["analysis_time"] if time is None: return # Display the analysis time analysis_time.text(f"Analysis Time: {time} ms") # If the WebRTC streamer is playing, initialize and publish frames if webrtc_ctx.state.playing: analysis_init() # Initialize the analysis UI while True: publish_frame() # Publish the frames and results time.sleep(0.1) # Delay to control frame rate # If an image is uploaded or a URL is provided, process the image if uploaded_file is not None or image_url: analysis_init() # Initialize the analysis UI if uploaded_file is not None: image = Image.open(uploaded_file) # Open the uploaded image img = np.array(image.convert("RGB")) # Convert the image to RGB format else: response = requests.get(image_url) # Download the image from the URL # Open the downloaded image image = Image.open(BytesIO(response.content)) img = np.array(image.convert("RGB")) # Convert the image to RGB format analyze_frame(img) # Analyze the image publish_frame() # Publish the results # Function to process video files # This function reads frames from a video file, analyzes each frame for face detection and sentiment analysis, # and updates the Streamlit UI with the current input frame, analyzed frame, and detected labels. def process_video(video_path): cap = cv2.VideoCapture(video_path) # Open the video file while cap.isOpened(): ret, frame = cap.read() # Read a frame from the video if not ret: break # Exit the loop if no more frames are available # Display the current frame as the input frame input_placeholder.image(frame) analyze_frame( frame ) # Analyze the frame for face detection and sentiment analysis publish_frame() # Publish the results if not result_queue.empty(): result = result_queue.get() if show_labels: labels_placeholder.table( result ) # Display labels if the checkbox is checked cap.release() # Release the video capture object # If a video is uploaded or a URL is provided, process the video if uploaded_video is not None or video_url: analysis_init() # Initialize the analysis UI if uploaded_video is not None: video_path = uploaded_video.name # Get the name of the uploaded video with open(video_path, "wb") as f: # Save the uploaded video to a file f.write(uploaded_video.getbuffer()) else: # Download the video from the URL video_path = download_file(video_url) process_video(video_path) # Process the video