Spaces:
Build error
Build error
from collections import defaultdict | |
import glob | |
import json | |
import os | |
import re | |
import random | |
import sys | |
from typing import List, Dict, Tuple | |
import pandas as pd | |
import numpy as np | |
from sociofillmore.common.analyze_text import load_caches, process_fn_sentence, FrameStructure, read_frames_of_interest | |
RANDOM_SEED = 9718 | |
NUM_EVALUATION_SENTENCES = 150 | |
EVALITA_MODEL = "lome_evalita_plus_fn" | |
# EVALITA_MODEL = "lome_evalita_plus_fn_0conf" | |
OUT_FOLDER = f"0shot__vs__{EVALITA_MODEL.split('_', maxsplit=1)[1]}" | |
print(OUT_FOLDER) | |
random.seed(RANDOM_SEED) | |
def map_predicates_to_frames(structures: List[FrameStructure]) -> Dict[str, str]: | |
mapping = {} | |
for struct in structures: | |
pred_key = "_".join(struct.target.tokens_str) | |
mapping[pred_key] = struct.frame | |
return mapping | |
def make_evaluation_sample(diffs_df: pd.DataFrame) -> Tuple[pd.DataFrame, pd.DataFrame]: | |
def make_experimental_columns(row: pd.Series): | |
if random.choice((True, False)): | |
left_col = "predicted_frame_0shot" | |
right_col = "predicted_frame_evalita" | |
else: | |
left_col = "predicted_frame_evalita" | |
right_col = "predicted_frame_0shot" | |
exp_info = pd.Series({ | |
"prediction_1": row[left_col], | |
"prediction_2": row[right_col], | |
"model_1": left_col, | |
"model_2": right_col | |
}) | |
return row.append(exp_info) | |
sample = diffs_df.sample(n=NUM_EVALUATION_SENTENCES, | |
random_state=RANDOM_SEED).reset_index(drop=True) | |
with_exp_info = sample.apply(make_experimental_columns, axis=1) | |
annotator_sheet = with_exp_info[[ | |
"sentence", "predicate", "prediction_1", "prediction_2"]] | |
# add answer columns | |
for answer_field in ["1_is_best", "2_is_best", "both_are_good", "both_are_bad", "missing_frame"]: | |
annotator_sheet.insert(len(annotator_sheet.columns), | |
f"answer::{answer_field}", np.nan) | |
# annotator_sheet[f"answer::{answer_field}"] = np.nan | |
return annotator_sheet, with_exp_info | |
def make_annotation_experiment(): | |
_, deep_frame_cache = load_caches("femicides/rai") | |
frames_of_interest = read_frames_of_interest("femicides/rai") | |
all_differences = [] | |
foi_differences = [] # foi='frame of interest' | |
# number of predicates that have been annotated by at least one model | |
num_all_predictions = 0 | |
num_foi_predictions = 0 | |
num_z_shot_all_predictions = 0 | |
num_z_shot_foi_predictions = 0 | |
num_evalita_all_predictions = 0 | |
num_evalita_foi_predictions = 0 | |
for ev_dir in sorted(glob.glob("output/femicides/lome/lome_0shot/multilabel/rai/*")): | |
ev_id = os.path.basename(ev_dir).rstrip("/") | |
print(f"event={ev_id}") | |
for doc_file in sorted(glob.glob(f"{ev_dir}/*.comm.json")): | |
doc_id = re.search(r'/lome_(\d+)\.comm\.json', doc_file).group(1) | |
print(f"\tdoc={doc_id}") | |
with open(doc_file, encoding="utf-8") as f: | |
z_shot_annotations = json.load(f) | |
with open(doc_file.replace("/lome_0shot/", f"/{EVALITA_MODEL}/"), encoding="utf-8") as f: | |
evalita_annotations = json.load(f) | |
for sent_idx, (z_shot_sent, evalita_sent) in enumerate(zip(z_shot_annotations, evalita_annotations)): | |
z_shot_structs = process_fn_sentence( | |
z_shot_sent, deep_frame_cache) | |
evalita_structs = process_fn_sentence( | |
evalita_sent, deep_frame_cache) | |
z_shot_frames = {s.frame for s in z_shot_structs.values()} | |
evalita_frames = {s.frame for s in evalita_structs.values()} | |
overlapping_frames = z_shot_frames.intersection(evalita_frames) | |
print(f"\t\tsent #{sent_idx}: {len(z_shot_frames)}x lome_0shot frames, " | |
f"{len(evalita_frames)}x evalita frames, {len(overlapping_frames)}x overlapping") | |
z_shot_preds_to_frames = map_predicates_to_frames( | |
z_shot_structs.values()) | |
evalita_preds_to_frames = map_predicates_to_frames( | |
evalita_structs.values()) | |
all_predicates = sorted(set(z_shot_preds_to_frames.keys()).union( | |
evalita_preds_to_frames.keys())) | |
for predicate in all_predicates: | |
print(f"\t\t\tpredicate={predicate}") | |
z_shot_frame = z_shot_preds_to_frames.get(predicate) | |
evalita_frame = evalita_preds_to_frames.get(predicate) | |
has_relevant_frame = z_shot_frame in frames_of_interest or evalita_frame in frames_of_interest | |
if z_shot_frame is not None: | |
num_z_shot_all_predictions += 1 | |
if z_shot_frame in frames_of_interest: | |
num_z_shot_foi_predictions += 1 | |
if evalita_frame is not None: | |
num_evalita_all_predictions += 1 | |
if evalita_frame in frames_of_interest: | |
num_evalita_foi_predictions += 1 | |
num_all_predictions += 1 | |
if has_relevant_frame: | |
num_foi_predictions += 1 | |
if z_shot_frame != evalita_frame: | |
diff = { | |
"ev_id": ev_id, | |
"doc_id": doc_id, | |
"sent_idx": sent_idx, | |
"sentence": " ".join(z_shot_sent["tokens"]), | |
"predicate": predicate, | |
"predicted_frame_0shot": z_shot_frame or "_", | |
"predicted_frame_evalita": evalita_frame or "_" | |
} | |
all_differences.append(diff) | |
if has_relevant_frame: | |
foi_differences.append(diff) | |
print() | |
print() | |
print(f"num_z_shot_all_predictions = {num_z_shot_all_predictions}") | |
print(f"num_z_shot_foi_predictions = {num_z_shot_foi_predictions}") | |
print(f"num_evalita_all_predictions = {num_evalita_all_predictions}") | |
print(f"num_evalita_foi_predictions = {num_evalita_foi_predictions}") | |
print( | |
f"all_differences: {len(all_differences)}/{num_all_predictions}={len(all_differences)/num_all_predictions}") | |
print( | |
f"foi_differences: {len(foi_differences)}/{num_foi_predictions}={len(foi_differences) / num_foi_predictions}") | |
# all_diffs_df = pd.DataFrame(all_differences) | |
# foi_diffs_df = pd.DataFrame(foi_differences) | |
# all_diffs_df.to_csv("output/femicides/compare_lome_models/all_differences.csv") | |
# foi_diffs_df.to_csv("output/femicides/compare_lome_models/foi_differences.csv") | |
# annotator_sheet, experiment_sheet = make_evaluation_sample(foi_diffs_df) | |
# annotator_sheet.to_csv("output/femicides/compare_lome_models/annotator_sheet.csv") | |
# experiment_sheet.to_csv("output/femicides/compare_lome_models/experiment_sheet.csv") | |
def analyze_annotations(): | |
ann_df = pd.read_excel("resources/sara_lome_annotations.xlsx", index_col=0) | |
exp_df = pd.read_csv( | |
f"output/femicides/compare_lome_models/{OUT_FOLDER}/experiment_sheet.csv", index_col=0) | |
ann_df_ = ann_df.join(exp_df[["model_1", "model_2"]]) | |
ann_df_proc = ann_df_.apply(combine_labels, axis=1) | |
print(ann_df_proc.head()) | |
ann_df_proc.to_csv( | |
f"output/femicides/compare_lome_models/{OUT_FOLDER}/annotator_sheet_processed.csv") | |
def combine_labels(row: pd.Series) -> pd.Series: | |
model_1 = row["model_1"].split("_")[-1] | |
model_2 = row["model_2"].split("_")[-1] | |
if row["answer::1_is_best"] == "X": | |
answer = f"{model_1}_is_best" | |
elif row["answer::2_is_best"] == "X": | |
answer = f"{model_2}_is_best" | |
elif row["answer::both_are_good"] == "X": | |
answer = "both_are_good" | |
elif row["answer::both_are_bad"] == "X": | |
answer = "both_are_bad" | |
elif row["answer::missing_frame"] == "X": | |
answer = "missing_frame" | |
else: | |
raise ValueError(f"Missing annotation in row {row}") | |
row_ = row.drop([k for k in row.keys() if k.startswith("answer::")]) | |
return row_.append(pd.Series({"answer": answer})) | |
def prep_svm_challenge(): | |
annotated_df = pd.read_csv( | |
"output/femicides/compare_lome_models/0shot__vs__evalita_plus_fn/annotator_sheet_processed.csv", index_col=0) | |
evalita_train_data = [] | |
with open("../stupid-svm-frameid/data/evalita_jsonl/evalita_train.jsonl", encoding="utf-8") as f_in: | |
for line in f_in: | |
evalita_train_data.append(json.loads(line)) | |
# evalita_frame_labels = {annotation["label"] for sentence in evalita_train_data for annotation in sentence["annotations"]} | |
evalita_frame_labels = defaultdict(int) | |
for sentence in evalita_train_data: | |
for annotation in sentence["annotations"]: | |
evalita_frame_labels[annotation["label"]] += 1 | |
evalita_train_counts = pd.DataFrame(evalita_frame_labels.items(), columns=["label", "count"]).sort_values(by="count") | |
evalita_train_counts.to_csv("output/femicides/compare_lome_models/evalita_trainset_counts.csv") | |
print("Evalita frame labels:", sorted(evalita_frame_labels.keys())) | |
out = [] | |
zshot_score = 0 | |
evalita_score = 0 | |
for _, row in annotated_df.iterrows(): | |
answer = row["answer"] | |
if answer not in ["0shot_is_best", "evalita_is_best", "both_are_good"]: | |
continue | |
tokens = row["sentence"].split() | |
predicate = row["predicate"].split("_")[0] # to keep things simple, only look at first token of predicate | |
predicate_idx = [i for i, tok in enumerate(tokens) if tok == predicate][0] | |
if answer == "0shot_is_best": | |
if row["model_1"] == "predicted_frame_0shot": | |
zshot_label = label = row["prediction_1"] | |
evalita_label = row["prediction_2"] | |
else: | |
zshot_label = label = row["prediction_2"] | |
evalita_label = row["prediction_1"] | |
elif answer == "evalita_is_best": | |
if row["model_1"] == "predicted_frame_evalita": | |
evalita_label = label = row["prediction_1"] | |
zshot_label = row["prediction_2"] | |
else: | |
evalita_label = label = row["prediction_2"] | |
zshot_label = row["prediction_1"] | |
else: | |
label = row["prediction_1"] | |
if row["model_1"] == "predicted_frame_evalita": | |
evalita_label = row["prediction_1"] | |
zshot_label = row["prediction_2"] | |
else: | |
evalita_label = row["prediction_2"] | |
zshot_label = row["prediction_1"] | |
if label not in evalita_frame_labels: | |
print("\tskipping gold frame label not present in EVALITA: ", label) | |
continue | |
if zshot_label == label: | |
zshot_score += 1 | |
if evalita_label == label: | |
evalita_score += 1 | |
out.append({"tokens": tokens, "annotations": [{"label": label, "span": [predicate_idx, predicate_idx], "lu": None, "children": []}]}) | |
print(f"Found {len(out)} relevant annotations") | |
print("0-shot score: ", zshot_score / len(out)) | |
print("evalita score: ", evalita_score / len(out)) | |
with open("output/femicides/compare_lome_models/svm_challenge.jsonl", "w", encoding="utf-8") as f_out: | |
for line in out: | |
f_out.write(json.dumps(line) + os.linesep) | |
f_out.write(os.linesep) | |
if __name__ == '__main__': | |
action = sys.argv[1] | |
assert action in ["make", "analyze", "prep_svm_challenge"] | |
if action == "make": | |
make_annotation_experiment() | |
elif action == "analyze": | |
analyze_annotations() | |
else: | |
prep_svm_challenge() | |