from torch import cuda, bfloat16
from transformers import (
    BitsAndBytesConfig,
    AutoTokenizer,
    AutoModelForCausalLM,
    pipeline,
)

# These imports at the end because of torch/datamapplot issue in Zero GPU
import spaces
import gradio as gr

import logging
import os

import datamapplot
import duckdb
import numpy as np
import requests

from dotenv import load_dotenv
from gradio_huggingfacehub_search import HuggingfaceHubSearch
from bertopic import BERTopic
from bertopic.representation import KeyBERTInspired
from bertopic.representation import TextGeneration
from cuml.manifold import UMAP
from cuml.cluster import HDBSCAN
from huggingface_hub import HfApi
from sklearn.feature_extraction.text import CountVectorizer
from sentence_transformers import SentenceTransformer
from prompts import REPRESENTATION_PROMPT

"""
TODOs:
- Improve representation layer (Try with llamacpp or TextGeneration)
- Make it run on Zero GPU
- Try with more rows (Current: 50_000/10_000 -> Minimal Targett: 1_000_000/20_000)
- Export interactive plots and serve their HTML content (It doesn't work with gr.HTML)
"""

load_dotenv()
HF_TOKEN = os.getenv("HF_TOKEN")
assert HF_TOKEN is not None, "You need to set HF_TOKEN in your environment variables"


EXPORTS_REPOSITORY = os.getenv("EXPORTS_REPOSITORY")
assert (
    EXPORTS_REPOSITORY is not None
), "You need to set EXPORTS_REPOSITORY in your environment variables"

logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)

MAX_ROWS = 50_000
CHUNK_SIZE = 10_000


session = requests.Session()
sentence_model = SentenceTransformer("all-MiniLM-L6-v2")

# Representation model
bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_use_double_quant=True,
    bnb_4bit_compute_dtype=bfloat16,
)

model_id = "meta-llama/Llama-2-7b-chat-hf"
tokenizer = AutoTokenizer.from_pretrained(model_id)
model = AutoModelForCausalLM.from_pretrained(
    model_id,
    trust_remote_code=True,
    quantization_config=bnb_config,
    device_map="auto",
)
model.eval()
generator = pipeline(
    model=model,
    tokenizer=tokenizer,
    task="text-generation",
    temperature=0.1,
    max_new_tokens=500,
    repetition_penalty=1.1,
)
representation_model = TextGeneration(generator, prompt=REPRESENTATION_PROMPT)
# End of representation model

vectorizer_model = CountVectorizer(stop_words="english")

global_topic_model = None


def get_split_rows(dataset, config, split):
    config_size = session.get(
        f"https://datasets-server.huggingface.co/size?dataset={dataset}&config={config}",
        timeout=20,
    ).json()
    if "error" in config_size:
        raise Exception(f"Error fetching config size: {config_size['error']}")
    split_size = next(
        (s for s in config_size["size"]["splits"] if s["split"] == split),
        None,
    )
    if split_size is None:
        raise Exception(f"Error fetching split {split} in config {config}")
    return split_size["num_rows"]


def get_parquet_urls(dataset, config, split):
    parquet_files = session.get(
        f"https://datasets-server.huggingface.co/parquet?dataset={dataset}&config={config}&split={split}",
        timeout=20,
    ).json()
    if "error" in parquet_files:
        raise Exception(f"Error fetching parquet files: {parquet_files['error']}")
    parquet_urls = [file["url"] for file in parquet_files["parquet_files"]]
    logging.debug(f"Parquet files: {parquet_urls}")
    return ",".join(f"'{url}'" for url in parquet_urls)


def get_docs_from_parquet(parquet_urls, column, offset, limit):
    SQL_QUERY = f"SELECT {column} FROM read_parquet([{parquet_urls}]) LIMIT {limit} OFFSET {offset};"
    df = duckdb.sql(SQL_QUERY).to_df()
    logging.debug(f"Dataframe: {df.head(5)}")
    return df[column].tolist()


@spaces.GPU
def calculate_embeddings(docs):
    return sentence_model.encode(docs, show_progress_bar=True, batch_size=32)


