File size: 8,682 Bytes
c095e16
 
 
893d387
 
 
b49e002
53a3c92
c095e16
 
b49e002
c095e16
53a3c92
 
 
c095e16
 
b49e002
c095e16
 
 
 
 
 
 
 
b49e002
c095e16
 
 
 
 
53a3c92
b49e002
53a3c92
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b49e002
53a3c92
d6f8bd2
53a3c92
 
 
 
 
 
b49e002
53a3c92
 
 
 
 
b0ade41
 
53a3c92
b0ade41
 
53a3c92
 
 
 
 
 
 
 
 
c095e16
 
 
 
 
 
 
 
 
 
 
 
 
 
53a3c92
 
 
c095e16
 
 
 
893d387
 
79d1a94
b49e002
79d1a94
 
 
53a3c92
0efd625
f5e465c
 
7bca760
79d1a94
f5e465c
 
 
 
79d1a94
f5e465c
 
79d1a94
f5e465c
 
79d1a94
f5e465c
 
79d1a94
594d6e6
f5e465c
594d6e6
 
 
 
 
 
 
f5e465c
 
 
 
594d6e6
7bca760
1c4f67f
7bca760
4694091
f5e465c
 
53a3c92
d6f8bd2
 
 
 
 
 
b0ade41
 
 
 
b49e002
b0ade41
 
 
 
 
 
 
 
d6f8bd2
 
 
53a3c92
 
 
f5e465c
 
7bca760
f5e465c
 
b49e002
b0ade41
 
f5e465c
 
b0ade41
f5e465c
 
 
 
 
b0ade41
f5e465c
 
53a3c92
594d6e6
f5e465c
594d6e6
 
 
 
 
 
 
f5e465c
 
 
 
594d6e6
7bca760
1c4f67f
7bca760
4694091
f5e465c
 
53a3c92
 
 
 
 
 
 
b49e002
53a3c92
 
 
 
 
276d365
53a3c92
 
 
f5e465c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
from nc_py_api import Nextcloud
import json
from typing import Dict, Any
import time
from datetime import datetime
import threading
import config
import math

# Initialize Nextcloud client
nc = Nextcloud(nextcloud_url=config.NEXTCLOUD_URL, nc_auth_user=config.NEXTCLOUD_USERNAME, nc_auth_pass=config.NEXTCLOUD_PASSWORD)

# Dictionary to store ELO ratings
elo_ratings = {}

def load_leaderboard() -> Dict[str, Any]:
    try:
        file_content = nc.files.download(config.NEXTCLOUD_LEADERBOARD_PATH)
        return json.loads(file_content.decode('utf-8'))
    except Exception as e:
        print(f"Error loading leaderboard: {str(e)}")
        return {}

def save_leaderboard(leaderboard_data: Dict[str, Any]) -> bool:
    try:
        json_data = json.dumps(leaderboard_data, indent=2)
        nc.files.upload(config.NEXTCLOUD_LEADERBOARD_PATH, json_data.encode('utf-8'))
        return True
    except Exception as e:
        print(f"Error saving leaderboard: {str(e)}")
        return False

def get_model_size(model_name):
    for model, human_readable in config.get_approved_models():
        if model == model_name:
            size = float(human_readable.split('(')[1].split('B')[0])
            return size
    return 1.0  # Default size if not found

def calculate_expected_score(rating_a, rating_b):
    return 1 / (1 + math.pow(10, (rating_b - rating_a) / 400))

def update_elo_ratings(winner, loser):
    if winner not in elo_ratings or loser not in elo_ratings:
        initialize_elo_ratings()
    
    winner_rating = elo_ratings[winner]
    loser_rating = elo_ratings[loser]
    
    expected_winner = calculate_expected_score(winner_rating, loser_rating)
    expected_loser = 1 - expected_winner
    
    winner_size = get_model_size(winner)
    loser_size = get_model_size(loser)
    max_size = max(get_model_size(model) for model, _ in config.get_approved_models())
    
    k_factor = min(64, 32 * (1 + (loser_size - winner_size) / max_size))
    
    elo_ratings[winner] += k_factor * (1 - expected_winner)
    elo_ratings[loser] += k_factor * (0 - expected_loser)

def initialize_elo_ratings():
    leaderboard = load_leaderboard()
    for model, _ in config.get_approved_models():
        size = get_model_size(model)
        elo_ratings[model] = 1000 + (size * 100)
    
    # Replay all battles to update ELO ratings
    for model, data in leaderboard.items():
        if model not in elo_ratings:
            elo_ratings[model] = 1000 + (get_model_size(model) * 100)
        for opponent, results in data['opponents'].items():
            if opponent not in elo_ratings:
                elo_ratings[opponent] = 1000 + (get_model_size(opponent) * 100)
            for _ in range(results['wins']):
                update_elo_ratings(model, opponent)
            for _ in range(results['losses']):
                update_elo_ratings(opponent, model)

def ensure_elo_ratings_initialized():
    if not elo_ratings:
        initialize_elo_ratings()

