import asyncio import shutil import tempfile import gradio as gr import pandas as pd import plotly.express as px import src.constants as constants from src.constants import TASKS from src.hub import glob, load_json_file def fetch_result_paths(): path = f"{constants.RESULTS_DATASET_ID}/**/**/*.json" return glob(path) def sort_result_paths_per_model(paths): from collections import defaultdict d = defaultdict(list) for path in paths: model_id, _ = path[len(constants.RESULTS_DATASET_ID) + 1 :].rsplit("/", 1) d[model_id].append(path) return {model_id: sorted(paths) for model_id, paths in d.items()} def update_load_results_component(): return (gr.Button("Load", interactive=True),) * 2 async def load_results_dataframe(model_id, result_paths_per_model=None): if not model_id or not result_paths_per_model: return result_paths = result_paths_per_model[model_id] results = await asyncio.gather(*[load_json_file(path) for path in result_paths]) data = {"results": {}, "configs": {}} for result in results: data["results"].update(result["results"]) data["configs"].update(result["configs"]) model_name = result.get("model_name", "Model") df = pd.json_normalize([data]) # df.columns = df.columns.str.split(".") # .split return a list instead of a tuple return df.set_index(pd.Index([model_name])) async def load_results(model_ids, result_paths_per_model=None): dfs = await asyncio.gather(*[load_results_dataframe(model_id, result_paths_per_model) for model_id in model_ids]) if dfs: return pd.concat(dfs) def display_results(df, task, hide_std_errors, show_only_differences): if df is None: return None, None df = df.T.rename_axis(columns=None) return ( display_tab("results", df, task, hide_std_errors=hide_std_errors), display_tab("configs", df, task, show_only_differences=show_only_differences), ) def display_tab(tab, df, task, hide_std_errors=True, show_only_differences=False): if show_only_differences: any_difference = df.ne(df.iloc[:, 0], axis=0).any(axis=1) df = df.style.format(escape="html", na_rep="") # Hide rows df.hide( [ row for row in df.index if ( not row.startswith(f"{tab}.") or row.startswith(f"{tab}.leaderboard.") or row.endswith(".alias") or ( not row.startswith(f"{tab}.{task}") if task != "All" else row.startswith(f"{tab}.leaderboard_arc_challenge") # Hide legacy ARC ) # Hide MATH fewshot_config.samples: or (row.startswith(f"{tab}.leaderboard_math") and row.endswith("fewshot_config.samples")) # Hide std errors or (hide_std_errors and row.endswith("_stderr,none")) # Hide non-different rows or (show_only_differences and not any_difference[row]) ) ], axis="index", ) # Color metric result cells idx = pd.IndexSlice colored_rows = idx[ [ row for row in df.index if row.endswith("acc,none") or row.endswith("acc_norm,none") or row.endswith("exact_match,none") ] ] # Apply only on numeric cells, otherwise the background gradient will not work subset = idx[colored_rows, idx[:]] df.background_gradient(cmap="PiYG", vmin=0, vmax=1, subset=subset, axis=None) # Format index values: remove prefix and suffix start = len(f"{tab}.leaderboard_") if task == "All" else len(f"{tab}.{task} ") df.format_index(lambda idx: idx[start:].removesuffix(",none"), axis="index") # Fix overflow df.set_table_styles( [ { "selector": "td", "props": [("overflow-wrap", "break-word"), ("max-width", "1px")], }, { "selector": ".col_heading", "props": [("width", f"{100 / len(df.columns)}%")], }, ] ) return df.to_html() def update_tasks_component(): return ( gr.Radio( ["All"] + list(constants.TASKS.values()), label="Tasks", info="Evaluation tasks to be displayed", value="All", visible=True, ), ) * 2 def clear_results(): # model_ids, dataframe, load_results_btn, load_configs_btn, results_task, configs_task return ( gr.Dropdown(value=[]), None, *(gr.Button("Load", interactive=False),) * 2, *( gr.Radio( ["All"] + list(constants.TASKS.values()), label="Tasks", info="Evaluation tasks to be displayed", value="All", visible=False, ), ) * 2, ) def display_loading_message_for_results(): return ("

Loading...

",) * 2 def plot_results(df, task): if df is not None: df = df[ [ col for col in df.columns if col.startswith("results.") and (col.endswith("acc,none") or col.endswith("acc_norm,none") or col.endswith("exact_match,none")) ] ] tasks = {key: tupl[0] for key, tupl in TASKS.items()} tasks["leaderboard_math"] = tasks["leaderboard_math_hard"] subtasks = {tupl[1]: tupl[0] for tupl in constants.SUBTASKS.get(task, [])} if task == "All": df = df[[col for col in df.columns if col.split(".")[1] in tasks]] # - IFEval: Calculate average of both strict accuracies ifeval_mean = df[ [ "results.leaderboard_ifeval.inst_level_strict_acc,none", "results.leaderboard_ifeval.prompt_level_strict_acc,none", ] ].mean(axis=1) df = df.drop(columns=[col for col in df.columns if col.split(".")[1] == "leaderboard_ifeval"]) loc = df.columns.get_loc("results.leaderboard_math_hard.exact_match,none") df.insert(loc - 1, "results.leaderboard_ifeval", ifeval_mean) # Rename df = df.rename(columns=lambda col: tasks[col.split(".")[1]]) else: df = df[[col for col in df.columns if col.startswith(f"results.{task}")]] # - IFEval: Return 4 accuracies if task == "leaderboard_ifeval": df = df.rename(columns=lambda col: col.split(".")[2].removesuffix(",none")) else: df = df.rename(columns=lambda col: tasks.get(col.split(".")[1], subtasks.get(col.split(".")[1]))) fig_1 = px.bar( df.T.rename_axis(columns="Model"), barmode="group", labels={"index": "Benchmark" if task == "All" else "Subtask", "value": "Score"}, color_discrete_sequence=px.colors.qualitative.Safe, # TODO: https://plotly.com/python/discrete-color/ ) fig_1.update_yaxes(range=[0, 1]) fig_2 = px.line_polar( df.melt(ignore_index=False, var_name="Benchmark", value_name="Score").reset_index(names="Model"), r="Score", theta="Benchmark", color="Model", line_close=True, range_r=[0, 1], color_discrete_sequence=px.colors.qualitative.Safe, # TODO: https://plotly.com/python/discrete-color/ ) # Avoid bug with radar: fig_2.update_layout( title_text="", title_font_size=1, ) return fig_1, fig_2 else: return None, None tmpdirname = None def download_results(results): global tmpdirname if results: if tmpdirname: shutil.rmtree(tmpdirname) tmpdirname = tempfile.mkdtemp() path = f"{tmpdirname}/results.html" with open(path, "w") as f: f.write(results) return gr.File(path, visible=True) def clear_results_file(): global tmpdirname if tmpdirname: shutil.rmtree(tmpdirname) tmpdirname = None return gr.File(visible=False)