import os
import json
import requests
import gradio as gr
import pandas as pd
from huggingface_hub import HfApi, hf_hub_download, snapshot_download
from huggingface_hub.repocard import metadata_load
from apscheduler.schedulers.background import BackgroundScheduler
from tqdm.contrib.concurrent import thread_map
from utils import *
DATASET_REPO_URL = "https://huggingface.co/datasets/huph22/drlc-leaderboard-data"
DATASET_REPO_ID = "huph22/drlc-leaderboard-data"
HF_TOKEN = os.environ.get("HF_TOKEN")
block = gr.Blocks()
api = HfApi(token=HF_TOKEN)
# Containing the data
rl_envs = [
{
"rl_env_beautiful": "LunarLander-v2 π",
"rl_env": "LunarLander-v2",
"video_link": "",
"global": None
},
{
"rl_env_beautiful": "CartPole-v1",
"rl_env": "CartPole-v1",
"video_link": "https://huggingface.co/sb3/ppo-CartPole-v1/resolve/main/replay.mp4",
"global": None
},
{
"rl_env_beautiful": "CarRacing-v0 ποΈ",
"rl_env": "CarRacing-v0",
"video_link": "",
"global": None
},
{
"rl_env_beautiful": "CarRacing-v2 ποΈ",
"rl_env": "CarRacing-v2",
"video_link": "",
"global": None
},
{
"rl_env_beautiful": "MountainCar-v0 β°οΈ",
"rl_env": "MountainCar-v0",
"video_link": "",
"global": None
},
{
"rl_env_beautiful": "BipedalWalker-v3 π",
"rl_env": "BipedalWalker-v3",
"video_link": "",
"global": None
}
]
def restart():
print("RESTART")
api.restart_space(repo_id="huph22/Deep-Reinforcement-Learning-Leaderboard")
def get_metadata(model_id):
try:
readme_path = hf_hub_download(model_id, filename="README.md", etag_timeout=180)
return metadata_load(readme_path)
except requests.exceptions.HTTPError:
# 404 README.md not found
return None
def parse_metrics_accuracy(meta):
if "model-index" not in meta:
return None
result = meta["model-index"][0]["results"]
metrics = result[0]["metrics"]
accuracy = metrics[0]["value"]
return accuracy
# We keep the worst case episode
def parse_rewards(accuracy):
default_std = -1000
default_reward=-1000
if accuracy != None:
accuracy = str(accuracy)
parsed = accuracy.split('+/-')
if len(parsed)>1:
mean_reward = float(parsed[0].strip())
std_reward = float(parsed[1].strip())
elif len(parsed)==1: #only mean reward
mean_reward = float(parsed[0].strip())
std_reward = float(0)
else:
mean_reward = float(default_std)
std_reward = float(default_reward)
else:
mean_reward = float(default_std)
std_reward = float(default_reward)
return mean_reward, std_reward
def get_model_ids(rl_env):
api = HfApi()
models = api.list_models(filter=rl_env)
model_ids = [x.modelId for x in models]
return model_ids
# Parralelized version
def update_leaderboard_dataset_parallel(rl_env, path):
# Get model ids associated with rl_env
model_ids = get_model_ids(rl_env)
def process_model(model_id):
meta = get_metadata(model_id)
#LOADED_MODEL_METADATA[model_id] = meta if meta is not None else ''
if meta is None:
return None
user_id = model_id.split('/')[0]
row = {}
row["User"] = user_id
row["Model"] = model_id
accuracy = parse_metrics_accuracy(meta)
mean_reward, std_reward = parse_rewards(accuracy)
mean_reward = mean_reward if not pd.isna(mean_reward) else 0
std_reward = std_reward if not pd.isna(std_reward) else 0
row["Results"] = mean_reward - std_reward
row["Mean Reward"] = mean_reward
row["Std Reward"] = std_reward
return row
data = list(thread_map(process_model, model_ids, desc="Processing models"))
# Filter out None results (models with no metadata)
data = [row for row in data if row is not None]
ranked_dataframe = rank_dataframe(pd.DataFrame.from_records(data))
new_history = ranked_dataframe
file_path = path + "/" + rl_env + ".csv"
new_history.to_csv(file_path, index=False)
return ranked_dataframe
def download_leaderboard_dataset():
path = snapshot_download(repo_id=DATASET_REPO_ID, repo_type="dataset")
return path
path_ = download_leaderboard_dataset()
from datetime import datetime
def run_update_dataset():
global path_
for i in range(0, len(rl_envs)):
rl_env = rl_envs[i]
update_leaderboard_dataset_parallel(rl_env["rl_env"], path_)
# θ·εε½εζΆι΄
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# ζε»ΊεΈ¦ζΆι΄ζ³η commit_message
commit_message = f"Update dataset at {current_time}"
api.upload_folder(
folder_path=path_,
repo_id="huph22/drlc-leaderboard-data",
repo_type="dataset",
commit_message=commit_message)
def get_data(rl_env, path) -> pd.DataFrame:
"""
Get data from rl_env
:return: data as a pandas DataFrame
"""
csv_path = path + "/" + rl_env + ".csv"
data = pd.read_csv(csv_path)
for index, row in data.iterrows():
user_id = row["User"]
data.loc[index, "User"] = make_clickable_user(user_id)
model_id = row["Model"]
data.loc[index, "Model"] = make_clickable_model(model_id)
return data
def get_data_no_html(rl_env, path) -> pd.DataFrame:
"""
Get data from rl_env
:return: data as a pandas DataFrame
"""
csv_path = path + "/" + rl_env + ".csv"
data = pd.read_csv(csv_path)
return data
def rank_dataframe(dataframe):
dataframe = dataframe.sort_values(by=['Results', 'User', 'Model'], ascending=False)
if not 'Ranking' in dataframe.columns:
dataframe.insert(0, 'Ranking', [i for i in range(1,len(dataframe)+1)])
else:
dataframe['Ranking'] = [i for i in range(1,len(dataframe)+1)]
return dataframe
def filter_data(rl_env, path, user_id):
data_df = get_data_no_html(rl_env, path)
models = []
models = data_df[data_df["User"] == user_id]
for index, row in models.iterrows():
user_id = row["User"]
models.loc[index, "User"] = make_clickable_user(user_id)
model_id = row["Model"]
models.loc[index, "Model"] = make_clickable_model(model_id)
return models
run_update_dataset()
with block:
gr.Markdown(f"""
# π The Deep Reinforcement Learning Course Leaderboard π
This is the leaderboard of trained agents during the Deep Reinforcement Learning Course. A free course from beginner to expert.
### We only display the best 100 models
If you want to **find yours, type your user id and click on Search my models.**
You **can click on the model's name** to be redirected to its model card, including documentation.
### How are the results calculated?
We use **lower bound result to sort the models: mean_reward - std_reward.**
### I can't find my model π
The leaderboard is **updated every two hours** if you can't find your models, just wait for the next update.
### The Deep RL Course
π€ You want to try to train your agents? Check the Hugging Face free Deep Reinforcement Learning Course π€ .
π§ There is an **environment missing?** Please open an issue.
""")
print(f"path:{path_}")
for i in range(0, len(rl_envs)):
rl_env = rl_envs[i]
with gr.TabItem(rl_env["rl_env_beautiful"]) as rl_tab:
with gr.Row():
markdown = """
# {name_leaderboard}
""".format(name_leaderboard = rl_env["rl_env_beautiful"], video_link = rl_env["video_link"])
gr.Markdown(markdown)
with gr.Row():
gr.Markdown("""
## Search your models
Simply type your user id to find your models
""")
with gr.Row():
user_id = gr.Textbox(label= "Your user id")
search_btn = gr.Button("Search my models π")
reset_btn = gr.Button("Clear my search")
env = gr.Variable(rl_env["rl_env"])
grpath = gr.Variable(path_)
with gr.Row():
gr_dataframe = gr.components.Dataframe(value=get_data(rl_env["rl_env"], path_), headers=["Ranking π", "User π€", "Model id π€", "Results", "Mean Reward", "Std Reward"], datatype=["number", "markdown", "markdown", "number", "number", "number"], row_count=(100, 'fixed'))
with gr.Row():
#gr_search_dataframe = gr.components.Dataframe(headers=["Ranking π", "User π€", "Model id π€", "Results", "Mean Reward", "Std Reward"], datatype=["number", "markdown", "markdown", "number", "number", "number"], visible=False)
search_btn.click(fn=filter_data, inputs=[env, grpath, user_id], outputs=gr_dataframe, api_name="filter_data")
with gr.Row():
#search_btn.click(fn=filter_data, inputs=[env, grpath, user_id], outputs=gr_dataframe, api_name="filter_data")
reset_btn.click(fn=get_data, inputs=[env, grpath], outputs=gr_dataframe, api_name="get_data")
"""
block.load(
download_leaderboard_dataset,
inputs=[],
outputs=[
grpath
],
)
"""
import threading
scheduler = BackgroundScheduler()
scheduler.add_job(run_update_dataset, 'interval', seconds=3600)
scheduler.start()
def start_block_launch():
block.launch()
launch_thread = threading.Thread(target=start_block_launch)
launch_thread.start()
import time
try:
while True:
time.sleep(60) # δΈ»ηΊΏη¨δΏζζ΄»θ·
except (KeyboardInterrupt, SystemExit):
scheduler.shutdown()