Spaces:
Sleeping
Sleeping
import gradio as gr | |
import whisper | |
import numpy as np | |
import openai | |
import os | |
from gtts import gTTS | |
import json | |
import hashlib | |
import random | |
import string | |
import uuid | |
from datetime import date,datetime | |
from huggingface_hub import Repository, upload_file | |
import shutil | |
from helpers import dict_origin, dict_promptset | |
HF_TOKEN_WRITE = os.environ.get("HF_TOKEN_WRITE") | |
print("HF_TOKEN_WRITE", HF_TOKEN_WRITE) | |
today = date.today() | |
today_ymd = today.strftime("%Y%m%d") | |
def greet(name): | |
return "Hello " + name + "!!" | |
with open('app.css','r') as f: | |
css_file = f.read() | |
markdown=""" | |
# Polish ASR BIGOS workspace | |
""" | |
# TODO move to config | |
WORKING_DATASET_REPO_URL = "https://huggingface.co/datasets/goodmike31/working-db" | |
REPO_NAME = "goodmike31/working-db" | |
REPOSITORY_DIR = "data" | |
LOCAL_DIR = "data_local" | |
os.makedirs(LOCAL_DIR,exist_ok=True) | |
def dump_json(thing,file): | |
with open(file,'w+',encoding="utf8") as f: | |
json.dump(thing,f) | |
def get_unique_name(): | |
return ''.join([random.choice(string.ascii_letters | |
+ string.digits) for n in range(32)]) | |
def get_prompts(project_name, size, language_code,prompts_left_info): | |
print(f"Retrieving prompts for project {project_name} with method: {type} for language_code {language_code} of size {size}") | |
size = int(size) | |
promptset = dict_promptset[project_name][0:size] | |
prompts_left_info = size | |
return(promptset, promptset[0],prompts_left_info) | |
def save_recording_and_meta(project_name, recording, prompt_text, language_code, spk_name, spk_age, spk_accent, spk_city, spk_gender, spk_nativity, promptset, prompt_number, prompts_left_info): | |
#, name, age, gender): | |
# TODO save user data in the next version | |
current_prompt = prompt_text.strip() | |
print("current_prompt: ", current_prompt) | |
# check if prompt number is set | |
if prompt_number == None: | |
prompt_number = 1 | |
prompt_index = prompt_number - 1 | |
print("prompt_number: ", prompt_number) | |
print("promptset: ", promptset) | |
if prompt_number == len(promptset): | |
next_prompt = "All prompts recorded. Thank you! You can close the app now:)" | |
else: | |
next_prompt = promptset[prompt_number] | |
print("next_prompt: ", next_prompt) | |
# remove leading and trailing spaces | |
next_prompt =next_prompt.strip() | |
# increment prompt number | |
prompt_number = prompt_number + 1 | |
speaker_metadata={} | |
speaker_metadata['name'] = spk_name if spk_name != None else 'unknown' | |
speaker_metadata['gender'] = spk_gender if spk_gender != None else 'unknown' | |
speaker_metadata['age'] = spk_age if spk_age != None else 'unknown' | |
speaker_metadata['accent'] = spk_accent if spk_accent != None else 'unknown' | |
speaker_metadata['city'] = spk_city if spk_city != None else 'unknown' | |
speaker_metadata['nativity'] = spk_nativity if spk_nativity != None else 'unknown' | |
# TODO get ISO-693-1 codes | |
SAVE_ROOT_DIR = os.path.join(LOCAL_DIR, project_name, today_ymd, spk_name) | |
SAVE_DIR_AUDIO = os.path.join(SAVE_ROOT_DIR, "audio") | |
SAVE_DIR_META = os.path.join(SAVE_ROOT_DIR, "meta") | |
os.makedirs(SAVE_DIR_AUDIO, exist_ok=True) | |
os.makedirs(SAVE_DIR_META, exist_ok=True) | |
# Write audio to file | |
#audio_name = get_unique_name() | |
uuid_name = str(uuid.uuid4()) | |
audio_fn = uuid_name + ".wav" | |
audio_output_fp = os.path.join(SAVE_DIR_AUDIO, audio_fn) | |
print (f"Saving {recording} as {audio_output_fp}") | |
shutil.copy2(recording, audio_output_fp) | |
# Write metadata.json to file | |
meta_fn = uuid_name + '.metadata.jsonl' | |
json_file_path = os.path.join(SAVE_DIR_META, meta_fn) | |
now = datetime.now() | |
timestamp_str = now.strftime("%d/%m/%Y %H:%M:%S") | |
metadata= {'id':uuid_name, | |
'audio_file': audio_fn, | |
'language_code':language_code, | |
'prompt_number':prompt_number, | |
'prompt':current_prompt, | |
'name': speaker_metadata['name'], | |
'age': speaker_metadata['age'], | |
'gender': speaker_metadata['gender'], | |
'accent': speaker_metadata['accent'], | |
'nativity': speaker_metadata['nativity'], | |
'city': speaker_metadata['city'], | |
"date":today_ymd, | |
"timestamp": timestamp_str } | |
dump_json(metadata, json_file_path) | |
# Simply upload the audio file and metadata using the hub's upload_file | |
# Upload the audio | |
repo_audio_path = os.path.join(REPOSITORY_DIR, project_name, today_ymd, spk_name, "audio", audio_fn) | |
_ = upload_file(path_or_fileobj = audio_output_fp, | |
path_in_repo = repo_audio_path, | |
repo_id = REPO_NAME, | |
repo_type = 'dataset', | |
token = HF_TOKEN_WRITE | |
) | |
# Upload the metadata | |
repo_json_path = os.path.join(REPOSITORY_DIR, project_name, today_ymd, spk_name, "meta", meta_fn) | |
_ = upload_file(path_or_fileobj = json_file_path, | |
path_in_repo = repo_json_path, | |
repo_id = REPO_NAME, | |
repo_type = 'dataset', | |
token = HF_TOKEN_WRITE | |
) | |
output = print(f"Recording {audio_fn} and meta file {meta_fn} successfully saved to repo!") | |
prompts_left_info = prompts_left_info - 1 | |
# check if this is the last prompt | |
return [next_prompt, prompt_number, None, prompts_left_info] | |
def whisper_model_change(radio_whisper_model): | |
whisper_model = whisper.load_model(radio_whisper_model) | |
return(whisper_model) | |
def prompt_gpt_assistant(input_text, api_key, temperature): | |
#, role, template_prompt, template_answer): | |
#TODO add option to specify instruction | |
openai.api_key = api_key | |
#TODO add specific message for specific role | |
system_role_message="You are a helpful assistant" | |
messages = [ | |
{"role": "system", "content": system_role_message}] | |
if input_text: | |
messages.append( | |
{"role": "user", "content": input_text}, | |
) | |
chat_completion = openai.ChatCompletion.create( | |
model="gpt-3.5-turbo", | |
messages=messages, | |
temperature=temperature | |
) | |
reply = chat_completion.choices[0].message.content | |
#TODO save chat completion for future reuse | |
return reply | |
def voicebot_pipeline(audio): | |
asr_out = transcribe(audio) | |
gpt_out = prompt_gpt_assistant(asr_out) | |
tts_out = synthesize_speech(gpt_out) | |
return(tts_out) | |
def transcribe(audio, language_code, whisper_model, whisper_model_type): | |
if not whisper_model: | |
whisper_model=init_whisper_model(whisper_model_type) | |
print(f"Transcribing {audio} for language_code {language_code} and model {whisper_model_type}") | |
audio = whisper.load_audio(audio) | |
audio = whisper.pad_or_trim(audio) | |
mel = whisper.log_mel_spectrogram(audio) | |
options = whisper.DecodingOptions(language=language_code, without_timestamps=True, fp16=False) | |
result = whisper.decode(whisper_model, mel, options) | |
result_text = result.text | |
return result_text | |
def init_whisper_model(whisper_model_type): | |
print("Initializing whisper model") | |
print(whisper_model_type) | |
whisper_model = whisper.load_model(whisper_model_type) | |
return whisper_model | |
def synthesize_speech(text, language_code): | |
audioobj = gTTS(text = text, | |
lang = language_code, | |
slow = False) | |
audioobj.save("Temp.mp3") | |
return("Temp.mp3") | |
block = gr.Blocks(css=css_file) | |
with block: | |
#state variables | |
project_name = gr.State("voicebot") # voicebot is default for playground. For recording app, it is selected e.g. bridge | |
language_code = gr.State("pl") | |
prompts_type = gr.State() | |
promptset = gr.State("test.prompts.txt") | |
prompt_history = gr.State() | |
current_prompt = gr.State() | |
prompt_number = gr.State() | |
finished_recording = gr.State() | |
temperature = gr.State(0) | |
whisper_model_type = gr.State("base") | |
whisper_model = gr.State() | |
openai_api_key = gr.State() | |
google_api_key = gr.State() | |
azure_api_key = gr.State() | |
spk_age = gr.State("unknown") | |
spk_accent = gr.State("unknown") | |
spk_city = gr.State("unknown") | |
spk_gender = gr.State("unknown") | |
spk_nativity = gr.State("unknown") | |
spk_name = gr.State("unknown") | |
cities = sorted(dict_origin["Poland"]["cities"]) | |
# state handling functions | |
def change_project(choice): | |
print("Changing project to") | |
print(choice) | |
project=choice | |
return(project) | |
def change_prompts_type(choice): | |
print("Changing promptset type to") | |
print(choice) | |
prompts_type=choice | |
return(prompts_type) | |
def change_nativity(choice): | |
print("Changing speaker nativity to") | |
print(choice) | |
spk_nativity=choice | |
return(spk_nativity) | |
def change_accent(choice): | |
print("Changing speaker accent to") | |
print(choice) | |
spk_accent=choice | |
return(spk_accent) | |
def change_age(choice): | |
print("Changing speaker age to") | |
print(choice) | |
spk_age=choice | |
return(spk_age) | |
def change_city(choice): | |
print("Changing speaker city to") | |
print(choice) | |
spk_city=choice | |
return(spk_city) | |
def change_gender(choice): | |
print("Changing speaker gender to") | |
print(choice) | |
spk_gender=choice | |
return(spk_gender) | |
def change_language(choice): | |
if choice == "Polish": | |
language_code="pl" | |
print("Switching to Polish") | |
print("language_code") | |
print(language_code) | |
elif choice == "English": | |
language_code="en" | |
print("Switching to English") | |
print("language_code") | |
print(language_code) | |
return(language_code) | |
def change_whisper_model(choice): | |
whisper_model_type = choice | |
print("Switching Whisper model") | |
print(whisper_model_type) | |
whisper_model = init_whisper_model(whisper_model_type) | |
return [whisper_model_type, whisper_model] | |
def change_prompts_left(prompts_left, current_prompt, promptset_size): | |
prompts_left = promptset_size - current_prompt | |
return [prompts_left] | |
gr.Markdown(markdown) | |
with gr.Tabs(): | |
"""with gr.TabItem('General settings'): | |
radio_lang = gr.Radio(["Polish", "English"], label="Language", info="If none is selected, Polish is used") | |
radio_asr_type = gr.Radio(["Local", "Cloud"], label="Select ASR type", info="Cloud models are faster and more accurate, but costs money") | |
with gr.Accordion(label="Local ASR settings", open=False): | |
#radio_asr_type = gr.Radio(["Local", "Cloud"], label="Select ASR type", info="Cloud models are faster and more accurate, but costs money") | |
#radio_cloud_asr = gr.Radio(["Whisper", "Google", "Azure"], label="Select Cloud ASR provider", info="You need to provide API keys for specific service") | |
radio_whisper_model = gr.Radio(["tiny", "base", "small", "medium", "large"], label="Whisper ASR model (local)", info="Larger models are more accurate, but slower. Default - base") | |
with gr.Accordion(label="Cloud ASR settings", open=False): | |
radio_cloud_asr = gr.Radio(["Whisper", "Google", "Azure"], label="Select Cloud ASR provider", info="You need to provide API keys for specific service") | |
with gr.Accordion(label="Cloud API Keys",open=False): | |
gr.HTML("<p class=\"apikey\">Open AI API Key:</p>") | |
# API key textbox (password-style) | |
openai_api_key = gr.Textbox(label="", elem_id="pw") | |
gr.HTML("<p class=\"apikey\">Google Cloud API Key:</p>") | |
# API key textbox (password-style) | |
google_api_key = gr.Textbox(label="", elem_id="pw") | |
gr.HTML("<p class=\"apikey\">Azure Cloud API Key:</p>") | |
# API key textbox (password-style) | |
azure_api_key = gr.Textbox(label="", elem_id="pw") | |
with gr.Accordion(label="Chat GPT settings",open=False): | |
slider_temp = gr.Slider(minimum=0, maximum= 2, step=0.2, label="ChatGPT temperature") | |
""" | |
with gr.TabItem('Speaker information'): | |
with gr.Row(): | |
spk_name = gr.Textbox(placeholder="Your name", label="Name") | |
dropdown_spk_nativity = gr.Dropdown(["Polish", "Other"], label="Native language", info="") | |
dropdown_spk_gender = gr.Dropdown(["Male", "Female", "Other", "Prefer not to say"], label="Gender", info="") | |
dropdown_spk_age = gr.Dropdown(["under 20", "20-29", "30-39", "40-49", "50-59", "over 60"], label="Age", info="") | |
dropdown_spk_origin_city = gr.Dropdown(cities, label="Hometown", visible=True, info="Closest city to speaker's place of birth and upbringing") | |
#radio_gdpr_consent = gr.Radio(["Yes", "No"], label="Personal data processing consent", info="Do you agree for your personal data processing according to the policy (link)") | |
dropdown_spk_nativity.change(fn=change_nativity, inputs=dropdown_spk_nativity, outputs=spk_age) | |
dropdown_spk_gender.change(fn=change_gender, inputs=dropdown_spk_gender, outputs=spk_gender) | |
dropdown_spk_age.change(fn=change_age, inputs=dropdown_spk_age, outputs=spk_age) | |
dropdown_spk_origin_city.change(fn=change_city, inputs=dropdown_spk_origin_city, outputs=spk_city) | |
"""with gr.TabItem('Voicebot playground'): | |
mic_recording = gr.Audio(source="microphone", type="filepath", label='Record your voice') | |
with gr.Row(): | |
button_transcribe = gr.Button("Transcribe speech") | |
button_save_audio_and_trans = gr.Button("Save audio recording and transcription") | |
out_asr = gr.Textbox(placeholder="ASR output", | |
lines=2, | |
max_lines=5, | |
show_label=False) | |
with gr.Row(): | |
button_prompt_gpt = gr.Button("Prompt ChatGPT") | |
button_save_gpt_response = gr.Button("Save ChatGPT response") | |
out_gpt = gr.Textbox(placeholder="ChatGPT output", | |
lines=4, | |
max_lines=10, | |
show_label=False) | |
with gr.Row(): | |
button_synth_speech = gr.Button("Synthesize speech") | |
button_save_synth_audio = gr.Button("Save synthetic audio") | |
synth_recording = gr.Audio() | |
# Events actions | |
button_save_audio_and_trans.click(save_recording_and_meta, inputs=[project_name, mic_recording, out_asr, language_code, spk_age, spk_accent, spk_city, spk_gender, spk_nativity], outputs=[]) | |
button_transcribe.click(transcribe, inputs=[mic_recording, language_code, whisper_model,whisper_model_type], outputs=out_asr) | |
button_prompt_gpt.click(prompt "dates":["20230922"], | |
"speakers":["Test"]_gpt_assistant, inputs=[out_asr, openai_api_key, slider_temp], outputs=out_gpt) | |
button_synth_speech.click(synthesize_speech, inputs=[out_gpt, language_code], outputs=synth_recording) | |
radio_lang.change(fn=change_language, inputs=radio_lang, outputs=language_code) | |
radio_whisper_model.change(fn=change_whisper_model, inputs=radio_whisper_model, outputs=[whisper_model_type, whisper_model]) | |
""" | |
with gr.TabItem('Speech recordings app'): | |
with gr.Accordion(label="Project settings"): | |
radio_project = gr.Dropdown(["bridge"], label="Select project", info="") | |
#radio_promptset_type = gr.Radio(["New promptset generation", "Existing promptset use"], label="Language", value ="Existing promptset use", info="New promptset is generated using. Requires providing open AI key in general settings tab") | |
var_promptset_size = gr.Textbox(label="How many recordings do you intend to make? (max 200)") | |
button_get_prompts = gr.Button("Save settings and get first prompt!") | |
prompts_left_info = gr.Number(placeholder='',label="Recordings left",lines=1, max_lines=1, show_label=True, interactive=False) | |
prompt_text = gr.Textbox(placeholder='Prompt to read during recording',label="Prompt to read") | |
speech_recording = gr.Audio(source="microphone",label="Select 'record from microphone' and read the prompt displayed above", type="filepath") | |
radio_project.change(fn=change_project, inputs=radio_project, outputs=project_name) | |
#radio_promptset_type.change(fn=change_prompts_type, inputs=radio_promptset_type, outputs=prompts_type) | |
#prompts_left.change(change_prompts_left, inputs = [prompts_left, current_prompt, var_promptset_size], outputs = [prompts_left]) | |
button_save_and_next = gr.Button("Save recording and get the next prompt") | |
# TODO - add option to generate new promptset on the fly for new projects | |
button_get_prompts.click(get_prompts, inputs=[radio_project, var_promptset_size, language_code, prompts_left_info], outputs = [promptset, prompt_text, prompts_left_info]) | |
button_save_and_next.click(save_recording_and_meta, inputs=[project_name, speech_recording, prompt_text, language_code, spk_name, spk_age, spk_accent, spk_city, spk_gender, spk_nativity, promptset, prompt_number, prompts_left_info], outputs=[prompt_text, prompt_number, speech_recording,prompts_left_info]) | |
block.launch() |