rodrigomasini's picture
Update app.py
22b332a verified
raw
history blame
9.35 kB
import gradio as gr
import json
import os
from datetime import datetime, timezone
from apscheduler.schedulers.background import BackgroundScheduler
import pandas as pd
from huggingface_hub import snapshot_download
from src.display.about import (
CITATION_BUTTON_LABEL,
CITATION_BUTTON_TEXT,
EVALUATION_QUEUE_TEXT,
INTRODUCTION_TEXT,
LLM_BENCHMARKS_TEXT,
FAQ_TEXT,
TITLE,
)
from src.display.css_html_js import custom_css
from src.display.utils import (
BENCHMARK_COLS,
COLS,
EVAL_COLS,
EVAL_TYPES,
NUMERIC_INTERVALS,
TYPES,
AutoEvalColumn,
ModelType,
fields,
WeightType,
Precision
)
from src.envs import API, EVAL_REQUESTS_PATH, DYNAMIC_INFO_REPO, DYNAMIC_INFO_FILE_PATH, DYNAMIC_INFO_PATH, EVAL_RESULTS_PATH, H4_TOKEN, IS_PUBLIC, QUEUE_REPO, REPO_ID, RESULTS_REPO
from src.populate import get_evaluation_queue_df, get_leaderboard_df
from src.submission.submit import add_new_eval
from src.tools.collections import update_collections
from src.tools.plots import (
create_metric_plot_obj,
create_plot_df,
create_scores_df,
)
def restart_space():
API.restart_space(repo_id=REPO_ID, token=H4_TOKEN)
def init_space():
try:
print(EVAL_REQUESTS_PATH)
snapshot_download(
repo_id=QUEUE_REPO, local_dir=EVAL_REQUESTS_PATH, repo_type="dataset", tqdm_class=None, etag_timeout=30
)
except Exception:
restart_space()
try:
print(DYNAMIC_INFO_PATH)
snapshot_download(
repo_id=DYNAMIC_INFO_REPO, local_dir=DYNAMIC_INFO_PATH, repo_type="dataset", tqdm_class=None, etag_timeout=30
)
except Exception:
restart_space()
try:
print(EVAL_RESULTS_PATH)
snapshot_download(
repo_id=RESULTS_REPO, local_dir=EVAL_RESULTS_PATH, repo_type="dataset", tqdm_class=None, etag_timeout=30
)
except Exception:
restart_space()
raw_data, original_df = get_leaderboard_df(
results_path=EVAL_RESULTS_PATH,
requests_path=EVAL_REQUESTS_PATH,
dynamic_path=DYNAMIC_INFO_FILE_PATH,
cols=COLS,
benchmark_cols=BENCHMARK_COLS
)
update_collections(original_df.copy())
leaderboard_df = original_df.copy()
plot_df = create_plot_df(create_scores_df(raw_data))
(
finished_eval_queue_df,
running_eval_queue_df,
pending_eval_queue_df,
) = get_evaluation_queue_df(EVAL_REQUESTS_PATH, EVAL_COLS)
return leaderboard_df, original_df, plot_df, finished_eval_queue_df, running_eval_queue_df, pending_eval_queue_df
leaderboard_df, original_df, plot_df, finished_eval_queue_df, running_eval_queue_df, pending_eval_queue_df = init_space()
# Searching and filtering
def update_table(
hidden_df: pd.DataFrame,
columns: list,
type_query: list,
precision_query: str,
size_query: list,
show_deleted: bool,
show_merges: bool,
show_moe: bool,
show_flagged: bool,
query: str,
):
filtered_df = filter_models(hidden_df, type_query, size_query, precision_query, show_deleted, show_merges, show_moe, show_flagged)
filtered_df = filter_queries(query, filtered_df)
df = select_columns(filtered_df, columns)
return df
def load_query(request: gr.Request): # triggered only once at startup => read query parameter if it exists
query = request.query_params.get("query") or ""
return query, query # return one for the "search_bar", one for a hidden component that triggers a reload only if value has changed
def search_table(df: pd.DataFrame, query: str) -> pd.DataFrame:
return df[(df[AutoEvalColumn.dummy.name].str.contains(query, case=False))]
def select_columns(df: pd.DataFrame, columns: list) -> pd.DataFrame:
always_here_cols = [c.name for c in fields(AutoEvalColumn) if c.never_hidden]
dummy_col = [AutoEvalColumn.dummy.name]
#AutoEvalColumn.model_type_symbol.name,
#AutoEvalColumn.model.name,
# We use COLS to maintain sorting
filtered_df = df[
always_here_cols + [c for c in COLS if c in df.columns and c in columns] + dummy_col
]
return filtered_df
def filter_queries(query: str, filtered_df: pd.DataFrame):
"""Added by Abishek"""
final_df = []
if query != "":
queries = [q.strip() for q in query.split(";")]
for _q in queries:
_q = _q.strip()
if _q != "":
temp_filtered_df = search_table(filtered_df, _q)
if len(temp_filtered_df) > 0:
final_df.append(temp_filtered_df)
if len(final_df) > 0:
filtered_df = pd.concat(final_df)
filtered_df = filtered_df.drop_duplicates(
subset=[AutoEvalColumn.model.name, AutoEvalColumn.precision.name, AutoEvalColumn.revision.name]
)
return filtered_df
def filter_models(
df: pd.DataFrame, type_query: list, size_query: list, precision_query: list, show_deleted: bool, show_merges: bool, show_moe:bool, show_flagged: bool
) -> pd.DataFrame:
# Show all models
if show_deleted:
filtered_df = df
else: # Show only still on the hub models
filtered_df = df[df[AutoEvalColumn.still_on_hub.name] == True]
if not show_merges:
filtered_df = filtered_df[filtered_df[AutoEvalColumn.merged.name] == False]
if not show_moe:
filtered_df = filtered_df[filtered_df[AutoEvalColumn.moe.name] == False]
if not show_flagged:
filtered_df = filtered_df[filtered_df[AutoEvalColumn.flagged.name] == False]
type_emoji = [t[0] for t in type_query]
filtered_df = filtered_df.loc[df[AutoEvalColumn.model_type_symbol.name].isin(type_emoji)]
filtered_df = filtered_df.loc[df[AutoEvalColumn.precision.name].isin(precision_query + ["None"])]
numeric_interval = pd.IntervalIndex(sorted([NUMERIC_INTERVALS[s] for s in size_query]))
params_column = pd.to_numeric(df[AutoEvalColumn.params.name], errors="coerce")
mask = params_column.apply(lambda x: any(numeric_interval.contains(x)))
filtered_df = filtered_df.loc[mask]
return filtered_df
leaderboard_df = filter_models(
df=leaderboard_df,
type_query=[t.to_str(" : ") for t in ModelType],
size_query=list(NUMERIC_INTERVALS.keys()),
precision_query=[i.value.name for i in Precision],
show_deleted=False,
show_merges=False,
show_moe=True,
show_flagged=False
)
import unicodedata
def is_valid_unicode(char):
try:
unicodedata.name(char)
return True # Valid Unicode character
except ValueError:
return False # Invalid Unicode character
def remove_invalid_unicode(input_string):
if isinstance(input_string, str):
valid_chars = [char for char in input_string if is_valid_unicode(char)]
return ''.join(valid_chars)
else:
return input_string # Return non-string values as is
dummy1 = gr.Textbox(visible=False)
hidden_leaderboard_table_for_search = gr.components.Dataframe(
headers=COLS,
datatype=TYPES,
visible=False,
line_breaks=False,
interactive=False
)
def display(x, y):
# Assuming df is your DataFrame
for column in leaderboard_df.columns:
if leaderboard_df[column].dtype == 'object':
leaderboard_df[column] = leaderboard_df[column].apply(remove_invalid_unicode)
subset_df = leaderboard_df[COLS]
return subset_df
INTRODUCTION_TEXT = """
This is a copied space from Open LLM Leaderboard. Instead of displaying
the results as table this space was modified to simply provides a gradio API interface.
Using the following python script below, users can access the full leaderboard data easily.
```python
# Import dependencies
from gradio_client import Client
# Initialize the Gradio client with the API URL
client = Client("https://rodrigomasini-data-only-enterprise-scenarios-leaderboard.hf.space/")
try:
# Perform the API call
response = client.predict("","", api_name='/predict')
# Check if response it's directly accessible
if len(response) > 0:
print("Response received!")
headers = response.get('headers', [])
data = response.get('data', [])
print(headers)
# Remove commenst if you want to download the dataset and save in csv format
# Specify the path to your CSV file
#csv_file_path = 'foundational-models-benchmark.csv'
# Open the CSV file for writing
#with open(csv_file_path, mode='w', newline='', encoding='utf-8') as file:
# writer = csv.writer(file)
# Write the headers
# writer.writerow(headers)
# Write the data
# for row in data:
# writer.writerow(row)
#print(f"Results saved to {csv_file_path}")
# If the above line prints a string that looks like JSON, you can parse it with json.loads(response)
# Otherwise, you might need to adjust based on the actual structure of `response`
except Exception as e:
print(f"An error occurred: {e}")
```
"""
interface = gr.Interface(
fn=display,
inputs=[gr.Markdown(INTRODUCTION_TEXT, elem_classes="markdown-text"), dummy1],
outputs=[hidden_leaderboard_table_for_search]
)
scheduler = BackgroundScheduler()
scheduler.add_job(restart_space, "interval", seconds=1800)
scheduler.start()
interface.launch()