RMSnow's picture
add backend inference and inferface output
0883aa1
# Copyright (c) 2023 Amphion.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
# This module is modified from [Whisper](https://github.com/openai/whisper.git).
# ## Citations
# ```bibtex
# @inproceedings{openai-whisper,
# author = {Alec Radford and
# Jong Wook Kim and
# Tao Xu and
# Greg Brockman and
# Christine McLeavey and
# Ilya Sutskever},
# title = {Robust Speech Recognition via Large-Scale Weak Supervision},
# booktitle = {{ICML}},
# series = {Proceedings of Machine Learning Research},
# volume = {202},
# pages = {28492--28518},
# publisher = {{PMLR}},
# year = {2023}
# }
# ```
#
import argparse
import os
import sys
import warnings
from typing import List, Optional, Tuple, Union, TYPE_CHECKING
import numpy as np
import torch
import tqdm
from .audio import SAMPLE_RATE, N_FRAMES, HOP_LENGTH, pad_or_trim, log_mel_spectrogram
from .decoding import DecodingOptions, DecodingResult
from .tokenizer import LANGUAGES, TO_LANGUAGE_CODE, get_tokenizer
from .utils import (
exact_div,
format_timestamp,
optional_int,
optional_float,
str2bool,
write_txt,
write_vtt,
write_srt,
)
if TYPE_CHECKING:
from .model import Whisper
def transcribe(
model: "Whisper",
audio: Union[str, np.ndarray, torch.Tensor],
*,
verbose: Optional[bool] = None,
temperature: Union[float, Tuple[float, ...]] = (0.0, 0.2, 0.4, 0.6, 0.8, 1.0),
compression_ratio_threshold: Optional[float] = 2.4,
logprob_threshold: Optional[float] = -1.0,
no_speech_threshold: Optional[float] = 0.6,
condition_on_previous_text: bool = True,
**decode_options,
):
"""
Transcribe an audio file using Whisper
Parameters
----------
model: Whisper
The Whisper model instance
audio: Union[str, np.ndarray, torch.Tensor]
The path to the audio file to open, or the audio waveform
verbose: bool
Whether to display the text being decoded to the console. If True, displays all the details,
If False, displays minimal details. If None, does not display anything
temperature: Union[float, Tuple[float, ...]]
Temperature for sampling. It can be a tuple of temperatures, which will be successively used
upon failures according to either `compression_ratio_threshold` or `logprob_threshold`.
compression_ratio_threshold: float
If the gzip compression ratio is above this value, treat as failed
logprob_threshold: float
If the average log probability over sampled tokens is below this value, treat as failed
no_speech_threshold: float
If the no_speech probability is higher than this value AND the average log probability
over sampled tokens is below `logprob_threshold`, consider the segment as silent
condition_on_previous_text: bool
if True, the previous output of the model is provided as a prompt for the next window;
disabling may make the text inconsistent across windows, but the model becomes less prone to
getting stuck in a failure loop, such as repetition looping or timestamps going out of sync.
decode_options: dict
Keyword arguments to construct `DecodingOptions` instances
Returns
-------
A dictionary containing the resulting text ("text") and segment-level details ("segments"), and
the spoken language ("language"), which is detected when `decode_options["language"]` is None.
"""
dtype = torch.float16 if decode_options.get("fp16", True) else torch.float32
if model.device == torch.device("cpu"):
if torch.cuda.is_available():
warnings.warn("Performing inference on CPU when CUDA is available")
if dtype == torch.float16:
warnings.warn("FP16 is not supported on CPU; using FP32 instead")
dtype = torch.float32
if dtype == torch.float32:
decode_options["fp16"] = False
mel = log_mel_spectrogram(audio)
if decode_options.get("language", None) is None:
if not model.is_multilingual:
decode_options["language"] = "en"
else:
if verbose:
print(
"Detecting language using up to the first 30 seconds. Use `--language` to specify the language"
)
segment = pad_or_trim(mel, N_FRAMES).to(model.device).to(dtype)
_, probs = model.detect_language(segment)
decode_options["language"] = max(probs, key=probs.get)
if verbose is not None:
print(
f"Detected language: {LANGUAGES[decode_options['language']].title()}"
)
language = decode_options["language"]
task = decode_options.get("task", "transcribe")
tokenizer = get_tokenizer(model.is_multilingual, language=language, task=task)
def decode_with_fallback(segment: torch.Tensor) -> DecodingResult:
temperatures = (
[temperature] if isinstance(temperature, (int, float)) else temperature
)
decode_result = None
for t in temperatures:
kwargs = {**decode_options}
if t > 0:
# disable beam_size and patience when t > 0
kwargs.pop("beam_size", None)
kwargs.pop("patience", None)
else:
# disable best_of when t == 0
kwargs.pop("best_of", None)
options = DecodingOptions(**kwargs, temperature=t)
decode_result = model.decode(segment, options)
needs_fallback = False
if (
compression_ratio_threshold is not None
and decode_result.compression_ratio > compression_ratio_threshold
):
needs_fallback = True # too repetitive
if (
logprob_threshold is not None
and decode_result.avg_logprob < logprob_threshold
):
needs_fallback = True # average log probability is too low
if not needs_fallback:
break
return decode_result
seek = 0
input_stride = exact_div(
N_FRAMES, model.dims.n_audio_ctx
) # mel frames per output token: 2
time_precision = (
input_stride * HOP_LENGTH / SAMPLE_RATE
) # time per output token: 0.02 (seconds)
all_tokens = []
all_segments = []
prompt_reset_since = 0
initial_prompt = decode_options.pop("initial_prompt", None) or []
if initial_prompt:
initial_prompt = tokenizer.encode(" " + initial_prompt.strip())
all_tokens.extend(initial_prompt)
def add_segment(
*, start: float, end: float, text_tokens: torch.Tensor, result: DecodingResult
):
text = tokenizer.decode(
[token for token in text_tokens if token < tokenizer.eot]
)
if len(text.strip()) == 0: # skip empty text output
return
all_segments.append(
{
"id": len(all_segments),
"seek": seek,
"start": start,
"end": end,
"text": text,
"tokens": text_tokens.tolist(),
"temperature": result.temperature,
"avg_logprob": result.avg_logprob,
"compression_ratio": result.compression_ratio,
"no_speech_prob": result.no_speech_prob,
}
)
if verbose:
line = f"[{format_timestamp(start)} --> {format_timestamp(end)}] {text}\n"
# compared to just `print(line)`, this replaces any character not representable using
# the system default encoding with an '?', avoiding UnicodeEncodeError.
sys.stdout.buffer.write(
line.encode(sys.getdefaultencoding(), errors="replace")
)
sys.stdout.flush()
# show the progress bar when verbose is False (otherwise the transcribed text will be printed)
num_frames = mel.shape[-1]
previous_seek_value = seek
with tqdm.tqdm(
total=num_frames, unit="frames", disable=verbose is not False
) as pbar:
while seek < num_frames:
timestamp_offset = float(seek * HOP_LENGTH / SAMPLE_RATE)
segment = pad_or_trim(mel[:, seek:], N_FRAMES).to(model.device).to(dtype)
segment_duration = segment.shape[-1] * HOP_LENGTH / SAMPLE_RATE
decode_options["prompt"] = all_tokens[prompt_reset_since:]
result: DecodingResult = decode_with_fallback(segment)
tokens = torch.tensor(result.tokens)
if no_speech_threshold is not None:
# no voice activity check
should_skip = result.no_speech_prob > no_speech_threshold
if (
logprob_threshold is not None
and result.avg_logprob > logprob_threshold
):
# don't skip if the logprob is high enough, despite the no_speech_prob
should_skip = False
if should_skip:
seek += segment.shape[
-1
] # fast-forward to the next segment boundary
continue
timestamp_tokens: torch.Tensor = tokens.ge(tokenizer.timestamp_begin)
consecutive = torch.where(timestamp_tokens[:-1] & timestamp_tokens[1:])[
0
].add_(1)
if (
len(consecutive) > 0
): # if the output contains two consecutive timestamp tokens
last_slice = 0
for current_slice in consecutive:
sliced_tokens = tokens[last_slice:current_slice]
start_timestamp_position = (
sliced_tokens[0].item() - tokenizer.timestamp_begin
)
end_timestamp_position = (
sliced_tokens[-1].item() - tokenizer.timestamp_begin
)
add_segment(
start=timestamp_offset
+ start_timestamp_position * time_precision,
end=timestamp_offset + end_timestamp_position * time_precision,
text_tokens=sliced_tokens[1:-1],
result=result,
)
last_slice = current_slice
last_timestamp_position = (
tokens[last_slice - 1].item() - tokenizer.timestamp_begin
)
seek += last_timestamp_position * input_stride
all_tokens.extend(tokens[: last_slice + 1].tolist())
else:
duration = segment_duration
timestamps = tokens[timestamp_tokens.nonzero().flatten()]
if (
len(timestamps) > 0
and timestamps[-1].item() != tokenizer.timestamp_begin
):
# no consecutive timestamps but it has a timestamp; use the last one.
# single timestamp at the end means no speech after the last timestamp.
last_timestamp_position = (
timestamps[-1].item() - tokenizer.timestamp_begin
)
duration = last_timestamp_position * time_precision
add_segment(
start=timestamp_offset,
end=timestamp_offset + duration,
text_tokens=tokens,
result=result,
)
seek += segment.shape[-1]
all_tokens.extend(tokens.tolist())
if not condition_on_previous_text or result.temperature > 0.5:
# do not feed the prompt tokens if a high temperature was used
prompt_reset_since = len(all_tokens)
# update progress bar
pbar.update(min(num_frames, seek) - previous_seek_value)
previous_seek_value = seek
return dict(
text=tokenizer.decode(all_tokens[len(initial_prompt) :]),
segments=all_segments,
language=language,
)
def cli():
from . import available_models
parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
parser.add_argument(
"audio", nargs="+", type=str, help="audio file(s) to transcribe"
)
parser.add_argument(
"--model",
default="small",
choices=available_models(),
help="name of the Whisper model to use",
)
parser.add_argument(
"--model_dir",
type=str,
default=None,
help="the path to save model files; uses ~/.cache/whisper by default",
)
parser.add_argument(
"--device",
default="cuda" if torch.cuda.is_available() else "cpu",
help="device to use for PyTorch inference",
)
parser.add_argument(
"--output_dir",
"-o",
type=str,
default=".",
help="directory to save the outputs",
)
parser.add_argument(
"--verbose",
type=str2bool,
default=True,
help="whether to print out the progress and debug messages",
)
parser.add_argument(
"--task",
type=str,
default="transcribe",
choices=["transcribe", "translate"],
help="whether to perform X->X speech recognition ('transcribe') or X->English translation ('translate')",
)
parser.add_argument(
"--language",
type=str,
default=None,
choices=sorted(LANGUAGES.keys())
+ sorted([k.title() for k in TO_LANGUAGE_CODE.keys()]),
help="language spoken in the audio, specify None to perform language detection",
)
parser.add_argument(
"--temperature", type=float, default=0, help="temperature to use for sampling"
)
parser.add_argument(
"--best_of",
type=optional_int,
default=5,
help="number of candidates when sampling with non-zero temperature",
)
parser.add_argument(
"--beam_size",
type=optional_int,
default=5,
help="number of beams in beam search, only applicable when temperature is zero",
)
parser.add_argument(
"--patience",
type=float,
default=None,
help="optional patience value to use in beam decoding, as in https://arxiv.org/abs/2204.05424, the default (1.0) is equivalent to conventional beam search",
)
parser.add_argument(
"--length_penalty",
type=float,
default=None,
help="optional token length penalty coefficient (alpha) as in https://arxiv.org/abs/1609.08144, uses simple length normalization by default",
)
parser.add_argument(
"--suppress_tokens",
type=str,
default="-1",
help="comma-separated list of token ids to suppress during sampling; '-1' will suppress most special characters except common punctuations",
)
parser.add_argument(
"--initial_prompt",
type=str,
default=None,
help="optional text to provide as a prompt for the first window.",
)
parser.add_argument(
"--condition_on_previous_text",
type=str2bool,
default=True,
help="if True, provide the previous output of the model as a prompt for the next window; disabling may make the text inconsistent across windows, but the model becomes less prone to getting stuck in a failure loop",
)
parser.add_argument(
"--fp16",
type=str2bool,
default=True,
help="whether to perform inference in fp16; True by default",
)
parser.add_argument(
"--temperature_increment_on_fallback",
type=optional_float,
default=0.2,
help="temperature to increase when falling back when the decoding fails to meet either of the thresholds below",
)
parser.add_argument(
"--compression_ratio_threshold",
type=optional_float,
default=2.4,
help="if the gzip compression ratio is higher than this value, treat the decoding as failed",
)
parser.add_argument(
"--logprob_threshold",
type=optional_float,
default=-1.0,
help="if the average log probability is lower than this value, treat the decoding as failed",
)
parser.add_argument(
"--no_speech_threshold",
type=optional_float,
default=0.6,
help="if the probability of the <|nospeech|> token is higher than this value AND the decoding has failed due to `logprob_threshold`, consider the segment as silence",
)
parser.add_argument(
"--threads",
type=optional_int,
default=0,
help="number of threads used by torch for CPU inference; supercedes MKL_NUM_THREADS/OMP_NUM_THREADS",
)
args = parser.parse_args().__dict__
model_name: str = args.pop("model")
model_dir: str = args.pop("model_dir")
output_dir: str = args.pop("output_dir")
device: str = args.pop("device")
os.makedirs(output_dir, exist_ok=True)
if model_name.endswith(".en") and args["language"] not in {"en", "English"}:
if args["language"] is not None:
warnings.warn(
f"{model_name} is an English-only model but receipted '{args['language']}'; using English instead."
)
args["language"] = "en"
temperature = args.pop("temperature")
temperature_increment_on_fallback = args.pop("temperature_increment_on_fallback")
if temperature_increment_on_fallback is not None:
temperature = tuple(
np.arange(temperature, 1.0 + 1e-6, temperature_increment_on_fallback)
)
else:
temperature = [temperature]
threads = args.pop("threads")
if threads > 0:
torch.set_num_threads(threads)
from . import load_model
model = load_model(model_name, device=device, download_root=model_dir)
for audio_path in args.pop("audio"):
result = transcribe(model, audio_path, temperature=temperature, **args)
audio_basename = os.path.basename(audio_path)
# save TXT
with open(
os.path.join(output_dir, audio_basename + ".txt"), "w", encoding="utf-8"
) as txt:
write_txt(result["segments"], file=txt)
# save VTT
with open(
os.path.join(output_dir, audio_basename + ".vtt"), "w", encoding="utf-8"
) as vtt:
write_vtt(result["segments"], file=vtt)
# save SRT
with open(
os.path.join(output_dir, audio_basename + ".srt"), "w", encoding="utf-8"
) as srt:
write_srt(result["segments"], file=srt)
if __name__ == "__main__":
cli()