File size: 9,709 Bytes
7b04d4e 49a323c 7b04d4e 33fd6ad 75c2b7c 33fd6ad 1cddd79 5f3406b 9bf83e0 5f3406b bda20be 5f3406b 18cd948 bda20be 18cd948 b122109 1cddd79 18cd948 bda20be 18cd948 30f620c 27eab0f 30f620c 27eab0f fc9e0d8 49a323c 27eab0f 46f4ca8 27eab0f 9fd1d46 bda20be 27eab0f f2ae346 33fd6ad 1cddd79 f2ae346 1cddd79 5f3406b bda20be 5f3406b f2ae346 5f3406b 1cddd79 bda20be 46f4ca8 1cddd79 18cd948 1cddd79 bda20be 18cd948 740f7c7 bda20be 9bf83e0 46f4ca8 9bf83e0 18cd948 9bf83e0 46f4ca8 9bf83e0 46f4ca8 bd1163f 46f4ca8 bd1163f 46f4ca8 bda20be 46f4ca8 bd1163f 9bf83e0 bda20be 46f4ca8 18cd948 bd1163f bda20be bd1163f bda20be 9bf83e0 18cd948 1cddd79 7e6153d 7b04d4e 1cddd79 b4f3ea6 18cd948 1cddd79 18cd948 7b04d4e b4f3ea6 b6ce847 49a323c 27eab0f 9fd1d46 27eab0f 33fd6ad b4f3ea6 1cddd79 7b04d4e bda20be 1cddd79 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 |
import gradio as gr
import cv2
import numpy as np
from groq import Groq
import time
from PIL import Image as PILImage
import io
import os
import base64
def create_monitor_interface():
api_key = os.getenv("GROQ_API_KEY")
class SafetyMonitor:
def __init__(self):
self.client = Groq()
self.model_name = "llama-3.2-90b-vision-preview"
self.max_image_size = (800, 800)
self.colors = [(0, 0, 255), (255, 0, 0), (0, 255, 0), (255, 255, 0), (255, 0, 255)]
def resize_image(self, image):
height, width = image.shape[:2]
if height > self.max_image_size[1] or width > self.max_image_size[0]:
aspect = width / height
if width > height:
new_width = self.max_image_size[0]
new_height = int(new_width / aspect)
else:
new_height = self.max_image_size[1]
new_width = int(new_height * aspect)
return cv2.resize(image, (new_width, new_height), interpolation=cv2.INTER_AREA)
return image
def analyze_frame(self, frame: np.ndarray) -> str:
if frame is None:
return "No frame received"
# Convert and resize image
if len(frame.shape) == 2:
frame = cv2.cvtColor(frame, cv2.COLOR_GRAY2RGB)
elif len(frame.shape) == 3 and frame.shape[2] == 4:
frame = cv2.cvtColor(frame, cv2.COLOR_RGBA2RGB)
frame = self.resize_image(frame)
frame_pil = PILImage.fromarray(frame)
# High quality image for better analysis
buffered = io.BytesIO()
frame_pil.save(buffered,
format="JPEG",
quality=95,
optimize=True)
img_base64 = base64.b64encode(buffered.getvalue()).decode('utf-8')
image_url = f"data:image/jpeg;base64,{img_base64}"
try:
completion = self.client.chat.completions.create(
model=self.model_name,
messages=[
{
"role": "user",
"content": [
{
"type": "text",
"text": """Analyze this workplace image for safety conditions and hazards. Focus on:
1. Work posture and ergonomics
2. PPE and safety equipment usage
3. Tool handling and techniques
4. Environmental conditions
5. Equipment and machinery safety
6. Ground conditions and hazards
Describe each safety condition observed, using this exact format:
- <location>position</location>: detailed safety observation
Examples:
- <location>center</location>: Improper kneeling posture without knee protection, risking joint injury
- <location>background</location>: Heavy machinery operating in close proximity creating hazard zone
- <location>ground</location>: Uneven surface and debris creating trip hazards
Be specific about locations and safety concerns."""
},
{
"type": "image_url",
"image_url": {
"url": image_url
}
}
]
}
],
temperature=0.5,
max_tokens=500,
stream=False
)
return completion.choices[0].message.content
except Exception as e:
print(f"Analysis error: {str(e)}")
return f"Analysis Error: {str(e)}"
def process_frame(self, frame: np.ndarray) -> tuple[np.ndarray, str]:
if frame is None:
return None, "No image provided"
analysis = self.analyze_frame(frame)
display_frame = frame.copy()
# Parse observations from the formatted response
observations = []
lines = analysis.split('\n')
for line in lines:
if '<location>' in line and '</location>' in line:
start = line.find('<location>') + len('<location>')
end = line.find('</location>')
location = line[start:end].strip()
# Get the description that follows the location tags
desc_start = line.find('</location>') + len('</location>:')
description = line[desc_start:].strip()
if location and description:
observations.append({
'location': location,
'description': description
})
# Draw observations if we found any
if observations:
annotated_frame = self.draw_observations(display_frame, observations)
return annotated_frame, analysis
return display_frame, analysis
def draw_observations(self, image, observations):
"""Draw accurate bounding boxes based on safety issue locations."""
height, width = image.shape[:2]
font = cv2.FONT_HERSHEY_SIMPLEX
font_scale = 0.5
thickness = 2
padding = 10
def get_region_coordinates(position: str) -> tuple:
"""Get coordinates based on position description."""
regions = {
'center': (width//3, height//3, 2*width//3, 2*height//3),
'background': (0, 0, width, height),
'top-left': (0, 0, width//3, height//3),
'top': (width//3, 0, 2*width//3, height//3),
'top-right': (2*width//3, 0, width, height//3),
'left': (0, height//3, width//3, 2*height//3),
'right': (2*width//3, height//3, width, 2*height//3),
'bottom-left': (0, 2*height//3, width//3, height),
'bottom': (width//3, 2*height//3, 2*width//3, height),
'bottom-right': (2*width//3, 2*height//3, width, height),
'ground': (0, 2*height//3, width, height),
'machinery': (0, 0, width//2, height),
'work-area': (width//4, height//4, 3*width//4, 3*height//4)
}
# Find best matching region
position = position.lower()
for key in regions.keys():
if key in position:
return regions[key]
return regions['center']
for idx, obs in enumerate(observations):
color = self.colors[idx % len(self.colors)]
# Get coordinates for this observation
x1, y1, x2, y2 = get_region_coordinates(obs['location'])
# Draw rectangle
cv2.rectangle(image, (x1, y1), (x2, y2), color, 2)
# Add label with background
label = obs['description'][:50] + "..." if len(obs['description']) > 50 else obs['description']
label_size, _ = cv2.getTextSize(label, font, font_scale, thickness)
# Position text above the box
text_x = max(0, x1)
text_y = max(label_size[1] + padding, y1 - padding)
# Draw text background
cv2.rectangle(image,
(text_x, text_y - label_size[1] - padding),
(text_x + label_size[0] + padding, text_y),
color, -1)
# Draw text
cv2.putText(image, label,
(text_x + padding//2, text_y - padding//2),
font, font_scale, (255, 255, 255), thickness)
return image
# Create the main interface
monitor = SafetyMonitor()
with gr.Blocks() as demo:
gr.Markdown("# Safety Analysis System powered by Llama 3.2 90b vision")
with gr.Row():
input_image = gr.Image(label="Upload Image")
output_image = gr.Image(label="Annotated Results")
analysis_text = gr.Textbox(label="Detailed Analysis", lines=5)
def analyze_image(image):
if image is None:
return None, "No image provided"
try:
processed_frame, analysis = monitor.process_frame(image)
return processed_frame, analysis
except Exception as e:
print(f"Processing error: {str(e)}")
return None, f"Error processing image: {str(e)}"
input_image.change(
fn=analyze_image,
inputs=input_image,
outputs=[output_image, analysis_text]
)
gr.Markdown("""
## Instructions:
1. Upload an image to analyze safety conditions
2. View annotated results showing safety concerns
3. Read detailed analysis of identified issues
""")
return demo
demo = create_monitor_interface()
demo.launch() |