# -*- coding: utf-8 -*-
"""Assignment-2-IT164_ajchri5

Automatically generated by Colab.

Original file is located at
    https://colab.research.google.com/drive/1RtE7mmtyUWwiuowgyQq4eCuH-ep_D1QQ
"""

# mount gd
from google.colab import drive
drive.mount('/content/drive')

# Commented out IPython magic to ensure Python compatibility.
# # token
# %%capture
# from google.colab import userdata
# hftoken=userdata.get('hftoken')

# Commented out IPython magic to ensure Python compatibility.
# # pi
# %%capture
# !pip install gradio
# !pip install huggingface_hub

# packages required for colab
!pip install gradio
!pip install transformers
!pip install torchaudio
!pip install fasttext

# fastText for language detection
!wget https://dl.fbaipublicfiles.com/fasttext/supervised-models/lid.176.bin

# imports required for colab
import gradio as gr
from transformers import WhisperProcessor, WhisperForConditionalGeneration, pipeline, EncoderDecoderCache
import torchaudio
import warnings
import fasttext
import pandas as pd
import csv
import os

# hides warnings with pysoundfile
warnings.filterwarnings("ignore", category=UserWarning, message="PySoundFile failed.*")

# load model 1 transcription
whisper_model_name = "openai/whisper-large"
processor = WhisperProcessor.from_pretrained(whisper_model_name)
whisper_model = WhisperForConditionalGeneration.from_pretrained(whisper_model_name)

# load model 2 translation
translation_model = pipeline("translation", model="Helsinki-NLP/opus-mt-ROMANCE-en")

# load additional model 3 language detection
lang_model = fasttext.load_model('lid.176.bin')  # pre-trained model

# app usage history
history_data = []

# save data csv
def saveData(text, language, translated_text, confidence_score):
    # gd path
    file_path = '/content/drive/MyDrive/IT164/a2prompt.csv'

    # check if file exists, if not make new one with headers
    file_exists = os.path.isfile(file_path)

    # open csv file to append data
    with open(file_path, 'a', newline='', encoding='utf-8') as f:
        w = csv.writer(f)
        if not file_exists:
            # write header if file is created
            w.writerow(['Text', 'Language', 'Translation', 'Confidence Score'])
        # write new data row
        w.writerow([text, language, translated_text, confidence_score])

# load audio input and transcribe
def transcribe_audio(audio_file, sampling_rate=48000):  # set to 48 kHz
    # load audio file with torchaudio
    waveform, sr = torchaudio.load(audio_file, normalize=True)

    # max 16kHz (resample)
    if sr != 16000:
        transform = torchaudio.transforms.Resample(orig_freq=sr, new_freq=16000)  # resample to 16 kHz
        waveform = transform(waveform)
        sr = 16000  # update as 16 kHz

    # whisperprocessor
    inputs = processor(waveform.squeeze(0).numpy(), return_tensors="pt", sampling_rate=sr)

    # generate transcription and handle "past_key_values deprecation" error
    past_key_values = None
    generated_ids = whisper_model.generate(
        inputs["input_features"],
        past_key_values=past_key_values
    )

    # encoderdecodercache (to handle past_key_values)
    if past_key_values is not None:
        past_key_values = EncoderDecoderCache.from_legacy_cache(past_key_values)

    return processor.decode(generated_ids[0], skip_special_tokens=True)

# detect language using fastText
def detect_language(text):
    result = lang_model.predict(text)  # predict language with fasttext
    language = result[0][0].replace('__label__', '')  # extract the predicted language label
    score = result[1][0]  # confidence score
    return language, score

# translate text (to english)
def translate_text_to_english(text, source_lang="fr"):
    # translate detected language
    translation = translation_model(text, src_lang=source_lang, tgt_lang="en")
    return translation[0]['translation_text']

# function to track history (save results to the list and save to csv)
def save_to_history(text, language, translation, confidence_score):
    history_data.append([text, language, translation, confidence_score])
    # save csv
    saveData(text, language, translation, confidence_score)

# process audio, transcribe, detect language, and translate
def process_audio(audio_file):
    transcription = transcribe_audio(audio_file, sampling_rate=48000)  # use 48 kHz initially (mac rate)
    language, score = detect_language(transcription)  # detect language of the transcription
    translated_text = translate_text_to_english(transcription, source_lang=language)  # translate
    save_to_history(transcription, language, translated_text, score)  # save results
    return transcription, language, score, translated_text

# update visibility of the history table in gradio
def update_vis(radio_value):
    if radio_value == 'show':
        return gr.DataFrame(pd.DataFrame(history_data, columns=["Text", "Language", "Translation", "Confidence Score"]), visible=True)
    else:
        return gr.DataFrame(pd.DataFrame(history_data, columns=["Text", "Language", "Translation", "Confidence Score"]), visible=False)

# gradio interface
with gr.Blocks() as demo:
    with gr.Row():
        with gr.Column():
            audio_input = gr.Audio(label="Record your voice", type="filepath")  # audio input
            transcription_output = gr.Textbox(label="Transcription")  # transcription output
            language_output = gr.Textbox(label="Detected Language")  # detected language output
            score_output = gr.Textbox(label="Confidence Score")  # confidence score output
            translated_output = gr.Textbox(label="Translated Text to English")  # translated text output
            process_button = gr.Button("Process Audio")  # button to process the audio

        with gr.Column():
            history = gr.Radio(['show', 'hide'], label="App usage history")  # "show" or "hide" (history)
            dataframe = gr.DataFrame(pd.DataFrame(history_data, columns=["Text", "Language", "Translation", "Confidence Score"]), visible=False)

        # button click (process audio and display output)
        process_button.click(fn=process_audio, inputs=[audio_input], outputs=[transcription_output, language_output, score_output, translated_output])
        history.change(fn=update_vis, inputs=history, outputs=dataframe)

demo.launch(debug=True)