import os import json import requests import gradio as gr import pandas as pd from huggingface_hub import HfApi, hf_hub_download, snapshot_download from huggingface_hub.repocard import metadata_load from apscheduler.schedulers.background import BackgroundScheduler from tqdm.contrib.concurrent import thread_map from utils import * DATASET_REPO_URL = "https://huggingface.co/datasets/huph22/drlc-leaderboard-data" DATASET_REPO_ID = "huph22/drlc-leaderboard-data" HF_TOKEN = os.environ.get("HF_TOKEN") block = gr.Blocks() api = HfApi(token=HF_TOKEN) # Containing the data rl_envs = [ { "rl_env_beautiful": "LunarLander-v2 πŸš€", "rl_env": "LunarLander-v2", "video_link": "", "global": None }, { "rl_env_beautiful": "CartPole-v1", "rl_env": "CartPole-v1", "video_link": "https://huggingface.co/sb3/ppo-CartPole-v1/resolve/main/replay.mp4", "global": None }, { "rl_env_beautiful": "CarRacing-v0 🏎️", "rl_env": "CarRacing-v0", "video_link": "", "global": None }, { "rl_env_beautiful": "CarRacing-v2 🏎️", "rl_env": "CarRacing-v2", "video_link": "", "global": None }, { "rl_env_beautiful": "MountainCar-v0 ⛰️", "rl_env": "MountainCar-v0", "video_link": "", "global": None }, { "rl_env_beautiful": "BipedalWalker-v3 πŸš€", "rl_env": "BipedalWalker-v3", "video_link": "", "global": None } ] def restart(): print("RESTART") api.restart_space(repo_id="huph22/Deep-Reinforcement-Learning-Leaderboard") def get_metadata(model_id): try: readme_path = hf_hub_download(model_id, filename="README.md", etag_timeout=180) return metadata_load(readme_path) except requests.exceptions.HTTPError: # 404 README.md not found return None def parse_metrics_accuracy(meta): if "model-index" not in meta: return None result = meta["model-index"][0]["results"] metrics = result[0]["metrics"] accuracy = metrics[0]["value"] return accuracy # We keep the worst case episode def parse_rewards(accuracy): default_std = -1000 default_reward=-1000 if accuracy != None: accuracy = str(accuracy) parsed = accuracy.split('+/-') if len(parsed)>1: mean_reward = float(parsed[0].strip()) std_reward = float(parsed[1].strip()) elif len(parsed)==1: #only mean reward mean_reward = float(parsed[0].strip()) std_reward = float(0) else: mean_reward = float(default_std) std_reward = float(default_reward) else: mean_reward = float(default_std) std_reward = float(default_reward) return mean_reward, std_reward def get_model_ids(rl_env): api = HfApi() models = api.list_models(filter=rl_env) model_ids = [x.modelId for x in models] return model_ids # Parralelized version def update_leaderboard_dataset_parallel(rl_env, path): # Get model ids associated with rl_env model_ids = get_model_ids(rl_env) def process_model(model_id): meta = get_metadata(model_id) #LOADED_MODEL_METADATA[model_id] = meta if meta is not None else '' if meta is None: return None user_id = model_id.split('/')[0] row = {} row["User"] = user_id row["Model"] = model_id accuracy = parse_metrics_accuracy(meta) mean_reward, std_reward = parse_rewards(accuracy) mean_reward = mean_reward if not pd.isna(mean_reward) else 0 std_reward = std_reward if not pd.isna(std_reward) else 0 row["Results"] = mean_reward - std_reward row["Mean Reward"] = mean_reward row["Std Reward"] = std_reward return row data = list(thread_map(process_model, model_ids, desc="Processing models")) # Filter out None results (models with no metadata) data = [row for row in data if row is not None] ranked_dataframe = rank_dataframe(pd.DataFrame.from_records(data)) new_history = ranked_dataframe file_path = path + "/" + rl_env + ".csv" new_history.to_csv(file_path, index=False) return ranked_dataframe def download_leaderboard_dataset(): path = snapshot_download(repo_id=DATASET_REPO_ID, repo_type="dataset") return path path_ = download_leaderboard_dataset() from datetime import datetime def run_update_dataset(): global path_ for i in range(0, len(rl_envs)): rl_env = rl_envs[i] update_leaderboard_dataset_parallel(rl_env["rl_env"], path_) # θŽ·ε–ε½“ε‰ζ—Άι—΄ current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # ζž„ε»ΊεΈ¦ζ—Άι—΄ζˆ³ηš„ commit_message commit_message = f"Update dataset at {current_time}" api.upload_folder( folder_path=path_, repo_id="huph22/drlc-leaderboard-data", repo_type="dataset", commit_message=commit_message) def get_data(rl_env, path) -> pd.DataFrame: """ Get data from rl_env :return: data as a pandas DataFrame """ csv_path = path + "/" + rl_env + ".csv" data = pd.read_csv(csv_path) for index, row in data.iterrows(): user_id = row["User"] data.loc[index, "User"] = make_clickable_user(user_id) model_id = row["Model"] data.loc[index, "Model"] = make_clickable_model(model_id) return data def get_data_no_html(rl_env, path) -> pd.DataFrame: """ Get data from rl_env :return: data as a pandas DataFrame """ csv_path = path + "/" + rl_env + ".csv" data = pd.read_csv(csv_path) return data def rank_dataframe(dataframe): dataframe = dataframe.sort_values(by=['Results', 'User', 'Model'], ascending=False) if not 'Ranking' in dataframe.columns: dataframe.insert(0, 'Ranking', [i for i in range(1,len(dataframe)+1)]) else: dataframe['Ranking'] = [i for i in range(1,len(dataframe)+1)] return dataframe def filter_data(rl_env, path, user_id): data_df = get_data_no_html(rl_env, path) models = [] models = data_df[data_df["User"] == user_id] for index, row in models.iterrows(): user_id = row["User"] models.loc[index, "User"] = make_clickable_user(user_id) model_id = row["Model"] models.loc[index, "Model"] = make_clickable_model(model_id) return models run_update_dataset() with block: gr.Markdown(f""" # πŸ† The Deep Reinforcement Learning Course Leaderboard πŸ† This is the leaderboard of trained agents during the Deep Reinforcement Learning Course. A free course from beginner to expert. ### We only display the best 100 models If you want to **find yours, type your user id and click on Search my models.** You **can click on the model's name** to be redirected to its model card, including documentation. ### How are the results calculated? We use **lower bound result to sort the models: mean_reward - std_reward.** ### I can't find my model 😭 The leaderboard is **updated every two hours** if you can't find your models, just wait for the next update. ### The Deep RL Course πŸ€– You want to try to train your agents? Check the Hugging Face free Deep Reinforcement Learning Course πŸ€— . πŸ”§ There is an **environment missing?** Please open an issue. """) print(f"path:{path_}") for i in range(0, len(rl_envs)): rl_env = rl_envs[i] with gr.TabItem(rl_env["rl_env_beautiful"]) as rl_tab: with gr.Row(): markdown = """ # {name_leaderboard} """.format(name_leaderboard = rl_env["rl_env_beautiful"], video_link = rl_env["video_link"]) gr.Markdown(markdown) with gr.Row(): gr.Markdown(""" ## Search your models Simply type your user id to find your models """) with gr.Row(): user_id = gr.Textbox(label= "Your user id") search_btn = gr.Button("Search my models πŸ”Ž") reset_btn = gr.Button("Clear my search") env = gr.Variable(rl_env["rl_env"]) grpath = gr.Variable(path_) with gr.Row(): gr_dataframe = gr.components.Dataframe(value=get_data(rl_env["rl_env"], path_), headers=["Ranking πŸ†", "User πŸ€—", "Model id πŸ€–", "Results", "Mean Reward", "Std Reward"], datatype=["number", "markdown", "markdown", "number", "number", "number"], row_count=(100, 'fixed')) with gr.Row(): #gr_search_dataframe = gr.components.Dataframe(headers=["Ranking πŸ†", "User πŸ€—", "Model id πŸ€–", "Results", "Mean Reward", "Std Reward"], datatype=["number", "markdown", "markdown", "number", "number", "number"], visible=False) search_btn.click(fn=filter_data, inputs=[env, grpath, user_id], outputs=gr_dataframe, api_name="filter_data") with gr.Row(): #search_btn.click(fn=filter_data, inputs=[env, grpath, user_id], outputs=gr_dataframe, api_name="filter_data") reset_btn.click(fn=get_data, inputs=[env, grpath], outputs=gr_dataframe, api_name="get_data") """ block.load( download_leaderboard_dataset, inputs=[], outputs=[ grpath ], ) """ import threading scheduler = BackgroundScheduler() scheduler.add_job(run_update_dataset, 'interval', seconds=3600) scheduler.start() def start_block_launch(): block.launch() launch_thread = threading.Thread(target=start_block_launch) launch_thread.start() import time try: while True: time.sleep(60) # δΈ»ηΊΏη¨‹δΏζŒζ΄»θ·ƒ except (KeyboardInterrupt, SystemExit): scheduler.shutdown()