fish-agent / app.py
PoTaTo721's picture
Update app.py
1c46b88 verified
raw
history blame
8.56 kB
import re
import gradio as gr
import numpy as np
import os
import threading
import subprocess
import sys
import time
from huggingface_hub import snapshot_download
from tools.fish_e2e import FishE2EAgent, FishE2EEventType
from tools.schema import ServeMessage, ServeTextPart, ServeVQPart
# Download Weights
os.makedirs("checkpoints", exist_ok=True)
snapshot_download(repo_id="fishaudio/fish-speech-1.4", local_dir="./checkpoints/fish-speech-1.4")
snapshot_download(repo_id="fishaudio/fish-agent-v0.1-3b", local_dir="./checkpoints/fish-agent-v0.1-3b")
class ChatState:
def __init__(self):
self.conversation = []
self.added_systext = False
self.added_sysaudio = False
def get_history(self):
results = []
for msg in self.conversation:
results.append({"role": msg.role, "content": self.repr_message(msg)})
# Process assistant messages to extract questions and update user messages
for i, msg in enumerate(results):
if msg["role"] == "assistant":
match = re.search(r"Question: (.*?)\n\nResponse:", msg["content"])
if match and i > 0 and results[i - 1]["role"] == "user":
# Update previous user message with extracted question
results[i - 1]["content"] += "\n" + match.group(1)
# Remove the Question/Answer format from assistant message
msg["content"] = msg["content"].split("\n\nResponse: ", 1)[1]
return results
def repr_message(self, msg: ServeMessage):
response = ""
for part in msg.parts:
if isinstance(part, ServeTextPart):
response += part.text
elif isinstance(part, ServeVQPart):
response += f"<audio {len(part.codes[0]) / 21:.2f}s>"
return response
def clear_fn():
return [], ChatState(), None, None, None
async def process_audio_input(
sys_audio_input, sys_text_input, audio_input, state: ChatState, text_input: str
):
if audio_input is None and not text_input:
raise gr.Error("No input provided")
agent = FishE2EAgent() # Create new agent instance for each request
# Convert audio input to numpy array
if isinstance(audio_input, tuple):
sr, audio_data = audio_input
elif text_input:
sr = 44100
audio_data = None
else:
raise gr.Error("Invalid audio format")
if isinstance(sys_audio_input, tuple):
sr, sys_audio_data = sys_audio_input
elif text_input:
sr = 44100
sys_audio_data = None
else:
raise gr.Error("Invalid audio format")
def append_to_chat_ctx(
part: ServeTextPart | ServeVQPart, role: str = "assistant"
) -> None:
if not state.conversation or state.conversation[-1].role != role:
state.conversation.append(ServeMessage(role=role, parts=[part]))
else:
state.conversation[-1].parts.append(part)
if state.added_systext is False and sys_text_input:
state.added_systext = True
append_to_chat_ctx(ServeTextPart(text=sys_text_input), role="system")
if text_input:
append_to_chat_ctx(ServeTextPart(text=text_input), role="user")
audio_data = None
result_audio = b""
async for event in agent.stream(
sys_audio_data,
audio_data,
sr,
1,
chat_ctx={
"messages": state.conversation,
"added_sysaudio": state.added_sysaudio,
},
):
if event.type == FishE2EEventType.USER_CODES:
append_to_chat_ctx(ServeVQPart(codes=event.vq_codes), role="user")
elif event.type == FishE2EEventType.SPEECH_SEGMENT:
result_audio += event.frame.data
np_audio = np.frombuffer(result_audio, dtype=np.int16)
append_to_chat_ctx(ServeVQPart(codes=event.vq_codes))
yield state.get_history(), (44100, np_audio), None, None
elif event.type == FishE2EEventType.TEXT_SEGMENT:
append_to_chat_ctx(ServeTextPart(text=event.text))
if result_audio:
np_audio = np.frombuffer(result_audio, dtype=np.int16)
yield state.get_history(), (44100, np_audio), None, None
else:
yield state.get_history(), None, None, None
np_audio = np.frombuffer(result_audio, dtype=np.int16)
yield state.get_history(), (44100, np_audio), None, None
async def process_text_input(
sys_audio_input, sys_text_input, state: ChatState, text_input: str
):
async for event in process_audio_input(
sys_audio_input, sys_text_input, None, state, text_input
):
yield event
def create_demo():
with gr.Blocks() as demo:
state = gr.State(ChatState())
with gr.Row():
# Left column (70%) for chatbot and notes
with gr.Column(scale=7):
chatbot = gr.Chatbot(
[],
elem_id="chatbot",
bubble_full_width=False,
height=600,
type="messages",
)
notes = gr.Markdown(
"""
# Fish Agent
1. This demo is the Fish Audio self-developed end-to-end language model Fish Agent 3B version.
2. You can find the code and weights in our official repository, but all related content is released under the CC BY-NC-SA 4.0 license.
3. The demo is an early beta version, and inference speed is yet to be optimized.
# Features
1. This model automatically integrates ASR and TTS components, requiring no external models, making it truly end-to-end rather than a three-stage process (ASR+LLM+TTS).
2. The model can use reference audio to control speaking voice.
3. It can generate audio with strong emotions and prosody.
"""
)
# Right column (30%) for controls
with gr.Column(scale=3):
sys_audio_input = gr.Audio(
sources=["upload"],
type="numpy",
label="Give a timbre for your assistant",
)
sys_text_input = gr.Textbox(
label="What is your assistant's role?",
value='You are a voice assistant created by Fish Audio, offering end-to-end voice interaction for a seamless user experience. You are required to first transcribe the user's speech, then answer it in the following format: "Question: [USER_SPEECH]\n\nResponse: [YOUR_RESPONSE]\n"。You are required to use the following voice in this conversation.',
type="text",
)
audio_input = gr.Audio(
sources=["microphone"], type="numpy", label="Speak your message"
)
text_input = gr.Textbox(label="Or type your message", type="text")
output_audio = gr.Audio(label="Assistant's Voice", type="numpy")
send_button = gr.Button("Send", variant="primary")
clear_button = gr.Button("Clear")
# Event handlers
audio_input.stop_recording(
process_audio_input,
inputs=[sys_audio_input, sys_text_input, audio_input, state, text_input],
outputs=[chatbot, output_audio, audio_input, text_input],
show_progress=True,
)
send_button.click(
process_text_input,
inputs=[sys_audio_input, sys_text_input, state, text_input],
outputs=[chatbot, output_audio, audio_input, text_input],
show_progress=True,
)
text_input.submit(
process_text_input,
inputs=[sys_audio_input, sys_text_input, state, text_input],
outputs=[chatbot, output_audio, audio_input, text_input],
show_progress=True,
)
clear_button.click(
clear_fn,
inputs=[],
outputs=[chatbot, state, audio_input, output_audio, text_input],
)
return demo
def run_api():
subprocess.run([sys.executable, "-m", "tools.api"])
if __name__ == "__main__":
# 创建并启动 API 线程
api_thread = threading.Thread(target=run_api, daemon=True)
api_thread.start()
# 给 API 一些时间启动
time.sleep(90)
# 创建并启动 Gradio demo
demo = create_demo()
demo.launch(share=True)