reviewer-arena / aws_utils.py
openreviewer's picture
Upload folder using huggingface_hub
13a9008 verified
raw
history blame
3.54 kB
import boto3
import uuid
import datetime
import os
from dotenv import load_dotenv
try:
load_dotenv()
except:
pass
# Load AWS credentials from environment variables
aws_access_key_id = os.environ.get('AWS_ACCESS_KEY_ID')
aws_secret_access_key = os.environ.get('AWS_SECRET_ACCESS_KEY')
aws_region = os.environ.get('AWS_REGION')
# Initialize the DynamoDB client
dynamodb = boto3.resource('dynamodb',
region_name=aws_region,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key)
# Define the tables
requests_table = dynamodb.Table('reviewer_arena_requests')
leaderboards_table = dynamodb.Table('reviewer_arena_leaderboard')
# Function to write a request to the Requests table
def write_request(user_id, paper_id, model_a, model_b, vote):
request_id = str(uuid.uuid4())
timestamp = datetime.datetime.now().isoformat()
response = requests_table.put_item(
Item={
'RequestID': request_id,
'Timestamp': timestamp,
'UserID': user_id,
'PaperID': paper_id,
'ModelA': model_a,
'ModelB': model_b,
'Vote': vote
}
)
return response
# Function to update leaderboard after a vote
def update_leaderboard(model_a, model_b, vote):
# Retrieve current stats for ModelA and ModelB
model_a_stats = leaderboards_table.get_item(Key={'ModelID': model_a}).get('Item', {})
model_b_stats = leaderboards_table.get_item(Key={'ModelID': model_b}).get('Item', {})
# Initialize stats if they don't exist
if not model_a_stats:
model_a_stats = {'ModelID': model_a, 'Wins': 0, 'Losses': 0, 'Ties': 0, 'EloScore': 1200, 'Votes': 0}
if not model_b_stats:
model_b_stats = {'ModelID': model_b, 'Wins': 0, 'Losses': 0, 'Ties': 0, 'EloScore': 1200, 'Votes': 0}
# Update stats based on the vote
if vote == "A is better":
model_a_stats['Wins'] += 1
model_b_stats['Losses'] += 1
elif vote == "B is better":
model_a_stats['Losses'] += 1
model_b_stats['Wins'] += 1
elif vote == "Tie":
model_a_stats['Ties'] += 1
model_b_stats['Ties'] += 1
model_a_stats['Votes'] += 1
model_b_stats['Votes'] += 1
# Calculate new Elo scores (simple Elo calculation for illustration)
model_a_stats['EloScore'], model_b_stats['EloScore'] = calculate_elo(model_a_stats['EloScore'], model_b_stats['EloScore'], vote)
# Write updated stats back to the Leaderboards table
leaderboards_table.put_item(Item=model_a_stats)
leaderboards_table.put_item(Item=model_b_stats)
# Function to calculate new Elo scores
def calculate_elo(elo_a, elo_b, vote, k=32):
expected_a = 1 / (1 + 10 ** ((elo_b - elo_a) / 400))
expected_b = 1 / (1 + 10 ** ((elo_a - elo_b) / 400))
if vote == "A is better":
actual_a = 1
actual_b = 0
elif vote == "B is better":
actual_a = 0
actual_b = 1
else: # Tie
actual_a = 0.5
actual_b = 0.5
new_elo_a = elo_a + k * (actual_a - expected_a)
new_elo_b = elo_b + k * (actual_b - expected_b)
return round(new_elo_a), round(new_elo_b)
# Function to query leaderboard
def get_leaderboard():
response = leaderboards_table.scan()
leaderboard = response.get('Items', [])
# Sort by EloScore in descending order
leaderboard.sort(key=lambda x: x['EloScore'], reverse=True)
return leaderboard