IL-TUR-Leaderboard / eval_utils.py
shounakpaul95's picture
Update eval_utils.py
9c54061 verified
import json
import re
from collections import defaultdict
import evaluate
import nltk
import numpy as np
from nervaluate import Evaluator
from sacrebleu.metrics import BLEU, CHRF
from sklearn.metrics import f1_score
from tqdm import tqdm
from transformers import AutoTokenizer
import rouge
import bert_score
import string
def load_json(file_path):
with open(file_path, "r") as f:
return json.load(f)
def get_micro_at_k(gold, pred, k):
gold_set = set(gold)
pred_set = set(pred[:k])
return len(gold_set & pred_set), len(gold_set), len(pred_set)
def evaluate_bail(gold_data, pred_data):
gold_labels = []
pred_labels = []
for id, label in gold_data.items():
gold_labels.append(label)
pred_labels.append(pred_data.get(id, 0))
f1 = f1_score(gold_labels, pred_labels, average="macro")
print("Macro-F1 on HLDC-all-districts test set:", f1)
return {"mF1": f1*100}
def get_BLEU_score(ref_text_all, machine_text_all):
sc_all = []
for i in range(len(ref_text_all)):
ref_text = ref_text_all[i]
machine_text = machine_text_all[i]
tok_ref_text = nltk.word_tokenize(ref_text)
tok_machine_text = nltk.word_tokenize(machine_text)
sc = nltk.translate.bleu_score.sentence_bleu([tok_ref_text], tok_machine_text, weights = (0.5,0.5))
sc_all.append(sc)
return sum(sc_all)/len(sc_all)
def evaluate_cjpe(gold_data, pred_data):
# Evaluate prediction
gold_labels = []
pred_labels = []
for id, label in gold_data["prediction"].items():
gold_labels.append(label)
pred_labels.append(pred_data["prediction"].get(id, 0))
f1 = f1_score(gold_labels, pred_labels, average="macro")
prediction_result = {"cjpe-eval": f1}
print("Macro-F1 on ILDC test:", prediction_result)
R = []
B = []
rl_evaluator = rouge.Rouge(metrics=['rouge-l'], max_n=2, limit_length=False, apply_avg=True)
for x in range(1, 6):
gold_explanations = []
pred_explanations = []
for k,v in gold_data['explanation'].items():
gold_explanations.append(v[f'expert_{x}'])
pred_explanations.append(pred_data['explanation'][k])
print("Metrics for expert", x, "...", end=' ')
rougex = rl_evaluator.get_scores(pred_explanations, gold_explanations)['rouge-l']['f']
bleux = get_BLEU_score(gold_explanations, pred_explanations)
R.append(rougex)
B.append(bleux)
print("Done.")
rouge_score = sum(R)/len(R)
bleu_score = sum(B)/len(B)
explanation_result = {
"cjpe-exp-eval": {
"rouge": rouge_score,
"bleu": bleu_score,
}
}
print("Explanability for ILDC Expert:", explanation_result)
#return {**prediction_result, **explanation_result}
return {"cjpe-prediction": {"mF1": f1*100}, "cjpe-explanation": {"ROUGE-L": rouge_score*100, "BLEU": bleu_score*100}}
def span2bio(txt, roles):
roles = sorted(roles, key = lambda x:x['start'])
roles_left = [r['start'] for r in roles]
ttxt = re.findall(r'[{}]|\w+'.format(string.punctuation), txt)
c = 0
cr = -1
prev = 'O'
troles = []
for tok in ttxt:
if c >= len(txt):
break
while txt[c] == ' ':
c += 1
else:
if c in roles_left: # Start of a new role
ind = roles_left.index(c)
cr = roles[ind]['end']
prev = 'I-' + roles[ind]['label']
troles.append('B-' + roles[ind]['label'])
else:
if c < cr: # Assign previous role
troles.append(prev)
else: # Assign 'O'
troles.append('O')
c += len(tok)
if len(ttxt) != len(troles):
troles += ['O'] * (len(ttxt) - len(troles))
assert len(ttxt) == len(troles)
return ttxt, troles
def evaluate_lner(gold_data, pred_data, text_data):
with open("ner_labels.txt") as f:
labels = f.read().strip().split("\n")
results_per_fold = {}
for fold in range(1, len(gold_data) + 1):
gold = gold_data[f"fold_{fold}"]
pred = pred_data[f"fold_{fold}"]
text = text_data[f"fold_{fold}"]
texts, gold_labels, pred_labels = [], [], []
for id, gold_label in tqdm(gold.items()):
txt = text[id]
pred_label = pred.get(id, [])
txt_seg, gold_bio = span2bio(txt, gold_label)
_, pred_bio = span2bio(txt, pred_label)
texts.append(txt_seg)
gold_labels.append(gold_bio)
pred_labels.append(pred_bio)
evaluator = Evaluator(gold_labels, pred_labels, tags=labels, loader="list")
results, results_per_tag, _, _ = evaluator.evaluate()
f1_scores = [results_per_tag[l]["strict"]["f1"] for l in results_per_tag]
avg_f1 = sum(f1_scores) / len(f1_scores)
print(f"Strict Macro-F1 on Fold {fold}:", avg_f1)
results_per_fold[f"fold_{fold}"] = avg_f1
print("Strict macro-F1 on L-NER Dataset:", results_per_fold)
return {"strict mF1": sum(results_per_fold.values())/len(results_per_fold)*100}
def evaluate_rr(gold_data, pred_data):
all_gold_labels = []
all_pred_labels = []
with open("rr_label_vocab.json") as f:
label_vocab = json.load(f)
for id, gold_labels in gold_data.items():
pred_labels = pred_data.get(id, ["None"] * len(gold_labels))
for i in range(len(gold_labels)):
g = gold_labels[i]
p = pred_labels[i]
if g not in label_vocab: continue
for pp in p.split():
if pp in label_vocab:
p = pp
break
if p not in label_vocab: continue
all_gold_labels.append([label_vocab[g]])
all_pred_labels.append([label_vocab[p]])
f1 = f1_score(all_gold_labels, all_pred_labels, average="macro")
print(f"Macro-F1 on combined test set:", f1)
return {"mF1": f1*100}
def evaluate_lsi(gold_data, pred_data):
with open("lsi_label_vocab.json") as f:
label_vocab = json.load(f)
gold_matrix = np.zeros((len(gold_data), len(label_vocab)))
pred_matrix = np.zeros((len(gold_data), len(label_vocab)))
for i, (id, gold_labels) in enumerate(gold_data.items()):
pred_labels = pred_data.get(id, [])
for label in gold_labels:
if label in label_vocab:
gold_matrix[i, label_vocab[label]] = 1
for label in pred_labels:
if label in label_vocab:
pred_matrix[i, label_vocab[label]] = 1
f1 = f1_score(gold_matrix, pred_matrix, average="macro")
print("Macro-F1 on ILSI test set:", f1)
return {"mF1": f1*100}
def evaluate_pcr(gold_data, pred_data):
f1_scores = []
for k in range(1, 21):
correct, gold_total, pred_total = 0, 0, 0
for id, gold_candidates in tqdm(gold_data.items(), desc="pcr"):
pred_candidates = pred_data.get(id, [])
gold_candidates = [c for c in gold_candidates if c != id]
pred_candidates = [c for c in pred_candidates if c != id]
c, g, p = get_micro_at_k(gold_candidates, pred_candidates, k)
correct += c
gold_total += g
pred_total += p
precision = correct / pred_total if pred_total > 0 else 0
recall = correct / gold_total if gold_total > 0 else 0
f1 = (
2 * precision * recall / (precision + recall)
if precision + recall > 0
else 0
)
f1_scores.append(f1)
print(f"Micro-F1@{k} on IL-PCR test set:", f1)
max_f1 = max(f1_scores)
# index_max = f1_scores.index(max_f1) + 1
return {"muF1@K": f"{max_f1*100:.2f}"} #@{index_max}"}
def evaluate_summ(gold_data, pred_data):
gold_summaries = []
pred_summaries = []
for id, gold_summary in gold_data.items():
if id in pred_data:
gold_summary = re.sub(r"\s+", " ", gold_summary.replace("\n", " ")).strip()
pred_summary = re.sub(r"\s+", " ", pred_data[id].replace("\n", " ")).strip()
gold_summaries.append(gold_summary)
pred_summaries.append(pred_summary)
# rl_evaluator = rouge.Rouge(metrics=['rouge-l'], max_n=2, limit_length=False, apply_avg=True)
# rl_scores = rl_evaluator.get_scores(pred_summaries, gold_summaries)
# print("Rouge:", {k:v['f'] for k,v in rl_scores.items()}, flush=True)
_, _, bs = bert_score.score(pred_summaries, gold_summaries, lang="en", verbose=True)
print("BERTSCORE:", bs.mean().item())
# return {'ROUGE-L': rl_scores['rouge-l']['f'] * 100, 'BERTSCORE': bs.mean().item() * 100}
return {'ROUGE-L': '-', 'BERTSCORE': bs.mean().item() * 100}
def evaluate_lmt(gold_data, pred_data):
tokenizer = AutoTokenizer.from_pretrained("ai4bharat/indic-bert", use_fast=False)
bleu = BLEU()
chrfp = CHRF(word_order=2)
gleu = evaluate.load("google_bleu")
G = defaultdict(lambda: defaultdict(list))
P = defaultdict(lambda: defaultdict(list))
for dataset in gold_data:
for id, gold_text in gold_data[dataset].items():
lang = id.split("/")[1].strip()
gold_tokens = " ".join(tokenizer.tokenize(gold_text))
pred_tokens = " ".join(tokenizer.tokenize(pred_data[dataset][id]))
G[dataset][lang].append(gold_tokens)
P[dataset][lang].append(pred_tokens)
bleu_scores, chrfpp_scores, gleu_scores = [], [], []
for dataset in G:
print("Dataset", dataset)
dataset_bleu, dataset_chrfpp, dataset_gleu = [], [], []
for lang in G[dataset]:
gold = G[dataset][lang]
pred = P[dataset][lang]
bleu_score = bleu.corpus_score(pred, [gold]).score
chrfpp_score = chrfp.corpus_score(pred, [gold]).score
gleu_score = gleu.compute(predictions=pred, references=gold)["google_bleu"]
dataset_bleu.append(bleu_score)
dataset_chrfpp.append(chrfpp_score)
dataset_gleu.append(gleu_score)
bleu_scores.append(sum(dataset_bleu) / len(dataset_bleu))
chrfpp_scores.append(sum(dataset_chrfpp) / len(dataset_chrfpp))
gleu_scores.append(sum(dataset_gleu) / len(dataset_gleu))
return {
"BLEU": sum(bleu_scores) / len(bleu_scores),
"GLEU": sum(gleu_scores) / len(gleu_scores) * 100,
"chrF++": sum(chrfpp_scores) / len(chrfpp_scores),
}
def create_output_json(evaluation_results):
output = {
"Method": "Final Testing Random",
"Submitted By": "IL-TUR",
"Github Link": "dummy submission",
"L-NER": {"strict mF1": evaluation_results["lner"]["strict mF1"]},
"RR": {"mF1": evaluation_results["rr"]["mF1"]},
"CJPE": {
"mF1": evaluation_results["cjpe-prediction"]["mF1"],
"ROUGE-L": evaluation_results["cjpe-explanation"]["ROUGE-L"],
"BLEU": evaluation_results["cjpe-explanation"]["BLEU"],
},
"BAIL": {"mF1": evaluation_results["bail"]["mF1"]},
"LSI": {"mF1": evaluation_results["lsi"]["mF1"]},
"PCR": {"muF1@K": evaluation_results["pcr"]["muF1@K"]},
"SUMM": {
"ROUGE-L": evaluation_results["summ"]["ROUGE-L"],
"BERTSCORE": evaluation_results["summ"]["BERTSCORE"] #"-", # Placeholder BERTSCORE
},
"L-MT": {
"BLEU": evaluation_results["lmt"]["BLEU"],
"GLEU": evaluation_results["lmt"]["GLEU"],
"chrF++": evaluation_results["lmt"]["chrF++"],
},
}
return [output] # Wrap in a list to match the desired format
def main():
# gold_data = load_json("IL_TUR_eval_gold.json")
# pred_data = load_json("IL_TUR_eval_submission2.json")
gold_data = load_json("submissions/baseline/IL_TUR_eval_gold.json")
pred_data = load_json("submissions/baseline/IL_TUR_eval_submission_dummy.json")
pred_data = gold_data
evaluation_results = {}
for task in pred_data.keys():
print(f"Task: {task}")
if task == "bail":
evaluation_results[task] = evaluate_bail(gold_data[task], pred_data[task])
elif task == "cjpe":
nltk.download('punkt')
evaluation_results.update(evaluate_cjpe(gold_data[task], pred_data[task]))
elif task == "lner":
text_data = load_json("lner-text.json")
evaluation_results[task] = evaluate_lner(
gold_data[task], pred_data[task], text_data
)
elif task == "rr":
evaluation_results[task] = evaluate_rr(gold_data[task], pred_data[task])
elif task == "lsi":
evaluation_results[task] = evaluate_lsi(gold_data[task], pred_data[task])
elif task == "pcr":
evaluation_results[task] = evaluate_pcr(gold_data[task], pred_data[task])
elif task == "summ":
nltk.download('punkt')
evaluation_results[task] = evaluate_summ(gold_data[task], pred_data[task])
elif task == "lmt":
evaluation_results[task] = evaluate_lmt(gold_data[task], pred_data[task])
# convert the evaluation results to the required format
for task, result in evaluation_results.items():
if isinstance(result, dict):
for subtask, subresult in result.items():
if isinstance(subresult, dict):
for subsubtask, subsubresult in subresult.items():
evaluation_results[task][subtask][
subsubtask
] = f"{subsubresult:.2f}"
else:
if isinstance(subresult, str):
evaluation_results[task][subtask] = subresult
else:
evaluation_results[task][subtask] = f"{subresult:.2f}"
else:
if isinstance(result, str):
evaluation_results[task] = result
else:
evaluation_results[task] = f"{result:.2f}"
blank_scores = {
"lner": {"strict mF1": "-"},
"rr": {"mF1": "-"},
"cjpe": {"mF1": "-", "ROUGE-L": "-", "BLEU": "-"},
"bail": {"mF1": "-"},
"lsi": {"mF1": "-"},
"pcr": {"muF1@K": "-"},
"summ": {"ROUGE-L": "-", "BERTSCORE": "-"},
"lmt": {"BLEU": "-", "GLEU": "-", "chrF++": "-"},
}
print("--------------------------Evaluation Summary--------------------------")
for task, result in evaluation_results.items():
print(f"{task}: {result}")
print("---------------------------------------------------------------------")
# for tasks that were not present in the submission, add blank scores
for task in gold_data.keys():
if task not in pred_data:
evaluation_results[task] = blank_scores[task]
# Generate the output JSON
output_json = create_output_json(evaluation_results)
with open("evaluation_results.json", "w") as f:
json.dump(output_json, f, indent=2)
print("Evaluation results saved to evaluation_results.json")
def get_evaluation_scores(gold_data, submission_data):
evaluation_results = {}
for task in submission_data.keys():
print(f"Task: {task}")
if task == "bail":
evaluation_results[task] = evaluate_bail(
gold_data[task], submission_data[task]
)
elif task == "cjpe":
nltk.download('punkt')
evaluation_results.update(
evaluate_cjpe(gold_data[task], submission_data[task])
)
elif task == "lner":
text_data = load_json("lner-text.json")
evaluation_results[task] = evaluate_lner(
gold_data[task], submission_data[task], text_data
)
elif task == "rr":
evaluation_results[task] = evaluate_rr(
gold_data[task], submission_data[task]
)
elif task == "lsi":
evaluation_results[task] = evaluate_lsi(
gold_data[task], submission_data[task]
)
elif task == "pcr":
evaluation_results[task] = evaluate_pcr(
gold_data[task], submission_data[task]
)
elif task == "summ":
nltk.download('punkt')
evaluation_results[task] = evaluate_summ(
gold_data[task], submission_data[task]
)
elif task == "lmt":
evaluation_results[task] = evaluate_lmt(
gold_data[task], submission_data[task]
)
# convert the evaluation results to the required format
for task, result in evaluation_results.items():
if isinstance(result, dict):
for subtask, subresult in result.items():
if isinstance(subresult, dict):
for subsubtask, subsubresult in subresult.items():
evaluation_results[task][subtask][
subsubtask
] = f"{subsubresult:.2f}"
else:
if isinstance(subresult, str):
evaluation_results[task][subtask] = subresult
else:
evaluation_results[task][subtask] = f"{subresult:.2f}"
else:
if isinstance(result, str):
evaluation_results[task] = result
else:
evaluation_results[task] = f"{result:.2f}"
blank_scores = {
"lner": {"strict mF1": "-"},
"rr": {"mF1": "-"},
"cjpe": {"mF1": "-", "ROUGE-L": "-", "BLEU": "-"},
"bail": {"mF1": "-"},
"lsi": {"mF1": "-"},
"pcr": {"muF1@K": "-"},
"summ": {"ROUGE-L": "-", "BERTSCORE": "-"},
"lmt": {"BLEU": "-", "GLEU": "-", "chrF++": "-"},
}
# for tasks that were not present in the submission, add blank scores
for task in gold_data.keys():
if task not in submission_data:
evaluation_results[task] = blank_scores[task]
print("--------------------------Evaluation Summary--------------------------")
for task, result in evaluation_results.items():
print(f"{task}: {result}")
print("---------------------------------------------------------------------")
output_json = create_output_json(evaluation_results)
return output_json
if __name__ == "__main__":
main()