import gradio as gr from functools import lru_cache import random import requests import logging import re import config import plotly.graph_objects as go from typing import Dict import json import os from leaderboard import ( get_current_leaderboard, update_leaderboard, start_backup_thread, get_leaderboard, get_elo_leaderboard, ensure_elo_ratings_initialized ) import sys import openai import threading import time from collections import Counter from release_notes import get_release_notes_html # Update the logging format to redact URLs logging.basicConfig( level=logging.WARNING, # Only show warnings and errors format='%(asctime)s - %(levelname)s - %(message)s' ) # Suppress verbose HTTP request logging logging.getLogger("urllib3").setLevel(logging.CRITICAL) logging.getLogger("httpx").setLevel(logging.CRITICAL) logging.getLogger("openai").setLevel(logging.CRITICAL) class RedactURLsFilter(logging.Filter): def filter(self, record): # Redact all URLs using regex pattern url_pattern = r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+' record.msg = re.sub(url_pattern, '[REDACTED_URL]', str(record.msg)) # Remove HTTP status codes record.msg = re.sub(r'HTTP/\d\.\d \d+ \w+', '', record.msg) # Remove sensitive API references record.msg = record.msg.replace(config.API_URL, '[API]') record.msg = record.msg.replace(config.NEXTCLOUD_URL, '[CLOUD]') # Clean up residual artifacts record.msg = re.sub(r'\s+', ' ', record.msg).strip() record.msg = re.sub(r'("?) \1', '', record.msg) # Remove empty quotes return True # Apply the filter to all handlers logger = logging.getLogger(__name__) for handler in logging.root.handlers: handler.addFilter(RedactURLsFilter()) # Start the backup thread start_backup_thread() # Function to get available models (using predefined list) def get_available_models(): return [model[0] for model in config.get_approved_models()] # Function to get recent opponents for a model recent_opponents = {} def update_recent_opponents(model_a, model_b): recent_opponents.setdefault(model_a, []).append(model_b) recent_opponents.setdefault(model_b, []).append(model_a) # Limit history to last 5 opponents recent_opponents[model_a] = recent_opponents[model_a][-5:] recent_opponents[model_b] = recent_opponents[model_b][-5:] # Function to call Ollama API with caching @lru_cache(maxsize=100) def call_ollama_api(model, prompt): client = openai.OpenAI( api_key=config.API_KEY, base_url=config.API_URL ) try: logger.info(f"Starting API call for model: {model}") response = client.chat.completions.create( model=model, messages=[ { "role": "system", "content": "You are a helpful assistant. At no point should you reveal your name, identity or team affiliation to the user, especially if asked directly!" }, { "role": "user", "content": prompt } ], timeout=180 ) logger.info(f"Received response for model: {model}") if not response or not response.choices: logger.error(f"Empty response received for model: {model}") return [ {"role": "user", "content": prompt}, {"role": "assistant", "content": "Error: Empty response from the model"} ] content = response.choices[0].message.content if not content: logger.error(f"Empty content received for model: {model}") return [ {"role": "user", "content": prompt}, {"role": "assistant", "content": "Error: Empty content from the model"} ] # Extract thinking part and main content using regex thinking_match = re.search(r'(.*?)', content, flags=re.DOTALL) if thinking_match: thinking_content = thinking_match.group(1).strip() main_content = re.sub(r'.*?', '', content, flags=re.DOTALL).strip() logger.info(f"Found thinking content for model: {model}") return [ {"role": "user", "content": prompt}, {"role": "assistant", "content": f"{main_content}\n\n
🤔 View thinking process\n\n{thinking_content}\n\n
"} ] # If no thinking tags, return normal content logger.info(f"No thinking tags found for model: {model}") return [ {"role": "user", "content": prompt}, {"role": "assistant", "content": content.strip()} ] except requests.exceptions.Timeout: logger.error(f"Timeout error after 180 seconds for model: {model}") return [ {"role": "user", "content": prompt}, {"role": "assistant", "content": "Error: Model response timed out after 180 seconds"} ] except openai.BadRequestError as e: error_msg = str(e) logger.error(f"Bad request error for model: {model}. Error: {error_msg}") return [ {"role": "user", "content": prompt}, {"role": "assistant", "content": "Error: Unable to get response from the model"} ] except Exception as e: logger.error(f"Error calling Ollama API for model: {model}. Error: {str(e)}", exc_info=True) return [ {"role": "user", "content": prompt}, {"role": "assistant", "content": "Error: Unable to get response from the model"} ] # Generate responses using two randomly selected models def get_battle_counts(): leaderboard = get_current_leaderboard() battle_counts = Counter() for model, data in leaderboard.items(): battle_counts[model] = data['wins'] + data['losses'] return battle_counts def generate_responses(prompt): available_models = get_available_models() if len(available_models) < 2: return [ {"role": "user", "content": prompt}, {"role": "assistant", "content": "Error: Not enough models available"} ], [ {"role": "user", "content": prompt}, {"role": "assistant", "content": "Error: Not enough models available"} ], None, None battle_counts = get_battle_counts() # Sort models by battle count (ascending) sorted_models = sorted(available_models, key=lambda m: battle_counts.get(m, 0)) # Select the first model (least battles) model_a = sorted_models[0] # Filter out recent opponents for model_a potential_opponents = [m for m in sorted_models[1:] if m not in recent_opponents.get(model_a, [])] # If no potential opponents left, reset recent opponents for model_a if not potential_opponents: recent_opponents[model_a] = [] potential_opponents = sorted_models[1:] # For the second model, use weighted random selection weights = [1 / (battle_counts.get(m, 1) + 1) for m in potential_opponents] model_b = random.choices(potential_opponents, weights=weights, k=1)[0] # Update recent opponents update_recent_opponents(model_a, model_b) # Get responses from both models response_a = call_ollama_api(model_a, prompt) response_b = call_ollama_api(model_b, prompt) # Return responses directly (already formatted correctly) return response_a, response_b, model_a, model_b def battle_arena(prompt): response_a, response_b, model_a, model_b = generate_responses(prompt) # Check for API errors in responses if any("Error: Unable to get response from the model" in msg["content"] for msg in response_a + response_b if msg["role"] == "assistant"): return ( [], [], None, None, gr.update(value=[]), gr.update(value=[]), gr.update(interactive=False, value="Voting Disabled - API Error"), gr.update(interactive=False, value="Voting Disabled - API Error"), gr.update(interactive=False, visible=False), prompt, 0, gr.update(visible=False), gr.update(value="Error: Unable to get response from the model", visible=True) ) nickname_a = random.choice(config.model_nicknames) nickname_b = random.choice(config.model_nicknames) # The responses are already in the correct format, no need to reformat if random.choice([True, False]): return ( response_a, response_b, model_a, model_b, gr.update(label=nickname_a, value=response_a), gr.update(label=nickname_b, value=response_b), gr.update(interactive=True, value=f"Vote for {nickname_a}"), gr.update(interactive=True, value=f"Vote for {nickname_b}"), gr.update(interactive=True, visible=True), prompt, 0, gr.update(visible=False), gr.update(value="Ready for your vote! 🗳️", visible=True) ) else: return ( response_b, response_a, model_b, model_a, gr.update(label=nickname_a, value=response_b), gr.update(label=nickname_b, value=response_a), gr.update(interactive=True, value=f"Vote for {nickname_a}"), gr.update(interactive=True, value=f"Vote for {nickname_b}"), gr.update(interactive=True, visible=True), prompt, 0, gr.update(visible=False), gr.update(value="Ready for your vote! 🗳️", visible=True) ) def record_vote(prompt, left_response, right_response, left_model, right_model, choice): # Check if outputs are generated if not left_response or not right_response or not left_model or not right_model: return ( "Please generate responses before voting.", gr.update(), gr.update(interactive=False), gr.update(interactive=False), gr.update(visible=False), gr.update() ) winner = left_model if choice == "Left is better" else right_model loser = right_model if choice == "Left is better" else left_model # Update the leaderboard battle_results = update_leaderboard(winner, loser) result_message = f""" 🎉 Vote recorded! You're awesome! 🌟 🔵 In the left corner: {get_human_readable_name(left_model)} 🔴 In the right corner: {get_human_readable_name(right_model)} 🏆 And the champion you picked is... {get_human_readable_name(winner)}! 🥇 """ return ( gr.update(value=result_message, visible=True), # Show result as Markdown get_leaderboard(), # Update leaderboard get_elo_leaderboard(), # Update ELO leaderboard gr.update(interactive=False), # Disable left vote button gr.update(interactive=False), # Disable right vote button gr.update(interactive=False), # Disable tie button gr.update(visible=True) # Show model names ) def get_leaderboard_chart(): battle_results = get_current_leaderboard() # Calculate scores and sort results for model, results in battle_results.items(): total_battles = results["wins"] + results["losses"] if total_battles > 0: win_rate = results["wins"] / total_battles results["score"] = win_rate * (1 - 1 / (total_battles + 1)) else: results["score"] = 0 sorted_results = sorted( battle_results.items(), key=lambda x: (x[1]["score"], x[1]["wins"] + x[1]["losses"]), reverse=True ) models = [get_human_readable_name(model) for model, _ in sorted_results] wins = [results["wins"] for _, results in sorted_results] losses = [results["losses"] for _, results in sorted_results] scores = [results["score"] for _, results in sorted_results] fig = go.Figure() # Stacked Bar chart for Wins and Losses fig.add_trace(go.Bar( x=models, y=wins, name='Wins', marker_color='#22577a' )) fig.add_trace(go.Bar( x=models, y=losses, name='Losses', marker_color='#38a3a5' )) # Line chart for Scores fig.add_trace(go.Scatter( x=models, y=scores, name='Score', yaxis='y2', line=dict(color='#ff7f0e', width=2) )) # Update layout for full-width, increased height, and secondary y-axis fig.update_layout( title='Model Performance', xaxis_title='Models', yaxis_title='Number of Battles', yaxis2=dict( title='Score', overlaying='y', side='right' ), barmode='stack', height=800, width=1450, autosize=True, legend=dict( orientation='h', yanchor='bottom', y=1.02, xanchor='right', x=1 ) ) chart_data = fig.to_json() return fig def new_battle(): nickname_a = random.choice(config.model_nicknames) nickname_b = random.choice(config.model_nicknames) return ( "", # Reset prompt_input gr.update(value=[], label=nickname_a), # Reset left Chatbot gr.update(value=[], label=nickname_b), # Reset right Chatbot None, None, gr.update(interactive=False, value=f"Vote for {nickname_a}"), gr.update(interactive=False, value=f"Vote for {nickname_b}"), gr.update(interactive=False, visible=False), # Reset Tie button gr.update(value="", visible=False), gr.update(), gr.update(visible=False), gr.update(), 0 # Reset tie_count ) # Add this new function def get_human_readable_name(model_name: str) -> str: model_dict = dict(config.get_approved_models()) return model_dict.get(model_name, model_name) # Add this new function to randomly select a prompt def random_prompt(): return random.choice(config.example_prompts) # Modify the continue_conversation function def continue_conversation(prompt, left_chat, right_chat, left_model, right_model, previous_prompt, tie_count): # Check if the prompt is empty or the same as the previous one if not prompt or prompt == previous_prompt: prompt = random.choice(config.example_prompts) # Get responses (which are lists of messages) left_response = call_ollama_api(left_model, prompt) right_response = call_ollama_api(right_model, prompt) # Append messages from the response lists left_chat.extend(left_response) right_chat.extend(right_response) tie_count += 1 tie_button_state = gr.update(interactive=True) if tie_count < 3 else gr.update(interactive=False, value="Max ties reached. Please vote!") return ( gr.update(value=left_chat), gr.update(value=right_chat), gr.update(value=""), # Clear the prompt input tie_button_state, prompt, # Return the new prompt tie_count ) def normalize_parameter_size(param_size: str) -> str: """Convert parameter size to billions (B) format.""" try: # Remove any spaces and convert to uppercase for consistency param_size = param_size.replace(" ", "").upper() # Extract the number and unit if 'M' in param_size: # Convert millions to billions number = float(param_size.replace('M', '').replace(',', '')) return f"{number/1000:.2f}B" elif 'B' in param_size: # Already in billions, just format consistently number = float(param_size.replace('B', '').replace(',', '')) return f"{number:.2f}B" else: # If no unit or unrecognized format, try to convert the raw number number = float(param_size.replace(',', '')) if number >= 1000000000: return f"{number/1000000000:.2f}B" elif number >= 1000000: return f"{number/1000000000:.2f}B" else: return f"{number/1000000000:.2f}B" except: return param_size # Return original if conversion fails def load_latest_model_stats(): """Load model stats from the model_stats.json file.""" try: # Read directly from model_stats.json in root directory with open('model_stats.json', 'r') as f: stats = json.load(f) # Convert stats to table format table_data = [] headers = ["Model", "VRAM (GB)", "Size", "Parameters", "Quantization", "Tokens/sec", "Gen Tokens/sec", "Total Tokens", "Response Time (s)"] for model in stats: if not model.get("success", False): # Skip failed tests continue perf = model.get("performance", {}) info = model.get("model_info", {}) try: # Format numeric values with 2 decimal places model_size = float(info.get("size", 0)) # Get raw size vram_gb = round(model_size/1024/1024/1024, 2) # Convert to GB tokens_per_sec = round(float(perf.get("tokens_per_second", 0)), 2) gen_tokens_per_sec = round(float(perf.get("generation_tokens_per_second", 0)), 2) total_tokens = perf.get("total_tokens", 0) response_time = round(float(perf.get("response_time", 0)), 2) # Normalize parameter size to billions format param_size = normalize_parameter_size(info.get("parameter_size", "Unknown")) row = [ model.get("model_name", "Unknown"), # String vram_gb, # Number (2 decimals) model_size, # Number (bytes) param_size, # String (normalized to B) info.get("quantization_level", "Unknown"), # String tokens_per_sec, # Number (2 decimals) gen_tokens_per_sec, # Number (2 decimals) total_tokens, # Number (integer) response_time # Number (2 decimals) ] table_data.append(row) except Exception as row_error: logger.warning(f"Skipping model {model.get('model_name', 'Unknown')}: {str(row_error)}") continue if not table_data: return None, "No valid model stats found" # Sort by tokens per second (numerically) table_data.sort(key=lambda x: float(x[5]) if isinstance(x[5], (int, float)) else 0, reverse=True) return headers, table_data except Exception as e: logger.error(f"Error in load_latest_model_stats: {str(e)}") return None, f"Error loading model stats: {str(e)}" # Initialize Gradio Blocks with gr.Blocks(css=""" #dice-button { min-height: 90px; font-size: 35px; } .sponsor-button { background-color: #30363D; color: white; border: none; padding: 10px 20px; border-radius: 6px; cursor: pointer; display: inline-flex; align-items: center; gap: 8px; font-weight: bold; } .sponsor-button:hover { background-color: #2D333B; } """) as demo: gr.Markdown(config.ARENA_NAME) # Main description with sponsor button with gr.Row(): with gr.Column(scale=8): gr.Markdown(""" **Step right up to the arena where frugal meets fabulous in the world of AI!** Watch as our compact contenders (maxing out at 14B parameters) duke it out in a battle of wits and words. What started as a simple experiment has grown into a popular platform for evaluating compact language models. As the arena continues to expand with more models, features, and battles, it requires computational resources to maintain and improve. If you find this project valuable and would like to support its development, consider sponsoring: """) with gr.Column(scale=2): gr.Button( "Sponsor on GitHub", link="https://github.com/sponsors/k-mktr", elem_classes="sponsor-button" ) # Instructions in an accordion with gr.Accordion("📖 How to Use", open=False): gr.Markdown(""" 1. To start the battle, go to the 'Battle Arena' tab. 2. Type your prompt into the text box. Alternatively, click the "🎲" button to receive a random prompt. 3. Click the "Generate Responses" button to view the models' responses. 4. Cast your vote for the model that provided the better response. In the event of a Tie, enter a new prompt before continuing the battle. 5. Check out the Leaderboard to see how models rank against each other. More info: [README.md](https://huggingface.co/spaces/k-mktr/gpu-poor-llm-arena/blob/main/README.md) """) # Leaderboard Tab (now first) with gr.Tab("Leaderboard"): gr.Markdown(""" ### Main Leaderboard This leaderboard uses a scoring system that balances win rate and total battles. The score is calculated using the formula: **Score = Win Rate * (1 - 1 / (Total Battles + 1))** This formula rewards models with higher win rates and more battles. As the number of battles increases, the score approaches the win rate. """) leaderboard = gr.Dataframe( headers=["#", "Model", "Score", "Wins", "Losses", "Total Battles", "Win Rate"], row_count=10, col_count=7, interactive=True, label="Leaderboard" ) # Battle Arena Tab (now second) with gr.Tab("Battle Arena"): with gr.Row(): prompt_input = gr.Textbox( label="Enter your prompt", placeholder="Type your prompt here...", scale=20 ) random_prompt_btn = gr.Button("🎲", scale=1, elem_id="dice-button") gr.Markdown("
") # Add the random prompt button functionality random_prompt_btn.click( random_prompt, outputs=prompt_input ) submit_btn = gr.Button("Generate Responses", variant="primary") with gr.Row(): left_output = gr.Chatbot(label=random.choice(config.model_nicknames), type="messages") right_output = gr.Chatbot(label=random.choice(config.model_nicknames), type="messages") with gr.Row(): left_vote_btn = gr.Button(f"Vote for {left_output.label}", interactive=False) tie_btn = gr.Button("Tie 🙈 Continue with a new prompt", interactive=False, visible=False) right_vote_btn = gr.Button(f"Vote for {right_output.label}", interactive=False) result = gr.Textbox( label="Status", interactive=False, value="Generate responses to start the battle! 🚀", visible=True # Always visible ) with gr.Row(visible=False) as model_names_row: left_model = gr.Textbox(label="🔵 Left Model", interactive=False) right_model = gr.Textbox(label="🔴 Right Model", interactive=False) previous_prompt = gr.State("") # Add this line to store the previous prompt tie_count = gr.State(0) # Add this line to keep track of tie count new_battle_btn = gr.Button("New Battle") # ELO Leaderboard Tab with gr.Tab("ELO Leaderboard"): gr.Markdown(""" ### ELO Rating System This leaderboard uses a modified ELO rating system that takes into account both the performance and size of the models. Initial ratings are based on model size, with larger models starting at higher ratings. The ELO rating is calculated based on wins and losses, with adjustments made based on the relative strengths of opponents. """) elo_leaderboard = gr.Dataframe( headers=["#", "Model", "ELO Rating", "Wins", "Losses", "Total Battles", "Win Rate"], row_count=10, col_count=7, interactive=True, label="ELO Leaderboard" ) # Latest Updates Tab with gr.Tab("Latest Updates"): release_notes = gr.HTML(get_release_notes_html()) refresh_notes_btn = gr.Button("Refresh Updates") refresh_notes_btn.click( get_release_notes_html, outputs=[release_notes] ) # Model Stats Tab with gr.Tab("Model Stats"): gr.Markdown(""" ### Model Performance Statistics This tab shows detailed performance metrics for each model, tested using a creative writing prompt. The tests were performed on an **AMD Radeon RX 7600 XT 16GB GPU**. For detailed information about the testing methodology, parameters, and hardware setup, please refer to the [README_model_stats.md](https://huggingface.co/spaces/k-mktr/gpu-poor-llm-arena/blob/main/README_model_stats.md). """) headers, table_data = load_latest_model_stats() if headers: model_stats_table = gr.Dataframe( headers=headers, value=table_data, row_count=len(table_data), col_count=len(headers), interactive=True, label="Model Performance Statistics" ) else: gr.Markdown(f"⚠️ {table_data}") # Show error message if loading failed # Define interactions submit_btn.click( battle_arena, inputs=prompt_input, outputs=[ left_output, right_output, left_model, right_model, left_output, right_output, left_vote_btn, right_vote_btn, tie_btn, previous_prompt, tie_count, model_names_row, result ] ) left_vote_btn.click( lambda *args: record_vote(*args, "Left is better"), inputs=[prompt_input, left_output, right_output, left_model, right_model], outputs=[result, leaderboard, elo_leaderboard, left_vote_btn, right_vote_btn, tie_btn, model_names_row] ) right_vote_btn.click( lambda *args: record_vote(*args, "Right is better"), inputs=[prompt_input, left_output, right_output, left_model, right_model], outputs=[result, leaderboard, elo_leaderboard, left_vote_btn, right_vote_btn, tie_btn, model_names_row] ) tie_btn.click( continue_conversation, inputs=[prompt_input, left_output, right_output, left_model, right_model, previous_prompt, tie_count], outputs=[left_output, right_output, prompt_input, tie_btn, previous_prompt, tie_count] ) new_battle_btn.click( new_battle, outputs=[prompt_input, left_output, right_output, left_model, right_model, left_vote_btn, right_vote_btn, tie_btn, result, leaderboard, model_names_row, tie_count] ) # Update leaderboard on launch demo.load(get_leaderboard, outputs=leaderboard) demo.load(get_elo_leaderboard, outputs=elo_leaderboard) if __name__ == "__main__": # Initialize ELO ratings before launching the app ensure_elo_ratings_initialized() # Start the model refresh thread config.start_model_refresh_thread() demo.launch(show_api=False)