Spaces:
Running
Running
import json | |
import re | |
from collections import defaultdict | |
import evaluate | |
import nltk | |
import numpy as np | |
from nervaluate import Evaluator | |
# from rouge_score import rouge_scorer | |
from sacrebleu.metrics import BLEU, CHRF | |
from sklearn.metrics import f1_score | |
from tqdm import tqdm | |
from transformers import AutoTokenizer | |
import rouge | |
import bert_score | |
import string | |
def load_json(file_path): | |
with open(file_path, "r") as f: | |
return json.load(f) | |
def get_micro_at_k(gold, pred, k): | |
gold_set = set(gold) | |
pred_set = set(pred[:k]) | |
return len(gold_set & pred_set), len(gold_set), len(pred_set) | |
def evaluate_bail(gold_data, pred_data): | |
gold_labels = [] | |
pred_labels = [] | |
for id, label in gold_data.items(): | |
gold_labels.append(label) | |
pred_labels.append(pred_data.get(id, 0)) | |
f1 = f1_score(gold_labels, pred_labels, average="macro") | |
print("Macro-F1 on HLDC-all-districts test set:", f1) | |
return {"mF1": f1} | |
def get_BLEU_score(ref_text_all, machine_text_all): | |
sc_all = [] | |
for i in range(len(ref_text_all)): | |
ref_text = ref_text_all[i] | |
machine_text = machine_text_all[i] | |
tok_ref_text = nltk.word_tokenize(ref_text) | |
tok_machine_text = nltk.word_tokenize(machine_text) | |
sc = nltk.translate.bleu_score.sentence_bleu([tok_ref_text], tok_machine_text, weights = (0.5,0.5)) | |
sc_all.append(sc) | |
return sum(sc_all)/len(sc_all) | |
def evaluate_cjpe(gold_data, pred_data): | |
# Evaluate prediction | |
gold_labels = [] | |
pred_labels = [] | |
for id, label in gold_data["prediction"].items(): | |
gold_labels.append(label) | |
pred_labels.append(pred_data["prediction"].get(id, 0)) | |
f1 = f1_score(gold_labels, pred_labels, average="macro") | |
prediction_result = {"cjpe-eval": f1} | |
print("Macro-F1 on ILDC test:", prediction_result) | |
R = [] | |
B = [] | |
rl_evaluator = rouge.Rouge(metrics=['rouge-l'], max_n=2, limit_length=False, apply_avg=True) | |
for x in range(1, 6): | |
gold_explanations = [] | |
pred_explanations = [] | |
for k,v in gold_data['explanation'].items(): | |
gold_explanations.append(v[f'expert_{x}']) | |
pred_explanations.append(pred_data['explanation'][k]) | |
print("Metrics for expert", x, "...", end=' ') | |
rougex = rl_evaluator.get_scores(pred_explanations, gold_explanations)['rouge-l']['f'] | |
bleux = get_BLEU_score(gold_explanations, pred_explanations) | |
R.append(rougex) | |
B.append(bleux) | |
print("Done.") | |
rouge_score = sum(R)/len(R) | |
bleu_score = sum(B)/len(B) | |
explanation_result = { | |
"cjpe-exp-eval": { | |
"rouge": rouge_score, | |
"bleu": bleu_score, | |
} | |
} | |
print("Explanability for ILDC Expert:", explanation_result) | |
#return {**prediction_result, **explanation_result} | |
return {"mF1": f1, "ROUGE-L": rouge_score, "BLEU": bleu_score} | |
def span2bio(txt, roles): | |
roles = sorted(roles, key = lambda x:x['start']) | |
roles_left = [r['start'] for r in roles] | |
ttxt = re.findall(r'[{}]|\w+'.format(string.punctuation), txt) | |
c = 0 | |
cr = -1 | |
prev = 'O' | |
troles = [] | |
for tok in ttxt: | |
if c >= len(txt): | |
break | |
while txt[c] == ' ': | |
c += 1 | |
else: | |
if c in roles_left: # Start of a new role | |
ind = roles_left.index(c) | |
cr = roles[ind]['end'] | |
prev = 'I-' + roles[ind]['label'] | |
troles.append('B-' + roles[ind]['label']) | |
else: | |
if c < cr: # Assign previous role | |
troles.append(prev) | |
else: # Assign 'O' | |
troles.append('O') | |
c += len(tok) | |
if len(ttxt) != len(troles): | |
troles += ['O'] * (len(ttxt) - len(troles)) | |
assert len(ttxt) == len(troles) | |
return ttxt, troles | |
def evaluate_lner(gold_data, pred_data, text_data): | |
with open("ner_labels.txt") as f: | |
labels = f.read().strip().split("\n") | |
results_per_fold = {} | |
for fold in range(1, len(gold_data) + 1): | |
gold = gold_data[f"fold_{fold}"] | |
pred = pred_data[f"fold_{fold}"] | |
text = text_data[f"fold_{fold}"] | |
texts, gold_labels, pred_labels = [], [], [] | |
for id, gold_label in tqdm(gold.items()): | |
txt = text[id] | |
pred_label = pred.get(id, []) | |
txt_seg, gold_bio = span2bio(txt, gold_label) | |
_, pred_bio = span2bio(txt, pred_label) | |
texts.append(txt_seg) | |
gold_labels.append(gold_bio) | |
pred_labels.append(pred_bio) | |
evaluator = Evaluator(gold_labels, pred_labels, tags=labels, loader="list") | |
results, results_per_tag, _, _ = evaluator.evaluate() | |
f1_scores = [results_per_tag[l]["strict"]["f1"] for l in results_per_tag] | |
avg_f1 = sum(f1_scores) / len(f1_scores) | |
print(f"Strict Macro-F1 on Fold {fold}:", avg_f1) | |
results_per_fold[f"fold_{fold}"] = avg_f1 | |
print("Strict macro-F1 on L-NER Dataset:", results_per_fold) | |
return {"strict mF1": sum(results_per_fold.values())/len(results_per_fold)} | |
def evaluate_rr(gold_data, pred_data): | |
all_gold_labels = [] | |
all_pred_labels = [] | |
with open("rr_label_vocab.json") as f: | |
label_vocab = json.load(f) | |
for id, gold_labels in gold_data.items(): | |
pred_labels = pred_data.get(id, ["None"] * len(gold_labels)) | |
for i in range(len(gold_labels)): | |
g = gold_labels[i] | |
p = pred_labels[i] | |
if g not in label_vocab: continue | |
for pp in p.split(): | |
if pp in label_vocab: | |
p = pp | |
break | |
if p not in label_vocab: continue | |
all_gold_labels.append([label_vocab[g]]) | |
all_pred_labels.append([label_vocab[p]]) | |
f1 = f1_score(all_gold_labels, all_pred_labels, average="macro") | |
print(f"Macro-F1 on combined test set:", f1) | |
return {"mF1": f1} | |
def evaluate_lsi(gold_data, pred_data): | |
with open("lsi_label_vocab.json") as f: | |
label_vocab = json.load(f) | |
gold_matrix = np.zeros((len(gold_data), len(label_vocab))) | |
pred_matrix = np.zeros((len(gold_data), len(label_vocab))) | |
for i, (id, gold_labels) in enumerate(gold_data.items()): | |
pred_labels = pred_data.get(id, []) | |
for label in gold_labels: | |
if label in label_vocab: | |
gold_matrix[i, label_vocab[label]] = 1 | |
for label in pred_labels: | |
if label in label_vocab: | |
pred_matrix[i, label_vocab[label]] = 1 | |
f1 = f1_score(gold_matrix, pred_matrix, average="macro") | |
print("Macro-F1 on ILSI test set:", f1) | |
return {"mF1": f1} | |
def evaluate_pcr(gold_data, pred_data): | |
f1_scores = [] | |
for k in range(1, 21): | |
correct, gold_total, pred_total = 0, 0, 0 | |
for id, gold_candidates in tqdm(gold_data.items(), desc="pcr"): | |
pred_candidates = pred_data.get(id, []) | |
gold_candidates = [c for c in gold_candidates if c != id] | |
pred_candidates = [c for c in pred_candidates if c != id] | |
c, g, p = get_micro_at_k(gold_candidates, pred_candidates, k) | |
correct += c | |
gold_total += g | |
pred_total += p | |
precision = correct / pred_total if pred_total > 0 else 0 | |
recall = correct / gold_total if gold_total > 0 else 0 | |
f1 = ( | |
2 * precision * recall / (precision + recall) | |
if precision + recall > 0 | |
else 0 | |
) | |
f1_scores.append(f1) | |
print(f"Micro-F1@{k} on IL-PCR test set:", f1) | |
max_f1 = max(f1_scores) | |
index_max = f1_scores.index(max_f1) + 1 | |
return {"muF1@K": f"{max_f1:.2f}@{index_max}"} | |
def evaluate_summ(gold_data, pred_data): | |
gold_summaries = [] | |
pred_summaries = [] | |
for id, gold_summary in gold_data.items(): | |
if id in pred_data: | |
gold_summary = re.sub(r"\s+", " ", gold_summary.replace("\n", " ")).strip() | |
pred_summary = re.sub(r"\s+", " ", pred_data[id].replace("\n", " ")).strip() | |
gold_summaries.append(gold_summary) | |
pred_summaries.append(pred_summary) | |
# rl_evaluator = rouge.Rouge(metrics=['rouge-n','rouge-l'], max_n=2, limit_length=False, apply_avg=True) | |
# rl_scores = rl_evaluator.get_scores(pred_summaries, gold_summaries) | |
# print("Rouge:", {k:v['f'] for k,v in rl_scores.items()}, flush=True) | |
_, _, bs = bert_score.score(pred_summaries, gold_summaries, lang="en", verbose=True) | |
print("BERTSCORE:", bs.mean().item()) | |
# return {'ROUGE-L': rl_scores['rouge-l']['f'], 'BERTSCORE': bs.mean().item()} | |
return {'ROUGE-L': '-', 'BERTSCORE': bs.mean().item()} | |
def evaluate_lmt(gold_data, pred_data): | |
tokenizer = AutoTokenizer.from_pretrained("ai4bharat/indic-bert", use_fast=False) | |
bleu = BLEU() | |
chrfp = CHRF(word_order=2) | |
gleu = evaluate.load("google_bleu") | |
G = defaultdict(lambda: defaultdict(list)) | |
P = defaultdict(lambda: defaultdict(list)) | |
for dataset in gold_data: | |
for id, gold_text in gold_data[dataset].items(): | |
lang = id.split("/")[1].strip() | |
gold_tokens = " ".join(tokenizer.tokenize(gold_text)) | |
pred_tokens = " ".join(tokenizer.tokenize(pred_data[dataset][id])) | |
G[dataset][lang].append(gold_tokens) | |
P[dataset][lang].append(pred_tokens) | |
bleu_scores, chrfpp_scores, gleu_scores = [], [], [] | |
for dataset in G: | |
print("Dataset", dataset) | |
dataset_bleu, dataset_chrfpp, dataset_gleu = [], [], [] | |
for lang in G[dataset]: | |
gold = G[dataset][lang] | |
pred = P[dataset][lang] | |
bleu_score = bleu.corpus_score(pred, [gold]).score | |
chrfpp_score = chrfp.corpus_score(pred, [gold]).score | |
gleu_score = gleu.compute(predictions=pred, references=gold)["google_bleu"] | |
dataset_bleu.append(bleu_score) | |
dataset_chrfpp.append(chrfpp_score) | |
dataset_gleu.append(gleu_score) | |
bleu_scores.append(sum(dataset_bleu) / len(dataset_bleu)) | |
chrfpp_scores.append(sum(dataset_chrfpp) / len(dataset_chrfpp)) | |
gleu_scores.append(sum(dataset_gleu) / len(dataset_gleu)) | |
return { | |
"BLEU": sum(bleu_scores) / len(bleu_scores), | |
"GLEU": sum(gleu_scores) / len(gleu_scores), | |
"chrF++": sum(chrfpp_scores) / len(chrfpp_scores), | |
} | |
def create_output_json(evaluation_results): | |
output = { | |
"Method": "Dummy Ideal", | |
"Submitted By": "IL-TUR", | |
"Github Link": "dummy submission", | |
"L-NER": {"strict mF1": evaluation_results["lner"]["strict mF1"]}, | |
"RR": {"mF1": evaluation_results["rr"]["mF1"]}, | |
"CJPE": { | |
"mF1": evaluation_results["cjpe"]["mF1"], | |
"ROUGE-L": evaluation_results["cjpe"]["ROUGE-L"], | |
"BLEU": evaluation_results["cjpe"]["BLEU"], | |
}, | |
"BAIL": {"mF1": evaluation_results["bail"]["mF1"]}, | |
"LSI": {"mF1": evaluation_results["lsi"]["mF1"]}, | |
"PCR": {"muF1@K": evaluation_results["pcr"]["muF1@K"]}, | |
"SUMM": { | |
"ROUGE-L": evaluation_results["summ"]["ROUGE-L"], | |
"BERTSCORE": evaluation_results["summ"]["BERTSCORE"] #"-", # Placeholder BERTSCORE | |
}, | |
"L-MT": { | |
"BLEU": evaluation_results["lmt"]["BLEU"], | |
"GLEU": evaluation_results["lmt"]["GLEU"], | |
"chrF++": evaluation_results["lmt"]["chrF++"], | |
}, | |
} | |
return [output] # Wrap in a list to match the desired format | |
def main(): | |
# gold_data = load_json("IL_TUR_eval_gold.json") | |
# pred_data = load_json("IL_TUR_eval_submission2.json") | |
gold_data = load_json("submissions/baseline/IL_TUR_eval_gold.json") | |
pred_data = load_json("submissions/baseline/IL_TUR_eval_submission_dummy.json") | |
pred_data = gold_data | |
evaluation_results = {} | |
for task in pred_data.keys(): | |
print(f"Task: {task}") | |
if task == "bail": | |
evaluation_results[task] = evaluate_bail(gold_data[task], pred_data[task]) | |
elif task == "cjpe": | |
nltk.download('punkt') | |
evaluation_results.update(evaluate_cjpe(gold_data[task], pred_data[task])) | |
elif task == "lner": | |
text_data = load_json("lner-text.json") | |
evaluation_results[task] = evaluate_lner( | |
gold_data[task], pred_data[task], text_data | |
) | |
elif task == "rr": | |
evaluation_results[task] = evaluate_rr(gold_data[task], pred_data[task]) | |
elif task == "lsi": | |
evaluation_results[task] = evaluate_lsi(gold_data[task], pred_data[task]) | |
elif task == "pcr": | |
evaluation_results[task] = evaluate_pcr(gold_data[task], pred_data[task]) | |
elif task == "summ": | |
nltk.download('punkt') | |
evaluation_results[task] = evaluate_summ(gold_data[task], pred_data[task]) | |
elif task == "lmt": | |
evaluation_results[task] = evaluate_lmt(gold_data[task], pred_data[task]) | |
# convert the evaluation results to the required format | |
for task, result in evaluation_results.items(): | |
if isinstance(result, dict): | |
for subtask, subresult in result.items(): | |
if isinstance(subresult, dict): | |
for subsubtask, subsubresult in subresult.items(): | |
evaluation_results[task][subtask][ | |
subsubtask | |
] = f"{subsubresult:.2f}" | |
else: | |
if isinstance(subresult, str): | |
evaluation_results[task][subtask] = subresult | |
else: | |
evaluation_results[task][subtask] = f"{subresult:.2f}" | |
else: | |
if isinstance(result, str): | |
evaluation_results[task] = result | |
else: | |
evaluation_results[task] = f"{result:.2f}" | |
blank_scores = { | |
"lner": {"strict mF1": "-"}, | |
"rr": {"mF1": "-"}, | |
"cjpe": {"mF1": "-", "ROUGE-L": "-", "BLEU": "-"}, | |
"bail": {"mF1": "-"}, | |
"lsi": {"mF1": "-"}, | |
"pcr": {"muF1@K": "-"}, | |
"summ": {"ROUGE-L": "-", "BERTSCORE": "-"}, | |
"lmt": {"BLEU": "-", "GLEU": "-", "chrF++": "-"}, | |
} | |
print("--------------------------Evaluation Summary--------------------------") | |
for task, result in evaluation_results.items(): | |
print(f"{task}: {result}") | |
print("---------------------------------------------------------------------") | |
# for tasks that were not present in the submission, add blank scores | |
for task in gold_data.keys(): | |
if task not in pred_data: | |
evaluation_results[task] = blank_scores[task] | |
# Generate the output JSON | |
output_json = create_output_json(evaluation_results) | |
with open("evaluation_results.json", "w") as f: | |
json.dump(output_json, f, indent=2) | |
print("Evaluation results saved to evaluation_results.json") | |
def get_evaluation_scores(gold_data, submission_data): | |
evaluation_results = {} | |
for task in submission_data.keys(): | |
print(f"Task: {task}") | |
if task == "bail": | |
evaluation_results[task] = evaluate_bail( | |
gold_data[task], submission_data[task] | |
) | |
elif task == "cjpe": | |
nltk.download('punkt') | |
evaluation_results.update( | |
evaluate_cjpe(gold_data[task], submission_data[task]) | |
) | |
elif task == "lner": | |
text_data = load_json("lner-text.json") | |
evaluation_results[task] = evaluate_lner( | |
gold_data[task], submission_data[task], text_data | |
) | |
elif task == "rr": | |
evaluation_results[task] = evaluate_rr( | |
gold_data[task], submission_data[task] | |
) | |
elif task == "lsi": | |
evaluation_results[task] = evaluate_lsi( | |
gold_data[task], submission_data[task] | |
) | |
elif task == "pcr": | |
evaluation_results[task] = evaluate_pcr( | |
gold_data[task], submission_data[task] | |
) | |
elif task == "summ": | |
nltk.download('punkt') | |
evaluation_results[task] = evaluate_summ( | |
gold_data[task], submission_data[task] | |
) | |
elif task == "lmt": | |
evaluation_results[task] = evaluate_lmt( | |
gold_data[task], submission_data[task] | |
) | |
# convert the evaluation results to the required format | |
for task, result in evaluation_results.items(): | |
if isinstance(result, dict): | |
for subtask, subresult in result.items(): | |
if isinstance(subresult, dict): | |
for subsubtask, subsubresult in subresult.items(): | |
evaluation_results[task][subtask][ | |
subsubtask | |
] = f"{subsubresult:.2f}" | |
else: | |
if isinstance(subresult, str): | |
evaluation_results[task][subtask] = subresult | |
else: | |
evaluation_results[task][subtask] = f"{subresult:.2f}" | |
else: | |
if isinstance(result, str): | |
evaluation_results[task] = result | |
else: | |
evaluation_results[task] = f"{result:.2f}" | |
blank_scores = { | |
"lner": {"strict mF1": "-"}, | |
"rr": {"mF1": "-"}, | |
"cjpe": {"mF1": "-", "ROUGE-L": "-", "BLEU": "-"}, | |
"bail": {"mF1": "-"}, | |
"lsi": {"mF1": "-"}, | |
"pcr": {"muF1@K": "-"}, | |
"summ": {"ROUGE-L": "-", "BERTSCORE": "-"}, | |
"lmt": {"BLEU": "-", "GLEU": "-", "chrF++": "-"}, | |
} | |
# for tasks that were not present in the submission, add blank scores | |
for task in gold_data.keys(): | |
if task not in submission_data: | |
evaluation_results[task] = blank_scores[task] | |
print("--------------------------Evaluation Summary--------------------------") | |
for task, result in evaluation_results.items(): | |
print(f"{task}: {result}") | |
print("---------------------------------------------------------------------") | |
output_json = create_output_json(evaluation_results) | |
return output_json | |
if __name__ == "__main__": | |
main() | |