import streamlit as st import pandas as pd import numpy as np import torch import evaluate from datasets import load_dataset from evaluate import load as load_metric from transformers import AutoModelForSeq2SeqLM, AutoTokenizer from sklearn.metrics import accuracy_score, f1_score from tqdm.auto import tqdm from torch.utils.data import DataLoader st.set_page_config(layout="wide") select = st.selectbox('Which model would you like to evaluate?', ('Bart', 'mBart')) def get_datasets(): if select == 'Bart': all_datasets = ["Communication Networks: unseen questions", "Communication Networks: unseen answers"] if select == 'mBart': all_datasets = ["Micro Job: unseen questions", "Micro Job: unseen answers", "Legal Domain: unseen questions", "Legal Domain: unseen answers"] return all_datasets all_datasets = get_datasets() #def get_split(dataset_name): # if dataset_name == "Communication Networks: unseen questions": # split = load_dataset("Short-Answer-Feedback/saf_communication_networks_english", split="test_unseen_questions") # if dataset_name == "Communication Networks: unseen answers": # split = load_dataset("Short-Answer-Feedback/saf_communication_networks_english", split="test_unseen_answers") # if dataset_name == "Micro Job: unseen questions": # split = load_dataset("Short-Answer-Feedback/saf_micro_job_german", split="test_unseen_questions") # if dataset_name == "Micro Job: unseen answers": # split = load_dataset("Short-Answer-Feedback/saf_micro_job_german", split="test_unseen_answers") # if dataset_name == "Legal Domain: unseen questions": # split = load_dataset("Short-Answer-Feedback/saf_legal_domain_german", split="test_unseen_questions") # if dataset_name == "Legal Domain: unseen answers": # split = load_dataset("Short-Answer-Feedback/saf_legal_domain_german", split="test_unseen_answers") # return split def get_model(datasetname): if datasetname == "Communication Networks: unseen questions" or datasetname == "Communication Networks: unseen answers": model = "Short-Answer-Feedback/bart-finetuned-saf-communication-networks" if datasetname == "Micro Job: unseen questions" or datasetname == "Micro Job: unseen answers": model = "Short-Answer-Feedback/mbart-finetuned-saf-micro-job" if datasetname == "Legal Domain: unseen questions" or datasetname == "Legal Domain: unseen answers": model = "Short-Answer-Feedback/mbart-finetuned-saf-legal-domain" return model # def get_tokenizer(datasetname): # if datasetname == "Communication Networks: unseen questions" or datasetname == "Communication Networks: unseen answers": # tokenizer = "Short-Answer-Feedback/bart-finetuned-saf-communication-networks" # if datasetname == "Micro Job: unseen questions" or datasetname == "Micro Job: unseen answers": # tokenizer = "Short-Answer-Feedback/mbart-finetuned-saf-micro-job" # if datasetname == "Legal Domain: unseen questions" or datasetname == "Legal Domain: unseen answers": # tokenizer = "Short-Answer-Feedback/mbart-finetuned-saf-legal-domain" # return tokenizer # sacrebleu = load_metric('sacrebleu') # rouge = load_metric('rouge') # meteor = load_metric('meteor') # bertscore = load_metric('bertscore') # # use gpu if it's available # device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu') # MAX_INPUT_LENGTH = 256 # MAX_TARGET_LENGTH = 128 # def preprocess_function(examples, **kwargs): # """ # Preprocess entries of the given dataset # Params: # examples (Dataset): dataset to be preprocessed # Returns: # model_inputs (BatchEncoding): tokenized dataset entries # """ # inputs, targets = [], [] # for i in range(len(examples['question'])): # inputs.append(f"Antwort: {examples['provided_answer'][i]} Lösung: {examples['reference_answer'][i]} Frage: {examples['question'][i]}") # targets.append(f"{examples['verification_feedback'][i]} Feedback: {examples['answer_feedback'][i]}") # # apply tokenization to inputs and labels # tokenizer = kwargs["tokenizer"] # model_inputs = tokenizer(inputs, max_length=MAX_INPUT_LENGTH, padding='max_length', truncation=True) # labels = tokenizer(text_target=targets, max_length=MAX_TARGET_LENGTH, padding='max_length', truncation=True) # model_inputs['labels'] = labels['input_ids'] # return model_inputs # def flatten_list(l): # """ # Utility function to convert a list of lists into a flattened list # Params: # l (list of lists): list to be flattened # Returns: # A flattened list with the elements of the original list # """ # return [item for sublist in l for item in sublist] # def extract_feedback(predictions): # """ # Utility function to extract the feedback from the predictions of the model # Params: # predictions (list): complete model predictions # Returns: # feedback (list): extracted feedback from the model's predictions # """ # feedback = [] # # iterate through predictions and try to extract predicted feedback # for pred in predictions: # try: # fb = pred.split(':', 1)[1] # except IndexError: # try: # if pred.lower().startswith('partially correct'): # fb = pred.split(' ', 1)[2] # else: # fb = pred.split(' ', 1)[1] # except IndexError: # fb = pred # feedback.append(fb.strip()) # return feedback # def extract_labels(predictions): # """ # Utility function to extract the labels from the predictions of the model # Params: # predictions (list): complete model predictions # Returns: # feedback (list): extracted labels from the model's predictions # """ # labels = [] # for pred in predictions: # if pred.lower().startswith('correct'): # label = 'Correct' # elif pred.lower().startswith('partially correct'): # label = 'Partially correct' # elif pred.lower().startswith('incorrect'): # label = 'Incorrect' # else: # label = 'Unknown label' # labels.append(label) # return labels # def get_predictions_labels(model, dataloader, tokenizer): # """ # Evaluate model on the given dataset # Params: # model (PreTrainedModel): seq2seq model # dataloader (torch Dataloader): dataloader of the dataset to be used for evaluation # Returns: # results (dict): dictionary with the computed evaluation metrics # predictions (list): list of the decoded predictions of the model # """ # decoded_preds, decoded_labels = [], [] # model.eval() # # iterate through batchs in the dataloader # for batch in tqdm(dataloader): # with torch.no_grad(): # batch = {k: v.to(device) for k, v in batch.items()} # # generate tokens from batch # generated_tokens = model.generate( # batch['input_ids'], # attention_mask=batch['attention_mask'], # max_length=MAX_TARGET_LENGTH # ) # # get golden labels from batch # labels_batch = batch['labels'] # # decode model predictions and golden labels # decoded_preds_batch = tokenizer.batch_decode(generated_tokens, skip_special_tokens=True) # decoded_labels_batch = tokenizer.batch_decode(labels_batch, skip_special_tokens=True) # decoded_preds.append(decoded_preds_batch) # decoded_labels.append(decoded_labels_batch) # # convert predictions and golden labels into flattened lists # predictions = flatten_list(decoded_preds) # labels = flatten_list(decoded_labels) # return predictions, labels # def load_data(): # df = pd.DataFrame(columns=['Model', 'Dataset', 'SacreBLEU', 'ROUGE-2', 'METEOR', 'BERTScore', 'Accuracy', 'Weighted F1', 'Macro F1']) # for ds in all_datasets: # split = get_split(ds) # model = AutoModelForSeq2SeqLM.from_pretrained(get_model(ds)) # tokenizer = AutoTokenizer.from_pretrained(get_tokenizer(ds)) # processed_dataset = split.map( # preprocess_function, # batched=True, # remove_columns=split.column_names, # fn_kwargs={"tokenizer": tokenizer} # ) # processed_dataset.set_format('torch') # dataloader = DataLoader(processed_dataset, batch_size=4) # predictions, labels = get_predictions_labels(model, dataloader, tokenizer) # predicted_feedback = extract_feedback(predictions) # predicted_labels = extract_labels(predictions) # reference_feedback = [x.split('Feedback:', 1)[1].strip() for x in labels] # reference_labels = [x.split('Feedback:', 1)[0].strip() for x in labels] # rouge_score = rouge.compute(predictions=predicted_feedback, references=reference_feedback)['rouge2'] # bleu_score = sacrebleu.compute(predictions=predicted_feedback, references=[[x] for x in reference_feedback])['score'] # meteor_score = meteor.compute(predictions=predicted_feedback, references=reference_feedback)['meteor'] # bert_score = bertscore.compute(predictions=predicted_feedback, references=reference_feedback, lang='de', model_type='bert-base-multilingual-cased', rescale_with_baseline=True) # reference_labels_np = np.array(reference_labels) # accuracy_value = accuracy_score(reference_labels_np, predicted_labels) # f1_weighted_value = f1_score(reference_labels_np, predicted_labels, average='weighted') # f1_macro_value = f1_score(reference_labels_np, predicted_labels, average='macro', labels=['Incorrect', 'Partially correct', 'Correct']) # new_row_data = {"Model": get_model(ds), "Dataset": ds, "SacreBLEU": bleu_score, "ROUGE-2": rouge_score, "METEOR": meteor_score, "BERTScore": bert_score, "Accuracy": accuracy_value, "Weighted F1": f1_weighted_value, "Macro F1": f1_macro_value} # new_row = pd.DataFrame(new_row_data) # df = pd.concat([df, new_row]) # return df def get_rows(datasetname): if datasetname == "Communication Networks: unseen questions": row = pd.DataFrame( { 'Model': get_model(datasetname), 'Dataset': datasetname, 'SacreBLEU': [2.4], 'ROUGE-2': [20.1], 'METEOR': [28.5], 'BERTScore': [36.6], 'Accuracy': [51.6], 'Weighted F1': [41.0], 'Macro F1': [27.9], } ) if datasetname == "Communication Networks: unseen answers": row = pd.DataFrame( { 'Model': get_model(datasetname), 'Dataset': datasetname, 'SacreBLEU': [36.0], 'ROUGE-2': [49.1], 'METEOR': [60.8], 'BERTScore': [69.5], 'Accuracy': [76.0], 'Weighted F1': [73.0], 'Macro F1': [53.4], } ) if datasetname == "Micro Job: unseen questions": row = pd.DataFrame( { 'Model': get_model(datasetname), 'Dataset': datasetname, 'SacreBLEU': [0.3], 'ROUGE-2': [0.5], 'METEOR': [33.8], 'BERTScore': [31.3], 'Accuracy': [48.7], 'Weighted F1': [46.5], 'Macro F1': [40.6], } ) if datasetname == "Micro Job: unseen answers": row = pd.DataFrame( { 'Model': get_model(datasetname), 'Dataset': datasetname, 'SacreBLEU': [39.5], 'ROUGE-2': [29.8], 'METEOR': [63.3], 'BERTScore': [63.1], 'Accuracy': [80.1], 'Weighted F1': [80.3], 'Macro F1': [80.7], } ) if datasetname == "Legal Domain: unseen questions": row = pd.DataFrame( { 'Model': get_model(datasetname), 'Dataset': datasetname, 'SacreBLEU': [3.2], 'ROUGE-2': [5.0], 'METEOR': [20.0], 'BERTScore': [14.8], 'Accuracy': [60.7], 'Weighted F1': [55.3], 'Macro F1': [55.4], } ) if datasetname == "Legal Domain: unseen answers": row = pd.DataFrame( { 'Model': get_model(datasetname), 'Dataset': datasetname, 'SacreBLEU': [42.8], 'ROUGE-2': [43.7], 'METEOR': [58.2], 'BERTScore': [57.5], 'Accuracy': [81.0], 'Weighted F1': [80.1], 'Macro F1': [74.6], } ) return row def load_data(): df = pd.DataFrame(columns=['Model', 'Dataset', 'SacreBLEU', 'ROUGE-2', 'METEOR', 'BERTScore', 'Accuracy', 'Weighted F1', 'Macro F1']) for ds in all_datasets: new_row = get_rows(ds) df = pd.concat([df, new_row], ignore_index=True) return df dataframe = load_data() st.dataframe(dataframe)