def update_leaderboard(winner: str, loser: str) -> Dict[str, Any]:
    leaderboard = load_leaderboard()
    
    if winner not in leaderboard:
        leaderboard[winner] = {"wins": 0, "losses": 0, "opponents": {}}
    if loser not in leaderboard:
        leaderboard[loser] = {"wins": 0, "losses": 0, "opponents": {}}
    
    leaderboard[winner]["wins"] += 1
    leaderboard[winner]["opponents"].setdefault(loser, {"wins": 0, "losses": 0})["wins"] += 1
    
    leaderboard[loser]["losses"] += 1
    leaderboard[loser]["opponents"].setdefault(winner, {"wins": 0, "losses": 0})["losses"] += 1
    
    # Update ELO ratings
    update_elo_ratings(winner, loser)
    
    save_leaderboard(leaderboard)
    return leaderboard

def get_current_leaderboard() -> Dict[str, Any]:
    return load_leaderboard()

def get_human_readable_name(model_name: str) -> str:
    model_dict = dict(config.get_approved_models())
    return model_dict.get(model_name, model_name)

def get_leaderboard():
    leaderboard = load_leaderboard()
    
    # Prepare data for Gradio table
    table_data = []
    headers = ["#", "Model", "Score", "Wins", "Losses", "Total Battles", "Win Rate"]
    
    for model, results in leaderboard.items():
        wins = results.get('wins', 0)
        losses = results.get('losses', 0)
        total_battles = wins + losses
        
        # Calculate win rate
        win_rate = wins / total_battles if total_battles > 0 else 0
        
        # Calculate score using the formula: win_rate * (1 - 1/(total_battles + 1))
        score = win_rate * (1 - 1/(total_battles + 1)) if total_battles > 0 else 0
        
        # Get human readable name
        human_readable = get_human_readable_name(model)
        
        # Format the row with formatted strings for display
        row = [
            0,                    # Position placeholder (integer)
            human_readable,       # String
            f"{score:.3f}",      # Score formatted to 3 decimal places
            wins,                 # Integer
            losses,              # Integer
            total_battles,       # Integer
            f"{win_rate:.1%}"    # Win rate as percentage
        ]
        table_data.append(row)
    
    # Sort by score (descending)
    table_data.sort(key=lambda x: float(x[2].replace('%', '')), reverse=True)
    
    # Add position numbers after sorting
    for i, row in enumerate(table_data, 1):
        row[0] = i
    
    return table_data

def calculate_elo_impact(model):
    positive_impact = 0
    negative_impact = 0
    leaderboard = load_leaderboard()
    initial_rating = 1000 + (get_model_size(model) * 100)
    
    if model in leaderboard:
        for opponent, results in leaderboard[model]['opponents'].items():
            model_size = get_model_size(model)
            opponent_size = get_model_size(opponent)
            max_size = max(get_model_size(m) for m, _ in config.get_approved_models())
            
            size_difference = (opponent_size - model_size) / max_size
            
            win_impact = 1 + max(0, size_difference)
            loss_impact = 1 + max(0, -size_difference)
            
            positive_impact += results['wins'] * win_impact
            negative_impact += results['losses'] * loss_impact
    
    return round(positive_impact), round(negative_impact), round(initial_rating)

def get_elo_leaderboard():
    ensure_elo_ratings_initialized()
    
    # Prepare data for Gradio table
    table_data = []
    headers = ["#", "Model", "ELO Rating", "Wins", "Losses", "Total Battles", "Win Rate"]
    
    leaderboard = load_leaderboard()
    all_models = set(dict(config.get_approved_models()).keys()) | set(leaderboard.keys())
    
    for model in all_models:
        # Get ELO rating
        rating = elo_ratings.get(model, 1000 + (get_model_size(model) * 100))
        
        # Get battle data
        wins = leaderboard.get(model, {}).get('wins', 0)
        losses = leaderboard.get(model, {}).get('losses', 0)
        total_battles = wins + losses
        win_rate = wins / total_battles if total_battles > 0 else 0
        
        # Get human readable name
        human_readable = get_human_readable_name(model)
        
        # Format the row with formatted strings for display
        row = [
            0,                    # Position placeholder (integer)
            human_readable,       # String
            f"{rating:.1f}",     # ELO rating formatted to 1 decimal place
            wins,                 # Integer
            losses,              # Integer
            total_battles,       # Integer
            f"{win_rate:.1%}"    # Win rate as percentage
        ]
        table_data.append(row)
    
    # Sort by ELO rating (descending)
    table_data.sort(key=lambda x: float(x[2]), reverse=True)
    
    # Add position numbers after sorting
    for i, row in enumerate(table_data, 1):
        row[0] = i
    
    return table_data

def create_backup():
    while True:
        try:
            leaderboard_data = load_leaderboard()
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            backup_file_name = f"leaderboard_backup_{timestamp}.json"
            backup_path = f"{config.NEXTCLOUD_BACKUP_FOLDER}/{backup_file_name}"
            json_data = json.dumps(leaderboard_data, indent=2)
            nc.files.upload(backup_path, json_data.encode('utf-8'))
            print(f"Backup created on Nextcloud: {backup_path}")
        except Exception as e:
            print(f"Error creating backup: {e}")
        time.sleep(43200)  # Sleep for 12 HOURS

def start_backup_thread():
    backup_thread = threading.Thread(target=create_backup, daemon=True)
    backup_thread.start()