VideoAnalyzer / app.py
Zeph27's picture
delete info
66c23b6
raw
history blame
4.87 kB
import gradio as gr
import torch
from transformers import AutoModel, AutoTokenizer, pipeline
from PIL import Image
from decord import VideoReader, cpu
import base64
import io
import spaces
import time
import os
from transformers.pipelines.audio_utils import ffmpeg_read
import moviepy.editor as mp
# Load models
model_path = 'openbmb/MiniCPM-V-2_6'
model = AutoModel.from_pretrained(model_path, trust_remote_code=True, torch_dtype=torch.bfloat16)
model = model.to(device='cuda')
tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True)
model.eval()
# Load Whisper model
whisper_model = "openai/whisper-large-v3"
asr_pipeline = pipeline(
task="automatic-speech-recognition",
model=whisper_model,
chunk_length_s=30,
device="cuda" if torch.cuda.is_available() else "cpu",
)
MAX_NUM_FRAMES = 64
def encode_image(image):
if not isinstance(image, Image.Image):
image = Image.open(image).convert("RGB")
max_size = 448*16
if max(image.size) > max_size:
w,h = image.size
if w > h:
new_w = max_size
new_h = int(h * max_size / w)
else:
new_h = max_size
new_w = int(w * max_size / h)
image = image.resize((new_w, new_h), resample=Image.BICUBIC)
return image
def encode_video(video_path):
vr = VideoReader(video_path, ctx=cpu(0))
sample_fps = round(vr.get_avg_fps() / 1)
frame_idx = [i for i in range(0, len(vr), sample_fps)]
if len(frame_idx) > MAX_NUM_FRAMES:
frame_idx = frame_idx[:MAX_NUM_FRAMES]
video = vr.get_batch(frame_idx).asnumpy()
video = [Image.fromarray(v.astype('uint8')) for v in video]
video = [encode_image(v) for v in video]
return video
def extract_audio(video_path):
import subprocess
audio_path = "temp_audio.wav"
subprocess.call(['ffmpeg', '-i', video_path, '-ab', '160k', '-ac', '2', '-ar', '44100', '-vn', audio_path])
return audio_path
def transcribe_audio(audio_file):
with open(audio_file, "rb") as f:
inputs = f.read()
inputs = ffmpeg_read(inputs, asr_pipeline.feature_extractor.sampling_rate)
inputs = {"array": inputs, "sampling_rate": asr_pipeline.feature_extractor.sampling_rate}
transcription = asr_pipeline(inputs, batch_size=8, generate_kwargs={"task": "translate"}, return_timestamps=False)["text"]
return transcription
@spaces.GPU
def analyze_video(prompt, video, progress=gr.Progress()):
start_time = time.time()
progress(0, desc="Initializing")
if isinstance(video, str):
video_path = video
else:
video_path = video.name
progress(0.1, desc="Encoding video")
encoded_video = encode_video(video_path)
progress(0.3, desc="Extracting audio")
# Extract audio and transcribe
audio_path = extract_audio(video_path)
progress(0.5, desc="Transcribing audio")
transcription = transcribe_audio(audio_path)
print(f"Transcription: {transcription}")
# Clean up temporary audio file
os.remove(audio_path)
progress(0.7, desc="Preparing context")
context = [
{"role": "user", "content": encoded_video},
{"role": "assistant", "content": f"Transcription of the video: {transcription}"},
{"role": "user", "content": prompt}
]
params = {
'sampling': True,
'top_p': 0.8,
'top_k': 100,
'temperature': 0.7,
'repetition_penalty': 1.05,
"max_new_tokens": 2048,
"max_inp_length": 4352,
"use_image_id": False,
"max_slice_nums": 1 if len(encoded_video) > 16 else 2
}
progress(0.8, desc="Generating response")
response = model.chat(image=None, msgs=context, tokenizer=tokenizer, **params)
progress(0.9, desc="Finalizing")
end_time = time.time()
processing_time = end_time - start_time
analysis_result = f"Analysis Result:\n{response}\n\n"
processing_time = f"Processing Time: {processing_time:.2f} seconds"
progress(1, desc="Complete")
return analysis_result, processing_time
with gr.Blocks(theme="NoCrypt/miku") as demo:
gr.Label("Video Analyzer with MiniCPM-V-2_6 and Whisper")
with gr.Accordion("Input (Work best with English videos)"):
with gr.Row():
video_input = gr.Video(label="Upload Video")
prompt_input = gr.Textbox(label="Prompt", value="Analyze this video, give me advice on how to improve it and score from 0 to 100 for each point")
with gr.Accordion("Output"):
with gr.Row():
analysis_result = gr.Textbox(label="Analysis Result")
processing_time = gr.Textbox(label="Processing Time")
analyze_button = gr.Button("Analyze Video")
analyze_button.click(fn=analyze_video, inputs=[prompt_input, video_input], outputs=[analysis_result, processing_time])
demo.launch()