Spaces:
Sleeping
Sleeping
from typing import List | |
import os | |
import cv2 | |
import gradio as gr | |
import numpy as np | |
import supervision as sv | |
import torch | |
from tqdm import tqdm | |
import cv2 | |
from translate import Translator | |
from inference.models.yolo_world.yolo_world import YOLOWorld | |
import datetime | |
import uuid | |
from typing import List | |
def generate_file_name(extension="mp4"): | |
current_datetime = datetime.datetime.now().strftime("%Y%m%d%H%M%S") | |
unique_id = uuid.uuid4() | |
return f"{current_datetime}_{unique_id}.{extension}" | |
def list_files_older_than(directory: str, diff_minutes: int) -> List[str]: | |
diff_seconds = diff_minutes * 60 | |
now = datetime.datetime.now() | |
older_files: List[str] = [] | |
for filename in os.listdir(directory): | |
file_path = os.path.join(directory, filename) | |
if os.path.isfile(file_path): | |
file_mod_time = os.path.getmtime(file_path) | |
file_mod_datetime = datetime.datetime.fromtimestamp(file_mod_time) | |
time_diff = now - file_mod_datetime | |
if time_diff.total_seconds() > diff_seconds: | |
older_files.append(file_path) | |
return older_files | |
def remove_files_older_than(directory: str, diff_minutes: int) -> None: | |
older_files = list_files_older_than(directory, diff_minutes) | |
file_count = len(older_files) | |
for file_path in older_files: | |
os.remove(file_path) | |
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
print( | |
f"[{now}] Removed {file_count} files older than {diff_minutes} minutes from " | |
f"'{directory}' directory." | |
) | |
def calculate_end_frame_index(source_video_path: str) -> int: | |
video_info = sv.VideoInfo.from_video_path(source_video_path) | |
return video_info.total_frames | |
def create_directory(directory_path: str) -> None: | |
if not os.path.exists(directory_path): | |
os.makedirs(directory_path) | |
MARKDOWN = """ | |
<h1>Porto do Itaqui YOLO-World </h1> | |
Este é protótipo em fase de execução que será apresentado ao porto do Itaqui com o objetivo de entregar alguma coisa. | |
""" | |
RESULTS = "results" | |
# IMAGE_EXAMPLES = [ | |
# ['https://media.roboflow.com/dog.jpeg', 'dog, eye, nose, tongue, car', 0.005, 0.1, True, False, False], | |
# ['https://media.roboflow.com/albert-4x.png', 'hand, hair', 0.005, 0.1, True, False, False], | |
# ] | |
# VIDEO_EXAMPLES = [ | |
# ['https://media.roboflow.com/supervision/video-examples/croissant-1280x720.mp4', 'croissant', 0.01, 0.2, False, False, False], | |
# ['https://media.roboflow.com/supervision/video-examples/suitcases-1280x720.mp4', 'suitcase', 0.1, 0.2, False, False, False], | |
# ['https://media.roboflow.com/supervision/video-examples/tokyo-walk-1280x720.mp4', 'woman walking', 0.1, 0.2, False, False, False], | |
# ['https://media.roboflow.com/supervision/video-examples/wooly-mammoth-1280x720.mp4', 'mammoth', 0.01, 0.2, False, False, False], | |
# ] | |
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
# EFFICIENT_SAM_MODEL = load(device=DEVICE) | |
YOLO_WORLD_MODEL = YOLOWorld(model_id="yolo_world/l") | |
BOUNDING_BOX_ANNOTATOR = sv.BoundingBoxAnnotator(thickness=2) | |
MASK_ANNOTATOR = sv.MaskAnnotator() | |
LABEL_ANNOTATOR = sv.LabelAnnotator(text_thickness=2, text_scale=1, text_color=sv.Color.BLACK) | |
# creating video results directory | |
create_directory(directory_path=RESULTS) | |
def process_categories(categories: str) -> List[str]: | |
# Traduzindo as palavras do português para o inglês | |
return [category.strip() for category in categories.split(',')] | |
def annotate_image( | |
input_image: np.ndarray, | |
detections: sv.Detections, | |
categories: List[str], | |
with_confidence: bool = False, | |
) -> np.ndarray: | |
labels = [ | |
( | |
f"{categories[class_id]}: {confidence:.3f}" | |
if with_confidence | |
else f"{categories[class_id]}" | |
) | |
for class_id, confidence in | |
zip(detections.class_id, detections.confidence) | |
] | |
output_image = MASK_ANNOTATOR.annotate(input_image, detections) | |
output_image = BOUNDING_BOX_ANNOTATOR.annotate(output_image, detections) | |
output_image = LABEL_ANNOTATOR.annotate(output_image, detections, labels=labels) | |
return output_image | |
def process_image( | |
input_image: np.ndarray, | |
categories: str, | |
confidence_threshold: float = 0.3, | |
iou_threshold: float = 0.5, | |
# with_segmentation: bool = True, | |
with_confidence: bool = False, | |
with_class_agnostic_nms: bool = False, | |
) -> np.ndarray: | |
# cleanup of old video files | |
remove_files_older_than(RESULTS, 30) | |
categories = process_categories(categories) | |
YOLO_WORLD_MODEL.set_classes(categories) | |
results = YOLO_WORLD_MODEL.infer(input_image, confidence=0.02) | |
detections = sv.Detections.from_inference(results) | |
detections = detections.with_nms( | |
class_agnostic=with_class_agnostic_nms, | |
threshold=iou_threshold | |
) | |
# if with_segmentation: | |
# detections.mask = inference_with_boxes( | |
# image=input_image, | |
# xyxy=detections.xyxy, | |
# model=EFFICIENT_SAM_MODEL, | |
# device=DEVICE | |
# ) | |
output_image = cv2.cvtColor(input_image, cv2.COLOR_RGB2BGR) | |
output_image = annotate_image( | |
input_image=output_image, | |
detections=detections, | |
categories=categories, | |
with_confidence=with_confidence | |
) | |
return cv2.cvtColor(output_image, cv2.COLOR_BGR2RGB) | |
def process_video( | |
input_video: str, | |
categories: str, | |
confidence_threshold: float = 0.3, | |
iou_threshold: float = 0.5, | |
# with_segmentation: bool = True, | |
with_confidence: bool = False, | |
with_class_agnostic_nms: bool = False, | |
progress=gr.Progress(track_tqdm=True) | |
) -> str: | |
# cleanup of old video files | |
remove_files_older_than(RESULTS, 30) | |
categories = process_categories(categories) | |
YOLO_WORLD_MODEL.set_classes(categories) | |
video_info = sv.VideoInfo.from_video_path(input_video) | |
total = calculate_end_frame_index(input_video) | |
frame_generator = sv.get_video_frames_generator( | |
source_path=input_video, | |
end=total | |
) | |
result_file_name = generate_file_name(extension="mp4") | |
result_file_path = os.path.join(RESULTS, result_file_name) | |
with sv.VideoSink(result_file_path, video_info=video_info) as sink: | |
for _ in tqdm(range(total), desc="Processing video..."): | |
frame = next(frame_generator) | |
results = YOLO_WORLD_MODEL.infer(frame, confidence=confidence_threshold) | |
detections = sv.Detections.from_inference(results) | |
detections = detections.with_nms( | |
class_agnostic=with_class_agnostic_nms, | |
threshold=iou_threshold | |
) | |
# if with_segmentation: | |
# detections.mask = inference_with_boxes( | |
# image=frame, | |
# xyxy=detections.xyxy, | |
# model=EFFICIENT_SAM_MODEL, | |
# device=DEVICE | |
# ) | |
frame = annotate_image( | |
input_image=frame, | |
detections=detections, | |
categories=categories, | |
with_confidence=with_confidence | |
) | |
sink.write_frame(frame) | |
return result_file_path | |
def process_image_real_time( | |
input_image: np.ndarray, | |
categories: str = "safety helmet, safety glasses, hearing protectors, protective masks, safety shoes, protective gloves, seat belt, person, bicycle, car, motorcycle, airplane, dining table, bus, train, truck, boat, parking meter, bench, bird, cat, dog, horse, elephant, bear, zebra, giraffe, backpack, umbrella, handbag, tie, suitcase, frisbee, skis, snowboard, sports ball, kite, baseball bat, baseball glove, skateboard, surfboard, tennis racket, bottle, wine glass, cup, fork, knife, spoon, bowl, banana, apple, sandwich, orange, donut, cake, chair, couch, potted plant, bed, toilet, tv, laptop, mouse, remote, keyboard, cell phone, microwave, oven, toaster, sink, refrigerator, book, clock, vase, teddy bear, hair drier, toothbrush", | |
confidence_threshold: float = 0.3, | |
iou_threshold: float = 0.5, | |
# with_segmentation: bool = True, | |
with_confidence: bool = False, | |
with_class_agnostic_nms: bool = False, | |
) -> np.ndarray: | |
# cleanup of old video files | |
remove_files_older_than(RESULTS, 30) | |
categories = process_categories(categories) | |
YOLO_WORLD_MODEL.set_classes(categories) | |
results = YOLO_WORLD_MODEL.infer(input_image, confidence=0.02) | |
detections = sv.Detections.from_inference(results) | |
detections = detections.with_nms( | |
class_agnostic=with_class_agnostic_nms, | |
threshold=iou_threshold | |
) | |
# if with_segmentation: | |
# detections.mask = inference_with_boxes( | |
# image=input_image, | |
# xyxy=detections.xyxy, | |
# model=EFFICIENT_SAM_MODEL, | |
# device=DEVICE | |
# ) | |
output_image = cv2.cvtColor(input_image, cv2.COLOR_RGB2BGR) | |
output_image = annotate_image( | |
input_image=output_image, | |
detections=detections, | |
categories=categories, | |
with_confidence=with_confidence | |
) | |
return cv2.cvtColor(output_image, cv2.COLOR_BGR2RGB) | |
confidence_threshold_component = gr.Slider( | |
minimum=0, | |
maximum=1.0, | |
value=0.03, | |
step=0.01, | |
label="Limite de Confiança", | |
info=( | |
"O limite de confiança para o modelo YOLO-World. Reduza o limite para " | |
"reduzir falsos negativos, aumentando a sensibilidade do modelo para detectar " | |
"objetos procurados. Por outro lado, aumente o limite para minimizar falsos " | |
"positivos, evitando que o modelo identifique objetos que não deveria." | |
)) | |
iou_threshold_component = gr.Slider( | |
minimum=0, | |
maximum=1.0, | |
value=0.1, | |
step=0.01, | |
label="Limite IoU", | |
info=( | |
"Limite de intersecção sobre união (Intersection over Union ou IoU) para supressão não máxima. " | |
"Diminua o valor para diminuir a ocorrência de caixas delimitadoras sobrepostas, " | |
"tornando o processo de detecção mais rigoroso. Por outro lado, aumente o valor " | |
"para permitir mais caixas delimitadoras sobrepostas, acomodando uma gama mais ampla de " | |
"detecções." | |
)) | |
with_segmentation_component = gr.Checkbox( | |
value=False, | |
label="With Segmentation", | |
info=( | |
"Whether to run EfficientSAM for instance segmentation." | |
) | |
) | |
with_confidence_component = gr.Checkbox( | |
value=False, | |
label="Mostrar confiança.", | |
info=( | |
"Mostrar ou não a confiança dos objetos detectados." | |
) | |
) | |
with_class_agnostic_nms_component = gr.Checkbox( | |
value=False, | |
label="Use NMS (Non-Max Supression ou Supressão Não Máxima) independente da classe.", | |
info=( | |
"Suprima caixas delimitadoras sobrepostas em todas as classes." | |
) | |
) | |
with gr.Blocks() as demo: | |
gr.Markdown(MARKDOWN) | |
with gr.Accordion("Configuração", open=False): | |
confidence_threshold_component.render() | |
iou_threshold_component.render() | |
with gr.Row(): | |
# with_segmentation_component.render() | |
with_confidence_component.render() | |
with_class_agnostic_nms_component.render() | |
with gr.Tab(label="Imagem"): | |
with gr.Row(): | |
input_image_component = gr.Image( | |
type='numpy', | |
label='Imagem de entrada' | |
) | |
output_image_component = gr.Image( | |
type='numpy', | |
label='Imagem de saída' | |
) | |
with gr.Row(): | |
image_categories_text_component = gr.Textbox( | |
label='Categorias', | |
placeholder='Digite as categorias separadas por vírgula', | |
scale=7 | |
) | |
image_submit_button_component = gr.Button( | |
value='Submeter', | |
scale=1, | |
variant='primary' | |
) | |
# gr.Examples( | |
# fn=process_image, | |
# examples=IMAGE_EXAMPLES, | |
# inputs=[ | |
# input_image_component, | |
# image_categories_text_component, | |
# confidence_threshold_component, | |
# iou_threshold_component, | |
# with_segmentation_component, | |
# with_confidence_component, | |
# with_class_agnostic_nms_component | |
# ], | |
# outputs=output_image_component | |
# ) | |
with gr.Tab(label="Video"): | |
with gr.Row(): | |
input_video_component = gr.Video( | |
label='Video de entrada' | |
) | |
output_video_component = gr.Video( | |
label='Video de saída' | |
) | |
with gr.Row(): | |
video_categories_text_component = gr.Textbox( | |
label='Categorias', | |
placeholder='Digite as categorias separadas por vírgula', | |
scale=7 | |
) | |
video_submit_button_component = gr.Button( | |
value='Submeter', | |
scale=1, | |
variant='primary' | |
) | |
# gr.Examples( | |
# fn=process_video, | |
# examples=VIDEO_EXAMPLES, | |
# inputs=[ | |
# input_video_component, | |
# video_categories_text_component, | |
# confidence_threshold_component, | |
# iou_threshold_component, | |
# with_segmentation_component, | |
# with_confidence_component, | |
# with_class_agnostic_nms_component | |
# ], | |
# outputs=output_image_component | |
# ) | |
with gr.Tab(label="Tempo Real"): | |
with gr.Row(): | |
input_real_time_component = gr.Image(label="Entrada", sources=["webcam"], streaming=True) | |
output_real_time_component = gr.Image(label="Saída") | |
image_submit_button_component.click( | |
fn=process_image, | |
inputs=[ | |
input_image_component, | |
image_categories_text_component, | |
confidence_threshold_component, | |
iou_threshold_component, | |
# with_segmentation_component, | |
with_confidence_component, | |
with_class_agnostic_nms_component | |
], | |
outputs=output_image_component | |
) | |
video_submit_button_component.click( | |
fn=process_video, | |
inputs=[ | |
input_video_component, | |
video_categories_text_component, | |
confidence_threshold_component, | |
iou_threshold_component, | |
# with_segmentation_component, | |
with_confidence_component, | |
with_class_agnostic_nms_component | |
], | |
outputs=output_video_component | |
) | |
input_real_time_component.change(fn=process_image_real_time, inputs=input_real_time_component, outputs=output_real_time_component) | |
demo.launch(debug=False, show_error=True, max_threads=1, share=True) |