efraim1011's picture
feature 'real time' added
9ff5c58 verified
raw
history blame
15.1 kB
from typing import List
import os
import cv2
import gradio as gr
import numpy as np
import supervision as sv
import torch
from tqdm import tqdm
import cv2
from translate import Translator
from inference.models.yolo_world.yolo_world import YOLOWorld
import datetime
import uuid
from typing import List
def generate_file_name(extension="mp4"):
current_datetime = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
unique_id = uuid.uuid4()
return f"{current_datetime}_{unique_id}.{extension}"
def list_files_older_than(directory: str, diff_minutes: int) -> List[str]:
diff_seconds = diff_minutes * 60
now = datetime.datetime.now()
older_files: List[str] = []
for filename in os.listdir(directory):
file_path = os.path.join(directory, filename)
if os.path.isfile(file_path):
file_mod_time = os.path.getmtime(file_path)
file_mod_datetime = datetime.datetime.fromtimestamp(file_mod_time)
time_diff = now - file_mod_datetime
if time_diff.total_seconds() > diff_seconds:
older_files.append(file_path)
return older_files
def remove_files_older_than(directory: str, diff_minutes: int) -> None:
older_files = list_files_older_than(directory, diff_minutes)
file_count = len(older_files)
for file_path in older_files:
os.remove(file_path)
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(
f"[{now}] Removed {file_count} files older than {diff_minutes} minutes from "
f"'{directory}' directory."
)
def calculate_end_frame_index(source_video_path: str) -> int:
video_info = sv.VideoInfo.from_video_path(source_video_path)
return video_info.total_frames
def create_directory(directory_path: str) -> None:
if not os.path.exists(directory_path):
os.makedirs(directory_path)
MARKDOWN = """
<h1>Porto do Itaqui YOLO-World </h1>
Este é protótipo em fase de execução que será apresentado ao porto do Itaqui com o objetivo de entregar alguma coisa.
"""
RESULTS = "results"
# IMAGE_EXAMPLES = [
# ['https://media.roboflow.com/dog.jpeg', 'dog, eye, nose, tongue, car', 0.005, 0.1, True, False, False],
# ['https://media.roboflow.com/albert-4x.png', 'hand, hair', 0.005, 0.1, True, False, False],
# ]
# VIDEO_EXAMPLES = [
# ['https://media.roboflow.com/supervision/video-examples/croissant-1280x720.mp4', 'croissant', 0.01, 0.2, False, False, False],
# ['https://media.roboflow.com/supervision/video-examples/suitcases-1280x720.mp4', 'suitcase', 0.1, 0.2, False, False, False],
# ['https://media.roboflow.com/supervision/video-examples/tokyo-walk-1280x720.mp4', 'woman walking', 0.1, 0.2, False, False, False],
# ['https://media.roboflow.com/supervision/video-examples/wooly-mammoth-1280x720.mp4', 'mammoth', 0.01, 0.2, False, False, False],
# ]
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# EFFICIENT_SAM_MODEL = load(device=DEVICE)
YOLO_WORLD_MODEL = YOLOWorld(model_id="yolo_world/l")
BOUNDING_BOX_ANNOTATOR = sv.BoundingBoxAnnotator(thickness=2)
MASK_ANNOTATOR = sv.MaskAnnotator()
LABEL_ANNOTATOR = sv.LabelAnnotator(text_thickness=2, text_scale=1, text_color=sv.Color.BLACK)
# creating video results directory
create_directory(directory_path=RESULTS)
def process_categories(categories: str) -> List[str]:
# Traduzindo as palavras do português para o inglês
translator = Translator(from_lang="pt", to_lang="en")
translation = translator.translate(categories)
return [category.strip() for category in translation.split(',')]
def annotate_image(
input_image: np.ndarray,
detections: sv.Detections,
categories: List[str],
with_confidence: bool = False,
) -> np.ndarray:
labels = [
(
f"{categories[class_id]}: {confidence:.3f}"
if with_confidence
else f"{categories[class_id]}"
)
for class_id, confidence in
zip(detections.class_id, detections.confidence)
]
output_image = MASK_ANNOTATOR.annotate(input_image, detections)
output_image = BOUNDING_BOX_ANNOTATOR.annotate(output_image, detections)
output_image = LABEL_ANNOTATOR.annotate(output_image, detections, labels=labels)
return output_image
def process_image(
input_image: np.ndarray,
categories: str,
confidence_threshold: float = 0.3,
iou_threshold: float = 0.5,
# with_segmentation: bool = True,
with_confidence: bool = False,
with_class_agnostic_nms: bool = False,
) -> np.ndarray:
# cleanup of old video files
remove_files_older_than(RESULTS, 30)
categories = process_categories(categories)
YOLO_WORLD_MODEL.set_classes(categories)
results = YOLO_WORLD_MODEL.infer(input_image, confidence=0.02)
detections = sv.Detections.from_inference(results)
detections = detections.with_nms(
class_agnostic=with_class_agnostic_nms,
threshold=iou_threshold
)
# if with_segmentation:
# detections.mask = inference_with_boxes(
# image=input_image,
# xyxy=detections.xyxy,
# model=EFFICIENT_SAM_MODEL,
# device=DEVICE
# )
output_image = cv2.cvtColor(input_image, cv2.COLOR_RGB2BGR)
output_image = annotate_image(
input_image=output_image,
detections=detections,
categories=categories,
with_confidence=with_confidence
)
return cv2.cvtColor(output_image, cv2.COLOR_BGR2RGB)
def process_video(
input_video: str,
categories: str,
confidence_threshold: float = 0.3,
iou_threshold: float = 0.5,
# with_segmentation: bool = True,
with_confidence: bool = False,
with_class_agnostic_nms: bool = False,
progress=gr.Progress(track_tqdm=True)
) -> str:
# cleanup of old video files
remove_files_older_than(RESULTS, 30)
categories = process_categories(categories)
YOLO_WORLD_MODEL.set_classes(categories)
video_info = sv.VideoInfo.from_video_path(input_video)
total = calculate_end_frame_index(input_video)
frame_generator = sv.get_video_frames_generator(
source_path=input_video,
end=total
)
result_file_name = generate_file_name(extension="mp4")
result_file_path = os.path.join(RESULTS, result_file_name)
with sv.VideoSink(result_file_path, video_info=video_info) as sink:
for _ in tqdm(range(total), desc="Processing video..."):
frame = next(frame_generator)
results = YOLO_WORLD_MODEL.infer(frame, confidence=confidence_threshold)
detections = sv.Detections.from_inference(results)
detections = detections.with_nms(
class_agnostic=with_class_agnostic_nms,
threshold=iou_threshold
)
# if with_segmentation:
# detections.mask = inference_with_boxes(
# image=frame,
# xyxy=detections.xyxy,
# model=EFFICIENT_SAM_MODEL,
# device=DEVICE
# )
frame = annotate_image(
input_image=frame,
detections=detections,
categories=categories,
with_confidence=with_confidence
)
sink.write_frame(frame)
return result_file_path
def process_image_real_time(
input_image: np.ndarray,
categories: str = "safety helmet, safety glasses, hearing protectors, protective masks, safety shoes, protective gloves, seat belt, person, bicycle, car, motorcycle, airplane, dining table, bus, train, truck, boat, traffic light, fire hydrant, stop sign, parking meter, bench, bird, cat, dog, horse, sheep, cow, elephant, bear, zebra, giraffe, backpack, umbrella, handbag, tie, suitcase, frisbee, skis, snowboard, sports ball, kite, baseball bat, baseball glove, skateboard, surfboard, tennis racket, bottle, wine glass, cup, fork, knife, spoon, bowl, banana, apple, sandwich, orange, broccoli, carrot, hot dog, pizza, donut, cake, chair, couch, potted plant, bed, toilet, tv, laptop, mouse, remote, keyboard, cell phone, microwave, oven, toaster, sink, refrigerator, book, clock, vase, scissors, teddy bear, hair drier, toothbrush",
confidence_threshold: float = 0.3,
iou_threshold: float = 0.5,
# with_segmentation: bool = True,
with_confidence: bool = False,
with_class_agnostic_nms: bool = False,) -> np.ndarray:
# cleanup of old video files
remove_files_older_than(RESULTS, 30)
categories = process_categories(categories)
YOLO_WORLD_MODEL.set_classes(categories)
results = YOLO_WORLD_MODEL.infer(input_image, confidence=0.02)
detections = sv.Detections.from_inference(results)
detections = detections.with_nms(
class_agnostic=with_class_agnostic_nms,
threshold=iou_threshold
)
# if with_segmentation:
# detections.mask = inference_with_boxes(
# image=input_image,
# xyxy=detections.xyxy,
# model=EFFICIENT_SAM_MODEL,
# device=DEVICE
# )
output_image = cv2.cvtColor(input_image, cv2.COLOR_RGB2BGR)
output_image = annotate_image(
input_image=output_image,
detections=detections,
categories=categories,
with_confidence=with_confidence
)
return cv2.cvtColor(output_image, cv2.COLOR_BGR2RGB)
confidence_threshold_component = gr.Slider(
minimum=0,
maximum=1.0,
value=0.03,
step=0.01,
label="Limite de Confiança",
info=(
"O limite de confiança para o modelo YOLO-World. Reduza o limite para "
"reduzir falsos negativos, aumentando a sensibilidade do modelo para detectar "
"objetos procurados. Por outro lado, aumente o limite para minimizar falsos "
"positivos, evitando que o modelo identifique objetos que não deveria."
))
iou_threshold_component = gr.Slider(
minimum=0,
maximum=1.0,
value=0.1,
step=0.01,
label="Limite IoU",
info=(
"Limite de intersecção sobre união (Intersection over Union ou IoU) para supressão não máxima. "
"Diminua o valor para diminuir a ocorrência de caixas delimitadoras sobrepostas, "
"tornando o processo de detecção mais rigoroso. Por outro lado, aumente o valor "
"para permitir mais caixas delimitadoras sobrepostas, acomodando uma gama mais ampla de "
"detecções."
))
with_segmentation_component = gr.Checkbox(
value=False,
label="With Segmentation",
info=(
"Whether to run EfficientSAM for instance segmentation."
)
)
with_confidence_component = gr.Checkbox(
value=False,
label="Mostrar confiança.",
info=(
"Mostrar ou não a confiança dos objetos detectados."
)
)
with_class_agnostic_nms_component = gr.Checkbox(
value=False,
label="Use NMS (Non-Max Supression ou Supressão Não Máxima) independente da classe.",
info=(
"Suprima caixas delimitadoras sobrepostas em todas as classes."
)
)
with gr.Blocks() as demo:
gr.Markdown(MARKDOWN)
with gr.Accordion("Configuração", open=False):
confidence_threshold_component.render()
iou_threshold_component.render()
with gr.Row():
# with_segmentation_component.render()
with_confidence_component.render()
with_class_agnostic_nms_component.render()
with gr.Tab(label="Imagem"):
with gr.Row():
input_image_component = gr.Image(
type='numpy',
label='Imagem de entrada'
)
output_image_component = gr.Image(
type='numpy',
label='Imagem de saída'
)
with gr.Row():
image_categories_text_component = gr.Textbox(
label='Categorias',
placeholder='Digite as categorias separadas por vírgula',
scale=7
)
image_submit_button_component = gr.Button(
value='Submeter',
scale=1,
variant='primary'
)
# gr.Examples(
# fn=process_image,
# examples=IMAGE_EXAMPLES,
# inputs=[
# input_image_component,
# image_categories_text_component,
# confidence_threshold_component,
# iou_threshold_component,
# with_segmentation_component,
# with_confidence_component,
# with_class_agnostic_nms_component
# ],
# outputs=output_image_component
# )
with gr.Tab(label="Video"):
with gr.Row():
input_video_component = gr.Video(
label='Video de entrada'
)
output_video_component = gr.Video(
label='Video de saída'
)
with gr.Row():
video_categories_text_component = gr.Textbox(
label='Categorias',
placeholder='Digite as categorias separadas por vírgula',
scale=7
)
video_submit_button_component = gr.Button(
value='Submeter',
scale=1,
variant='primary'
)
# gr.Examples(
# fn=process_video,
# examples=VIDEO_EXAMPLES,
# inputs=[
# input_video_component,
# video_categories_text_component,
# confidence_threshold_component,
# iou_threshold_component,
# with_segmentation_component,
# with_confidence_component,
# with_class_agnostic_nms_component
# ],
# outputs=output_image_component
# )
with gr.Tab(label="Tempo Real"):
with gr.Row():
input_real_time_component = gr.Image(label="Entrada", sources=["webcam"], streaming=True)
output_real_time_component = gr.Image(label="Saída")
image_submit_button_component.click(
fn=process_image,
inputs=[
input_image_component,
image_categories_text_component,
confidence_threshold_component,
iou_threshold_component,
# with_segmentation_component,
with_confidence_component,
with_class_agnostic_nms_component
],
outputs=output_image_component
)
video_submit_button_component.click(
fn=process_video,
inputs=[
input_video_component,
video_categories_text_component,
confidence_threshold_component,
iou_threshold_component,
# with_segmentation_component,
with_confidence_component,
with_class_agnostic_nms_component
],
outputs=output_video_component
)
input_real_time_component.change(fn=process_image_real_time,inputs=input_real_time_component, outputs=output_real_time_component)
demo.launch(debug=False, show_error=True, max_threads=1, share=True)