import streamlit as st
import sparknlp
import os
import pandas as pd
import librosa

from sparknlp.base import *
from sparknlp.common import *
from sparknlp.annotator import *
from pyspark.ml import Pipeline
from sparknlp.pretrained import PretrainedPipeline
from pyspark.sql.types import *
import pyspark.sql.functions as F

# Page configuration
st.set_page_config(
    layout="wide", 
    initial_sidebar_state="auto"
)

# Custom CSS for styling
st.markdown("""
    <style>
        .main-title {
            font-size: 36px;
            color: #4A90E2;
            font-weight: bold;
            text-align: center;
        }
        .section {
            background-color: #f9f9f9;
            padding: 10px;
            border-radius: 10px;
            margin-top: 10px;
        }
        .section p, .section ul {
            color: #666666;
        }
    </style>
""", unsafe_allow_html=True)

@st.cache_resource
def init_spark():
    """Initialize Spark NLP."""
    return sparknlp.start()

@st.cache_resource
def create_pipeline(model):
    """Create a Spark NLP pipeline for audio processing."""
    audio_assembler = AudioAssembler() \
        .setInputCol("audio_content") \
        .setOutputCol("audio_assembler")

    speech_to_text = HubertForCTC \
        .pretrained(model)\
        .setInputCols("audio_assembler") \
        .setOutputCol("text")

    pipeline = Pipeline(stages=[
        audio_assembler,
        speech_to_text
    ])
    return pipeline

def fit_data(pipeline, fed_data):
    """Fit the data into the pipeline and return the transcription."""
    data, sampling_rate = librosa.load(fed_data, sr=16000)
    data = data.tolist()
    spark_df = spark.createDataFrame([[data]], ["audio_content"])

    model = pipeline.fit(spark_df)
    lp = LightPipeline(model)
    lp_result = lp.fullAnnotate(data)[0]
    return lp_result

def save_uploadedfile(uploadedfile, path):
    """Save the uploaded file to the specified path."""
    filepath = os.path.join(path, uploadedfile.name)
    with open(filepath, "wb") as f:
        if hasattr(uploadedfile, 'getbuffer'):
            f.write(uploadedfile.getbuffer())
        else:
            f.write(uploadedfile.read())

# Sidebar content
model_list = ["asr_hubert_large_ls960"]
model = st.sidebar.selectbox(
    "Choose the pretrained model",
    model_list,
    help="For more info about the models visit: https://sparknlp.org/models"
)

# Main content
st.markdown('<div class="main-title">Speech Recognition With HubertForCTC</div>', unsafe_allow_html=True)
st.markdown('<div class="section"><p>This demo transcribes audio files into texts using the <code>HubertForCTC</code> Annotator and advanced speech recognition models.</p></div>', unsafe_allow_html=True)

# Reference notebook link in sidebar
st.sidebar.markdown('Reference notebook:')
st.sidebar.markdown("""
    <a href="https://github.com/JohnSnowLabs/spark-nlp-workshop/blob/master/open-source-nlp/17.0.Speech_Recognition.ipynb">
        <img src="https://colab.research.google.com/assets/colab-badge.svg" style="zoom: 1.3" alt="Open In Colab"/>
    </a>
""", unsafe_allow_html=True)

# Load examples
AUDIO_FILE_PATH = "inputs"
audio_files = sorted(os.listdir(AUDIO_FILE_PATH))

selected_audio = st.selectbox("Select an audio", audio_files)

# Creating a simplified Python list of audio file types
audio_file_types = ["mp3", "flac", "wav", "aac", "ogg", "aiff", "wma", "m4a", "ape", "dsf", "dff", "midi", "mid", "opus", "amr"]
uploadedfile = st.file_uploader("Try it for yourself!", type=audio_file_types)

if uploadedfile:
    selected_audio = f"{AUDIO_FILE_PATH}/{uploadedfile.name}"
    save_uploadedfile(uploadedfile, AUDIO_FILE_PATH)
elif selected_audio:
    selected_audio = f"{AUDIO_FILE_PATH}/{selected_audio}"

# Audio playback and transcription
st.subheader("Play Audio")

with open(selected_audio, 'rb') as audio_file:
    audio_bytes = audio_file.read()
st.audio(audio_bytes)

spark = init_spark()
pipeline = create_pipeline(model)
output = fit_data(pipeline, selected_audio)

st.subheader(f"Transcription:")
st.markdown(f"{(output['text'][0].result).title()}")