File size: 4,432 Bytes
7b04d4e
49a323c
7b04d4e
 
 
 
49a323c
7b04d4e
33fd6ad
 
1cddd79
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7b04d4e
1cddd79
 
 
 
49a323c
1cddd79
 
 
33fd6ad
1cddd79
 
 
 
 
 
33fd6ad
1cddd79
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7b04d4e
1cddd79
 
 
 
 
 
7b04d4e
1cddd79
 
 
 
 
 
 
 
 
 
7b04d4e
1cddd79
7b04d4e
1cddd79
 
 
 
 
7b04d4e
1cddd79
49a323c
 
1cddd79
49a323c
7b04d4e
49a323c
b6ce847
49a323c
b6ce847
1cddd79
33fd6ad
49a323c
 
 
1cddd79
 
49a323c
 
 
 
 
 
 
7b04d4e
1cddd79
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import gradio as gr
from gradio.components import Image, Textbox, Markdown
import cv2
import numpy as np
from groq import Groq
import time
from PIL import Image as PILImage
import io
import os

def create_monitor_interface():
    api_key = os.getenv("GROQ_API_KEY")
    
    if not api_key:
        with gr.Blocks() as demo:
            gr.Markdown("""
            # ⚠️ Groq API Key Required
            
            ## Setup Instructions for Hugging Face Space:
            1. Go to your Space's Settings tab
            2. Scroll down to "Repository Secrets"
            3. Click "New Secret"
            4. Enter:
               - Secret name: `GROQ_API_KEY`
               - Secret value: Your Groq API key
            5. Click "Add secret"
            6. Rebuild the Space
            
            Once configured, the safety monitoring system will be available.
            """)
        return demo

    class SafetyMonitor:
        def __init__(self, model_name: str = "mixtral-8x7b-vision"):
            self.client = Groq(api_key=api_key)
            self.model_name = model_name

        def analyze_frame(self, frame: np.ndarray) -> str:
            if frame is None:
                return "No frame received"
                
            frame_pil = PILImage.fromarray(frame)
            img_byte_arr = io.BytesIO()
            frame_pil.save(img_byte_arr, format='JPEG')
            img_byte_arr = img_byte_arr.getvalue()
            
            prompt = """Analyze this image for workplace safety issues. Focus on:
            1. PPE usage (helmets, vests, etc.)
            2. Unsafe behaviors
            3. Equipment safety
            4. Environmental hazards
            Provide specific observations."""
            
            try:
                completion = self.client.chat.completions.create(
                    messages=[
                        {
                            "role": "user",
                            "content": [
                                {"type": "text", "text": prompt},
                                {"type": "image", "image": img_byte_arr}
                            ]
                        }
                    ],
                    model=self.model_name,
                    max_tokens=200,
                    temperature=0.2
                )
                return completion.choices[0].message.content
            except Exception as e:
                return f"Analysis Error: {str(e)}"

        def process_frame(self, frame: np.ndarray) -> tuple[np.ndarray, str]:
            if frame is None:
                return None, "No frame received"
                
            analysis = self.analyze_frame(frame)
            display_frame = frame.copy()
            
            # Add text overlay
            overlay = display_frame.copy()
            cv2.rectangle(overlay, (5, 5), (640, 200), (0, 0, 0), -1)
            cv2.addWeighted(overlay, 0.3, display_frame, 0.7, 0, display_frame)
            
            y_position = 30
            for line in analysis.split('\n'):
                cv2.putText(display_frame, line[:80], (10, y_position),
                           cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
                y_position += 30

            return display_frame, analysis

    # Create the main interface
    monitor = SafetyMonitor()
    
    with gr.Blocks() as demo:
        gr.Markdown("# Real-time Safety Monitoring System")
        
        with gr.Row():
            input_image = Image(label="Input Image", type="numpy", tool="upload")
            output_image = Image(label="Analysis Feed", type="numpy")
        
        analysis_text = Textbox(label="Safety Analysis", lines=5)
            
        def analyze_image(image):
            if image is None:
                return None, "No image provided"
            processed_frame, analysis = monitor.process_frame(image)
            return processed_frame, analysis
            
        input_image.change(
            fn=analyze_image,
            inputs=[input_image],
            outputs=[output_image, analysis_text],
        )
        
        gr.Markdown("""
        ## Instructions:
        1. Upload an image using the input panel
        2. The system will automatically analyze the image for safety concerns
        3. Results will be shown in the analysis feed and text box
        """)

    return demo

demo = create_monitor_interface()
demo.launch()