import gradio as gr import cv2 import numpy as np from groq import Groq import time from PIL import Image as PILImage import io import os import base64 import random def create_monitor_interface(): api_key = os.getenv("GROQ_API_KEY") class SafetyMonitor: def __init__(self): self.client = Groq() self.model_name = "llama-3.2-90b-vision-preview" self.max_image_size = (800, 800) # Increased size for better visibility self.colors = [(255, 0, 0), (0, 255, 0), (0, 0, 255), (255, 255, 0), (255, 0, 255)] def resize_image(self, image): height, width = image.shape[:2] aspect = width / height if width > height: new_width = min(self.max_image_size[0], width) new_height = int(new_width / aspect) else: new_height = min(self.max_image_size[1], height) new_width = int(new_height * aspect) return cv2.resize(image, (new_width, new_height), interpolation=cv2.INTER_AREA) def analyze_frame(self, frame: np.ndarray) -> str: if frame is None: return "No frame received" # Convert and resize image if len(frame.shape) == 2: frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2RGB) elif len(frame.shape) == 3 and frame.shape[2] == 4: frame = cv2.cvtColor(frame, cv2.COLOR_RGBA2RGB) frame = self.resize_image(frame) frame_pil = PILImage.fromarray(frame) # High quality image for better analysis buffered = io.BytesIO() frame_pil.save(buffered, format="JPEG", quality=95, optimize=True) img_base64 = base64.b64encode(buffered.getvalue()).decode('utf-8') image_url = f"data:image/jpeg;base64,{img_base64}" try: completion = self.client.chat.completions.create( model=self.model_name, messages=[ { "role": "user", "content": [ { "type": "text", "text": """Analyze this workplace image for safety conditions and hazards. Focus only on safety aspects such as: 1. Work posture and ergonomics at the shown position 2. Use of PPE and safety equipment 3. Tool handling and work techniques 4. Environmental conditions and surroundings 5. Equipment and machinery safety 6. Ground conditions and trip hazards Do not identify or describe any individuals. Instead, describe the safety conditions and actions observed. Format each safety observation as: - position:safety condition description Examples: - center:Improper kneeling posture without knee protection, risking joint injury - left:Heavy machinery operating in close proximity to work area - bottom:Uneven ground surface creating trip hazard near work zone""" }, { "type": "image_url", "image_url": { "url": image_url } } ] } ], temperature=0.7, max_tokens=500, stream=False ) return completion.choices[0].message.content except Exception as e: print(f"Detailed error: {str(e)}") return f"Analysis Error: {str(e)}" def draw_observations(self, image, observations): """Draw accurate bounding boxes based on safety issue locations.""" height, width = image.shape[:2] font = cv2.FONT_HERSHEY_SIMPLEX font_scale = 0.5 thickness = 2 padding = 10 def get_region_coordinates(position: str) -> tuple: """Get coordinates based on position description.""" # Basic regions regions = { 'center': (width//3, height//3, 2*width//3, 2*height//3), 'background': (0, 0, width, height), 'top-left': (0, 0, width//3, height//3), 'top': (width//3, 0, 2*width//3, height//3), 'top-right': (2*width//3, 0, width, height//3), 'left': (0, height//3, width//3, 2*height//3), 'right': (2*width//3, height//3, width, 2*height//3), 'bottom-left': (0, 2*height//3, width//3, height), 'bottom': (width//3, 2*height//3, 2*width//3, height), 'bottom-right': (2*width//3, 2*height//3, width, height), 'ground': (0, 2*height//3, width, height) } # Find best matching region position = position.lower() for key in regions.keys(): if key in position: return regions[key] return regions['center'] # Default to center if no match for idx, obs in enumerate(observations): color = self.colors[idx % len(self.colors)] # Get coordinates for this observation x1, y1, x2, y2 = get_region_coordinates(obs['location']) # Draw rectangle cv2.rectangle(image, (x1, y1), (x2, y2), color, 2) # Add label with background label = obs['description'][:50] + "..." if len(obs['description']) > 50 else obs['description'] label_size, _ = cv2.getTextSize(label, font, font_scale, thickness) # Position text above the box text_x = max(0, x1) text_y = max(label_size[1] + padding, y1 - padding) # Draw text background cv2.rectangle(image, (text_x, text_y - label_size[1] - padding), (text_x + label_size[0] + padding, text_y), color, -1) # Draw text cv2.putText(image, label, (text_x + padding//2, text_y - padding//2), font, font_scale, (255, 255, 255), thickness) return image def process_frame(self, frame: np.ndarray) -> tuple[np.ndarray, str]: if frame is None: return None, "No image provided" analysis = self.analyze_frame(frame) display_frame = frame.copy() # Parse observations from the formatted response observations = [] lines = analysis.split('\n') for line in lines: # Look for location tags in the line if '' in line and '' in line: start = line.find('') + len('') end = line.find('') location = line[start:end].strip() # Get the description that follows the location tag desc_start = line.find('') + len(':') description = line[desc_start:].strip() if location and description: observations.append({ 'location': location, 'description': description }) # Draw observations if we found any if observations: annotated_frame = self.draw_observations(display_frame, observations) return annotated_frame, analysis return display_frame, analysis # Create the main interface monitor = SafetyMonitor() with gr.Blocks() as demo: gr.Markdown("# Safety Analysis System powered by Llama 3.2 90b vision") with gr.Row(): input_image = gr.Image(label="Upload Image") output_image = gr.Image(label="Annotated Results") analysis_text = gr.Textbox(label="Detailed Analysis", lines=5) def analyze_image(image): if image is None: return None, "No image provided" try: processed_frame, analysis = monitor.process_frame(image) return processed_frame, analysis except Exception as e: print(f"Processing error: {str(e)}") return None, f"Error processing image: {str(e)}" input_image.change( fn=analyze_image, inputs=input_image, outputs=[output_image, analysis_text] ) return demo demo = create_monitor_interface() demo.launch()