|
import gradio as gr |
|
import cv2 |
|
import numpy as np |
|
from groq import Groq |
|
import time |
|
from PIL import Image as PILImage |
|
import io |
|
import os |
|
import base64 |
|
|
|
def create_monitor_interface(): |
|
api_key = os.getenv("GROQ_API_KEY") |
|
|
|
class SafetyMonitor: |
|
def __init__(self, model_name: str = "llama-3.2-90b-vision-preview"): |
|
self.client = Groq(api_key=api_key) |
|
self.model_name = model_name |
|
self.max_image_size = (128, 128) |
|
self.jpeg_quality = 20 |
|
|
|
def resize_image(self, image): |
|
"""Resize image while maintaining aspect ratio""" |
|
height, width = image.shape[:2] |
|
|
|
|
|
aspect = width / height |
|
|
|
if width > height: |
|
new_width = min(self.max_image_size[0], width) |
|
new_height = int(new_width / aspect) |
|
else: |
|
new_height = min(self.max_image_size[1], height) |
|
new_width = int(new_height * aspect) |
|
|
|
resized = cv2.resize(image, (new_width, new_height), interpolation=cv2.INTER_AREA) |
|
return resized |
|
|
|
def analyze_frame(self, frame: np.ndarray) -> str: |
|
if frame is None: |
|
return "No frame received" |
|
|
|
|
|
if len(frame.shape) == 2: |
|
frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2RGB) |
|
elif len(frame.shape) == 3 and frame.shape[2] == 4: |
|
frame = cv2.cvtColor(frame, cv2.COLOR_RGBA2RGB) |
|
|
|
frame = self.resize_image(frame) |
|
frame_pil = PILImage.fromarray(frame) |
|
|
|
|
|
buffered = io.BytesIO() |
|
frame_pil.save(buffered, |
|
format="JPEG", |
|
quality=self.jpeg_quality, |
|
optimize=True) |
|
img_base64 = base64.b64encode(buffered.getvalue()).decode('utf-8') |
|
|
|
try: |
|
|
|
prompt = f"""List safety issues: <image>data:image/jpeg;base64,{img_base64}</image>""" |
|
|
|
completion = self.client.chat.completions.create( |
|
messages=[ |
|
{ |
|
"role": "user", |
|
"content": prompt |
|
} |
|
], |
|
model=self.model_name, |
|
max_tokens=100, |
|
temperature=0.1 |
|
) |
|
return completion.choices[0].message.content |
|
except Exception as e: |
|
print(f"Detailed error: {str(e)}") |
|
return f"Analysis Error: {str(e)}" |
|
|
|
def process_frame(self, frame: np.ndarray) -> tuple[np.ndarray, str]: |
|
if frame is None: |
|
return None, "No image provided" |
|
|
|
analysis = self.analyze_frame(frame) |
|
display_frame = frame.copy() |
|
|
|
|
|
overlay = display_frame.copy() |
|
height, width = display_frame.shape[:2] |
|
cv2.rectangle(overlay, (5, 5), (width-5, 100), (0, 0, 0), -1) |
|
cv2.addWeighted(overlay, 0.3, display_frame, 0.7, 0, display_frame) |
|
|
|
|
|
y_position = 30 |
|
lines = analysis.split('\n') |
|
for line in lines: |
|
cv2.putText(display_frame, line[:80], (10, y_position), |
|
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2) |
|
y_position += 30 |
|
if y_position >= 90: |
|
break |
|
|
|
return display_frame, analysis |
|
|
|
|
|
monitor = SafetyMonitor() |
|
|
|
with gr.Blocks() as demo: |
|
gr.Markdown("# Safety Analysis System") |
|
|
|
with gr.Row(): |
|
input_image = gr.Image(label="Upload Image") |
|
output_image = gr.Image(label="Results") |
|
|
|
analysis_text = gr.Textbox(label="Analysis", lines=5) |
|
|
|
def analyze_image(image): |
|
if image is None: |
|
return None, "No image provided" |
|
try: |
|
processed_frame, analysis = monitor.process_frame(image) |
|
return processed_frame, analysis |
|
except Exception as e: |
|
print(f"Processing error: {str(e)}") |
|
return None, f"Error processing image: {str(e)}" |
|
|
|
input_image.change( |
|
fn=analyze_image, |
|
inputs=input_image, |
|
outputs=[output_image, analysis_text] |
|
) |
|
|
|
return demo |
|
|
|
demo = create_monitor_interface() |
|
demo.launch() |