Spaces:
Build error
Build error
""" | |
Learn to classify the manually annotated CDA attributes (frames, 'riferimento', orientation) | |
""" | |
import sys | |
import torch | |
from allennlp.data.vocabulary import Vocabulary | |
from allennlp.data import DatasetReader, TokenIndexer, Instance, Token | |
from allennlp.data.fields import TextField, LabelField | |
from allennlp.data.token_indexers.pretrained_transformer_indexer import ( | |
PretrainedTransformerIndexer, | |
) | |
from allennlp.data.tokenizers.pretrained_transformer_tokenizer import ( | |
PretrainedTransformerTokenizer, | |
) | |
from allennlp.models import BasicClassifier | |
from allennlp.modules.text_field_embedders.basic_text_field_embedder import ( | |
BasicTextFieldEmbedder, | |
) | |
from allennlp.modules.token_embedders.pretrained_transformer_embedder import ( | |
PretrainedTransformerEmbedder, | |
) | |
from allennlp.modules.seq2vec_encoders.bert_pooler import BertPooler | |
from allennlp.training.checkpointer import Checkpointer | |
from allennlp.training.gradient_descent_trainer import GradientDescentTrainer | |
from allennlp.data.data_loaders.simple_data_loader import SimpleDataLoader | |
from allennlp.training.optimizers import AdamOptimizer | |
from allennlp.predictors.text_classifier import TextClassifierPredictor | |
from sklearn.svm import SVC | |
from sklearn.feature_extraction.text import CountVectorizer | |
from sklearn.metrics import precision_recall_fscore_support | |
from sklearn.tree import DecisionTreeClassifier | |
from sklearn.dummy import DummyClassifier | |
import pandas as pd | |
import numpy as np | |
import spacy | |
import json | |
import os | |
from typing import Dict, Iterable | |
class MigrationReader(DatasetReader): | |
def __init__(self, token_indexers, tokenizer): | |
self.token_indexers = token_indexers | |
self.tokenizer = tokenizer | |
def text_to_instance(self, sentence, label=None) -> Instance: | |
text_field = TextField(self.tokenizer.tokenize(sentence), self.token_indexers) | |
fields = {"tokens": text_field} | |
if label is not None: | |
label_field = LabelField(label) | |
fields["label"] = label_field | |
return Instance(fields) | |
def read_instances( | |
self, text: pd.Series, labels: pd.Series | |
) -> Iterable[Instance]: | |
for sentence, label in zip(text, labels): | |
instance = self.text_to_instance(sentence, label) | |
yield instance | |
def train(attrib, use_gpu=False): | |
assert attrib in ["cda_frame", "riferimento", "orientation", "fake"] | |
# load data | |
print("Loading data...") | |
x_train, y_train, x_dev, y_dev = load_data(attrib) | |
print(f"\t\ttrain size: {len(x_train)}") | |
print(f"\t\tdev size: {len(x_dev)}") | |
# try different setups | |
print("Running training setups...") | |
scores = [] | |
setups = [ | |
# defaults: remove_punct=True, lowercase=True, lemmatize=False, remove_stop=False | |
# ({}, {}, {"type": "svm", "options": {"kernel": "linear", "C": 1.0}}), | |
( | |
{}, | |
{}, | |
{ | |
"type": "bert", | |
"options": {"transformer": "Musixmatch/umberto-commoncrawl-cased-v1"}, | |
}, | |
), | |
# ({"lemmatize": True, "remove_stop": True}, {}, {"type": "svm", "options": {"kernel": "linear", "C": 0.8}}), | |
# ({"lemmatize": True, "remove_stop": True}, {"embed": False}, {"type": "svm", "options": {"kernel": "linear", "C": 0.8}}), | |
# ({"lemmatize": True, "remove_stop": True}, {"embed": False}, {"type": "dummy", "options": {}}), | |
# ({"lemmatize": True, "remove_stop": True}, {"embed": False}, {"type": "tree", "options": {}}), | |
# ({"lemmatize": True, "remove_stop": True}, {}, SVC(kernel='linear')), | |
# ({"lemmatize": True, "remove_stop": True}, {"min_freq": 5}, SVC(kernel='linear')), | |
# ({"lemmatize": True, "remove_stop": True}, {"min_freq": 5, "max_freq": .70}, SVC(kernel='linear')), | |
# ({"lemmatize": True, "remove_stop": True}, {}, SVC(kernel='linear', C=0.6)), | |
# ({"lemmatize": True, "remove_stop": True}, {}, SVC(kernel='linear', C=0.7)), | |
# ({"lemmatize": True, "remove_stop": True}, {}, SVC(kernel='linear', C=0.8)), | |
# ({"lemmatize": True, "remove_stop": True}, {"ngram_range": (1,2)}, SVC(kernel='linear', C=0.8)), | |
# ({"lemmatize": True, "remove_stop": True}, {}, SVC(kernel="rbf")), | |
] | |
nlp = spacy.load("it_core_news_md") | |
for s_idx, (text_options, vect_options, model_info) in enumerate(setups): | |
if model_info["type"] == "bert": | |
print("\t\tPreparing BERT model...") | |
# cuda_device = 0 if torch.cuda.is_available() else -1 | |
cuda_device = None if use_gpu and torch.cuda.is_available() else -1 | |
transformer = model_info["options"]["transformer"] | |
token_indexers = {"tokens": PretrainedTransformerIndexer(transformer)} | |
tokenizer = PretrainedTransformerTokenizer(transformer) | |
reader = MigrationReader(token_indexers, tokenizer) | |
train_instances = list( | |
reader.read_instances(x_train, y_train) | |
) | |
dev_instances = list( | |
reader.read_instances(x_dev, y_dev) | |
) | |
vocab = Vocabulary.from_instances(train_instances + dev_instances) | |
print(vocab.get_vocab_size("tags")) | |
embedder = BasicTextFieldEmbedder( | |
{"tokens": PretrainedTransformerEmbedder(transformer)} | |
) | |
seq2vec = BertPooler(transformer) | |
model = BasicClassifier(vocab, embedder, seq2vec, namespace="tags") | |
if use_gpu: | |
model = model.cuda(cuda_device) | |
checkpoint_dir = f"/scratch/p289731/cda_classify/model_{attrib}/checkpoints/" | |
serialization_dir = f"/scratch/p289731/cda_classify/model_{attrib}/serialize/" | |
os.makedirs(checkpoint_dir) | |
os.makedirs(serialization_dir) | |
checkpointer = Checkpointer(checkpoint_dir) | |
optimizer = AdamOptimizer( | |
[(n, p) for n, p in model.named_parameters() if p.requires_grad], | |
lr=1e-6 | |
) | |
train_loader = SimpleDataLoader(train_instances, batch_size=8, shuffle=True) | |
dev_loader = SimpleDataLoader(dev_instances, batch_size=8, shuffle=False) | |
train_loader.index_with(vocab) | |
dev_loader.index_with(vocab) | |
print("\t\tTraining BERT model") | |
trainer = GradientDescentTrainer( | |
model, | |
optimizer, | |
train_loader, | |
validation_data_loader=dev_loader, | |
patience=32, | |
checkpointer=checkpointer, | |
cuda_device=cuda_device, | |
serialization_dir=serialization_dir | |
) | |
trainer.train() | |
print("\t\tProducing predictions...") | |
predictor = TextClassifierPredictor(model, reader) | |
predictions = [predictor.predict(sentence) for sentence in x_dev] | |
y_dev_pred = [p["label"] for p in predictions] | |
class_labels = list(vocab.get_token_to_index_vocabulary("labels").keys()) | |
elif model_info["type"] in ["svm", "tree", "dummy"]: | |
# extract features | |
print("\t\tExtracting features...") | |
x_train_fts, vectorizer = extract_features( | |
x_train, nlp, text_options, **vect_options | |
) | |
x_dev_fts, _ = extract_features( | |
x_dev, nlp, text_options, **vect_options, vectorizer=vectorizer | |
) | |
if not vect_options["embed"]: | |
print(f"\t\t\tnum features: {len(vectorizer.vocabulary_)}") | |
else: | |
assert model_info["type"] != "tree", "Decision tree does not support embedding input" | |
print("\t\tTraining the model...") | |
if model_info["type"] == "svm": | |
model = SVC(**model_info["options"]) | |
elif model_info["type"] == "tree": | |
model = DecisionTreeClassifier() | |
else: | |
model = DummyClassifier() | |
model.fit(x_train_fts, y_train) | |
# evaluate on dev | |
print("\t\tValidating the model...") | |
y_dev_pred = model.predict(x_dev_fts) | |
class_labels = model.classes_ | |
p_micro, r_micro, f_micro, _ = precision_recall_fscore_support( | |
y_dev, y_dev_pred, average="micro" | |
) | |
p_classes, r_classes, f_classes, _ = precision_recall_fscore_support( | |
y_dev, y_dev_pred, average=None, labels=class_labels, zero_division=0 | |
) | |
print( | |
f"\t\t\tOverall scores (micro-averaged):\tP={p_micro}\tR={r_micro}\tF={f_micro}" | |
) | |
scores.append( | |
{ | |
"micro": {"p": p_micro, "r": r_micro, "f": f_micro}, | |
"classes": { | |
"p": list(zip(class_labels, p_classes)), | |
"r": list(zip(class_labels, r_classes)), | |
"f": list(zip(class_labels, f_classes)), | |
}, | |
} | |
) | |
prediction_df = pd.DataFrame( | |
zip(x_dev, y_dev, y_dev_pred), columns=["headline", "gold", "prediction"] | |
) | |
prediction_df.to_csv( | |
f"output/migration/cda_classify/predictions_{attrib}_{s_idx:02}.csv" | |
) | |
with open( | |
f"output/migration/cda_classify/scores_{attrib}.json", "w", encoding="utf-8" | |
) as f_scores: | |
json.dump(scores, f_scores, indent=4) | |
def load_data(attrib): | |
train_data = pd.read_csv("output/migration/preprocess/annotations_train.csv") | |
dev_data = pd.read_csv("output/migration/preprocess/annotations_dev.csv") | |
x_train = train_data["Titolo"] | |
x_dev = dev_data["Titolo"] | |
if attrib == "cda_frame": | |
y_train = train_data["frame"] | |
y_dev = dev_data["frame"] | |
elif attrib == "riferimento": | |
y_train = train_data["riferimento"] | |
y_dev = dev_data["riferimento"] | |
elif attrib == "orientation": | |
y_train = train_data["orientation"] | |
y_dev = dev_data["orientation"] | |
# fake task to test setup | |
else: | |
y_train = pd.Series(["true" if "rifugiato" in exa else "false" for exa in x_train]) | |
y_dev = pd.Series(["true" if "rifugiato" in exa else "false" for exa in x_dev]) | |
return x_train, y_train, x_dev, y_dev | |
def extract_features( | |
headlines, | |
nlp, | |
text_options, | |
embed=False, | |
min_freq=1, | |
max_freq=1.0, | |
ngram_range=(1, 1), | |
vectorizer=None, | |
): | |
if embed: | |
vectorized = np.array( | |
[vec for vec in process_text(headlines, nlp, embed=True, **text_options)] | |
) | |
else: | |
tokenized = [ | |
" ".join(sent) for sent in process_text(headlines, nlp, **text_options) | |
] | |
if vectorizer is None: | |
vectorizer = CountVectorizer( | |
lowercase=False, | |
analyzer="word", | |
min_df=min_freq, | |
max_df=max_freq, | |
ngram_range=ngram_range, | |
) | |
vectorized = vectorizer.fit_transform(tokenized) | |
else: | |
vectorized = vectorizer.transform(tokenized) | |
return vectorized, vectorizer | |
def process_text( | |
headlines, | |
nlp, | |
embed=False, | |
remove_punct=True, | |
lowercase=True, | |
lemmatize=False, | |
remove_stop=False, | |
): | |
for sent in headlines: | |
doc = nlp(sent) | |
tokens = ( | |
t | |
for t in doc | |
if (not remove_stop or not t.is_stop) | |
and (not remove_punct or t.pos_ not in ["PUNCT", "SYM", "X"]) | |
) | |
if embed: | |
if lemmatize: | |
tokens = (t.vocab[t.lemma].vector for t in tokens) | |
else: | |
tokens = (t.vector for t in tokens if t.has_vector) | |
else: | |
if lemmatize: | |
tokens = (t.lemma_ for t in tokens) | |
else: | |
tokens = (t.text for t in tokens) | |
if lowercase: | |
tokens = (t.lower() for t in tokens) | |
if embed: | |
token_arr = np.array([t for t in tokens]) | |
if len(token_arr) == 0: | |
yield np.random.rand(300) | |
else: | |
yield np.mean(token_arr, axis=0) | |
else: | |
yield list(tokens) | |
if __name__ == "__main__": | |
use_gpu = True if sys.argv[1] == "gpu" else False | |
# train(attrib="fake", use_gpu=use_gpu) | |
train(attrib="cda_frame", use_gpu=use_gpu) | |
# train(attrib="riferimento") | |
# train(attrib="orientation") | |