persian-tts-piper / piper /download.py
aigmixer
upload piper scripts
479b96c
raw
history blame
4.6 kB
"""Utility for downloading Piper voices."""
import json
import logging
import shutil
from pathlib import Path
from typing import Any, Dict, Iterable, Set, Tuple, Union
from urllib.request import urlopen
from .file_hash import get_file_hash
URL_FORMAT = "https://huggingface.co/rhasspy/piper-voices/resolve/v1.0.0/{file}"
_DIR = Path(__file__).parent
_LOGGER = logging.getLogger(__name__)
_SKIP_FILES = {"MODEL_CARD"}
class VoiceNotFoundError(Exception):
pass
def get_voices(
download_dir: Union[str, Path], update_voices: bool = False
) -> Dict[str, Any]:
"""Loads available voices from downloaded or embedded JSON file."""
download_dir = Path(download_dir)
voices_download = download_dir / "voices.json"
if update_voices:
# Download latest voices.json
voices_url = URL_FORMAT.format(file="voices.json")
_LOGGER.debug("Downloading %s to %s", voices_url, voices_download)
with urlopen(voices_url) as response, open(
voices_download, "wb"
) as download_file:
shutil.copyfileobj(response, download_file)
# Prefer downloaded file to embedded
voices_embedded = _DIR / "voices.json"
voices_path = voices_download if voices_download.exists() else voices_embedded
_LOGGER.debug("Loading %s", voices_path)
with open(voices_path, "r", encoding="utf-8") as voices_file:
return json.load(voices_file)
def ensure_voice_exists(
name: str,
data_dirs: Iterable[Union[str, Path]],
download_dir: Union[str, Path],
voices_info: Dict[str, Any],
):
assert data_dirs, "No data dirs"
if name not in voices_info:
raise VoiceNotFoundError(name)
voice_info = voices_info[name]
voice_files = voice_info["files"]
files_to_download: Set[str] = set()
for data_dir in data_dirs:
data_dir = Path(data_dir)
# Check sizes/hashes
for file_path, file_info in voice_files.items():
if file_path in files_to_download:
# Already planning to download
continue
file_name = Path(file_path).name
if file_name in _SKIP_FILES:
continue
data_file_path = data_dir / file_name
_LOGGER.debug("Checking %s", data_file_path)
if not data_file_path.exists():
_LOGGER.debug("Missing %s", data_file_path)
files_to_download.add(file_path)
continue
expected_size = file_info["size_bytes"]
actual_size = data_file_path.stat().st_size
if expected_size != actual_size:
_LOGGER.warning(
"Wrong size (expected=%s, actual=%s) for %s",
expected_size,
actual_size,
data_file_path,
)
files_to_download.add(file_path)
continue
expected_hash = file_info["md5_digest"]
actual_hash = get_file_hash(data_file_path)
if expected_hash != actual_hash:
_LOGGER.warning(
"Wrong hash (expected=%s, actual=%s) for %s",
expected_hash,
actual_hash,
data_file_path,
)
files_to_download.add(file_path)
continue
if (not voice_files) and (not files_to_download):
raise ValueError(f"Unable to find or download voice: {name}")
# Download missing files
download_dir = Path(download_dir)
for file_path in files_to_download:
file_name = Path(file_path).name
if file_name in _SKIP_FILES:
continue
file_url = URL_FORMAT.format(file=file_path)
download_file_path = download_dir / file_name
download_file_path.parent.mkdir(parents=True, exist_ok=True)
_LOGGER.debug("Downloading %s to %s", file_url, download_file_path)
with urlopen(file_url) as response, open(
download_file_path, "wb"
) as download_file:
shutil.copyfileobj(response, download_file)
_LOGGER.info("Downloaded %s (%s)", download_file_path, file_url)
def find_voice(name: str, data_dirs: Iterable[Union[str, Path]]) -> Tuple[Path, Path]:
for data_dir in data_dirs:
data_dir = Path(data_dir)
onnx_path = data_dir / f"{name}.onnx"
config_path = data_dir / f"{name}.onnx.json"
if onnx_path.exists() and config_path.exists():
return onnx_path, config_path
raise ValueError(f"Missing files for voice {name}")