Spaces:
Running
Running
import boto3 | |
import uuid | |
import datetime | |
import os | |
from dotenv import load_dotenv | |
try: | |
load_dotenv() | |
except: | |
pass | |
# Load AWS credentials from environment variables | |
aws_access_key_id = os.environ.get('AWS_ACCESS_KEY_ID') | |
aws_secret_access_key = os.environ.get('AWS_SECRET_ACCESS_KEY') | |
aws_region = os.environ.get('AWS_REGION') | |
# Initialize the DynamoDB client | |
dynamodb = boto3.resource('dynamodb', | |
region_name=aws_region, | |
aws_access_key_id=aws_access_key_id, | |
aws_secret_access_key=aws_secret_access_key) | |
# Define the tables | |
requests_table = dynamodb.Table('reviewer_arena_requests') | |
leaderboards_table = dynamodb.Table('reviewer_arena_leaderboard') | |
# Function to write a request to the Requests table | |
def write_request(user_id, paper_id, model_a, model_b, vote): | |
request_id = str(uuid.uuid4()) | |
timestamp = datetime.datetime.now().isoformat() | |
response = requests_table.put_item( | |
Item={ | |
'RequestID': request_id, | |
'Timestamp': timestamp, | |
'UserID': user_id, | |
'PaperID': paper_id, | |
'ModelA': model_a, | |
'ModelB': model_b, | |
'Vote': vote | |
} | |
) | |
return response | |
# Function to update leaderboard after a vote | |
def update_leaderboard(model_a, model_b, vote): | |
# Retrieve current stats for ModelA and ModelB | |
model_a_stats = leaderboards_table.get_item(Key={'ModelID': model_a}).get('Item', {}) | |
model_b_stats = leaderboards_table.get_item(Key={'ModelID': model_b}).get('Item', {}) | |
# Initialize stats if they don't exist | |
if not model_a_stats: | |
model_a_stats = {'ModelID': model_a, 'Wins': 0, 'Losses': 0, 'Ties': 0, 'EloScore': 1200, 'Votes': 0} | |
if not model_b_stats: | |
model_b_stats = {'ModelID': model_b, 'Wins': 0, 'Losses': 0, 'Ties': 0, 'EloScore': 1200, 'Votes': 0} | |
# Update stats based on the vote | |
if vote == "A is better": | |
model_a_stats['Wins'] += 1 | |
model_b_stats['Losses'] += 1 | |
elif vote == "B is better": | |
model_a_stats['Losses'] += 1 | |
model_b_stats['Wins'] += 1 | |
elif vote == "Tie": | |
model_a_stats['Ties'] += 1 | |
model_b_stats['Ties'] += 1 | |
model_a_stats['Votes'] += 1 | |
model_b_stats['Votes'] += 1 | |
# Calculate new Elo scores (simple Elo calculation for illustration) | |
model_a_stats['EloScore'], model_b_stats['EloScore'] = calculate_elo(model_a_stats['EloScore'], model_b_stats['EloScore'], vote) | |
# Write updated stats back to the Leaderboards table | |
leaderboards_table.put_item(Item=model_a_stats) | |
leaderboards_table.put_item(Item=model_b_stats) | |
# Function to calculate new Elo scores | |
def calculate_elo(elo_a, elo_b, vote, k=32): | |
expected_a = 1 / (1 + 10 ** ((elo_b - elo_a) / 400)) | |
expected_b = 1 / (1 + 10 ** ((elo_a - elo_b) / 400)) | |
if vote == "A is better": | |
actual_a = 1 | |
actual_b = 0 | |
elif vote == "B is better": | |
actual_a = 0 | |
actual_b = 1 | |
else: # Tie | |
actual_a = 0.5 | |
actual_b = 0.5 | |
new_elo_a = elo_a + k * (actual_a - expected_a) | |
new_elo_b = elo_b + k * (actual_b - expected_b) | |
return round(new_elo_a), round(new_elo_b) | |
# Function to query leaderboard | |
def get_leaderboard(): | |
response = leaderboards_table.scan() | |
leaderboard = response.get('Items', []) | |
# Sort by EloScore in descending order | |
leaderboard.sort(key=lambda x: x['EloScore'], reverse=True) | |
return leaderboard | |