Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| import json | |
| import ffmpeg | |
| import os | |
| from pathlib import Path | |
| import time | |
| from transformers import pipeline | |
| import torch | |
| # checkpoint = "openai/whisper-tiny" | |
| # checkpoint = "openai/whisper-base" | |
| checkpoint = "openai/whisper-small" | |
| if torch.cuda.is_available() and torch.cuda.device_count() > 0: | |
| from transformers import ( | |
| AutomaticSpeechRecognitionPipeline, | |
| WhisperForConditionalGeneration, | |
| WhisperProcessor, | |
| ) | |
| model = WhisperForConditionalGeneration.from_pretrained( | |
| checkpoint).to("cuda").half() | |
| processor = WhisperProcessor.from_pretrained(checkpoint) | |
| pipe = AutomaticSpeechRecognitionPipeline( | |
| model=model, | |
| tokenizer=processor.tokenizer, | |
| feature_extractor=processor.feature_extractor, | |
| batch_size=8, | |
| torch_dtype=torch.float16, | |
| device="cuda:0" | |
| ) | |
| else: | |
| pipe = pipeline(model=checkpoint) | |
| # TODO: no longer need to set these manually once the models have been updated on the Hub | |
| # whisper-tiny | |
| # pipe.model.generation_config.alignment_heads = [[2, 2], [3, 0], [3, 2], [3, 3], [3, 4], [3, 5]] | |
| # whisper-base | |
| # pipe.model.generation_config.alignment_heads = [[3, 1], [4, 2], [4, 3], [4, 7], [5, 1], [5, 2], [5, 4], [5, 6]] | |
| # whisper-small | |
| pipe.model.generation_config.alignment_heads = [[5, 3], [5, 9], [ | |
| 8, 0], [8, 4], [8, 7], [8, 8], [9, 0], [9, 7], [9, 9], [10, 5]] | |
| videos_out_path = Path("./videos_out") | |
| videos_out_path.mkdir(parents=True, exist_ok=True) | |
| samples_data = sorted(Path('examples').glob('*.json')) | |
| SAMPLES = [] | |
| for file in samples_data: | |
| with open(file) as f: | |
| sample = json.load(f) | |
| SAMPLES.append(sample) | |
| VIDEOS = list(map(lambda x: [x['video']], SAMPLES)) | |
| async def speech_to_text(video_in): | |
| """ | |
| Takes a video path to convert to audio, transcribe audio channel to text and char timestamps | |
| Using https://huggingface.co/tasks/automatic-speech-recognition pipeline | |
| """ | |
| video_in = video_in[0] if isinstance(video_in, list) else video_in | |
| if (video_in == None): | |
| raise ValueError("Video input undefined") | |
| video_path = Path(video_in.name) | |
| try: | |
| # convert video to audio 16k using PIPE to audio_memory | |
| audio_memory, _ = ffmpeg.input(video_path).output( | |
| '-', format="wav", ac=1, ar=pipe.feature_extractor.sampling_rate).overwrite_output().global_args('-loglevel', 'quiet').run(capture_stdout=True) | |
| except Exception as e: | |
| raise RuntimeError("Error converting video to audio") | |
| try: | |
| print(f'Transcribing via local model') | |
| output = pipe(audio_memory, chunk_length_s=10, | |
| stride_length_s=[4, 2], return_timestamps="word") | |
| transcription = output["text"] | |
| chunks = output["chunks"] | |
| timestamps_var = [{"word": chunk["text"], "timestamp":( | |
| chunk["timestamp"][0], chunk["timestamp"][1]), "state": True} for chunk in chunks] | |
| words = [(word['word'], '+' if word['state'] else '-') | |
| for word in timestamps_var] | |
| return (words, timestamps_var, video_in.name) | |
| except Exception as e: | |
| raise RuntimeError("Error Running inference with local model", e) | |
| async def cut_timestamps_to_video(video_in, timestamps_var): | |
| video_in = video_in[0] if isinstance(video_in, list) else video_in | |
| if (video_in == None or timestamps_var == None): | |
| raise ValueError("Inputs undefined") | |
| video_path = Path(video_in.name) | |
| video_file_name = video_path.stem | |
| timestamps_to_cut = [ | |
| (timestamps_var[i]['timestamp'][0], timestamps_var[i]['timestamp'][1]) | |
| for i in range(len(timestamps_var)) if timestamps_var[i]['state']] | |
| between_str = '+'.join( | |
| map(lambda t: f'between(t,{t[0]},{t[1]})', timestamps_to_cut)) | |
| if timestamps_to_cut: | |
| video_file = ffmpeg.input(video_path) | |
| video = video_file.video.filter( | |
| "select", f'({between_str})').filter("setpts", "N/FRAME_RATE/TB") | |
| audio = video_file.audio.filter( | |
| "aselect", f'({between_str})').filter("asetpts", "N/SR/TB") | |
| output_video = f'./videos_out/{video_file_name}.mp4' | |
| ffmpeg.concat(video, audio, v=1, a=1).output( | |
| output_video).overwrite_output().global_args('-loglevel', 'quiet').run() | |
| else: | |
| output_video = video_path | |
| return output_video | |
| css = """ | |
| #words-container { | |
| max-height: 400px; | |
| overflow-y: scroll !important; | |
| } | |
| """ | |
| with gr.Blocks(css=css) as demo: | |
| timestamps_var = gr.JSON(visible=False) | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown(""" | |
| # Whisper: Word-Level Video Trimming | |
| Quick edit a video by trimming out words. | |
| Using the [Huggingface Automatic Speech Recognition Pipeline](https://huggingface.co/tasks/automatic-speech-recognition) | |
| with [Whisper](https://huggingface.co/docs/transformers/model_doc/whisper) | |
| """) | |
| with gr.Row(): | |
| with gr.Column(): | |
| file_upload = gr.File( | |
| label="Upload Video File", file_count=1, scale=1) | |
| video_preview = gr.Video( | |
| label="Video Preview", scale=3, interactive=False) | |
| # with gr.Row(): | |
| # transcribe_btn = gr.Button( | |
| # "Transcribe Audio") | |
| with gr.Column(): | |
| text_in = gr.HighlightedText( | |
| label="Transcription", combine_adjacent=False, show_legend=True, color_map={"+": "green", "-": "red"}, elem_id="words-container") | |
| with gr.Row(): | |
| cut_btn = gr.Button("Cut Video") | |
| select_all_words = gr.Button("Select All Words") | |
| reset_words = gr.Button("Reset Words") | |
| video_out = gr.Video(label="Video Out") | |
| with gr.Row(): | |
| gr.Examples( | |
| fn=speech_to_text, | |
| examples=["./examples/ShiaLaBeouf.mp4", | |
| "./examples/zuckyuval.mp4", | |
| "./examples/cooking.mp4"], | |
| inputs=[file_upload], | |
| outputs=[text_in, timestamps_var, video_preview], | |
| cache_examples=True) | |
| with gr.Row(): | |
| gr.Markdown(""" | |
| #### Video Credits | |
| 1. [Cooking](https://vimeo.com/573792389) | |
| 1. [Shia LaBeouf "Just Do It"](https://www.youtube.com/watch?v=n2lTxIk_Dr0) | |
| 1. [Mark Zuckerberg & Yuval Noah Harari in Conversation](https://www.youtube.com/watch?v=Boj9eD0Wug8) | |
| """) | |
| def select_text(evt: gr.SelectData, timestamps_var): | |
| index = evt.index | |
| timestamps_var[index]['state'] = not timestamps_var[index]['state'] | |
| words = [(word['word'], '+' if word['state'] else '-') | |
| for word in timestamps_var] | |
| return timestamps_var, words | |
| def words_selection(timestamps_var, reset=False): | |
| if reset: | |
| for word in timestamps_var: | |
| word['state'] = True | |
| else: | |
| # reverse the state of all words | |
| for word in timestamps_var: | |
| word['state'] = False | |
| words = [(word['word'], '+' if word['state'] else '-') | |
| for word in timestamps_var] | |
| return timestamps_var, words | |
| file_upload.upload(speech_to_text, inputs=[file_upload], outputs=[ | |
| text_in, timestamps_var, video_preview]) | |
| select_all_words.click(words_selection, inputs=[timestamps_var], outputs=[ | |
| timestamps_var, text_in], queue=False, show_progress=False) | |
| reset_words.click(lambda x: words_selection(x, True), inputs=[timestamps_var], outputs=[ | |
| timestamps_var, text_in], queue=False, show_progress=False) | |
| text_in.select(select_text, inputs=timestamps_var, | |
| outputs=[timestamps_var, text_in], queue=False, show_progress=False) | |
| # transcribe_btn.click(speech_to_text, inputs=[file_upload], outputs=[ | |
| # text_in, transcription_var, timestamps_var, video_preview]) | |
| cut_btn.click(cut_timestamps_to_video, [ | |
| file_upload, timestamps_var], [video_out]) | |
| demo.queue() | |
| if __name__ == "__main__": | |
| demo.launch(debug=True) | |