# This file contains some functions I use for automated analysis and evaluation # It is not used in the main functionality of the service # It is quite messy so far # to use analytics tools you need to install some extra libraries # !pip install pandas from tests.candidate import complete_interview from tests.grader import grade import pandas as pd import numpy as np from functools import partial import concurrent.futures import os from IPython.display import Markdown, display from openai import OpenAI from tests.testing_prompts import feedback_analyzer from resources.prompts import prompts, base_prompts def complete_and_grade(interview_params, exp_name="GPT4", grader_model="gpt-4-turbo", candidate_model="gpt-3.5-turbo"): interview_type, attempt_num = interview_params feedback = {} try: file_path, _ = complete_interview(interview_type, exp_name, model=candidate_model) feedback = grade(file_path, grader_model) # Just a heuristic check of the JSON format TODO: add a proper check if "problem_statement_topic" not in feedback: raise Exception("Grading failed") print(f"Attempt {attempt_num + 1} of {interview_type} completed successfully") print(f"Overall score: {feedback['overall_score']}") except Exception as e: print(f"Attempt {attempt_num + 1} of {interview_type} failed with error: {e}") return feedback def run_evaluation( exp_name, num=5, interview_types=["ml_design", "math", "ml_theory", "system_design", "sql", "coding"], grader_model="gpt-4-turbo", candidate_model="gpt-3.5-turbo", num_workers=3, ): exp_name = f"{exp_name}_{pd.Timestamp.now().strftime('%Y-%m-%d_%H-%M-%S')}" os.makedirs(f"records/{exp_name}", exist_ok=True) tasks = [(interview_type, i) for i in range(num) for interview_type in interview_types] complete_f = partial(complete_and_grade, exp_name=exp_name, grader_model=grader_model, candidate_model=candidate_model) with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor: results = list(executor.map(complete_f, tasks)) # Filter out empty results and count them non_empty_results = [res for res in results if res] empty_count = len(results) - len(non_empty_results) print(f"Number of empty results (errors or failed grading): {empty_count}") # Store non-empty results in a DataFrame df = pd.DataFrame(non_empty_results) df.to_csv(os.path.join("records", exp_name, "results.csv"), index=False) return exp_name def highlight_color(val): color = "red" if val < 0.7 else "orange" if val < 0.9 else "lightgreen" if val < 0.95 else "green" return f"color: {color}" def generate_and_display_tables(df): # Grouping by prefix prefixes = ["problem", "interviewer", "feedback"] prefix_columns = [col for col in df.columns if any(col.startswith(prefix) for prefix in prefixes)] criteria_summary_df = pd.DataFrame(df[prefix_columns].mean(), columns=["avg score"]) criteria_summary_df_styled = criteria_summary_df.style.map(highlight_color) criteria_summary_df_styled.set_caption("Aggregated Scores per Criteria") # Aggregated scores per stage grouped_scores = {} for prefix in prefixes: prefix_cols = [col for col in df.columns if col.startswith(prefix)] grouped_scores[prefix] = df[prefix_cols].mean(axis=1).mean() grouped_scores_df = pd.DataFrame([grouped_scores]).T grouped_scores_df.columns = ["avg score"] grouped_scores_styled = grouped_scores_df.style.map(highlight_color) grouped_scores_styled.set_caption("Aggregated Scores per Stage") # Grouped by unique type grouped_by_type = pd.DataFrame(df.groupby("type")[prefix_columns].mean().mean(axis=1), columns=["avg score"]) grouped_by_type_styled = grouped_by_type.style.map(highlight_color) grouped_by_type_styled.set_caption("Scores Grouped by Unique Type") total_llm_scores = df.groupby("agent_llm")[prefix_columns].mean().mean(axis=1).sort_values(ascending=False) # Grouped by unique interviewer model and sorted by descending total score grouped_by_interviewer = df.groupby("agent_llm")[["overall_score", "average_response_time_seconds", "number_of_messages"]].mean() grouped_by_interviewer_styled = grouped_by_interviewer.style.map(highlight_color) grouped_by_interviewer_styled.set_caption("Scores Grouped by Unique Interviewer Model") for prefix in prefixes: prefix_cols = [col for col in prefix_columns if col.startswith(prefix)] df[prefix] = df[prefix_cols].mean(axis=1) # Pivot table: Agent model vs Stage pivot1 = pd.pivot_table(df, values=prefixes, index="agent_llm", aggfunc="mean").reindex(total_llm_scores.index) pivot1_styled = pivot1.style.map(highlight_color) pivot1_styled.set_caption("Pivot Table: Agent Model vs Stage") # Pivot table: Agent model vs Type (Single aggregated score per type) pivot2 = pd.pivot_table(df, values="overall_score", index="agent_llm", columns="type", aggfunc="mean").reindex(total_llm_scores.index) pivot2_styled = pivot2.style.map(highlight_color) pivot2_styled.set_caption("Pivot Table: Agent Model vs Type") # Pivot table: Type vs Stage pivot3 = pd.pivot_table(df, values=prefixes, index="type", aggfunc="mean") pivot3_styled = pivot3.style.map(highlight_color) pivot3_styled.set_caption("Pivot Table: Type vs Stage") # Pivot table: Agent Model x Stage vs Type (MultiIndex) multi_index_data = [(llm, stage) for llm in total_llm_scores.index for stage in prefixes] multi_index = pd.MultiIndex.from_tuples(multi_index_data, names=["agent_llm", "stage"]) types = df["type"].unique() pivot4_df = pd.DataFrame(index=multi_index, columns=types) # Fill the DataFrame with the aggregated scores grouped by type for llm in total_llm_scores.index: for stage in prefixes: mask = df["agent_llm"] == llm stage_values = df.loc[mask, ["type", stage]].groupby("type").mean()[stage] pivot4_df.loc[(llm, stage), :] = stage_values pivot4_styled = pivot4_df.style.map(highlight_color) pivot4_styled.set_caption("Pivot Table: Agent Model x Stage vs Type") tables_dict = { "criteria_summary_df_styled": criteria_summary_df_styled, "grouped_scores_styled": grouped_scores_styled, "grouped_by_type_styled": grouped_by_type_styled, "grouped_by_interviewer_styled": grouped_by_interviewer_styled, "pivot1_styled": pivot1_styled, "pivot2_styled": pivot2_styled, "pivot3_styled": pivot3_styled, "pivot4_styled": pivot4_styled, } for table in tables_dict.values(): display(table) return tables_dict def filter_df(df, prefixes=["problem", "interviewer", "feedback"]): # Identify all columns starting with any of the prefixes columns_to_check = [col for col in df.columns if any(col.startswith(prefix) for prefix in prefixes)] # Function to check if a value is a boolean, None, or string representations of boolean types def is_valid_value(val): return isinstance(val, bool) or val is None or val is np.nan or val in {"True", "False", "None", "NaN"} # Function to convert string representations to actual booleans def to_bool(val): if val == "True": return True elif val == "False": return False elif val == "None": return None return val # Check if all values in the specified columns are valid def all_values_valid(row): return all(is_valid_value(row[col]) for col in columns_to_check) # Apply filtering to keep only rows with valid values valid_df = df[df.apply(all_values_valid, axis=1)].copy() # Convert string representations to booleans for col in columns_to_check: valid_df[col] = valid_df[col].apply(to_bool) # Identify removed rows removed_rows = df[~df.index.isin(valid_df.index)] # Print the number of rows removed num_removed = len(removed_rows) print(f"Number of rows removed: {num_removed}") # Print the value from the "file_name" column for each removed row, or `None` if not present if "file_name" in removed_rows.columns: for value in removed_rows["file_name"].tolist(): print(f"Removed row file_name: {value}") else: print("Removed row file_name: None") return valid_df def generate_analysis_report(df, folder, focus=None, model="gpt-4-turbo"): client = OpenAI(base_url="https://api.openai.com/v1") all_comments = "\n\n".join([f"Interview type: {t}. Feedback: {str(f)}" for t, f in zip(df["type"].values, df["comments"].values)]) messages = [ {"role": "system", "content": feedback_analyzer}, {"role": "user", "content": f"Interview feedback: {all_comments}"}, ] if focus: messages.append({"role": "user", "content": f"Focus only on comments about {focus} part of the interview"}) response = client.chat.completions.create(model=model, messages=messages, temperature=1) comments_analysis = response.choices[0].message.content display(Markdown(comments_analysis)) if folder is not None: with open(os.path.join(folder, "analysis.md"), "w") as f: f.write(comments_analysis) f.write("\n\n") for t in np.unique(df["type"]): f.write(f"Type: {t}\n") f.write(df[[c for c in df.columns if c != "comments"]][df["type"] == t].T.to_markdown()) f.write("\n\n") f.write(f"Type: all\n") f.write("\n\n") f.write("Feedback:\n") f.write(all_comments) return comments_analysis def analyze_and_improve_segment(df, segment_to_improve=None): sorted_stages = df[["problem", "interviewer", "feedback"]].mean().sort_values() if not segment_to_improve: segment_to_improve = sorted_stages.index[0] th_score = sorted_stages.iloc[0] + 0.1 print(f"Let's try to improve {segment_to_improve}") print(f"Quality threshold {th_score}") # Identifying types that need improvement type_stage_scores = df.groupby("type")[segment_to_improve].mean() types_to_improve = [] for t, s in type_stage_scores.items(): if s < th_score: types_to_improve.append(t) print(f"We will focus on {types_to_improve}") # Filtering DataFrame based on identified types and scoring criteria filtered_df = df[df["type"].apply(lambda x: x in types_to_improve)] prefix_columns = [col for col in df.columns if col.startswith(segment_to_improve)] filtered_df = filtered_df[filtered_df[prefix_columns].mean(axis=1) < th_score] # Generating an analysis report comments_analysis = generate_analysis_report(filtered_df, None, focus=segment_to_improve, model="gpt-4-turbo") # Constructing improvement prompt improvement_prompt = """You want to improve the prompts for LLM interviewer. Below you will see some of the prompts that are used right now. As well as a summary of mistakes that interviewer make. You can add 1-3 lines to each of prompts if needed, but you can't change or remove anything. """ # Selecting the base prompt for the segment to improve base_prompt = base_prompts.get(f"base_{segment_to_improve}", "Base prompt not found for the segment") # Constructing the current prompts display current_prompts = "The current prompts are below. \n" current_prompts += "BASE PROMPT (applied to all interview types): \n" current_prompts += base_prompt + "\n" for k, v in prompts.items(): if segment_to_improve in k: current_prompts += f"{k}: {v[len(base_prompt):]} \n\n" # Making API call to OpenAI client = OpenAI(base_url="https://api.openai.com/v1") model = "gpt-4-turbo" messages = [ {"role": "system", "content": improvement_prompt}, {"role": "user", "content": current_prompts}, {"role": "user", "content": f"Interview feedback: {comments_analysis}"}, {"role": "user", "content": "Please return any additional instructions you would like to add to any of the prompts."}, ] response = client.chat.completions.create(model=model, messages=messages, temperature=1).choices[0].message.content print(response)