def calculate_n_neighbors_and_components(n_rows):
    n_neighbors = min(max(n_rows // 20, 15), 100)
    n_components = 10 if n_rows > 1000 else 5  # Higher components for larger datasets
    return n_neighbors, n_components


@spaces.GPU
def fit_model(docs, embeddings, n_neighbors, n_components):
    global global_topic_model

    umap_model = UMAP(
        n_neighbors=n_neighbors,
        n_components=n_components,
        min_dist=0.0,
        metric="cosine",
        random_state=42,
    )

    hdbscan_model = HDBSCAN(
        min_cluster_size=max(
            5, n_neighbors // 2
        ),  # Reducing min_cluster_size for fewer outliers
        metric="euclidean",
        cluster_selection_method="eom",
        prediction_data=True,
    )

    new_model = BERTopic(
        language="english",
        # Sub-models
        embedding_model=sentence_model,  # Step 1 - Extract embeddings
        umap_model=umap_model,  # Step 2 - UMAP model
        hdbscan_model=hdbscan_model,  # Step 3 - Cluster reduced embeddings
        vectorizer_model=vectorizer_model,  # Step 4 - Tokenize topics
        representation_model=representation_model,  # Step 5 - Label topics
        # Hyperparameters
        top_n_words=10,
        verbose=True,
        min_topic_size=n_neighbors,  # Coherent with n_neighbors?
    )
    logging.info("Fitting new model")
    new_model.fit(docs, embeddings)
    logging.info("End fitting new model")

    global_topic_model = new_model

    logging.info("Global model updated")


def _push_to_hub(
    dataset_id,
    file_path,
):
    logging.info(f"Pushing file to hub: {dataset_id} on file {file_path}")

    file_name = file_path.split("/")[-1]
    api = HfApi(token=HF_TOKEN)
    try:
        logging.info(f"About to push {file_path} - {dataset_id}")
        api.upload_file(
            path_or_fileobj=file_path,
            path_in_repo=file_name,
            repo_id=EXPORTS_REPOSITORY,
            repo_type="dataset",
        )
    except Exception as e:
        logging.info("Failed to push file", e)
        raise


def generate_topics(dataset, config, split, column, nested_column, plot_type):
    global global_topic_model
    logging.info(
        f"Generating topics for {dataset} with config {config} {split} {column} {nested_column}"
    )

    parquet_urls = get_parquet_urls(dataset, config, split)
    split_rows = get_split_rows(dataset, config, split)
    logging.info(f"Split rows: {split_rows}")

    limit = min(split_rows, MAX_ROWS)
    n_neighbors, n_components = calculate_n_neighbors_and_components(limit)

    reduce_umap_model = UMAP(
        n_neighbors=n_neighbors,
        n_components=2,  # For visualization, keeping it for 2D
        min_dist=0.0,
        metric="cosine",
        random_state=42,
    )

    offset = 0
    rows_processed = 0

    base_model = None
    all_docs = []
    reduced_embeddings_list = []
    topics_info, topic_plot = None, None
    full_processing = split_rows <= MAX_ROWS
    message = (
        f"⚙️ Processing full dataset: 0 of ({split_rows} rows)"
        if full_processing
        else f"⚙️ Processing partial dataset 0 of ({limit} rows)"
    )
    yield (
        gr.Accordion(open=False),
        gr.DataFrame(value=[], interactive=False, visible=True),
        gr.Plot(value=None, visible=True),
        gr.Label({message: rows_processed / limit}, visible=True),
        "",
    )
    while offset < limit:
        docs = get_docs_from_parquet(parquet_urls, column, offset, CHUNK_SIZE)
        if not docs:
            break

        logging.info(
            f"----> Processing chunk: {offset=} {CHUNK_SIZE=} with {len(docs)} docs"
        )

        embeddings = calculate_embeddings(docs)
        fit_model(docs, embeddings, n_neighbors, n_components)

        if base_model is None:
            base_model = global_topic_model
        else:
            updated_model = BERTopic.merge_models([base_model, global_topic_model])
            nr_new_topics = len(set(updated_model.topics_)) - len(
                set(base_model.topics_)
            )
            new_topics = list(updated_model.topic_labels_.values())[-nr_new_topics:]
            logging.info(f"The following topics are newly found: {new_topics}")
            base_model = updated_model

        reduced_embeddings = reduce_umap_model.fit_transform(embeddings)
        reduced_embeddings_list.append(reduced_embeddings)

        all_docs.extend(docs)
        reduced_embeddings_array = np.vstack(reduced_embeddings_list)

        topics_info = base_model.get_topic_info()
        all_topics, _ = base_model.transform(all_docs)
        all_topics = np.array(all_topics)

        topic_plot = (
            base_model.visualize_document_datamap(
                docs=all_docs,
                reduced_embeddings=reduced_embeddings_array,
                title=dataset,
                width=800,
                height=700,
                arrowprops={
                    "arrowstyle": "wedge,tail_width=0.5",
                    "connectionstyle": "arc3,rad=0.05",
                    "linewidth": 0,
                    "fc": "#33333377",
                },
                dynamic_label_size=False,
                # label_wrap_width=12,
                # label_over_points=True,
                # dynamic_label_size=True,
                # max_font_size=36,
                # min_font_size=4,
            )
            if plot_type == "DataMapPlot"
            else base_model.visualize_documents(
                docs=all_docs,
                reduced_embeddings=reduced_embeddings_array,
                custom_labels=True,
                title=dataset,
            )
        )

        rows_processed += len(docs)
        progress = min(rows_processed / limit, 1.0)
        logging.info(f"Progress: {progress} % - {rows_processed} of {limit}")
        message = (
            f"⚙️ Processing full dataset: {rows_processed} of {limit}"
            if full_processing
            else f"⚙️ Processing partial dataset: {rows_processed} of {limit} rows"
        )

        yield (
            gr.Accordion(open=False),
            topics_info,
            topic_plot,
            gr.Label({message: progress}, visible=True),
            "",
        )

        offset += CHUNK_SIZE

    logging.info("Finished processing all data")

    plot_png = f"{dataset.replace('/', '-')}-{plot_type.lower()}.png"
    if plot_type == "DataMapPlot":
        topic_plot.savefig(plot_png, format="png", dpi=300)
    else:
        topic_plot.write_image(plot_png)

    _push_to_hub(dataset, plot_png)
    plot_png_link = (
        f"https://huggingface.co/datasets/{EXPORTS_REPOSITORY}/blob/main/{plot_png}"
    )
    yield (
        gr.Accordion(open=False),
        topics_info,
        topic_plot,
        gr.Label(
            {f"✅ Done: {rows_processed} rows have been processed": 1.0}, visible=True
        ),
        f"[![Download as PNG](https://img.shields.io/badge/Download_as-PNG-red)]({plot_png_link})",
    )
    cuda.empty_cache()


with gr.Blocks() as demo:
    gr.Markdown("# 💠 Dataset Topic Discovery 🔭")
    gr.Markdown("## Select dataset and text column")
    data_details_accordion = gr.Accordion("Data details", open=True)
    with data_details_accordion:
        with gr.Row():
            with gr.Column(scale=3):
                dataset_name = HuggingfaceHubSearch(
                    label="Hub Dataset ID",
                    placeholder="Search for dataset id on Huggingface",
                    search_type="dataset",
                )
            subset_dropdown = gr.Dropdown(label="Subset", visible=False)
            split_dropdown = gr.Dropdown(label="Split", visible=False)

        with gr.Accordion("Dataset preview", open=False):

            @gr.render(inputs=[dataset_name, subset_dropdown, split_dropdown])
            def embed(name, subset, split):
                html_code = f"""
                <iframe
                src="https://huggingface.co/datasets/{name}/embed/viewer/{subset}/{split}"
                frameborder="0"
                width="100%"
                height="600px"
                ></iframe>
                    """
                return gr.HTML(value=html_code)

        with gr.Row():
            text_column_dropdown = gr.Dropdown(label="Text column name")
            nested_text_column_dropdown = gr.Dropdown(
                label="Nested text column name", visible=False
            )
            plot_type_radio = gr.Radio(
                ["DataMapPlot", "Plotly"],
                value="DataMapPlot",
                label="Choose the plot type",
                interactive=True,
            )
        generate_button = gr.Button("Generate Topics", variant="primary")

    gr.Markdown("## Data map")
    full_topics_generation_label = gr.Label(visible=False, show_label=False)
    open_png_label = gr.Markdown()
    topics_plot = gr.Plot()
    with gr.Accordion("Topics Info", open=False):
        topics_df = gr.DataFrame(interactive=False, visible=True)
    generate_button.click(
        generate_topics,
        inputs=[
            dataset_name,
            subset_dropdown,
            split_dropdown,
            text_column_dropdown,
            nested_text_column_dropdown,
            plot_type_radio,
        ],
        outputs=[
            data_details_accordion,
            topics_df,
            topics_plot,
            full_topics_generation_label,
            open_png_label,
        ],
    )

    def _resolve_dataset_selection(
        dataset: str, default_subset: str, default_split: str, text_feature
    ):
        if "/" not in dataset.strip().strip("/"):
            return {
                subset_dropdown: gr.Dropdown(visible=False),
                split_dropdown: gr.Dropdown(visible=False),
                text_column_dropdown: gr.Dropdown(label="Text column name"),
                nested_text_column_dropdown: gr.Dropdown(visible=False),
            }
        info_resp = session.get(
            f"https://datasets-server.huggingface.co/info?dataset={dataset}", timeout=20
        ).json()
        if "error" in info_resp:
            return {
                subset_dropdown: gr.Dropdown(visible=False),
                split_dropdown: gr.Dropdown(visible=False),
                text_column_dropdown: gr.Dropdown(label="Text column name"),
                nested_text_column_dropdown: gr.Dropdown(visible=False),
            }
        subsets: list[str] = list(info_resp["dataset_info"])
        subset = default_subset if default_subset in subsets else subsets[0]
        splits: list[str] = list(info_resp["dataset_info"][subset]["splits"])
        split = default_split if default_split in splits else splits[0]
        features = info_resp["dataset_info"][subset]["features"]

        def _is_string_feature(feature):
            return isinstance(feature, dict) and feature.get("dtype") == "string"

        text_features = [
            feature_name
            for feature_name, feature in features.items()
            if _is_string_feature(feature)
        ]
        nested_features = [
            feature_name
            for feature_name, feature in features.items()
            if isinstance(feature, dict)
            and isinstance(next(iter(feature.values())), dict)
        ]
        nested_text_features = [
            feature_name
            for feature_name in nested_features
            if any(
                _is_string_feature(nested_feature)
                for nested_feature in features[feature_name].values()
            )
        ]
        if not text_feature:
            return {
                subset_dropdown: gr.Dropdown(
                    value=subset, choices=subsets, visible=len(subsets) > 1
                ),
                split_dropdown: gr.Dropdown(
                    value=split, choices=splits, visible=len(splits) > 1
                ),
                text_column_dropdown: gr.Dropdown(
                    choices=text_features + nested_text_features,
                    label="Text column name",
                ),
                nested_text_column_dropdown: gr.Dropdown(visible=False),
            }
        if text_feature in nested_text_features:
            nested_keys = [
                feature_name
                for feature_name, feature in features[text_feature].items()
                if _is_string_feature(feature)
            ]
            return {
                subset_dropdown: gr.Dropdown(
                    value=subset, choices=subsets, visible=len(subsets) > 1
                ),
                split_dropdown: gr.Dropdown(
                    value=split, choices=splits, visible=len(splits) > 1
                ),
                text_column_dropdown: gr.Dropdown(
                    choices=text_features + nested_text_features,
                    label="Text column name",
                ),
                nested_text_column_dropdown: gr.Dropdown(
                    value=nested_keys[0],
                    choices=nested_keys,
                    label="Nested text column name",
                    visible=True,
                ),
            }
        return {
            subset_dropdown: gr.Dropdown(
                value=subset, choices=subsets, visible=len(subsets) > 1
            ),
            split_dropdown: gr.Dropdown(
                value=split, choices=splits, visible=len(splits) > 1
            ),
            text_column_dropdown: gr.Dropdown(
                choices=text_features + nested_text_features, label="Text column name"
            ),
            nested_text_column_dropdown: gr.Dropdown(visible=False),
        }

    @dataset_name.change(
        inputs=[dataset_name],
        outputs=[
            subset_dropdown,
            split_dropdown,
            text_column_dropdown,
            nested_text_column_dropdown,
        ],
    )
    def show_input_from_subset_dropdown(dataset: str) -> dict:
        return _resolve_dataset_selection(
            dataset, default_subset="default", default_split="train", text_feature=None
        )

    @subset_dropdown.change(
        inputs=[dataset_name, subset_dropdown],
        outputs=[
            subset_dropdown,
            split_dropdown,
            text_column_dropdown,
            nested_text_column_dropdown,
        ],
    )
    def show_input_from_subset_dropdown(dataset: str, subset: str) -> dict:
        return _resolve_dataset_selection(
            dataset, default_subset=subset, default_split="train", text_feature=None
        )

    @split_dropdown.change(
        inputs=[dataset_name, subset_dropdown, split_dropdown],
        outputs=[
            subset_dropdown,
            split_dropdown,
            text_column_dropdown,
            nested_text_column_dropdown,
        ],
    )
    def show_input_from_split_dropdown(dataset: str, subset: str, split: str) -> dict:
        return _resolve_dataset_selection(
            dataset, default_subset=subset, default_split=split, text_feature=None
        )

    @text_column_dropdown.change(
        inputs=[dataset_name, subset_dropdown, split_dropdown, text_column_dropdown],
        outputs=[
            subset_dropdown,
            split_dropdown,
            text_column_dropdown,
            nested_text_column_dropdown,
        ],
    )
    def show_input_from_text_column_dropdown(
        dataset: str, subset: str, split: str, text_column
    ) -> dict:
        return _resolve_dataset_selection(
            dataset,
            default_subset=subset,
            default_split=split,
            text_feature=text_column,
        )


demo.launch()