Spaces:
Runtime error
Runtime error
import json | |
import os | |
import re | |
from datetime import datetime, timezone | |
import rsa | |
from src.envs import RSA_PUBKEY | |
from src.challenges.result_parsers import parse_challenge_result_dict | |
# email validity checker | |
from email.utils import parseaddr | |
# url validity checker | |
from urllib.parse import urlparse | |
# json parser | |
from json.decoder import JSONDecodeError | |
from src.display.formatting import styled_error, styled_message, styled_warning | |
from src.envs import API, EVAL_REQUESTS_PATH, TOKEN, DATA_REPO | |
from src.submission.check_validity import ( | |
already_submitted_models, | |
check_model_card, | |
get_model_size, | |
is_model_on_hub, | |
) | |
def add_new_eval( | |
submission_file, | |
algo_name: str, | |
algo_info: str, | |
algo_link: str, | |
submitter_email: str, | |
): | |
return_str = 'Success! Your submission will be added to the leaderboard within 24 hours.' | |
# validate email and url | |
if not parseaddr(submitter_email): | |
return styled_error("Please enter a valid email address.") | |
submitter_email = rsa.encrypt(submitter_email.encode(), RSA_PUBKEY).hex() | |
if algo_link.strip() and not urlparse(algo_link).scheme: | |
return styled_error("Please enter a valid URL (including the http/https protocol).") | |
# get file path | |
try: | |
file_path: str = submission_file.name, | |
assert isinstance(file_path, str) | |
except: | |
if isinstance(submission_file, str): | |
file_path: str = submission_file | |
else: | |
return styled_error("Invalid submission file: File path not found.") | |
# parse the submission file | |
try: | |
with open(file_path, "r") as f: | |
submission_data = json.load(f) | |
except JSONDecodeError: | |
return styled_error("Invalid submission file: JSON parsing failed.") | |
try: | |
assert isinstance(submission_data, dict) | |
submission_data_content = list(submission_data.items()) | |
assert len(submission_data_content) == 1 | |
results_per_challenge = submission_data_content[0][1] | |
assert isinstance(results_per_challenge, dict) | |
assert all(isinstance(challenge, str) for challenge in results_per_challenge.keys()) | |
assert all(isinstance(result, dict) for result in results_per_challenge.values()) | |
except (AssertionError, KeyError): | |
return styled_error("Invalid submission file: Incorrect organization of the JSON file.") | |
# format the algo name | |
algo_name = algo_name.strip() | |
algo_name_filename = re.sub(r"[^a-zA-Z0-9]+", "-", algo_name).lower() | |
timestamp_filename = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H-%M-%S") | |
print("Uploading submission file") | |
API.upload_file( | |
path_or_fileobj=file_path, | |
path_in_repo=f'upload_history/{algo_name_filename}/{timestamp_filename}.json', | |
repo_id=DATA_REPO, | |
repo_type="dataset", | |
commit_message=f"Add {algo_name} to eval queue by {submitter_email} at {timestamp_filename}", | |
) | |
# Construct entry in the master table | |
eval_entry = { | |
"name": algo_name, | |
"id": algo_name_filename, | |
"info": algo_info, | |
"link": algo_link, | |
"email": submitter_email, | |
"update_timestamp": timestamp_filename, | |
} | |
# Upload the metadata file | |
print("Uploading metadata file") | |
metadata_filename = f'./tmp_metadata_{algo_name_filename}_{timestamp_filename}.json' | |
with open(metadata_filename, 'w') as f: | |
f.write(json.dumps(eval_entry)) | |
API.upload_file( | |
path_or_fileobj=metadata_filename, | |
path_in_repo=f'upload_history/{algo_name_filename}/{timestamp_filename}_metadata.json', | |
repo_id=DATA_REPO, | |
repo_type="dataset", | |
commit_message=f"Add metadata {algo_name} by {submitter_email} at {timestamp_filename}", | |
) | |
for challenge, result in results_per_challenge.items(): | |
try: | |
parsed_result: float = parse_challenge_result_dict(challenge, result) | |
assert isinstance(parsed_result, float) | |
except: | |
return styled_error(f"Could not parse the score for {challenge}.") | |
eval_entry[challenge] = parsed_result | |
# Get content of the master table from DATA_REPO | |
try: | |
master_table = {} | |
if API.file_exists(DATA_REPO, "master_table.json", repo_type='dataset'): | |
API.hf_hub_download(DATA_REPO, "master_table.json", local_dir=EVAL_REQUESTS_PATH, repo_type='dataset', force_download=True) | |
with open(f"{EVAL_REQUESTS_PATH}/master_table.json", "r") as f: | |
master_table = json.load(f) | |
else: | |
print("No master table found. Will create a new one.") | |
except: | |
return styled_error("Could not get the master table from the data repository.") | |
# Check for duplicate submission | |
if algo_name_filename in master_table: | |
return_str += ' An existing submission with the same name has been found. Your submission will be used to update the existing one.' | |
master_table[algo_name_filename].update(eval_entry) | |
else: | |
print("Creating eval entry") | |
master_table[algo_name_filename] = eval_entry | |
# Save the updated master table | |
with open(f"./master_table.json", "w") as f: | |
f.write(json.dumps(master_table)) | |
print("Uploading master table") | |
API.upload_file( | |
path_or_fileobj="./master_table.json", | |
path_in_repo="master_table.json", | |
repo_id=DATA_REPO, | |
repo_type="dataset", | |
commit_message=f"Update master table with {algo_name} by {submitter_email} at {timestamp_filename}", | |
) | |
return styled_message(return_str) | |
def add_new_challenge( | |
submission_files, | |
challenge_name: str, | |
challenge_info: str, | |
challenge_link: str, | |
submitter_email: str, | |
): | |
return_str = 'Success! We are working to incorporate your submitted challenge into the leaderboard, and will get back to you when we encounter problems.' | |
# validate email and url | |
if not parseaddr(submitter_email): | |
return styled_error("Please enter a valid email address.") | |
submitter_email = rsa.encrypt(submitter_email.encode(), RSA_PUBKEY).hex() | |
if challenge_link.strip() and not urlparse(challenge_link).scheme: | |
return styled_error("Please enter a valid URL (including the http/https protocol).") | |
# get file path | |
if submission_files is None: | |
submission_files = [] | |
else: | |
try: | |
assert isinstance(submission_files, list) | |
assert all(isinstance(file, str) for file in submission_files) | |
except: | |
return styled_error("Invalid submission file: File path not found.") | |
# format the challenge name | |
challenge_name = challenge_name.strip() | |
challenge_name_filename = re.sub(r"[^a-zA-Z0-9]+", "-", challenge_name).lower() | |
timestamp_filename = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H-%M-%S") | |
for num, file_path in enumerate(submission_files): | |
# parse the submission file | |
try: | |
with open(file_path, "r") as f: | |
submission_data = json.load(f) | |
except JSONDecodeError: | |
return styled_error(f"Invalid submission file {os.path.basename(file_path)}: JSON parsing failed.") | |
try: | |
assert isinstance(submission_data, dict) | |
assert all(isinstance(result, dict) for result in submission_data.values()) | |
except (AssertionError, KeyError): | |
return styled_error(f"Invalid submission file {os.path.basename(file_path)}: Incorrect organization of the JSON file.") | |
print("Uploading submission file") | |
API.upload_file( | |
path_or_fileobj=file_path, | |
path_in_repo=f'upload_history/{challenge_name_filename}/{timestamp_filename}_file{num}_{os.path.basename(file_path)}.json', | |
repo_id=DATA_REPO, | |
repo_type="dataset", | |
commit_message=f"Add {challenge_name} to eval queue by {submitter_email} at {timestamp_filename}", | |
) | |
print("Uploading metadata file") | |
filename = f'./tmp_metadata_{challenge_name_filename}_{timestamp_filename}.json' | |
with open(filename, 'w') as f: | |
f.write(json.dumps({ | |
"name": challenge_name, | |
"info": challenge_info, | |
"link": challenge_link, | |
"email": submitter_email, | |
"update_timestamp": timestamp_filename, | |
})) | |
API.upload_file( | |
path_or_fileobj=filename, | |
path_in_repo=f'upload_history/{challenge_name_filename}/{timestamp_filename}_metadata.json', | |
repo_id=DATA_REPO, | |
repo_type="dataset", | |
commit_message=f"Add metadata {challenge_name} by {submitter_email} at {timestamp_filename}", | |
) | |
return styled_message(return_str) | |