Spaces:
Sleeping
Sleeping
from typing import List | |
import os | |
import cv2 | |
import gradio as gr | |
import numpy as np | |
import supervision as sv | |
import torch | |
from tqdm import tqdm | |
import cv2 | |
from translate import Translator | |
from inference.models.yolo_world.yolo_world import YOLOWorld | |
import datetime | |
import uuid | |
from typing import List | |
def generate_file_name(extension="mp4"): | |
current_datetime = datetime.datetime.now().strftime("%Y%m%d%H%M%S") | |
unique_id = uuid.uuid4() | |
return f"{current_datetime}_{unique_id}.{extension}" | |
def list_files_older_than(directory: str, diff_minutes: int) -> List[str]: | |
diff_seconds = diff_minutes * 60 | |
now = datetime.datetime.now() | |
older_files: List[str] = [] | |
for filename in os.listdir(directory): | |
file_path = os.path.join(directory, filename) | |
if os.path.isfile(file_path): | |
file_mod_time = os.path.getmtime(file_path) | |
file_mod_datetime = datetime.datetime.fromtimestamp(file_mod_time) | |
time_diff = now - file_mod_datetime | |
if time_diff.total_seconds() > diff_seconds: | |
older_files.append(file_path) | |
return older_files | |
def remove_files_older_than(directory: str, diff_minutes: int) -> None: | |
older_files = list_files_older_than(directory, diff_minutes) | |
file_count = len(older_files) | |
for file_path in older_files: | |
os.remove(file_path) | |
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
print( | |
f"[{now}] Removed {file_count} files older than {diff_minutes} minutes from " | |
f"'{directory}' directory." | |
) | |
def calculate_end_frame_index(source_video_path: str) -> int: | |
video_info = sv.VideoInfo.from_video_path(source_video_path) | |
return video_info.total_frames | |
def create_directory(directory_path: str) -> None: | |
if not os.path.exists(directory_path): | |
os.makedirs(directory_path) | |
MARKDOWN = """ | |
<h1>Porto do Itaqui YOLO-World </h1> | |
Este é protótipo em fase de execução que será apresentado ao porto do Itaqui com o objetivo de entregar alguma coisa. | |
""" | |
RESULTS = "results" | |
# IMAGE_EXAMPLES = [ | |
# ['https://media.roboflow.com/dog.jpeg', 'dog, eye, nose, tongue, car', 0.005, 0.1, True, False, False], | |
# ['https://media.roboflow.com/albert-4x.png', 'hand, hair', 0.005, 0.1, True, False, False], | |
# ] | |
# VIDEO_EXAMPLES = [ | |
# ['https://media.roboflow.com/supervision/video-examples/croissant-1280x720.mp4', 'croissant', 0.01, 0.2, False, False, False], | |
# ['https://media.roboflow.com/supervision/video-examples/suitcases-1280x720.mp4', 'suitcase', 0.1, 0.2, False, False, False], | |
# ['https://media.roboflow.com/supervision/video-examples/tokyo-walk-1280x720.mp4', 'woman walking', 0.1, 0.2, False, False, False], | |
# ['https://media.roboflow.com/supervision/video-examples/wooly-mammoth-1280x720.mp4', 'mammoth', 0.01, 0.2, False, False, False], | |
# ] | |
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
# EFFICIENT_SAM_MODEL = load(device=DEVICE) | |
YOLO_WORLD_MODEL = YOLOWorld(model_id="yolo_world/l") | |
BOUNDING_BOX_ANNOTATOR = sv.BoundingBoxAnnotator(thickness=2) | |
MASK_ANNOTATOR = sv.MaskAnnotator() | |
LABEL_ANNOTATOR = sv.LabelAnnotator(text_thickness=2, text_scale=1, text_color=sv.Color.BLACK) | |
# creating video results directory | |
create_directory(directory_path=RESULTS) | |
def process_categories(categories: str) -> List[str]: | |
# Traduzindo as palavras do português para o inglês | |
translator = Translator(from_lang="pt", to_lang="en") | |
translation = translator.translate(categories) | |
return [category.strip() for category in translation.split(',')] | |
def annotate_image( | |
input_image: np.ndarray, | |
detections: sv.Detections, | |
categories: List[str], | |
with_confidence: bool = False, | |
) -> np.ndarray: | |
labels = [ | |
( | |
f"{categories[class_id]}: {confidence:.3f}" | |
if with_confidence | |
else f"{categories[class_id]}" | |
) | |
for class_id, confidence in | |
zip(detections.class_id, detections.confidence) | |
] | |
output_image = MASK_ANNOTATOR.annotate(input_image, detections) | |
output_image = BOUNDING_BOX_ANNOTATOR.annotate(output_image, detections) | |
output_image = LABEL_ANNOTATOR.annotate(output_image, detections, labels=labels) | |
return output_image | |
def process_image( | |
input_image: np.ndarray, | |
categories: str, | |
confidence_threshold: float = 0.3, | |
iou_threshold: float = 0.5, | |
# with_segmentation: bool = True, | |
with_confidence: bool = False, | |
with_class_agnostic_nms: bool = False, | |
) -> np.ndarray: | |
# cleanup of old video files | |
remove_files_older_than(RESULTS, 30) | |
categories = process_categories(categories) | |
YOLO_WORLD_MODEL.set_classes(categories) | |
results = YOLO_WORLD_MODEL.infer(input_image, confidence=0.02) | |
detections = sv.Detections.from_inference(results) | |
detections = detections.with_nms( | |
class_agnostic=with_class_agnostic_nms, | |
threshold=iou_threshold | |
) | |
# if with_segmentation: | |
# detections.mask = inference_with_boxes( | |
# image=input_image, | |
# xyxy=detections.xyxy, | |
# model=EFFICIENT_SAM_MODEL, | |
# device=DEVICE | |
# ) | |
output_image = cv2.cvtColor(input_image, cv2.COLOR_RGB2BGR) | |
output_image = annotate_image( | |
input_image=output_image, | |
detections=detections, | |
categories=categories, | |
with_confidence=with_confidence | |
) | |
return cv2.cvtColor(output_image, cv2.COLOR_BGR2RGB) | |
def process_video( | |
input_video: str, | |
categories: str, | |
confidence_threshold: float = 0.3, | |
iou_threshold: float = 0.5, | |
# with_segmentation: bool = True, | |
with_confidence: bool = False, | |
with_class_agnostic_nms: bool = False, | |
progress=gr.Progress(track_tqdm=True) | |
) -> str: | |
# cleanup of old video files | |
remove_files_older_than(RESULTS, 30) | |
categories = process_categories(categories) | |
YOLO_WORLD_MODEL.set_classes(categories) | |
video_info = sv.VideoInfo.from_video_path(input_video) | |
total = calculate_end_frame_index(input_video) | |
frame_generator = sv.get_video_frames_generator( | |
source_path=input_video, | |
end=total | |
) | |
result_file_name = generate_file_name(extension="mp4") | |
result_file_path = os.path.join(RESULTS, result_file_name) | |
with sv.VideoSink(result_file_path, video_info=video_info) as sink: | |
for _ in tqdm(range(total), desc="Processing video..."): | |
frame = next(frame_generator) | |
results = YOLO_WORLD_MODEL.infer(frame, confidence=confidence_threshold) | |
detections = sv.Detections.from_inference(results) | |
detections = detections.with_nms( | |
class_agnostic=with_class_agnostic_nms, | |
threshold=iou_threshold | |
) | |
# if with_segmentation: | |
# detections.mask = inference_with_boxes( | |
# image=frame, | |
# xyxy=detections.xyxy, | |
# model=EFFICIENT_SAM_MODEL, | |
# device=DEVICE | |
# ) | |
frame = annotate_image( | |
input_image=frame, | |
detections=detections, | |
categories=categories, | |
with_confidence=with_confidence | |
) | |
sink.write_frame(frame) | |
return result_file_path | |
confidence_threshold_component = gr.Slider( | |
minimum=0, | |
maximum=1.0, | |
value=0.03, | |
step=0.01, | |
label="Limite de Confiança", | |
info=( | |
"O limite de confiança para o modelo YOLO-World. Reduza o limite para " | |
"reduzir falsos negativos, aumentando a sensibilidade do modelo para detectar " | |
"objetos procurados. Por outro lado, aumente o limite para minimizar falsos " | |
"positivos, evitando que o modelo identifique objetos que não deveria." | |
)) | |
iou_threshold_component = gr.Slider( | |
minimum=0, | |
maximum=1.0, | |
value=0.1, | |
step=0.01, | |
label="Limite IoU", | |
info=( | |
"Limite de intersecção sobre união (Intersection over Union ou IoU) para supressão não máxima. " | |
"Diminua o valor para diminuir a ocorrência de caixas delimitadoras sobrepostas, " | |
"tornando o processo de detecção mais rigoroso. Por outro lado, aumente o valor " | |
"para permitir mais caixas delimitadoras sobrepostas, acomodando uma gama mais ampla de " | |
"detecções." | |
)) | |
with_segmentation_component = gr.Checkbox( | |
value=False, | |
label="With Segmentation", | |
info=( | |
"Whether to run EfficientSAM for instance segmentation." | |
) | |
) | |
with_confidence_component = gr.Checkbox( | |
value=False, | |
label="Mostrar confiança.", | |
info=( | |
"Mostrar ou não a confiança dos objetos detectados." | |
) | |
) | |
with_class_agnostic_nms_component = gr.Checkbox( | |
value=False, | |
label="Use NMS (Non-Max Supression ou Supressão Não Máxima) independente da classe.", | |
info=( | |
"Suprima caixas delimitadoras sobrepostas em todas as classes." | |
) | |
) | |
with gr.Blocks() as demo: | |
gr.Markdown(MARKDOWN) | |
with gr.Accordion("Confiduração", open=False): | |
confidence_threshold_component.render() | |
iou_threshold_component.render() | |
with gr.Row(): | |
# with_segmentation_component.render() | |
with_confidence_component.render() | |
with_class_agnostic_nms_component.render() | |
with gr.Tab(label="Imagem"): | |
with gr.Row(): | |
input_image_component = gr.Image( | |
type='numpy', | |
label='Imagem de entrada' | |
) | |
output_image_component = gr.Image( | |
type='numpy', | |
label='Imagem de saída' | |
) | |
with gr.Row(): | |
image_categories_text_component = gr.Textbox( | |
label='Categorias', | |
placeholder='Digite as categorias separadas por vírgula', | |
scale=7 | |
) | |
image_submit_button_component = gr.Button( | |
value='Submeter', | |
scale=1, | |
variant='primary' | |
) | |
# gr.Examples( | |
# fn=process_image, | |
# examples=IMAGE_EXAMPLES, | |
# inputs=[ | |
# input_image_component, | |
# image_categories_text_component, | |
# confidence_threshold_component, | |
# iou_threshold_component, | |
# with_segmentation_component, | |
# with_confidence_component, | |
# with_class_agnostic_nms_component | |
# ], | |
# outputs=output_image_component | |
# ) | |
with gr.Tab(label="Video"): | |
with gr.Row(): | |
input_video_component = gr.Video( | |
label='Video de entrada' | |
) | |
output_video_component = gr.Video( | |
label='Video de saída' | |
) | |
with gr.Row(): | |
video_categories_text_component = gr.Textbox( | |
label='Categorias', | |
placeholder='Digite as categorias separadas por vírgula', | |
scale=7 | |
) | |
video_submit_button_component = gr.Button( | |
value='Submeter', | |
scale=1, | |
variant='primary' | |
) | |
# gr.Examples( | |
# fn=process_video, | |
# examples=VIDEO_EXAMPLES, | |
# inputs=[ | |
# input_video_component, | |
# video_categories_text_component, | |
# confidence_threshold_component, | |
# iou_threshold_component, | |
# with_segmentation_component, | |
# with_confidence_component, | |
# with_class_agnostic_nms_component | |
# ], | |
# outputs=output_image_component | |
# ) | |
image_submit_button_component.click( | |
fn=process_image, | |
inputs=[ | |
input_image_component, | |
image_categories_text_component, | |
confidence_threshold_component, | |
iou_threshold_component, | |
# with_segmentation_component, | |
with_confidence_component, | |
with_class_agnostic_nms_component | |
], | |
outputs=output_image_component | |
) | |
video_submit_button_component.click( | |
fn=process_video, | |
inputs=[ | |
input_video_component, | |
video_categories_text_component, | |
confidence_threshold_component, | |
iou_threshold_component, | |
# with_segmentation_component, | |
with_confidence_component, | |
with_class_agnostic_nms_component | |
], | |
outputs=output_video_component | |
) | |
demo.launch(debug=False, show_error=True, max_threads=1) |