import gradio as gr
import bittensor as bt
import typing
from bittensor.extrinsics.serving import get_metadata
from dataclasses import dataclass
import requests
import wandb
import math
import os
import datetime
import time
import functools
import multiprocessing
from dotenv import load_dotenv
from huggingface_hub import HfApi
from apscheduler.schedulers.background import BackgroundScheduler
from tqdm import tqdm
load_dotenv()
FONT = """"""
TITLE = """
Subnet 17 Leaderboard
"""
IMAGE = """"""
HEADER = """
Subnet 17 is a Bittensor subnet that incentivizes the creation of the best image open models by evaluating submissions on a constant stream of newly generated synthetic MidJourney v6 data. The models with the best head-to-head loss on the evaluation data receive a steady emission of TAO."""
EVALUATION_DETAILS = """Name is the 🤗 Hugging Face model name (click to go to the model card). Rewards / Day are the expected rewards per day for each model. Perplexity is represents the loss on all of the evaluation data for the model as calculated by the validator (lower is better). UID is the Bittensor user id of the submitter. Block is the Bittensor block that the model was submitted in. More stats on taostats."""
EVALUATION_HEADER = """
Shows the latest internal evaluation statistics as calculated by a validator run by Nous Research
"""
VALIDATOR_WANDB_PROJECT = os.environ["VALIDATOR_WANDB_PROJECT"]
H4_TOKEN = os.environ.get("H4_TOKEN", None)
API = HfApi(token=H4_TOKEN)
REPO_ID = "PlixAI/pixel-subnet-leaderboard"
METAGRAPH_RETRIES = 10
METAGRAPH_DELAY_SECS = 30
METADATA_TTL = 10
NETUID = 17
SUBNET_START_BLOCK = 2225782
SECONDS_PER_BLOCK = 12
SUBTENSOR = os.environ.get("SUBTENSOR")
@dataclass
class Competition:
id: str
name: str
COMPETITIONS = [Competition(id="m6", name="midjourney-6-sdxl")]
DEFAULT_COMPETITION_ID = "m6"
def run_in_subprocess(func: functools.partial, ttl: int) -> typing.Any:
"""Runs the provided function on a subprocess with 'ttl' seconds to complete.
Args:
func (functools.partial): Function to be run.
ttl (int): How long to try for in seconds.
Returns:
Any: The value returned by 'func'
"""
def wrapped_func(func: functools.partial, queue: multiprocessing.Queue):
try:
result = func()
queue.put(result)
except (Exception, BaseException) as e:
# Catch exceptions here to add them to the queue.
queue.put(e)
# Use "fork" (the default on all POSIX except macOS), because pickling doesn't seem
# to work on "spawn".
ctx = multiprocessing.get_context("fork")
queue = ctx.Queue()
process = ctx.Process(target=wrapped_func, args=[func, queue])
process.start()
process.join(timeout=ttl)
if process.is_alive():
process.terminate()
process.join()
raise TimeoutError(f"Failed to {func.func.__name__} after {ttl} seconds")
# Raises an error if the queue is empty. This is fine. It means our subprocess timed out.
result = queue.get(block=False)
# If we put an exception on the queue then raise instead of returning.
if isinstance(result, Exception):
raise result
if isinstance(result, BaseException):
raise Exception(f"BaseException raised in subprocess: {str(result)}")
return result
def get_subtensor_and_metagraph() -> typing.Tuple[bt.subtensor, bt.metagraph]:
for i in range(0, METAGRAPH_RETRIES):
try:
print("Connecting to subtensor...")
subtensor: bt.subtensor = bt.subtensor(SUBTENSOR)
print("Pulling metagraph...")
metagraph: bt.metagraph = subtensor.metagraph(NETUID, lite=False)
return subtensor, metagraph
except Exception as e:
print(e)
if i == METAGRAPH_RETRIES - 1:
raise
print(f"Error connecting to subtensor or pulling metagraph, retry {i + 1} of {METAGRAPH_RETRIES} in {METAGRAPH_DELAY_SECS} seconds...")
time.sleep(METAGRAPH_DELAY_SECS)
raise RuntimeError()
@dataclass
class ModelData:
uid: int
hotkey: str
namespace: str
name: str
commit: str
hash: str
block: int
incentive: float
emission: float
competition: str
@classmethod
def from_compressed_str(cls, uid: int, hotkey: str, cs: str, block: int, incentive: float, emission: float):
"""Returns an instance of this class from a compressed string representation"""
tokens = cs.split(":")
return ModelData(
uid=uid,
hotkey=hotkey,
namespace=tokens[0],
name=tokens[1],
commit=tokens[2] if tokens[2] != "None" else "",
hash=tokens[3] if tokens[3] != "None" else "",
competition=tokens[4] if len(tokens) > 4 and tokens[4] != "None" else DEFAULT_COMPETITION_ID,
block=block,
incentive=incentive,
emission=emission
)
def get_tao_price() -> float:
for i in range(0, METAGRAPH_RETRIES):
try:
return float(requests.get("https://api.mexc.com/api/v3/avgPrice?symbol=TAOUSDT").json()["price"])
except Exception as e:
print(e)
if i == METAGRAPH_RETRIES - 1:
raise
time.sleep(METAGRAPH_DELAY_SECS)
raise RuntimeError()
def get_validator_weights(metagraph: bt.metagraph) -> typing.Dict[int, typing.Tuple[float, int, typing.Dict[int, float]]]:
ret = {}
for uid in metagraph.uids.tolist():
vtrust = metagraph.validator_trust[uid].item()
if vtrust > 0:
ret[uid] = (vtrust, metagraph.S[uid].item(), {})
for ouid in metagraph.uids.tolist():
if ouid == uid:
continue
weight = round(metagraph.weights[uid][ouid].item(), 4)
if weight > 0:
ret[uid][-1][ouid] = weight
return ret
def get_subnet_data(subtensor: bt.subtensor, metagraph: bt.metagraph) -> typing.List[ModelData]:
result = []
for uid in tqdm(metagraph.uids.tolist(), desc="Metadata for hotkeys"):
hotkey = metagraph.hotkeys[uid]
try:
# Wrap calls to the subtensor in a subprocess with a timeout to handle potential hangs.
partial = functools.partial(get_metadata, subtensor, metagraph.netuid, hotkey)
metadata = run_in_subprocess(partial, METADATA_TTL)
except KeyboardInterrupt:
raise
except:
metadata = None
if not metadata:
continue
commitment = metadata["info"]["fields"][0]
hex_data = commitment[list(commitment.keys())[0]][2:]
chain_str = bytes.fromhex(hex_data).decode()
block = metadata["block"]
incentive = metagraph.incentive[uid].nan_to_num().item()
emission = metagraph.emission[uid].nan_to_num().item() * 20 # convert to daily TAO
model_data = None
try:
model_data = ModelData.from_compressed_str(uid, hotkey, chain_str, block, incentive, emission)
except:
continue
result.append(model_data)
return result
def floatable(x) -> bool:
return (isinstance(x, float) and not math.isnan(x) and not math.isinf(x)) or isinstance(x, int)
def get_float_score(key: str, history, competition_id: str) -> typing.Tuple[typing.Optional[float], bool]:
if key in history and "competition_id" in history:
data = list(history[key])
if len(data) > 0:
competitions = list(history["competition_id"])
while True:
if competitions.pop() != competition_id:
data.pop()
continue
if floatable(data[-1]):
return float(data[-1]), True
else:
data = [float(x) for x in data if floatable(x)]
if len(data) > 0:
return float(data[-1]), False
break
return None, False
def get_sample(uid, history, competition_id: str) -> typing.Optional[typing.Tuple[str, str, str]]:
prompt_key = f"sample_prompt_data.{uid}"
response_key = f"sample_response_data.{uid}"
truth_key = f"sample_truth_data.{uid}"
if prompt_key in history and response_key in history and truth_key in history and "competition_id" in history:
competitions = list(history["competition_id"])
prompts = list(history[prompt_key])
responses = list(history[response_key])
truths = list(history[truth_key])
while True:
prompt = prompts.pop()
response = responses.pop()
truth = truths.pop()
if competitions.pop() != competition_id:
continue
if isinstance(prompt, str) and isinstance(response, str) and isinstance(truth, str):
return prompt, response, truth
break
return None
def get_scores(uids: typing.List[int], competition_id: str) -> typing.Dict[int, typing.Dict[str, typing.Optional[float | str]]]:
api = wandb.Api()
runs = list(api.runs(VALIDATOR_WANDB_PROJECT))
result = {}
for run in runs:
history = run.history()
for uid in uids:
if uid in result.keys():
continue
perplexity, perplexity_fresh = get_float_score(f"perplexity_data.{uid}", history, competition_id)
win_rate, win_rate_fresh = get_float_score(f"win_rate_data.{uid}", history, competition_id)
win_total, win_total_fresh = get_float_score(f"win_total_data.{uid}", history, competition_id)
weight, weight_fresh = get_float_score(f"weight_data.{uid}", history, competition_id)
sample = get_sample(uid, history, competition_id)
result[uid] = {
"perplexity": perplexity,
"win_rate": win_rate,
"win_total": win_total,
"weight": weight,
"sample": sample,
"fresh": perplexity_fresh and win_rate_fresh and win_total_fresh
}
if len(result.keys()) == len(uids):
break
return result
def format_score(uid, scores, key) -> typing.Optional[float]:
if uid in scores:
if key in scores[uid]:
point = scores[uid][key]
if floatable(point):
return round(scores[uid][key], 4)
return None
def next_tempo(start_block, tempo, block):
start_num = start_block + tempo
intervals = (block - start_num) // tempo
nearest_num = start_num + ((intervals + 1) * tempo)
return nearest_num
subtensor, metagraph = get_subtensor_and_metagraph()
tao_price = get_tao_price()
leaderboard_df = get_subnet_data(subtensor, metagraph)
leaderboard_df.sort(key=lambda x: x.incentive, reverse=True)
competition_scores = {
y.id: get_scores([x.uid for x in leaderboard_df if x.competition == y.id], y.id)
for y in COMPETITIONS
}
current_block = metagraph.block.item()
next_update = next_tempo(
SUBNET_START_BLOCK,
subtensor.get_subnet_hyperparameters(NETUID).tempo,
current_block
)
blocks_to_go = next_update - current_block
current_time = datetime.datetime.now()
next_update_time = current_time + datetime.timedelta(seconds=blocks_to_go * SECONDS_PER_BLOCK)
validator_df = get_validator_weights(metagraph)
weight_keys = set()
for uid, stats in validator_df.items():
weight_keys.update(stats[-1].keys())
def get_next_update():
now = datetime.datetime.now()
delta = next_update_time - now
return f"""
Next reward update: {blocks_to_go} blocks (~{int(delta.total_seconds() // 60)} minutes)
"""
def leaderboard_data(show_stale: bool, scores: typing.Dict[int, typing.Dict[str, typing.Optional[float | str]]], competition_id: str):
value = [
[
f'[{c.namespace}/{c.name} ({c.commit[0:8]}, UID={c.uid})](https://huggingface.co/{c.namespace}/{c.name}/commit/{c.commit})',
format_score(c.uid, scores, "win_rate"),
format_score(c.uid, scores, "perplexity"),
format_score(c.uid, scores, "weight"),
c.uid,
c.block
] for c in leaderboard_df if c.competition == competition_id and (scores[c.uid]["fresh"] or show_stale)
]
return value
demo = gr.Blocks(css=".typewriter {font-family: 'JMH Typewriter', sans-serif;}")
with demo:
gr.HTML(FONT)
gr.HTML(TITLE)
gr.HTML(IMAGE)
gr.HTML(HEADER)
gr.HTML(value=get_next_update())
with gr.Tabs():
for competition in COMPETITIONS:
with gr.Tab(competition.name):
scores = competition_scores[competition.id]
print(scores)
class_denominator = sum(leaderboard_df[i].incentive for i in range(len(leaderboard_df)) if i < 10 and leaderboard_df[i].incentive and leaderboard_df[i].competition == competition.id)
class_values = {
f"{leaderboard_df[i].namespace}/{leaderboard_df[i].name} ({leaderboard_df[i].commit[0:8]}, UID={leaderboard_df[i].uid}) · ${round(leaderboard_df[i].emission * tao_price, 2):,} (τ{round(leaderboard_df[i].emission, 2):,})": \
leaderboard_df[i].incentive / class_denominator for i in range(len(leaderboard_df)) if i < 10 and leaderboard_df[i].incentive and leaderboard_df[i].competition == competition.id
}
gr.Label(
value=class_values,
num_top_classes=10,
)
with gr.Accordion("Evaluation Stats"):
gr.HTML(EVALUATION_HEADER)
with gr.Tabs():
for entry in leaderboard_df:
if entry.competition == competition.id:
sample = scores[entry.uid]["sample"]
if sample is not None:
name = f"{entry.namespace}/{entry.name} ({entry.commit[0:8]}, UID={entry.uid})"
with gr.Tab(name):
gr.Chatbot([(sample[0], sample[1])])
# gr.Chatbot([(sample[0], f"*{name}*: {sample[1]}"), (None, f"*GPT-4*: {sample[2]}")])
show_stale = gr.Checkbox(label="Show Stale", interactive=True)
leaderboard_table = gr.components.Dataframe(
value=leaderboard_data(show_stale.value, scores, competition.id),
headers=["Name", "Win Rate", "Perplexity", "Weight", "UID", "Block"],
datatype=["markdown", "number", "number", "number", "number", "number"],
elem_id="leaderboard-table",
interactive=False,
visible=True,
)
gr.HTML(EVALUATION_DETAILS)
show_stale.change(lambda x: leaderboard_data(x, scores, competition.id), [show_stale], leaderboard_table)
with gr.Accordion("Validator Stats"):
validator_table = gr.components.Dataframe(
value=[
[uid, int(validator_df[uid][1]), round(validator_df[uid][0], 4)] + [validator_df[uid][-1].get(c.uid) for c in leaderboard_df if c.incentive]
for uid, _ in sorted(
zip(validator_df.keys(), [validator_df[x][1] for x in validator_df.keys()]),
key=lambda x: x[1],
reverse=True
)
],
headers=["UID", "Stake (Ï„)", "V-Trust"] + [f"{c.namespace}/{c.name} ({c.commit[0:8]}, UID={c.uid})" for c in leaderboard_df if c.incentive],
datatype=["number", "number", "number"] + ["number" for c in leaderboard_df if c.incentive],
interactive=False,
visible=True,
)
def restart_space():
API.restart_space(repo_id=REPO_ID, token=H4_TOKEN)
scheduler = BackgroundScheduler()
scheduler.add_job(restart_space, "interval", seconds=60 * 15) # restart every 15 minutes
scheduler.start()
demo.launch()