Spaces:
Running
on
Zero
Running
on
Zero
import os | |
import io | |
import spaces | |
import torch | |
import numpy as np | |
import time | |
import tiktoken | |
import scipy.io.wavfile as wavfile | |
from huggingface_hub import hf_hub_download | |
import importlib.util | |
import sys | |
def load_module_from_file(module_name, file_path): | |
"""Load a Python module from file path""" | |
spec = importlib.util.spec_from_file_location(module_name, file_path) | |
if spec is None or spec.loader is None: | |
raise ImportError(f"Cannot load module {module_name} from {file_path}") | |
module = importlib.util.module_from_spec(spec) | |
sys.modules[module_name] = module | |
spec.loader.exec_module(module) | |
return module | |
# Download and load required Python modules | |
py_modules = ["istftnet", "plbert", "models"] | |
for py_module in py_modules: | |
path = hf_hub_download(repo_id="hexgrad/Kokoro-82M", filename=f"{py_module}.py") | |
load_module_from_file(py_module, path) | |
# Load the kokoro module | |
kokoro_path = hf_hub_download(repo_id="hexgrad/Kokoro-82M", filename="kokoro.py") | |
kokoro = load_module_from_file("kokoro", kokoro_path) | |
# Import required functions | |
generate = kokoro.generate | |
normalize_text = kokoro.normalize_text | |
models = sys.modules['models'] | |
build_model = models.build_model | |
# Set HF_HOME for faster restarts | |
os.environ["HF_HOME"] = "/data/.huggingface" | |
class TTSModel: | |
"""Self-contained TTS model manager for Hugging Face Spaces""" | |
def __init__(self): | |
self.model = None | |
self.voices_dir = "voices" | |
self.model_repo = "hexgrad/Kokoro-82M" | |
os.makedirs(self.voices_dir, exist_ok=True) | |
def initialize(self): | |
"""Initialize model and download voices""" | |
try: | |
print("Initializing model...") | |
# Download model and config | |
model_path = hf_hub_download( | |
repo_id=self.model_repo, | |
filename="kokoro-v0_19.pth" | |
) | |
config_path = hf_hub_download( | |
repo_id=self.model_repo, | |
filename="config.json" | |
) | |
# Build model directly on GPU if available | |
with torch.cuda.device(0): | |
torch.cuda.set_device(0) | |
self.model = build_model(model_path, 'cuda') | |
self._model_on_gpu = True | |
# Download all available voices | |
voices = [ | |
"af_bella.pt", "af_nicole.pt", "af_sarah.pt", "af_sky.pt", "af.pt", | |
"am_adam.pt", "am_michael.pt", | |
"bf_emma.pt", "bf_isabella.pt", | |
"bm_george.pt", "bm_lewis.pt" | |
] | |
for voice in voices: | |
try: | |
# Download voice file | |
# Create full destination path | |
voice_path = os.path.join(self.voices_dir, voice) | |
print(f"Attempting to download voice {voice} to {voice_path}") | |
# Ensure directory exists | |
os.makedirs(self.voices_dir, exist_ok=True) | |
# Download with explicit destination | |
try: | |
downloaded_path = hf_hub_download( | |
repo_id=self.model_repo, | |
filename=f"voices/{voice}", | |
local_dir=self.voices_dir, | |
local_dir_use_symlinks=False, | |
force_filename=voice | |
) | |
print(f"Download completed to: {downloaded_path}") | |
# Verify file exists | |
if not os.path.exists(voice_path): | |
print(f"Warning: File not found at expected path {voice_path}") | |
print(f"Checking download location: {downloaded_path}") | |
if os.path.exists(downloaded_path): | |
print(f"Moving file from {downloaded_path} to {voice_path}") | |
os.rename(downloaded_path, voice_path) | |
else: | |
print(f"Verified voice file exists: {voice_path}") | |
except Exception as e: | |
print(f"Error downloading voice {voice}: {str(e)}") | |
import traceback | |
traceback.print_exc() | |
except Exception as e: | |
print(f"Error downloading voice {voice}: {str(e)}") | |
import traceback | |
traceback.print_exc() | |
print("Model initialization complete") | |
return True | |
except Exception as e: | |
print(f"Error initializing model: {str(e)}") | |
return False | |
def list_voices(self): | |
"""List available voices""" | |
voices = [] | |
try: | |
# Verify voices directory exists | |
if not os.path.exists(self.voices_dir): | |
print(f"Voices directory does not exist: {self.voices_dir}") | |
return voices | |
# Get list of files | |
files = os.listdir(self.voices_dir) | |
print(f"Found {len(files)} files in voices directory") | |
# Filter for .pt files | |
for file in files: | |
if file.endswith(".pt"): | |
voices.append(file[:-3]) # Remove .pt extension | |
print(f"Found voice: {file[:-3]}") | |
if not voices: | |
print("No voice files found in voices directory") | |
except Exception as e: | |
print(f"Error listing voices: {str(e)}") | |
import traceback | |
traceback.print_exc() | |
return sorted(voices) | |
def _ensure_model_on_gpu(self): | |
"""Ensure model is on GPU and stays there""" | |
if not hasattr(self, '_model_on_gpu') or not self._model_on_gpu: | |
print("Moving model to GPU...") | |
with torch.cuda.device(0): | |
torch.cuda.set_device(0) | |
# Move model to GPU using torch.nn.Module method | |
if hasattr(self.model, 'to'): | |
self.model.to('cuda') | |
else: | |
# Fallback for Munch object - move parameters individually | |
for name in self.model: | |
if isinstance(self.model[name], torch.Tensor): | |
self.model[name] = self.model[name].cuda() | |
self._model_on_gpu = True | |
def _generate_audio(self, text: str, voicepack: torch.Tensor, lang: str, speed: float) -> np.ndarray: | |
"""GPU-accelerated audio generation""" | |
try: | |
with torch.cuda.device(0): | |
torch.cuda.set_device(0) | |
# Move everything to GPU in a single context | |
if not hasattr(self, '_model_on_gpu') or not self._model_on_gpu: | |
print("Moving model to GPU...") | |
if hasattr(self.model, 'to'): | |
self.model.to('cuda') | |
else: | |
for name in self.model: | |
if isinstance(self.model[name], torch.Tensor): | |
self.model[name] = self.model[name].cuda() | |
self._model_on_gpu = True | |
# Move voicepack to GPU | |
voicepack = voicepack.cuda() | |
# Run generation with everything on GPU | |
audio, _ = generate( | |
self.model, | |
text, | |
voicepack, | |
lang=lang, | |
speed=speed | |
) | |
return audio | |
except Exception as e: | |
print(f"Error in audio generation: {str(e)}") | |
raise e | |
def chunk_text(self, text: str, max_chars: int = 300) -> list[str]: | |
"""Break text into chunks at natural boundaries""" | |
chunks = [] | |
current_chunk = "" | |
# Split on sentence boundaries first | |
sentences = text.replace(".", ".|").replace("!", "!|").replace("?", "?|").replace(";", ";|").split("|") | |
for sentence in sentences: | |
if not sentence.strip(): | |
continue | |
# If sentence is already too long, break on commas | |
if len(sentence) > max_chars: | |
parts = sentence.split(",") | |
for part in parts: | |
if len(current_chunk) + len(part) <= max_chars: | |
current_chunk += part + "," | |
else: | |
# If part is still too long, break on whitespace | |
if len(part) > max_chars: | |
words = part.split() | |
for word in words: | |
if len(current_chunk) + len(word) > max_chars: | |
chunks.append(current_chunk.strip()) | |
current_chunk = word + " " | |
else: | |
current_chunk += word + " " | |
else: | |
chunks.append(current_chunk.strip()) | |
current_chunk = part + "," | |
else: | |
if len(current_chunk) + len(sentence) <= max_chars: | |
current_chunk += sentence | |
else: | |
chunks.append(current_chunk.strip()) | |
current_chunk = sentence | |
if current_chunk: | |
chunks.append(current_chunk.strip()) | |
return chunks | |
def generate_speech(self, text: str, voice_name: str, speed: float = 1.0) -> tuple[np.ndarray, float]: | |
"""Generate speech from text. Returns (audio_array, duration)""" | |
try: | |
if not text or not voice_name: | |
raise ValueError("Text and voice name are required") | |
start_time = time.time() | |
# Initialize tokenizer | |
enc = tiktoken.get_encoding("cl100k_base") | |
total_tokens = len(enc.encode(text)) | |
# Normalize text | |
text = normalize_text(text) | |
if not text: | |
raise ValueError("Text is empty after normalization") | |
# Load voice and process within GPU context | |
with torch.cuda.device(0): | |
torch.cuda.set_device(0) | |
voice_path = os.path.join(self.voices_dir, f"{voice_name}.pt") | |
if not os.path.exists(voice_path): | |
raise ValueError(f"Voice not found: {voice_name}") | |
# Load voice directly to GPU | |
voicepack = torch.load(voice_path, map_location='cuda', weights_only=True) | |
# Break text into chunks for better memory management | |
chunks = self.chunk_text(text) | |
print(f"Processing {len(chunks)} chunks...") | |
# Ensure model is initialized and on GPU | |
if self.model is None: | |
print("Model not initialized, reinitializing...") | |
if not self.initialize(): | |
raise ValueError("Failed to initialize model") | |
# Move model to GPU if needed | |
if not hasattr(self, '_model_on_gpu') or not self._model_on_gpu: | |
print("Moving model to GPU...") | |
if hasattr(self.model, 'to'): | |
self.model.to('cuda') | |
else: | |
for name in self.model: | |
if isinstance(self.model[name], torch.Tensor): | |
self.model[name] = self.model[name].cuda() | |
self._model_on_gpu = True | |
# Process all chunks within same GPU context | |
audio_chunks = [] | |
for i, chunk in enumerate(chunks): | |
chunk_start = time.time() | |
chunk_audio = self._generate_audio( | |
text=chunk, | |
voicepack=voicepack, | |
lang=voice_name[0], | |
speed=speed | |
) | |
chunk_time = time.time() - chunk_start | |
print(f"Chunk {i+1}/{len(chunks)} processed in {chunk_time:.2f}s") | |
audio_chunks.append(chunk_audio) | |
# Concatenate audio chunks | |
audio = np.concatenate(audio_chunks) | |
# Calculate metrics | |
total_time = time.time() - start_time | |
tokens_per_second = total_tokens / total_time | |
print(f"\nProcessing Metrics:") | |
print(f"Total tokens: {total_tokens}") | |
print(f"Total time: {total_time:.2f}s") | |
print(f"Tokens per second: {tokens_per_second:.2f}") | |
return audio, len(audio) / 24000 # Return audio array and duration | |
except Exception as e: | |
print(f"Error generating speech: {str(e)}") | |
raise | |