import json import re import random from collections import defaultdict from datetime import datetime, timezone import hashlib from dotenv import load_dotenv load_dotenv() import gradio as gr from gen_api_answer import get_model_response, parse_model_response, get_random_human_ai_pair from db import add_vote, create_db_connection, get_votes from utils import Vote from common import ( POLICY_CONTENT, ACKNOWLEDGEMENTS, DEFAULT_EVAL_PROMPT, DEFAULT_INPUT, DEFAULT_RESPONSE, CSS_STYLES, MAIN_TITLE, HOW_IT_WORKS, BATTLE_RULES, EVAL_DESCRIPTION, VOTING_HEADER, ) from example_metrics import EXAMPLE_METRICS # Model and ELO score data DEFAULT_ELO = 1200 # Starting ELO for new models K_FACTOR = 32 # Standard chess K-factor, adjust as needed elo_scores = defaultdict(lambda: DEFAULT_ELO) vote_counts = defaultdict(int) db = create_db_connection() votes_collection = get_votes(db) current_time = datetime.now() # Load the model_data from JSONL def load_model_data(): model_data = {} try: with open("data/models.jsonl", "r") as f: for line in f: model = json.loads(line) model_data[model["name"]] = { "organization": model["organization"], "license": model["license"], "api_model": model["api_model"], } except FileNotFoundError: print("Warning: models.jsonl not found") return {} return model_data model_data = load_model_data() def store_vote_data(prompt, response_a, response_b, model_a, model_b, winner, judge_id): vote = Vote( timestamp=datetime.now().isoformat(), prompt=prompt, response_a=response_a, response_b=response_b, model_a=model_a, model_b=model_b, winner=winner, judge_id=judge_id, ) add_vote(vote, db) def parse_variables(prompt): # Extract variables enclosed in double curly braces variables = re.findall(r"{{(.*?)}}", prompt) # Remove duplicates while preserving order seen = set() variables = [ x.strip() for x in variables if not (x.strip() in seen or seen.add(x.strip())) ] return variables def get_final_prompt(eval_prompt, variable_values): # Replace variables in the eval prompt with their values for var, val in variable_values.items(): eval_prompt = eval_prompt.replace("{{" + var + "}}", val) return eval_prompt def submit_prompt(eval_prompt, *variable_values): try: variables = parse_variables(eval_prompt) variable_values_dict = {var: val for var, val in zip(variables, variable_values)} final_prompt = get_final_prompt(eval_prompt, variable_values_dict) models = list(model_data.keys()) model1, model2 = random.sample(models, 2) model_a, model_b = (model1, model2) if random.random() < 0.5 else (model2, model1) response_a = get_model_response(model_a, model_data.get(model_a), final_prompt) response_b = get_model_response(model_b, model_data.get(model_b), final_prompt) return ( response_a, response_b, gr.update(visible=True), gr.update(visible=True), model_a, model_b, final_prompt, ) except Exception as e: print(f"Error in submit_prompt: {str(e)}") return ( "Error generating response", "Error generating response", gr.update(visible=False), gr.update(visible=False), None, None, None, ) def get_ip(request: gr.Request) -> str: """Get and hash the IP address from the request.""" if "cf-connecting-ip" in request.headers: ip = request.headers["cf-connecting-ip"] elif "x-forwarded-for" in request.headers: ip = request.headers["x-forwarded-for"] if "," in ip: ip = ip.split(",")[0] else: ip = request.client.host # Hash the IP address for privacy return hashlib.sha256(ip.encode()).hexdigest()[:16] def vote( choice, model_a, model_b, final_prompt, score_a, critique_a, score_b, critique_b, request: gr.Request, ): # Get hashed IP as judge_id judge_id = get_ip(request) # Update ELO scores based on user choice elo_a = elo_scores[model_a] elo_b = elo_scores[model_b] # Calculate expected scores Ea = 1 / (1 + 10 ** ((elo_b - elo_a) / 400)) Eb = 1 / (1 + 10 ** ((elo_a - elo_b) / 400)) # Assign actual scores if choice == "A": Sa, Sb = 1, 0 elif choice == "B": Sa, Sb = 0, 1 else: Sa, Sb = 0.5, 0.5 # Update scores and vote counts elo_scores[model_a] += K_FACTOR * (Sa - Ea) elo_scores[model_b] += K_FACTOR * (Sb - Eb) vote_counts[model_a] += 1 vote_counts[model_b] += 1 # Format the full responses with score and critique response_a = f"""{score_a} {critique_a}""" response_b = f"""{score_b} {critique_b}""" # Store the vote data with the final prompt store_vote_data( final_prompt, response_a, response_b, model_a, model_b, choice, judge_id ) # Return updates for UI components return [ gr.update(visible=False), # vote_a gr.update(visible=False), # vote_b gr.update(visible=False), # tie_button_row gr.update(value=f"*Model: {model_a}*"), # model_name_a gr.update(value=f"*Model: {model_b}*"), # model_name_b gr.update(interactive=True, value="Run the evaluators", variant="primary"), # send_btn gr.update(visible=True), # spacing_div ] def get_current_votes(): """Get current votes from database.""" return get_votes(db) def get_leaderboard(show_preliminary=True): """Generate leaderboard data using fresh votes from MongoDB.""" # Get fresh voting data voting_data = get_current_votes() print(f"Fetched {len(voting_data)} votes from database") # Debug log # Initialize dictionaries for tracking ratings = defaultdict(lambda: DEFAULT_ELO) matches = defaultdict(int) # Process each vote for vote in voting_data: try: model_a = vote.get("model_a") model_b = vote.get("model_b") winner = vote.get("winner") # Skip if models aren't in current model_data if ( not all([model_a, model_b, winner]) or model_a not in model_data or model_b not in model_data ): continue # Update match counts matches[model_a] += 1 matches[model_b] += 1 # Calculate ELO changes elo_a = ratings[model_a] elo_b = ratings[model_b] # Expected scores expected_a = 1 / (1 + 10 ** ((elo_b - elo_a) / 400)) expected_b = 1 - expected_a # Actual scores score_a = 1 if winner == "A" else 0 if winner == "B" else 0.5 score_b = 1 - score_a # Update ratings ratings[model_a] += K_FACTOR * (score_a - expected_a) ratings[model_b] += K_FACTOR * (score_b - expected_b) except Exception as e: print(f"Error processing vote: {e}") continue # Generate leaderboard data leaderboard = [] for model in model_data.keys(): votes = matches[model] # Skip models with < 500 votes if show_preliminary is False if not show_preliminary and votes < 500: continue elo = ratings[model] ci = 1.96 * (400 / (votes + 1) ** 0.5) if votes > 0 else 0 data = { "Model": model, "ELO Score": f"{int(elo)}", "95% CI": f"±{int(ci)}", "# Votes": votes, "Organization": model_data[model]["organization"], "License": model_data[model]["license"], } leaderboard.append(data) # Sort leaderboard by ELO score in descending order leaderboard.sort(key=lambda x: float(x["ELO Score"]), reverse=True) return leaderboard def calculate_elo_change(rating_a, rating_b, winner): """Calculate ELO rating changes for both players.""" expected_a = 1 / (1 + 10 ** ((rating_b - rating_a) / 400)) expected_b = 1 - expected_a if winner == "A": score_a, score_b = 1, 0 elif winner == "B": score_a, score_b = 0, 1 else: # Handle ties score_a, score_b = 0.5, 0.5 change_a = K_FACTOR * (score_a - expected_a) change_b = K_FACTOR * (score_b - expected_b) return change_a, change_b def update_leaderboard(): """Generate leaderboard DataFrame using fresh votes from MongoDB.""" # Get fresh voting data voting_data = get_current_votes() print(f"Found {len(voting_data)} votes in database") matches = defaultdict(int) # Process each vote chronologically for vote in voting_data: # Extract model names from the vote document try: model_a = vote.get("model_a") model_b = vote.get("model_b") winner = vote.get("winner") print(f"Processing vote: {model_a} vs {model_b}, winner: {winner}") # Skip if any required field is missing or models aren't in current model_data if not all([model_a, model_b, winner]): print(f"Missing required fields in vote: {vote}") continue if model_a not in model_data: print(f"Model A '{model_a}' not found in model_data") continue if model_b not in model_data: print(f"Model B '{model_b}' not found in model_data") continue # Update match counts matches[model_a] += 1 matches[model_b] += 1 print( f"Updated matches - {model_a}: {matches[model_a]}, {model_b}: {matches[model_b]}" ) except Exception as e: print(f"Error processing vote: {e}") print(f"Problematic vote data: {vote}") continue # Update the display_leaderboard function def display_leaderboard(): df = update_leaderboard() return gr.DataFrame( value=df, headers=["Model", "ELO", "95% CI", "Matches", "Organization", "License"], datatype=["str", "number", "str", "number", "str", "str", "str"], row_count=(len(df) + 1, "dynamic"), ) # Update the leaderboard table definition in the UI leaderboard_table = gr.Dataframe( headers=["Model", "ELO", "95% CI", "Matches", "Organization", "License"], datatype=["str", "number", "str", "number", "str", "str", "str"], ) def get_leaderboard_stats(): """Get summary statistics for the leaderboard.""" now = datetime.now(timezone.utc) total_votes = len(get_current_votes()) total_models = len(model_data) last_updated = now.replace(minute=0, second=0, microsecond=0).strftime( "%B %d, %Y at %H:00 UTC" ) return f""" ### Leaderboard Stats - **Total Models**: {total_models} - **Total Votes**: {total_votes} - **Last Updated**: {last_updated} """ #def set_example_metric(metric_name): # if metric_name == "Custom": # variables = parse_variables(DEFAULT_EVAL_PROMPT) # variable_values = [] # for var in variables: # if var == "input": # variable_values.append(DEFAULT_INPUT) # elif var == "response": # variable_values.append(DEFAULT_RESPONSE) # else: # variable_values.append("") # Default empty value # Pad variable_values to match the length of variable_rows # while len(variable_values) < len(variable_rows): # variable_values.append("") # return [DEFAULT_EVAL_PROMPT] + variable_values # metric_data = EXAMPLE_METRICS[metric_name] # variables = parse_variables(metric_data["prompt"]) # variable_values = [] # for var in variables: # value = metric_data.get(var, "") # Default to empty string if not found # variable_values.append(value) # Pad variable_values to match the length of variable_rows # while len(variable_values) < len(variable_rows): # variable_values.append("") # return [metric_data["prompt"]] + variable_values # Select random metric at startup # def get_random_metric(): # metrics = list(EXAMPLE_METRICS.keys()) # return set_example_metric(random.choice(metrics)) def populate_random_example(request: gr.Request): """Generate a random human-AI conversation example.""" human_msg, ai_msg = get_random_human_ai_pair() return [ gr.update(value=human_msg), gr.update(value=ai_msg) ] with gr.Blocks(theme="default", css=CSS_STYLES) as demo: gr.Markdown(MAIN_TITLE) gr.Markdown(HOW_IT_WORKS) # Hidden eval prompt that will always contain DEFAULT_EVAL_PROMPT eval_prompt = gr.Textbox( value=DEFAULT_EVAL_PROMPT, visible=False ) with gr.Tabs(): with gr.TabItem("Judge Arena"): random_btn = gr.Button("🎲", scale=0) with gr.Row(): # Left side - Input section with gr.Column(scale=1): with gr.Group(): human_input = gr.TextArea( label="👩 Human Input", lines=12, placeholder="Enter the human message here..." ) ai_response = gr.TextArea( label="🤖 AI Response", lines=12, placeholder="Enter the AI response here..." ) send_btn = gr.Button( value="Run the evaluators", variant="primary", size="lg" ) # Right side - Model outputs with gr.Column(scale=1): gr.Markdown("### 👩⚖️ Judge A") with gr.Group(): model_name_a = gr.Markdown("*Model: Hidden*") with gr.Row(): with gr.Column(scale=1, min_width=100): # Fixed narrow width for score score_a = gr.Textbox(label="Score", lines=5, interactive=False) vote_a = gr.Button("Vote A", variant="primary", visible=False) with gr.Column(scale=9, min_width=400): # Wider width for critique critique_a = gr.TextArea(label="Critique", lines=7, interactive=False) # Spacing div that's visible only when tie button is hidden spacing_div = gr.HTML('
', visible=True, elem_id="spacing-div") # Tie button row with gr.Row(visible=False) as tie_button_row: with gr.Column(): vote_tie = gr.Button("Tie", variant="secondary") gr.Markdown("### 🧑⚖️ Judge B") with gr.Group(): model_name_b = gr.Markdown("*Model: Hidden*") with gr.Row(): with gr.Column(scale=1, min_width=100): # Fixed narrow width for score score_b = gr.Textbox(label="Score", lines=5, interactive=False) vote_b = gr.Button("Vote B", variant="primary", visible=False) with gr.Column(scale=9, min_width=400): # Wider width for critique critique_b = gr.TextArea(label="Critique", lines=7, interactive=False) # Place Vote B button directly under Judge B gr.Markdown("