import gradio as gr from huggingface_hub import snapshot_download from threading import Thread import os import time import gradio as gr import base64 import numpy as np import requests from server import serve repo_id = "gpt-omni/mini-omni" snapshot_download(repo_id, local_dir="./checkpoint", revision="main") IP='0.0.0.0' PORT=60808 thread = Thread(target=serve, daemon=True) thread.start() API_URL = "http://0.0.0.0:60808/chat" # recording parameters IN_FORMAT = pyaudio.paInt16 IN_CHANNELS = 1 IN_RATE = 24000 IN_CHUNK = 1024 IN_SAMPLE_WIDTH = 2 VAD_STRIDE = 0.5 # playing parameters OUT_FORMAT = pyaudio.paInt16 OUT_CHANNELS = 1 OUT_RATE = 24000 OUT_SAMPLE_WIDTH = 2 OUT_CHUNK = 5760 OUT_CHUNK = 4096 OUT_RATE = 24000 OUT_CHANNELS = 1 def run_vad(ori_audio, sr): _st = time.time() try: audio = np.frombuffer(ori_audio, dtype=np.int16) audio = audio.astype(np.float32) / 32768.0 sampling_rate = 16000 if sr != sampling_rate: audio = librosa.resample(audio, orig_sr=sr, target_sr=sampling_rate) vad_parameters = {} vad_parameters = VadOptions(**vad_parameters) speech_chunks = get_speech_timestamps(audio, vad_parameters) audio = collect_chunks(audio, speech_chunks) duration_after_vad = audio.shape[0] / sampling_rate if sr != sampling_rate: # resample to original sampling rate vad_audio = librosa.resample(audio, orig_sr=sampling_rate, target_sr=sr) else: vad_audio = audio vad_audio = np.round(vad_audio * 32768.0).astype(np.int16) vad_audio_bytes = vad_audio.tobytes() return duration_after_vad, vad_audio_bytes, round(time.time() - _st, 4) except Exception as e: msg = f"[asr vad error] audio_len: {len(ori_audio)/(sr*2):.3f} s, trace: {traceback.format_exc()}" print(msg) return -1, ori_audio, round(time.time() - _st, 4) def warm_up(): frames = b"\x00\x00" * 1024 * 2 # 1024 frames of 2 bytes each dur, frames, tcost = run_vad(frames, 16000) print(f"warm up done, time_cost: {tcost:.3f} s") warm_up() def determine_pause(): temp_audio = b"" vad_audio = b"" start_talking = False last_temp_audio = None while st.session_state.recording: status.success("Listening...") audio_bytes = stream.read(IN_CHUNK) temp_audio += audio_bytes if len(temp_audio) > IN_SAMPLE_WIDTH * IN_RATE * IN_CHANNELS * VAD_STRIDE: dur_vad, vad_audio_bytes, time_vad = run_vad(temp_audio, IN_RATE) print(f"duration_after_vad: {dur_vad:.3f} s, time_vad: {time_vad:.3f} s") if dur_vad > 0.2 and not start_talking: if last_temp_audio is not None: st.session_state.frames.append(last_temp_audio) start_talking = True if start_talking: st.session_state.frames.append(temp_audio) if dur_vad < 0.1 and start_talking: st.session_state.recording = False print(f"speech end detected. excit") last_temp_audio = temp_audio temp_audio = b"" def process_audio(audio): filepath = audio print(f"filepath: {filepath}") if filepath is None: return cnt = 0 with open(filepath, "rb") as f: data = f.read() base64_encoded = str(base64.b64encode(data), encoding="utf-8") files = {"audio": base64_encoded} tik = time.time() with requests.post(API_URL, json=files, stream=True) as response: try: for chunk in response.iter_content(chunk_size=OUT_CHUNK): if chunk: # Convert chunk to numpy array if cnt == 0: print(f"first chunk time cost: {time.time() - tik:.3f}") cnt += 1 audio_data = np.frombuffer(chunk, dtype=np.int16) audio_data = audio_data.reshape(-1, OUT_CHANNELS) yield OUT_RATE, audio_data.astype(np.int16) except Exception as e: print(f"error: {e}") def greet(name): return "Hello " + name + "!!" demo = gr.Interface(fn=greet, inputs="text", outputs="text") demo.launch()