import gradio as gr
import cv2
import numpy as np
from groq import Groq
import time
from PIL import Image as PILImage
import io
import os
import base64
import random
def create_monitor_interface():
api_key = os.getenv("GROQ_API_KEY")
class SafetyMonitor:
def __init__(self):
self.client = Groq()
self.model_name = "llama-3.2-90b-vision-preview"
self.max_image_size = (800, 800) # Increased size for better visibility
self.colors = [(255, 0, 0), (0, 255, 0), (0, 0, 255), (255, 255, 0), (255, 0, 255)]
def resize_image(self, image):
height, width = image.shape[:2]
aspect = width / height
if width > height:
new_width = min(self.max_image_size[0], width)
new_height = int(new_width / aspect)
else:
new_height = min(self.max_image_size[1], height)
new_width = int(new_height * aspect)
return cv2.resize(image, (new_width, new_height), interpolation=cv2.INTER_AREA)
def analyze_frame(self, frame: np.ndarray) -> str:
if frame is None:
return "No frame received"
# Convert and resize image
if len(frame.shape) == 2:
frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2RGB)
elif len(frame.shape) == 3 and frame.shape[2] == 4:
frame = cv2.cvtColor(frame, cv2.COLOR_RGBA2RGB)
frame = self.resize_image(frame)
frame_pil = PILImage.fromarray(frame)
# High quality image for better analysis
buffered = io.BytesIO()
frame_pil.save(buffered,
format="JPEG",
quality=95,
optimize=True)
img_base64 = base64.b64encode(buffered.getvalue()).decode('utf-8')
image_url = f"data:image/jpeg;base64,{img_base64}"
try:
completion = self.client.chat.completions.create(
model=self.model_name,
messages=[
{
"role": "user",
"content": [
{
"type": "text",
"text": """Analyze this workplace image for safety conditions and hazards. Focus only on safety aspects such as:
1. Work posture and ergonomics at the shown position
2. Use of PPE and safety equipment
3. Tool handling and work techniques
4. Environmental conditions and surroundings
5. Equipment and machinery safety
6. Ground conditions and trip hazards
Do not identify or describe any individuals. Instead, describe the safety conditions and actions observed.
Format each safety observation as:
- position:safety condition description
Examples:
- center:Improper kneeling posture without knee protection, risking joint injury
- left:Heavy machinery operating in close proximity to work area
- bottom:Uneven ground surface creating trip hazard near work zone"""
},
{
"type": "image_url",
"image_url": {
"url": image_url
}
}
]
}
],
temperature=0.7,
max_tokens=500,
stream=False
)
return completion.choices[0].message.content
except Exception as e:
print(f"Detailed error: {str(e)}")
return f"Analysis Error: {str(e)}"
def draw_observations(self, image, observations):
"""Draw accurate bounding boxes based on safety issue locations."""
height, width = image.shape[:2]
font = cv2.FONT_HERSHEY_SIMPLEX
font_scale = 0.5
thickness = 2
padding = 10
def get_region_coordinates(position: str) -> tuple:
"""Get coordinates based on position description."""
# Basic regions
regions = {
'center': (width//3, height//3, 2*width//3, 2*height//3),
'background': (0, 0, width, height),
'top-left': (0, 0, width//3, height//3),
'top': (width//3, 0, 2*width//3, height//3),
'top-right': (2*width//3, 0, width, height//3),
'left': (0, height//3, width//3, 2*height//3),
'right': (2*width//3, height//3, width, 2*height//3),
'bottom-left': (0, 2*height//3, width//3, height),
'bottom': (width//3, 2*height//3, 2*width//3, height),
'bottom-right': (2*width//3, 2*height//3, width, height),
'ground': (0, 2*height//3, width, height)
}
# Find best matching region
position = position.lower()
for key in regions.keys():
if key in position:
return regions[key]
return regions['center'] # Default to center if no match
for idx, obs in enumerate(observations):
color = self.colors[idx % len(self.colors)]
# Get coordinates for this observation
x1, y1, x2, y2 = get_region_coordinates(obs['location'])
# Draw rectangle
cv2.rectangle(image, (x1, y1), (x2, y2), color, 2)
# Add label with background
label = obs['description'][:50] + "..." if len(obs['description']) > 50 else obs['description']
label_size, _ = cv2.getTextSize(label, font, font_scale, thickness)
# Position text above the box
text_x = max(0, x1)
text_y = max(label_size[1] + padding, y1 - padding)
# Draw text background
cv2.rectangle(image,
(text_x, text_y - label_size[1] - padding),
(text_x + label_size[0] + padding, text_y),
color, -1)
# Draw text
cv2.putText(image, label,
(text_x + padding//2, text_y - padding//2),
font, font_scale, (255, 255, 255), thickness)
return image
def process_frame(self, frame: np.ndarray) -> tuple[np.ndarray, str]:
if frame is None:
return None, "No image provided"
analysis = self.analyze_frame(frame)
display_frame = frame.copy()
# Parse observations from the formatted response
observations = []
lines = analysis.split('\n')
for line in lines:
# Look for location tags in the line
if '' in line and '' in line:
start = line.find('') + len('')
end = line.find('')
location = line[start:end].strip()
# Get the description that follows the location tag
desc_start = line.find('') + len(':')
description = line[desc_start:].strip()
if location and description:
observations.append({
'location': location,
'description': description
})
# Draw observations if we found any
if observations:
annotated_frame = self.draw_observations(display_frame, observations)
return annotated_frame, analysis
return display_frame, analysis
# Create the main interface
monitor = SafetyMonitor()
with gr.Blocks() as demo:
gr.Markdown("# Safety Analysis System powered by Llama 3.2 90b vision")
with gr.Row():
input_image = gr.Image(label="Upload Image")
output_image = gr.Image(label="Annotated Results")
analysis_text = gr.Textbox(label="Detailed Analysis", lines=5)
def analyze_image(image):
if image is None:
return None, "No image provided"
try:
processed_frame, analysis = monitor.process_frame(image)
return processed_frame, analysis
except Exception as e:
print(f"Processing error: {str(e)}")
return None, f"Error processing image: {str(e)}"
input_image.change(
fn=analyze_image,
inputs=input_image,
outputs=[output_image, analysis_text]
)
return demo
demo = create_monitor_interface()
demo.launch()