Spaces:
Running
Running
File size: 3,543 Bytes
9e62f85 13a9008 9e62f85 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
import boto3
import uuid
import datetime
import os
from dotenv import load_dotenv
try:
load_dotenv()
except:
pass
# Load AWS credentials from environment variables
aws_access_key_id = os.environ.get('AWS_ACCESS_KEY_ID')
aws_secret_access_key = os.environ.get('AWS_SECRET_ACCESS_KEY')
aws_region = os.environ.get('AWS_REGION')
# Initialize the DynamoDB client
dynamodb = boto3.resource('dynamodb',
region_name=aws_region,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key)
# Define the tables
requests_table = dynamodb.Table('reviewer_arena_requests')
leaderboards_table = dynamodb.Table('reviewer_arena_leaderboard')
# Function to write a request to the Requests table
def write_request(user_id, paper_id, model_a, model_b, vote):
request_id = str(uuid.uuid4())
timestamp = datetime.datetime.now().isoformat()
response = requests_table.put_item(
Item={
'RequestID': request_id,
'Timestamp': timestamp,
'UserID': user_id,
'PaperID': paper_id,
'ModelA': model_a,
'ModelB': model_b,
'Vote': vote
}
)
return response
# Function to update leaderboard after a vote
def update_leaderboard(model_a, model_b, vote):
# Retrieve current stats for ModelA and ModelB
model_a_stats = leaderboards_table.get_item(Key={'ModelID': model_a}).get('Item', {})
model_b_stats = leaderboards_table.get_item(Key={'ModelID': model_b}).get('Item', {})
# Initialize stats if they don't exist
if not model_a_stats:
model_a_stats = {'ModelID': model_a, 'Wins': 0, 'Losses': 0, 'Ties': 0, 'EloScore': 1200, 'Votes': 0}
if not model_b_stats:
model_b_stats = {'ModelID': model_b, 'Wins': 0, 'Losses': 0, 'Ties': 0, 'EloScore': 1200, 'Votes': 0}
# Update stats based on the vote
if vote == "A is better":
model_a_stats['Wins'] += 1
model_b_stats['Losses'] += 1
elif vote == "B is better":
model_a_stats['Losses'] += 1
model_b_stats['Wins'] += 1
elif vote == "Tie":
model_a_stats['Ties'] += 1
model_b_stats['Ties'] += 1
model_a_stats['Votes'] += 1
model_b_stats['Votes'] += 1
# Calculate new Elo scores (simple Elo calculation for illustration)
model_a_stats['EloScore'], model_b_stats['EloScore'] = calculate_elo(model_a_stats['EloScore'], model_b_stats['EloScore'], vote)
# Write updated stats back to the Leaderboards table
leaderboards_table.put_item(Item=model_a_stats)
leaderboards_table.put_item(Item=model_b_stats)
# Function to calculate new Elo scores
def calculate_elo(elo_a, elo_b, vote, k=32):
expected_a = 1 / (1 + 10 ** ((elo_b - elo_a) / 400))
expected_b = 1 / (1 + 10 ** ((elo_a - elo_b) / 400))
if vote == "A is better":
actual_a = 1
actual_b = 0
elif vote == "B is better":
actual_a = 0
actual_b = 1
else: # Tie
actual_a = 0.5
actual_b = 0.5
new_elo_a = elo_a + k * (actual_a - expected_a)
new_elo_b = elo_b + k * (actual_b - expected_b)
return round(new_elo_a), round(new_elo_b)
# Function to query leaderboard
def get_leaderboard():
response = leaderboards_table.scan()
leaderboard = response.get('Items', [])
# Sort by EloScore in descending order
leaderboard.sort(key=lambda x: x['EloScore'], reverse=True)
return leaderboard
|