""" Common data structures and utilities. """ import ast import dataclasses import glob import json import os import re import time from typing import Optional import openai import anthropic from mistralai.client import MistralClient from fastchat.model.model_adapter import get_conversation_template, ANTHROPIC_MODEL_LIST, MISTRAL_MODEL_LIST # API setting constants API_MAX_RETRY = 16 API_RETRY_SLEEP = 10 API_ERROR_OUTPUT = "$ERROR$" TIE_DELTA = 0.1 # Categories that need reference answers NEED_REF_CATS = ["math", "reasoning", "coding", "arena-hard-200"] # Extract scores from judgments two_score_pattern = re.compile("\[\[(\d+\.?\d*),\s?(\d+\.?\d*)\]\]") two_score_pattern_backup = re.compile("\[(\d+\.?\d*),\s?(\d+\.?\d*)\]") one_score_pattern = re.compile("\[\[(\d+\.?\d*)\]\]") one_score_pattern_backup = re.compile("\[(\d+\.?\d*)\]") # Sampling temperature configs for temperature_config = { "writing": 0.7, "roleplay": 0.7, "extraction": 0.0, "math": 0.0, "coding": 0.0, "reasoning": 0.0, "stem": 0.1, "humanities": 0.1, "arena-hard-200": 0.0, } reverse_model_map = { "model_1": "model_2", "model_2": "model_1", } @dataclasses.dataclass class Judge: model_name: str prompt_template: dict ref_based: bool = False multi_turn: bool = False @dataclasses.dataclass class MatchSingle: question: dict model: str answer: dict judge: Judge ref_answer: dict = None multi_turn: bool = False @dataclasses.dataclass class MatchPair: question: dict model_1: str model_2: str answer_1: dict answer_2: dict judge: Judge ref_answer: dict = None multi_turn: bool = False def load_questions(question_file: str, begin: Optional[int], end: Optional[int]): """Load questions from a file.""" questions = [] with open(question_file, "r") as ques_file: for line in ques_file: if line: questions.append(json.loads(line)) questions = questions[begin:end] return questions def load_model_answers(answer_dir: str): """Load model answers. The return value is a python dict of type: Dict[model_name: str -> Dict[question_id: int -> answer: dict]] """ filenames = glob.glob(os.path.join(answer_dir, "*.jsonl")) filenames.sort() model_answers = {} for filename in filenames: model_name = os.path.basename(filename)[:-6] answer = {} with open(filename) as fin: for line in fin: line = json.loads(line) answer[line["question_id"]] = line model_answers[model_name] = answer return model_answers def load_judge_prompts(prompt_file: str): """Load judge prompts. The return value is a python dict of type: Dict[judge_name: str -> dict] """ prompts = {} with open(prompt_file) as fin: for line in fin: line = json.loads(line) prompts[line["name"]] = line return prompts def run_judge_single(question, answer, judge, ref_answer, multi_turn=False): kwargs = {} model = judge.model_name if ref_answer is not None: kwargs["ref_answer_1"] = ref_answer["choices"][0]["turns"][0] if multi_turn: kwargs["ref_answer_2"] = ref_answer["choices"][0]["turns"][1] if multi_turn: user_prompt = judge.prompt_template["prompt_template"].format( question_1=question["turns"][0], question_2=question["turns"][1], answer_1=answer["choices"][0]["turns"][0], answer_2=answer["choices"][0]["turns"][1], **kwargs, ) else: user_prompt = judge.prompt_template["prompt_template"].format( question=question["turns"][0], answer=answer["choices"][0]["turns"][0], **kwargs, ) rating = -1 system_prompt = judge.prompt_template["system_prompt"] conv = get_conversation_template(model) conv.set_system_message(system_prompt) conv.append_message(conv.roles[0], user_prompt) conv.append_message(conv.roles[1], None) if model in ["gpt-3.5-turbo", "gpt-4"]: judgment = chat_compeletion_openai(model, conv, temperature=0, max_tokens=2048) elif model in ANTHROPIC_MODEL_LIST: judgment = chat_compeletion_anthropic( model, conv, temperature=0, max_tokens=1024 ) elif model in MISTRAL_MODEL_LIST: judgment = chat_compeletion_mistral( model, conv, temperature=0, max_tokens=1024 ) else: raise ValueError(f"Invalid judge model name: {model}") if judge.prompt_template["output_format"] == "[[rating]]": match = re.search(one_score_pattern, judgment) if not match: match = re.search(one_score_pattern_backup, judgment) if match: rating = ast.literal_eval(match.groups()[0]) else: rating = -1 else: raise ValueError( f"invalid output format: {judge.prompt_template['output_format']}" ) return rating, user_prompt, judgment def play_a_match_single(match: MatchPair, output_file: str): question, model, answer, judge, ref_answer, multi_turn = ( match.question, match.model, match.answer, match.judge, match.ref_answer, match.multi_turn, ) if judge.prompt_template["type"] == "single": score, user_prompt, judgment = run_judge_single( question, answer, judge, ref_answer, multi_turn=multi_turn ) question_id = question["question_id"] turn = 1 if not multi_turn else 2 result = { "question_id": question_id, "model": model, "judge": (judge.model_name, judge.prompt_template["name"]), "user_prompt": user_prompt, "judgment": judgment, "score": score, "turn": turn, "tstamp": time.time(), } print( f"question: {question_id}, turn: {turn}, model: {model}, " f"score: {score}, " f"judge: {(judge.model_name, judge.prompt_template['name'])}" ) else: raise ValueError(f"invalid judge type: {judge['type']}") if output_file: os.makedirs(os.path.dirname(output_file), exist_ok=True) with open(output_file, "a") as fout: fout.write(json.dumps(result) + "\n") return result def run_judge_pair(question, answer_a, answer_b, judge, ref_answer, multi_turn=False): kwargs = {} model = judge.model_name if ref_answer is not None: kwargs["ref_answer_1"] = ref_answer["choices"][0]["turns"][0] if multi_turn: kwargs["ref_answer_2"] = ref_answer["choices"][0]["turns"][1] if multi_turn: system_prompt = judge.prompt_template["system_prompt"] user_prompt = judge.prompt_template["prompt_template"].format( question_1=question["turns"][0], question_2=question["turns"][1], answer_a_1=answer_a["choices"][0]["turns"][0], answer_b_1=answer_b["choices"][0]["turns"][0], answer_a_2=answer_a["choices"][0]["turns"][1], answer_b_2=answer_b["choices"][0]["turns"][1], **kwargs, ) else: system_prompt = judge.prompt_template["system_prompt"] user_prompt = judge.prompt_template["prompt_template"].format( question=question["turns"][0], answer_a=answer_a["choices"][0]["turns"][0], answer_b=answer_b["choices"][0]["turns"][0], **kwargs, ) winner = "error" conv = get_conversation_template(model) conv.append_message(conv.roles[0], user_prompt) conv.append_message(conv.roles[1], None) if model in ["gpt-3.5-turbo", "gpt-4"]: conv.set_system_message(system_prompt) judgment = chat_compeletion_openai(model, conv, temperature=0, max_tokens=2048) elif model in ANTHROPIC_MODEL_LIST: if system_prompt != "You are a helpful assistant.": user_prompt = "[Instruction]\n" + system_prompt + "\n\n" + user_prompt conv.messages[0][1] = user_prompt judgment = chat_compeletion_anthropic( model, conv, temperature=0, max_tokens=1024 ) else: raise ValueError(f"Invalid judge model name: {model}") if judge.prompt_template["output_format"] == "[[A]]": if "[[A]]" in judgment: winner = "A" elif "[[B]]" in judgment: winner = "B" elif "[[C]]" in judgment: winner = "tie" else: winner = "error" elif judge.prompt_template["output_format"] == "[[rating_a,rating_b]]": match = re.search(two_score_pattern, judgment) if not match: match = re.search(two_score_pattern_backup, judgment) if match: scores = [ast.literal_eval(s.strip()) for s in match.groups()] if abs(scores[0] - scores[1]) <= TIE_DELTA: winner = "tie" elif scores[0] > scores[1]: winner = "A" else: winner = "B" else: winner = "error" else: raise ValueError( f"invalid output format: {judge.prompt_template['output_format']}" ) return winner, user_prompt, judgment def play_a_match_pair(match: MatchPair, output_file: str): question, model_1, model_2, answer_1, answer_2, judge, ref_answer, multi_turn = ( match.question, match.model_1, match.model_2, match.answer_1, match.answer_2, match.judge, match.ref_answer, match.multi_turn, ) if judge.prompt_template["type"] == "pairwise": g1_winner, g1_user_prompt, g1_judgment = run_judge_pair( question, answer_1, answer_2, judge, ref_answer, multi_turn=multi_turn ) g2_winner, g2_user_prompt, g2_judgment = run_judge_pair( question, answer_2, answer_1, judge, ref_answer, multi_turn=multi_turn ) g1_map = {"A": "model_1", "B": "model_2"} g2_map = {"A": "model_2", "B": "model_1"} g1_winner = g1_map.get(g1_winner, g1_winner) g2_winner = g2_map.get(g2_winner, g2_winner) question_id = question["question_id"] turn = 1 if not multi_turn else 2 result = { "question_id": question_id, "model_1": model_1, "model_2": model_2, "g1_winner": g1_winner, "g2_winner": g2_winner, "judge": (judge.model_name, judge.prompt_template["name"]), "g1_user_prompt": g1_user_prompt, "g1_judgment": g1_judgment, "g2_user_prompt": g2_user_prompt, "g2_judgment": g2_judgment, "turn": turn, "tstamp": time.time(), } print( f"question: {question_id}, turn: {turn}, model_1: {model_1}, model_2: {model_2}, " f"g1_winner: {g1_winner}, g2_winner: {g2_winner}, " f"judge: {(judge.model_name, judge.prompt_template['name'])}" ) elif judge.prompt_template["type"] == "single": m1_score, m1_user_prompt, m1_judgment = run_judge_single( question, answer_1, judge ) m2_score, m2_user_prompt, m2_judgment = run_judge_single( question, answer_2, judge ) if abs(m1_score - m2_score) <= TIE_DELTA: winner = "tie" elif m1_score > m2_score: winner = "model_1" else: winner = "model_2" question_id = question["question_id"] result = { "question_id": question_id, "model_1": model_1, "model_2": model_2, "g1_winner": winner, "g2_winner": winner, "judge": (judge.model_name, judge.prompt_template["name"]), "g1_user_prompt": m1_user_prompt, "g1_judgment": m1_judgment, "g2_user_prompt": m2_user_prompt, "g2_judgment": m2_judgment, "m1_score": m1_score, "m2_score": m2_score, "tstamp": time.time(), } print( f"question: {question_id}, model_1: {model_1}, model_2: {model_2}, " f"winner: {winner}, m1_score: {m1_score}, m2_score: {m2_score}, " f"judge: {(judge.model_name, judge.prompt_template['name'])}" ) else: raise ValueError(f"invalid judge type: {judge['type']}") if output_file: os.makedirs(os.path.dirname(output_file), exist_ok=True) with open(output_file, "a") as fout: fout.write(json.dumps(result) + "\n") return result def chat_compeletion_openai(model, conv, temperature, max_tokens, api_dict=None): if api_dict is not None: openai.api_base = api_dict["api_base"] openai.api_key = api_dict["api_key"] output = API_ERROR_OUTPUT for _ in range(API_MAX_RETRY): try: messages = conv.to_openai_api_messages() response = openai.ChatCompletion.create( model=model, messages=messages, n=1, temperature=temperature, max_tokens=max_tokens, ) output = response["choices"][0]["message"]["content"] break except openai.error.OpenAIError as e: print(type(e), e) time.sleep(API_RETRY_SLEEP) return output def chat_compeletion_openai_azure(model, conv, temperature, max_tokens, api_dict=None): openai.api_type = "azure" openai.api_version = "2023-07-01-preview" if api_dict is not None: openai.api_base = api_dict["api_base"] openai.api_key = api_dict["api_key"] else: openai.api_base = os.environ["AZURE_OPENAI_ENDPOINT"] openai.api_key = os.environ["AZURE_OPENAI_KEY"] if "azure-" in model: model = model[6:] output = API_ERROR_OUTPUT for _ in range(API_MAX_RETRY): try: messages = conv.to_openai_api_messages() response = openai.ChatCompletion.create( engine=model, messages=messages, n=1, temperature=temperature, max_tokens=max_tokens, ) output = response["choices"][0]["message"]["content"] break except openai.error.OpenAIError as e: print(type(e), e) time.sleep(API_RETRY_SLEEP) except openai.error.InvalidRequestError as e: print(type(e), e) break except KeyError: print(response) break return output def chat_compeletion_anthropic(model, conv, temperature, max_tokens): output = API_ERROR_OUTPUT for _ in range(API_MAX_RETRY): try: c = anthropic.Anthropic(api_key=os.environ["ANTHROPIC_API_KEY"]) prompt = conv.get_prompt() response = c.completions.create( model=model, prompt=prompt, stop_sequences=[anthropic.HUMAN_PROMPT], max_tokens_to_sample=max_tokens, temperature=temperature, ) output = response.completion break except anthropic.APIError as e: print(type(e), e) time.sleep(API_RETRY_SLEEP) return output.strip() def chat_compeletion_palm(chat_state, model, conv, temperature, max_tokens): from fastchat.serve.api_provider import init_palm_chat assert model == "palm-2-chat-bison-001" if chat_state is None: chat_state = init_palm_chat("chat-bison@001") parameters = { "temperature": temperature, "top_p": 0.8, "top_k": 40, "max_output_tokens": max_tokens, } output = API_ERROR_OUTPUT for _ in range(API_MAX_RETRY): try: response = chat_state.send_message(conv.messages[-2][1], **parameters) output = response.text break except Exception as e: print(type(e), e) time.sleep(API_RETRY_SLEEP) return chat_state, output def chat_compeletion_mistral(model, conv, temperature, max_tokens): output = API_ERROR_OUTPUT for _ in range(API_MAX_RETRY): try: c = MistralClient(api_key=os.environ["MISTRAL_API_KEY"]) messages = conv.to_mistralai_api_messages() response = c.chat( model=model, messages=messages, temperature=temperature, max_tokens=max_tokens, ) output = response.choices[0].message.content break except Exception as e: print(type(e), e) time.sleep(API_RETRY_SLEEP) return output def normalize_game_key_single(gamekey, result): """Make the model names sorted in a game key.""" qid, model_1, model_2 = gamekey if model_1 < model_2: return gamekey, result else: new_gamekey = (qid, model_2, model_1) new_result = { "winners": tuple(reverse_model_map.get(x, x) for x in result["winners"]), "g1_judgment": result["g2_judgment"], "g2_judgment": result["g1_judgment"], } return new_gamekey, new_result def normalize_game_key_dict(judgment_dict): """Make the model names sorted in the game keys.""" ret = {} for key, value in judgment_dict.items(): new_key, new_value = normalize_game_key_single(key, value) ret[new_key] = new_value return ret def load_pairwise_model_judgments(filename: str): """Load model judgments. The return value is a dict of type: Dict[judge: Tuple -> Dict[game_key: tuple -> game_result: dict] """ judge_dict = {} for line in open(filename): obj = json.loads(line) judge = tuple(obj["judge"]) qid, model_1, model_2 = obj["question_id"], obj["model_1"], obj["model_2"] if judge not in judge_dict: judge_dict[judge] = {} if "winner" in obj: winner = obj["winner"] elif "g1_winner" in obj and "g2_winner" in obj: g1_winner, g2_winner = obj["g1_winner"], obj["g2_winner"] if g1_winner == g2_winner: winner = g1_winner else: winner = "inconsistent" else: raise ValueError(f"Invalid keys: {list(obj.keys())}") gamekey = (qid, model_1, model_2) winners = (winner,) judge_dict[judge][gamekey] = { "winners": winners, "g1_judgment": obj["g1_judgment"], "g2_judgment": obj["g2_judgment"], } # Make the model names sorted in the game keys normalized = {} for judge, value in judge_dict.items(): normalized[judge] = normalize_game_key_dict(value) return normalized def load_single_model_judgments(filename: str): """Load model judgments. The return value is a dict of type: Dict[judge: Tuple -> Dict[game_key: tuple -> game_result: dict] """ judge_dict = {} for line in open(filename): obj = json.loads(line) judge = tuple(obj["judge"]) qid, model = obj["question_id"], obj["model"] if judge not in judge_dict: judge_dict[judge] = {} gamekey = (qid, model) judge_dict[judge][gamekey] = { "score": obj["score"], "judgment": obj["judgment"], } return judge_dict def resolve_pairwise_judgment_dict( question, model_judgments_normal, model_judgments_math, multi_turn=False ): """Return the correct pairwise judge.""" if multi_turn: if question["category"] in NEED_REF_CATS: return model_judgments_math[("gpt-4", "pair-math-v1-multi-turn")] return model_judgments_normal[("gpt-4", "pair-v2-multi-turn")] if question["category"] in NEED_REF_CATS: return model_judgments_math[("gpt-4", "pair-math-v1")] else: return model_judgments_normal[("gpt-4", "pair-v2")] def resolve_single_judgment_dict( question, model_judgments_normal, model_judgments_math, multi_turn=False ): """Return the correct single answer grading judge.""" if multi_turn: if question["category"] in NEED_REF_CATS: return model_judgments_math[("gpt-4", "single-math-v1-multi-turn")] return model_judgments_normal[("gpt-4", "single-v1-multi-turn")] if question["category"] in NEED_REF_CATS: return model_judgments_math[("gpt-4", "single-math-v1")] else: return model_judgments_normal[("gpt-4", "single-v1")] def get_pairwise_judge_explanation(gamekey, judgment_dict): """Get model judge explanation.""" try: qid, model_1, model_2 = gamekey if model_1 < model_2: res = judgment_dict[gamekey] g1_judgment, g2_judgment = res["g1_judgment"], res["g2_judgment"] else: new_gamekey = (qid, model_2, model_1) res = judgment_dict[new_gamekey] model_1, model_2 = model_1, model_2 g1_judgment, g2_judgment = res["g2_judgment"], res["g1_judgment"] return ( f"**Game 1**. **A**: {model_1}, **B**: {model_2}\n\n" f"**Judgment**: {g1_judgment}" + f"\n\n`--------------------------`\n\n" + f"**Game 2**. **A**: {model_2}, **B**: {model_1}\n\n" f"**Judgment**: {g2_judgment}" ) except KeyError: return "N/A" def get_single_judge_explanation(gamekey, judgment_dict): """Get model judge explanation.""" try: qid, model = gamekey res = judgment_dict[gamekey] g1_judgment = res["judgment"] g1_score = res["score"] return ( f"**Game 1**. **A**: {model}, **Score**: {g1_score}\n\n" f"**Judgment**: {g1_judgment}" ) except KeyError: return "N/A" def check_data(questions, model_answers, ref_answers, models, judges): # check model answers for m in models: assert m in model_answers, f"Missing model answer for {m}" m_answer = model_answers[m] for q in questions: assert ( q["question_id"] in m_answer ), f"Missing model {m}'s answer to Question {q['question_id']}" # check ref answers for jg in judges.values(): if not jg.ref_based: continue for q in questions: if q["category"] not in NEED_REF_CATS: continue assert ( q["question_id"] in ref_answers[jg.model_name] ), f"Missing reference answer to Question {q['question_id']} for judge {jg.model_name}" def get_model_list(answer_dir): file_paths = glob.glob(f"{answer_dir}/*.jsonl") file_names = [os.path.splitext(os.path.basename(f))[0] for f in file_paths] return file_names