idolezal's picture
More informations about progress from `results_dataset_integrity_check()`
4123571
raw
history blame
27.7 kB
import copy
import glob
import json
import os
# Necessary for `requests`. Without set correct path or empty string it fails during process HTTPS connection with this: [Errno 101] Network is unreachable
if os.path.exists("/etc/ssl/certs/ca-certificates.crt"):
os.environ["CURL_CA_BUNDLE"] = "/etc/ssl/certs/ca-certificates.crt"
os.environ["REQUESTS_CA_BUNDLE"] = "/etc/ssl/certs/ca-certificates.crt"
else:
os.environ["CURL_CA_BUNDLE"] = ""
os.environ["REQUESTS_CA_BUNDLE"] = ""
print(f"{os.environ.get('CURL_CA_BUNDLE') = }")
print(f"{os.environ.get('REQUESTS_CA_BUNDLE') = }")
import hashlib
import time
import requests
from collections import namedtuple
from xml.sax.saxutils import escape as xmlEscape, quoteattr as xmlQuoteAttr
from threading import Lock
import gradio as gr
import pandas as pd
import numpy as np
from huggingface_hub import HfApi, snapshot_download
from compare_significance import SUPPORTED_METRICS
VISIBLE_METRICS = SUPPORTED_METRICS + ["macro_f1"]
api = HfApi()
ORG = "CZLC"
REPO = f"{ORG}/LLM_benchmark_data"
HF_TOKEN = os.environ.get("HF_TOKEN")
TASKS_METADATA_PATH = "./tasks_metadata.json"
MARKDOWN_SPECIAL_CHARACTERS = {
"#": "#", # for usage in xml.sax.saxutils.escape as entities must be first
"\\": "\",
"`": "`",
"*": "*",
"_": "_",
"{": "{",
"}": "}",
"[": "[",
"]": "]",
"(": "(",
")": ")",
"+": "+",
"-": "-",
".": ".",
"!": "!",
"=": "=",
"|": "|"
}
def check_significance_send_task(model_a_path, model_b_path):
url = 'https://czechllm.fit.vutbr.cz/benczechmark-leaderboard/compare_significance/'
# prepare and send request
with (
open(model_a_path, 'rb') as model_a_fp,
open(model_b_path, 'rb') as model_b_fp,
):
files = {
'model_a': model_a_fp,
'model_b': model_b_fp,
}
response = requests.post(url, files=files, timeout=60 * 5)
# check response
if response.status_code == 202:
result_url = response.url
#task_id = response.json()['task_id']
elif response.status_code == 429:
raise RuntimeError('Server is too busy. Please try again later.') # TODO: try-except do raise gr.error
else:
raise RuntimeError(f'Failed to submit task. Status code: {response.status_code}') # TODO: try-except do raise gr.error
return result_url
def check_significance_wait_for_result(result_url):
while True:
response = requests.get(result_url, timeout=60 * 5)
if response.status_code == 200:
result = response.json()
break
elif response.status_code == 202:
time.sleep(5)
else:
raise RuntimeError(f'Failed to get result. Status code: {response.status_code}') # TODO: try-except do raise gr.error
if result["state"] == "COMPLETED":
return result['result']
else:
raise RuntimeError(result['result']['error'])
def check_significance(model_a_path, model_b_path):
result_url = check_significance_send_task(model_a_path, model_b_path)
result = check_significance_wait_for_result(result_url)
return result
pre_submit_lock = Lock()
class _ReadLock:
def __init__(self, lock):
self._lock = lock
self.reading = 0
def __enter__(self):
with self._lock:
self.reading += 1
def __exit__(self, exc_type, exc_value, traceback):
with self._lock:
self.reading -= 1
class ReadWriteLock:
"""
Zámek, který ověří, že nikdo nečte když se zapisuje a že zapisuje pouze jeden
"""
def __init__(self):
self._lock = Lock()
self.ro = _ReadLock(self._lock)
self.rw = self
def __enter__(self):
self._lock.acquire()
while True:
reading = self.ro.reading
if reading > 0:
self._lock.release()
time.sleep(1)
self._lock.acquire()
elif reading < 0:
self._lock.release()
raise RuntimeError()
else:
return
def __exit__(self, exc_type, exc_value, traceback):
self._lock.release()
class LeaderboardServer:
def __init__(self):
self.server_address = REPO
self.repo_type = "dataset"
self.local_leaderboard = snapshot_download(
self.server_address,
repo_type=self.repo_type,
token=HF_TOKEN,
local_dir="./",
)
self.TASKS_METADATA = json.load(open(TASKS_METADATA_PATH))
self.TASKS_CATEGORIES = {self.TASKS_METADATA[task]["category"] for task in self.TASKS_METADATA}
self.TASKS_CATEGORY_OVERALL = "Overall"
self.CATEGORY_TO_TASK_ABBREVIATION_TO_NAME = self._prepare_category_to_task_abbr_to_name()
self.var_lock = ReadWriteLock()
self.submission_ids = set()
self.submission_id_to_file = {} # Map submission ids to file paths
self.submission_id_to_model_title = {}
self.submission_id_to_data = {} # Only data (results and metadata) using by leaderboard
self.fetch_existing_models()
self.tournament_results = self.load_tournament_results()
self.pre_submit_lock = pre_submit_lock
self.pre_submit = None
self.results_dataset_integrity_check() # Check integrity of the results dataset after (re)start Hugging Face Space
def update_leaderboard(self):
self.local_leaderboard = snapshot_download(
self.server_address,
repo_type=self.repo_type,
token=HF_TOKEN,
local_dir="./",
)
self.fetch_existing_models()
with self.var_lock.rw:
self.tournament_results = self.load_tournament_results()
def load_tournament_results(self):
metadata_rank_paths = os.path.join(self.local_leaderboard, "tournament.json")
if not os.path.exists(metadata_rank_paths):
return {}
with open(metadata_rank_paths) as ranks_file:
results = json.load(ranks_file)
return results
def _prepare_category_to_task_abbr_to_name(self):
tasks_per_category = {}
for task in self.TASKS_METADATA:
task_category = self.TASKS_METADATA[task]["category"]
tasks_per_category.setdefault(task_category, list()).append(task)
category2abbreviation2name = {}
for category, tasks in tasks_per_category.items():
abbreviation2name = {self.TASKS_METADATA[t]["abbreviation"]: self.TASKS_METADATA[t]["name"] for t in tasks}
sorted_abbreviation2name = dict.fromkeys(sorted(abbreviation2name.keys()))
sorted_abbreviation2name.update(abbreviation2name)
category2abbreviation2name[category] = sorted_abbreviation2name
return category2abbreviation2name
def fetch_existing_models(self):
# Models data
for submission_file in glob.glob(os.path.join(self.local_leaderboard, "data") + "/*.json"):
data = json.load(open(submission_file))
metadata = data.get('metadata')
if metadata is None:
continue
submission_id = metadata["submission_id"]
with self.var_lock.rw:
self.submission_ids.add(submission_id)
self.submission_id_to_file[submission_id] = submission_file
self.submission_id_to_model_title[submission_id] = metadata["team_name"] + "/" + metadata["model_name"]
self.submission_id_to_data[submission_id] = {"results": data["results"], "metadata": metadata}
def results_dataset_integrity_check(self):
"""
Zkontroluje, že:
- všechny modely byly v duelu se všemi
-- pokud ne, znemožní potvrzení nových submitů a udělá zbývající zápasy
-- kontroluje soubory v adresáři "/data" a soubor "tournament.json"
- v souboru "tournament.json" není `submission_id`, které by nemělo soubor v adresáři "/data"
"""
while True:
with self.pre_submit_lock:
if self.pre_submit == None:
gr.Info('Checking integrity...', duration=15)
self.update_leaderboard()
with self.var_lock.ro:
# Is every `submission_id` in results known?
if self.tournament_results.keys() - self.submission_ids != set():
pass
# Was every `submission_id` in some match?
elif self.submission_ids - self.tournament_results.keys() != set():
pass
# Are all competitors known?
elif any(
self.tournament_results[submission_id].keys() - self.submission_ids != set()
for submission_id in self.submission_ids
):
pass
# Has had every `submission_id` match with all competitors?
elif any(
self.submission_ids - self.tournament_results[submission_id].keys() != set()
for submission_id in self.submission_ids
):
pass
else:
break
gr.Info('Running tournament...', duration=15)
with self.var_lock.rw:
self.tournament_results = {}
for submission_id in self.submission_ids:
with self.var_lock.ro:
file = self.submission_id_to_file[submission_id]
tournament_results = self.start_tournament(submission_id, file)
with self.var_lock.rw:
self.tournament_results = tournament_results
gr.Info('Uploading tournament results...', duration=5)
if self.tournament_results:
self._upload_tournament_results(self.tournament_results)
break
gr.Info("Waiting in queue...", duration=5)
time.sleep(10)
gr.Info('Integrity of the results dataset is checked', duration=5)
@staticmethod
def _model_tournament_table_highlight_true_and_false(x):
return np.where(
x == True,
'background-color: rgba(0, 255, 0, 0.1);',
np.where(
x == False,
'background-color: rgba(255, 0, 0, 0.1);',
None
)
)
def get_model_tournament_table(self, submission_id, category):
if category == self.TASKS_CATEGORY_OVERALL:
return None
model_tournament_table = []
with self.var_lock.ro:
for competitor_id in self.tournament_results[submission_id].keys() - {submission_id}: # without self
data = self.submission_id_to_data[competitor_id]
match_results = {}
for task in self.tournament_results[submission_id][competitor_id]:
task_category = self.TASKS_METADATA[task]["category"]
if task_category == category:
match_results[task] = bool(self.tournament_results[submission_id][competitor_id][task])
model_link = data["metadata"]["link_to_model"]
model_title = data["metadata"]["team_name"] + "/" + data["metadata"]["model_name"]
model_title_abbr_team_name = self.abbreviate(data["metadata"]["team_name"], 28)
model_title_abbr_model_name = self.abbreviate(data["metadata"]["model_name"], 28)
model_title_abbr_html = f'<div style="font-size: 10px;">{xmlEscape(model_title_abbr_team_name, MARKDOWN_SPECIAL_CHARACTERS)}</div>{xmlEscape(model_title_abbr_model_name, MARKDOWN_SPECIAL_CHARACTERS)}'
match_results["model"] = f'<a href={xmlQuoteAttr(model_link)} title={xmlQuoteAttr(model_title)}>{model_title_abbr_html}</a>'
model_tournament_table.append(match_results)
dataframe = pd.DataFrame.from_records(model_tournament_table)
extra_attributes_map_word_to_header = {
"model": "Competitor",
}
first_attributes = [
"model",
]
df_order = [
key
for key in dict.fromkeys(
first_attributes
+ sorted(
list(self.TASKS_METADATA.keys())
+ list(dataframe.columns)
)
).keys()
if key in dataframe.columns
]
dataframe = dataframe[df_order]
attributes_map_word_to_header = {key: value["abbreviation"] for key, value in self.TASKS_METADATA.items()}
attributes_map_word_to_header.update(extra_attributes_map_word_to_header)
dataframe = dataframe.rename(
columns=attributes_map_word_to_header
)
dataframe = dataframe.style.apply(self._model_tournament_table_highlight_true_and_false, axis=None)
return dataframe
def get_leaderboard(self, pre_submit=None, category=None):
with self.var_lock.ro:
tournament_results = pre_submit.tournament_results if pre_submit else self.tournament_results
category = category if category else self.TASKS_CATEGORY_OVERALL
if len(tournament_results) == 0:
return pd.DataFrame(columns=['No submissions yet'])
else:
processed_results = []
for submission_id in tournament_results.keys():
if submission_id not in self.submission_id_to_data:
if pre_submit and submission_id == pre_submit.submission_id:
data = json.load(open(pre_submit.file))
else:
raise gr.Error(f"Internal error: Submission [{submission_id}] not found")
else:
data = self.submission_id_to_data[submission_id]
if submission_id != data["metadata"]["submission_id"]:
raise gr.Error(f"Proper submission [{submission_id}] not found")
local_results = {}
win_score = {}
visible_metrics_map_word_to_header = {}
for task in self.TASKS_METADATA.keys():
task_category = self.TASKS_METADATA[task]["category"]
if category not in (self.TASKS_CATEGORY_OVERALL, task_category):
continue
else:
# tournament_results
num_of_competitors = 0
num_of_wins = 0
for competitor_id in tournament_results[submission_id].keys() - {submission_id}: # without self
num_of_competitors += 1
if tournament_results[submission_id][competitor_id][task]:
num_of_wins += 1
task_score = num_of_wins / num_of_competitors * 100 if num_of_competitors > 0 else 100
win_score.setdefault(task_category, []).append(task_score)
if category == task_category:
local_results[task] = task_score
for metric in VISIBLE_METRICS:
visible_metrics_map_word_to_header[task + "_" + metric] = self.TASKS_METADATA[task]["abbreviation"] + " " + metric
metric_value = data['results'][task].get(metric)
if metric_value is not None:
local_results[task + "_" + metric] = metric_value if metric == "word_perplexity" else metric_value * 100
break # Only the first metric of every task
for c in win_score:
win_score[c] = sum(win_score[c]) / len(win_score[c])
if category == self.TASKS_CATEGORY_OVERALL:
for c in win_score:
local_results[c] = win_score[c]
local_results["average_score"] = sum(win_score.values()) / len(win_score)
else:
local_results["average_score"] = win_score[category]
model_link = data["metadata"]["link_to_model"]
model_title = data["metadata"]["team_name"] + "/" + data["metadata"]["model_name"]
model_title_abbr_team_name = self.abbreviate(data["metadata"]["team_name"], 28)
model_title_abbr_model_name = self.abbreviate(data["metadata"]["model_name"], 28)
model_title_abbr_html = f'<div style="font-size: 10px;">{xmlEscape(model_title_abbr_team_name, MARKDOWN_SPECIAL_CHARACTERS)}</div>{xmlEscape(model_title_abbr_model_name, MARKDOWN_SPECIAL_CHARACTERS)}'
local_results["model"] = f'<a href={xmlQuoteAttr(model_link)} title={xmlQuoteAttr(model_title)}>{model_title_abbr_html}</a>'
release = data["metadata"].get("submission_timestamp")
release = time.strftime("%Y-%m-%d", time.gmtime(release)) if release else "N/A"
local_results["release"] = release
local_results["model_type"] = data["metadata"]["model_type"]
local_results["parameters"] = data["metadata"]["parameters"]
if pre_submit and submission_id == pre_submit.submission_id:
processed_results.insert(0, local_results)
else:
processed_results.append(local_results)
dataframe = pd.DataFrame.from_records(processed_results)
extra_attributes_map_word_to_header = {
"model": "Model",
"release": "Release",
"average_score": "Average ⬆️",
"team_name": "Team name",
"model_name": "Model name",
"model_type": "Type",
"parameters": "# θ (B)",
"input_length": "Input length (# tokens)",
"precision": "Precision",
"description": "Description",
"link_to_model": "Link to model"
}
first_attributes = [
"model",
"release",
"model_type",
"parameters",
"average_score",
]
df_order = [
key
for key in dict.fromkeys(
first_attributes
+ sorted(
list(self.TASKS_METADATA.keys())
+ list(dataframe.columns)
)
).keys()
if key in dataframe.columns
]
dataframe = dataframe[df_order]
attributes_map_word_to_header = {key: value["abbreviation"] for key, value in self.TASKS_METADATA.items()}
attributes_map_word_to_header.update(extra_attributes_map_word_to_header)
attributes_map_word_to_header.update(visible_metrics_map_word_to_header)
dataframe = dataframe.rename(
columns=attributes_map_word_to_header
)
return dataframe
def start_tournament(self, new_submission_id, new_model_file):
with self.var_lock.ro:
new_tournament = copy.deepcopy(self.tournament_results)
new_tournament[new_submission_id] = {}
new_tournament[new_submission_id][new_submission_id] = {
task: False for task in self.TASKS_METADATA.keys()
}
rest_of_competitors = list(self.submission_ids - {new_submission_id}) # without self
num_of_competitors = len(rest_of_competitors)
result_url = {}
result_inverse_url = {}
while rest_of_competitors:
next_competitors = []
while rest_of_competitors:
if len(next_competitors) < 5: # 5*2==10 tasks
next_competitors.append(rest_of_competitors.pop())
else:
break
for competitor_id in next_competitors:
result_url[competitor_id] = check_significance_send_task(new_model_file, self.submission_id_to_file[competitor_id])
result_inverse_url[competitor_id] = check_significance_send_task(self.submission_id_to_file[competitor_id], new_model_file)
while next_competitors:
competitor_id = next_competitors.pop(0)
result = check_significance_wait_for_result(result_url.pop(competitor_id))
result_inverse = check_significance_wait_for_result(result_inverse_url.pop(competitor_id))
if rest_of_competitors:
new_competitor_id = rest_of_competitors.pop()
next_competitors.append(new_competitor_id)
result_url[new_competitor_id] = check_significance_send_task(new_model_file, self.submission_id_to_file[new_competitor_id])
result_inverse_url[new_competitor_id] = check_significance_send_task(self.submission_id_to_file[new_competitor_id], new_model_file)
new_tournament[new_submission_id][competitor_id] = {
task: data["significant"] for task, data in result.items()
}
new_tournament[competitor_id][new_submission_id] = {
task: data["significant"] for task, data in result_inverse.items()
}
num_of_competitors_done = num_of_competitors - len(next_competitors) - len(rest_of_competitors)
gr.Info(f"Tournament: {num_of_competitors_done}/{num_of_competitors} = {(num_of_competitors_done) * 100 // num_of_competitors}% done")
return new_tournament
@staticmethod
def abbreviate(s, max_length, dots_place="center"):
if len(s) <= max_length:
return s
else:
if max_length <= 1:
return "…"
elif dots_place == "begin":
return "…" + s[-max_length + 1:].lstrip()
elif dots_place == "center" and max_length >= 3:
max_length_begin = max_length // 2
max_length_end = max_length - max_length_begin - 1
return s[:max_length_begin].rstrip() + "…" + s[-max_length_end:].lstrip()
else: # dots_place == "end"
return s[:max_length - 1].rstrip() + "…"
@staticmethod
def create_submission_id(metadata):
# Délka ID musí být omezena, protože se používá v názvu souboru
submission_id = "_".join([metadata[key][:7] for key in (
"team_name",
"model_name",
"model_predictions_sha256",
"model_results_sha256",
)])
submission_id = submission_id.replace("/", "_").replace("\n", "_").strip()
return submission_id
@staticmethod
def get_sha256_hexdigest(obj):
data = json.dumps(
obj,
separators=(',', ':'),
sort_keys=True,
ensure_ascii=True,
).encode()
result = hashlib.sha256(data).hexdigest()
return result
PreSubmit = namedtuple('PreSubmit', 'tournament_results, submission_id, file')
def prepare_model_for_submission(self, file, metadata) -> PreSubmit:
with open(file, "r") as f:
data = json.load(f)
data["metadata"] = metadata
metadata["model_predictions_sha256"] = self.get_sha256_hexdigest(data["predictions"])
metadata["model_results_sha256"] = self.get_sha256_hexdigest(data["results"])
submission_id = self.create_submission_id(metadata)
metadata["submission_id"] = submission_id
metadata["submission_timestamp"] = time.time() # timestamp
with open(file, "w") as f:
json.dump(data, f, separators=(',', ':')) # compact JSON
while True:
with self.pre_submit_lock:
if self.pre_submit == None:
gr.Info('Running tournament...', duration=15)
self.update_leaderboard()
tournament_results = self.start_tournament(submission_id, file)
self.pre_submit = self.PreSubmit(tournament_results, submission_id, file)
break
gr.Info("Waiting in queue...", duration=5)
time.sleep(10)
return self.pre_submit
def save_pre_submit(self):
with self.pre_submit_lock:
if self.pre_submit:
tournament_results, submission_id, file = self.pre_submit
self._upload_submission(submission_id, file)
self._upload_tournament_results(tournament_results)
self.pre_submit = None
def _upload_submission(self, submission_id, file):
api.upload_file(
path_or_fileobj=file,
path_in_repo=f"data/{submission_id}.json",
repo_id=self.server_address,
repo_type=self.repo_type,
token=HF_TOKEN,
)
def _upload_tournament_results(self, tournament_results):
# Temporary save tournament results
tournament_results_path = os.path.join(self.local_leaderboard, "tournament.json")
with open(tournament_results_path, "w") as f:
json.dump(tournament_results, f, sort_keys=True, indent=2) # readable JSON
api.upload_file(
path_or_fileobj=tournament_results_path,
path_in_repo="tournament.json",
repo_id=self.server_address,
repo_type=self.repo_type,
token=HF_TOKEN,
)
def get_model_detail(self, submission_id):
if submission_id not in self.submission_id_to_data:
raise gr.Error(f"Submission [{submission_id}] not found")
else:
with self.var_lock.ro:
data = self.submission_id_to_data[submission_id]
return data["metadata"]