|
import gradio as gr |
|
import cv2 |
|
import numpy as np |
|
from groq import Groq |
|
import time |
|
from PIL import Image as PILImage |
|
import io |
|
import os |
|
import base64 |
|
|
|
class SafetyMonitor: |
|
def __init__(self): |
|
self.client = Groq() |
|
self.model_name = "llama-3.2-90b-vision-preview" |
|
self.max_image_size = (800, 800) |
|
self.colors = [(0, 0, 255), (255, 0, 0), (0, 255, 0), (255, 255, 0), (255, 0, 255)] |
|
|
|
def preprocess_image(self, frame): |
|
"""Prepare image for analysis.""" |
|
if len(frame.shape) == 2: |
|
frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2RGB) |
|
elif len(frame.shape) == 3 and frame.shape[2] == 4: |
|
frame = cv2.cvtColor(frame, cv2.COLOR_RGBA2RGB) |
|
|
|
return self.resize_image(frame) |
|
|
|
def resize_image(self, image): |
|
"""Resize image while maintaining aspect ratio.""" |
|
height, width = image.shape[:2] |
|
if height > self.max_image_size[1] or width > self.max_image_size[0]: |
|
aspect = width / height |
|
if width > height: |
|
new_width = self.max_image_size[0] |
|
new_height = int(new_width / aspect) |
|
else: |
|
new_height = self.max_image_size[1] |
|
new_width = int(new_height * aspect) |
|
return cv2.resize(image, (new_width, new_height), interpolation=cv2.INTER_AREA) |
|
return image |
|
|
|
def encode_image(self, frame): |
|
"""Convert image to base64 encoding.""" |
|
frame_pil = PILImage.fromarray(frame) |
|
buffered = io.BytesIO() |
|
frame_pil.save(buffered, format="JPEG", quality=95) |
|
img_base64 = base64.b64encode(buffered.getvalue()).decode('utf-8') |
|
return f"data:image/jpeg;base64,{img_base64}" |
|
|
|
def get_scene_context(self, image: np.ndarray) -> str: |
|
"""Get scene understanding to determine context.""" |
|
try: |
|
image_url = self.encode_image(image) |
|
completion = self.client.chat.completions.create( |
|
model=self.model_name, |
|
messages=[ |
|
{ |
|
"role": "user", |
|
"content": [ |
|
{ |
|
"type": "text", |
|
"text": """Describe the key areas and elements visible in this construction/workplace image. Include: |
|
1. Worker locations and activities |
|
2. Equipment and machinery positions |
|
3. Material storage or work areas |
|
4. Environmental features |
|
5. Access ways and pathways |
|
|
|
Format as: |
|
- Element: precise location description""" |
|
}, |
|
{ |
|
"type": "image_url", |
|
"image_url": { |
|
"url": image_url |
|
} |
|
} |
|
] |
|
} |
|
], |
|
temperature=0.3, |
|
max_tokens=200, |
|
stream=False |
|
) |
|
return completion.choices[0].message.content |
|
except Exception as e: |
|
print(f"Scene analysis error: {str(e)}") |
|
return "" |
|
|
|
def analyze_frame(self, frame: np.ndarray) -> tuple[str, dict]: |
|
"""Analyze frame and return both safety analysis and scene context.""" |
|
if frame is None: |
|
return "No frame received", {} |
|
|
|
|
|
scene_context = self.get_scene_context(frame) |
|
scene_regions = self.parse_scene_context(scene_context) |
|
|
|
|
|
frame = self.preprocess_image(frame) |
|
image_url = self.encode_image(frame) |
|
|
|
try: |
|
completion = self.client.chat.completions.create( |
|
model=self.model_name, |
|
messages=[ |
|
{ |
|
"role": "user", |
|
"content": [ |
|
{ |
|
"type": "text", |
|
"text": """Analyze this workplace image for safety concerns. For each identified hazard: |
|
1. Specify the exact location where the hazard exists |
|
2. Describe the specific safety issue |
|
3. Note any violations or risks |
|
|
|
Format each observation exactly as: |
|
- <location>area:detailed hazard description</location> |
|
|
|
Consider all safety aspects: |
|
- PPE compliance |
|
- Ergonomic risks |
|
- Equipment safety |
|
- Environmental hazards |
|
- Material handling |
|
- Access/egress |
|
- Work procedures |
|
""" |
|
}, |
|
{ |
|
"type": "image_url", |
|
"image_url": { |
|
"url": image_url |
|
} |
|
} |
|
] |
|
} |
|
], |
|
temperature=0.5, |
|
max_tokens=500, |
|
stream=False |
|
) |
|
return completion.choices[0].message.content, scene_regions |
|
except Exception as e: |
|
print(f"Analysis error: {str(e)}") |
|
return f"Analysis Error: {str(e)}", scene_regions |
|
|
|
def parse_scene_context(self, context: str) -> dict: |
|
"""Parse scene context to get region mapping.""" |
|
regions = {} |
|
for line in context.split('\n'): |
|
if line.strip().startswith('-'): |
|
parts = line.strip('- ').split(':') |
|
if len(parts) == 2: |
|
element_type = parts[0].strip() |
|
location = parts[1].strip() |
|
regions[element_type] = location |
|
return regions |
|
|
|
def get_region_coordinates(self, location: str, image_shape: tuple) -> tuple: |
|
"""Convert location description to coordinates.""" |
|
height, width = image_shape[:2] |
|
|
|
|
|
location = location.lower() |
|
x1, y1, x2, y2 = 0, 0, width, height |
|
|
|
|
|
if 'left' in location: |
|
x2 = width // 2 |
|
elif 'right' in location: |
|
x1 = width // 2 |
|
elif 'center' in location: |
|
x1 = width // 4 |
|
x2 = 3 * width // 4 |
|
|
|
|
|
if 'top' in location: |
|
y2 = height // 2 |
|
elif 'bottom' in location: |
|
y1 = height // 2 |
|
elif 'middle' in location or 'center' in location: |
|
y1 = height // 4 |
|
y2 = 3 * height // 4 |
|
|
|
return (x1, y1, x2, y2) |
|
|
|
def draw_observations(self, image: np.ndarray, observations: list, scene_regions: dict) -> np.ndarray: |
|
"""Draw safety observations using scene context.""" |
|
height, width = image.shape[:2] |
|
font = cv2.FONT_HERSHEY_SIMPLEX |
|
font_scale = 0.5 |
|
thickness = 2 |
|
padding = 10 |
|
|
|
for idx, obs in enumerate(observations): |
|
color = self.colors[idx % len(self.colors)] |
|
|
|
|
|
location = obs['location'].lower() |
|
x1, y1, x2, y2 = self.get_region_coordinates(location, image.shape) |
|
|
|
|
|
cv2.rectangle(image, (x1, y1), (x2, y2), color, 2) |
|
|
|
|
|
label = obs['description'][:50] + "..." if len(obs['description']) > 50 else obs['description'] |
|
label_size, _ = cv2.getTextSize(label, font, font_scale, thickness) |
|
|
|
|
|
text_x = max(0, x1) |
|
text_y = max(label_size[1] + padding, y1 - padding) |
|
|
|
|
|
cv2.rectangle(image, |
|
(text_x, text_y - label_size[1] - padding), |
|
(text_x + label_size[0] + padding, text_y), |
|
color, -1) |
|
|
|
|
|
cv2.putText(image, label, |
|
(text_x + padding//2, text_y - padding//2), |
|
font, font_scale, (255, 255, 255), thickness) |
|
|
|
return image |
|
|
|
def process_frame(self, frame: np.ndarray) -> tuple[np.ndarray, str]: |
|
"""Process frame with safety analysis and visualization.""" |
|
if frame is None: |
|
return None, "No image provided" |
|
|
|
|
|
analysis, scene_regions = self.analyze_frame(frame) |
|
display_frame = frame.copy() |
|
|
|
|
|
observations = [] |
|
for line in analysis.split('\n'): |
|
line = line.strip() |
|
if line.startswith('-') and '<location>' in line and '</location>' in line: |
|
start = line.find('<location>') + len('<location>') |
|
end = line.find('</location>') |
|
location_description = line[start:end].strip() |
|
|
|
if ':' in location_description: |
|
location, description = location_description.split(':', 1) |
|
observations.append({ |
|
'location': location.strip(), |
|
'description': description.strip() |
|
}) |
|
|
|
|
|
if observations: |
|
annotated_frame = self.draw_observations(display_frame, observations, scene_regions) |
|
return annotated_frame, analysis |
|
|
|
return display_frame, analysis |
|
|
|
def create_monitor_interface(): |
|
monitor = SafetyMonitor() |
|
|
|
with gr.Blocks() as demo: |
|
gr.Markdown("# Safety Analysis System powered by Llama 3.2 90b vision") |
|
|
|
with gr.Row(): |
|
input_image = gr.Image(label="Upload Image") |
|
output_image = gr.Image(label="Safety Analysis") |
|
|
|
analysis_text = gr.Textbox(label="Detailed Analysis", lines=5) |
|
|
|
def analyze_image(image): |
|
if image is None: |
|
return None, "No image provided" |
|
try: |
|
processed_frame, analysis = monitor.process_frame(image) |
|
return processed_frame, analysis |
|
except Exception as e: |
|
print(f"Processing error: {str(e)}") |
|
return None, f"Error processing image: {str(e)}" |
|
|
|
input_image.change( |
|
fn=analyze_image, |
|
inputs=input_image, |
|
outputs=[output_image, analysis_text] |
|
) |
|
|
|
gr.Markdown(""" |
|
## Instructions: |
|
1. Upload any workplace/safety-related image |
|
2. View identified hazards and their locations |
|
3. Read detailed analysis of safety concerns |
|
""") |
|
|
|
return demo |
|
|
|
if __name__ == "__main__": |
|
demo = create_monitor_interface() |
|
demo.launch() |
|
|