Spaces:
Runtime error
Runtime error
import gc | |
import numpy as np | |
import pandas as pd | |
import torch | |
from torch import nn | |
import transformers | |
from transformers import AutoModel, AutoTokenizer, AutoConfig | |
config = dict( | |
# basic | |
seed = 3407, | |
num_jobs=1, | |
num_labels=2, | |
# model info | |
tokenizer_path = 'roberta-large', # 'allenai/biomed_roberta_base', | |
model_checkpoint = 'roberta-large', # 'allenai/biomed_roberta_base', | |
device = 'cuda' if torch.cuda.is_available() else 'cpu', | |
# training paramters | |
max_length = 512, | |
batch_size=16, | |
# for this notebook | |
debug = False, | |
) | |
def create_sample_test(): | |
feats = pd.read_csv(f"../input/nbme-score-clinical-patient-notes/features.csv") | |
feats.loc[27, 'feature_text'] = "Last-Pap-smear-1-year-ago" | |
notes = pd.read_csv(f"../input/nbme-score-clinical-patient-notes/patient_notes.csv") | |
test = pd.read_csv(f"../input/nbme-score-clinical-patient-notes/test.csv") | |
merged = test.merge(notes, how = "left") | |
merged = merged.merge(feats, how = "left") | |
def process_feature_text(text): | |
return text.replace("-OR-", ";-").replace("-", " ") | |
merged["feature_text"] = [process_feature_text(x) for x in merged["feature_text"]] | |
return merged.sample(1).reset_index(drop=True) | |
class NBMETestData(torch.utils.data.Dataset): | |
def __init__(self, feature_text, pn_history, tokenizer): | |
self.feature_text = feature_text | |
self.pn_history = pn_history | |
self.tokenizer = tokenizer | |
def __len__(self): | |
return len(self.feature_text) | |
def __getitem__(self, idx): | |
tokenized = self.tokenizer( | |
self.feature_text[idx], | |
self.pn_history[idx], | |
truncation = "only_second", | |
max_length = config['max_length'], | |
padding = "max_length", | |
return_offsets_mapping = True | |
) | |
tokenized["sequence_ids"] = tokenized.sequence_ids() | |
input_ids = np.array(tokenized["input_ids"]) | |
attention_mask = np.array(tokenized["attention_mask"]) | |
offset_mapping = np.array(tokenized["offset_mapping"]) | |
sequence_ids = np.array(tokenized["sequence_ids"]).astype("float16") | |
return { | |
'input_ids': input_ids, | |
'attention_mask': attention_mask, | |
'offset_mapping': offset_mapping, | |
'sequence_ids': sequence_ids, | |
} | |
# class NBMEModel(nn.Module): | |
# def __init__(self, num_labels=1, path=None): | |
# super().__init__() | |
# layer_norm_eps: float = 1e-6 | |
# self.path = path | |
# self.num_labels = num_labels | |
# self.transformer = transformers.AutoModel.from_pretrained(config['model_checkpoint']) | |
# self.dropout = nn.Dropout(0.2) | |
# self.output = nn.Linear(768, 1) | |
# if self.path is not None: | |
# self.load_state_dict(torch.load(self.path)['model']) | |
# def forward(self, data): | |
# ids = data['input_ids'] | |
# mask = data['attention_mask'] | |
# try: | |
# target = data['targets'] | |
# except: | |
# target = None | |
# transformer_out = self.transformer(ids, mask) | |
# sequence_output = transformer_out[0] | |
# sequence_output = self.dropout(sequence_output) | |
# logits = self.output(sequence_output) | |
# ret = { | |
# "logits": torch.sigmoid(logits), | |
# } | |
# if target is not None: | |
# loss = self.get_loss(logits, target) | |
# ret['loss'] = loss | |
# ret['targets'] = target | |
# return ret | |
# def get_optimizer(self, learning_rate, weigth_decay): | |
# optimizer = torch.optim.AdamW( | |
# self.parameters(), | |
# lr=learning_rate, | |
# weight_decay=weigth_decay, | |
# ) | |
# if self.path is not None: | |
# optimizer.load_state_dict(torch.load(self.path)['optimizer']) | |
# return optimizer | |
# def get_scheduler(self, optimizer, num_warmup_steps, num_training_steps): | |
# scheduler = transformers.get_linear_schedule_with_warmup( | |
# optimizer, | |
# num_warmup_steps=num_warmup_steps, | |
# num_training_steps=num_training_steps, | |
# ) | |
# if self.path is not None: | |
# scheduler.load_state_dict(torch.load(self.path)['scheduler']) | |
# return scheduler | |
# def get_loss(self, output, target): | |
# loss_fn = nn.BCEWithLogitsLoss(reduction="none") | |
# loss = loss_fn(output.view(-1, 1), target.view(-1, 1)) | |
# loss = torch.masked_select(loss, target.view(-1, 1) != -100).mean() | |
# return loss | |
class NBMEModel(nn.Module): | |
def __init__(self, num_labels=2, path=None): | |
super().__init__() | |
layer_norm_eps: float = 1e-6 | |
self.path = path | |
self.num_labels = num_labels | |
self.transformer = transformers.AutoModel.from_pretrained(config['model_checkpoint']) | |
self.dropout = nn.Dropout(0.1) | |
self.dropout1 = nn.Dropout(0.1) | |
self.dropout2 = nn.Dropout(0.2) | |
self.dropout3 = nn.Dropout(0.3) | |
self.dropout4 = nn.Dropout(0.4) | |
self.dropout5 = nn.Dropout(0.5) | |
self.output = nn.Linear(1024, 1) | |
if self.path is not None: | |
self.load_state_dict(torch.load(self.path)['model']) | |
def forward(self, data): | |
ids = data['input_ids'] | |
mask = data['attention_mask'] | |
try: | |
target = data['targets'] | |
except: | |
target = None | |
transformer_out = self.transformer(ids, mask) | |
sequence_output = transformer_out[0] | |
sequence_output = self.dropout(sequence_output) | |
logits1 = self.output(self.dropout1(sequence_output)) | |
logits2 = self.output(self.dropout2(sequence_output)) | |
logits3 = self.output(self.dropout3(sequence_output)) | |
logits4 = self.output(self.dropout4(sequence_output)) | |
logits5 = self.output(self.dropout5(sequence_output)) | |
logits = (logits1 + logits2 + logits3 + logits4 + logits5) / 5 | |
ret = { | |
'logits': torch.sigmoid(logits), | |
} | |
loss = 0 | |
if target is not None: | |
loss1 = self.get_loss(logits1, target) | |
loss2 = self.get_loss(logits2, target) | |
loss3 = self.get_loss(logits3, target) | |
loss4 = self.get_loss(logits4, target) | |
loss5 = self.get_loss(logits5, target) | |
loss = (loss1 + loss2 + loss3 + loss4 + loss5) / 5 | |
ret['loss'] = loss | |
ret['target'] = target | |
return ret | |
def get_optimizer(self, learning_rate, weigth_decay): | |
optimizer = torch.optim.AdamW( | |
self.parameters(), | |
lr=learning_rate, | |
weight_decay=weigth_decay, | |
) | |
if self.path is not None: | |
optimizer.load_state_dict(torch.load(self.path)['optimizer']) | |
return optimizer | |
def get_scheduler(self, optimizer, num_warmup_steps, num_training_steps): | |
scheduler = transformers.get_linear_schedule_with_warmup( | |
optimizer, | |
num_warmup_steps=num_warmup_steps, | |
num_training_steps=num_training_steps, | |
) | |
if self.path is not None: | |
scheduler.load_state_dict(torch.load(self.path)['scheduler']) | |
return scheduler | |
def get_loss(self, output, target): | |
loss_fn = nn.BCEWithLogitsLoss(reduction="none") | |
loss = loss_fn(output.view(-1, 1), target.view(-1, 1)) | |
loss = torch.masked_select(loss, target.view(-1, 1) != -100).mean() | |
return loss | |
def get_location_predictions(preds, offset_mapping, sequence_ids, test=False): | |
all_predictions = [] | |
for pred, offsets, seq_ids in zip(preds, offset_mapping, sequence_ids): | |
start_idx = None | |
current_preds = [] | |
for p, o, s_id in zip(pred, offsets, seq_ids): | |
if s_id is None or s_id == 0: | |
continue | |
if p > 0.5: | |
if start_idx is None: | |
start_idx = o[0] | |
end_idx = o[1] | |
elif start_idx is not None: | |
if test: | |
current_preds.append(f"{start_idx} {end_idx}") | |
else: | |
current_preds.append((start_idx, end_idx)) | |
start_idx = None | |
if test: | |
all_predictions.append("; ".join(current_preds)) | |
else: | |
all_predictions.append(current_preds) | |
return all_predictions | |
def predict_location_preds(tokenizer, model, feature_text, pn_history, pn_history_lower): | |
test_ds = NBMETestData(feature_text, pn_history_lower, tokenizer) | |
test_dl = torch.utils.data.DataLoader( | |
test_ds, | |
batch_size=config['batch_size'], | |
pin_memory=True, | |
shuffle=False, | |
drop_last=False | |
) | |
all_preds = None | |
offsets = [] | |
seq_ids = [] | |
preds = [] | |
with torch.no_grad(): | |
for batch in test_dl: | |
for k, v in batch.items(): | |
if k not in ['offset_mapping', 'sequence_id']: | |
batch[k] = v.to(config['device']) | |
logits = model(batch)['logits'] | |
preds.append(logits.cpu().numpy()) | |
offset_mapping = batch['offset_mapping'] | |
sequence_ids = batch['sequence_ids'] | |
offsets.append(offset_mapping.cpu().numpy()) | |
seq_ids.append(sequence_ids.cpu().numpy()) | |
preds = np.concatenate(preds, axis=0) | |
if all_preds is None: | |
all_preds = np.array(preds).astype(np.float32) | |
else: | |
all_preds += np.array(preds).astype(np.float32) | |
torch.cuda.empty_cache() | |
all_preds = all_preds.squeeze() | |
offsets = np.concatenate(offsets, axis=0) | |
seq_ids = np.concatenate(seq_ids, axis=0) | |
# print(all_preds.shape, offsets.shape, seq_ids.shape) | |
location_preds = get_location_predictions([all_preds], offsets, seq_ids, test=False)[0] | |
x = [] | |
for location in location_preds: | |
x.append(pn_history[0][location[0]: location[1]]) | |
return location_preds, ', '.join(x) | |
def get_predictions(feature_text, pn_history): | |
feature_text = feature_text.lower().replace("-OR-", ";-").replace("-", " ") | |
pn_history_lower = pn_history.lower() | |
location_preds, pred_string = predict_location_preds(tokenizer, model, [feature_text], [pn_history], [pn_history_lower]) | |
if pred_string == "": | |
pred_string = 'Feature not present!' | |
else: | |
pred_string = 'Feature is present!' + '\nText Span - ' + pred_string | |
return pred_string | |
tokenizer = AutoTokenizer.from_pretrained(config['tokenizer_path']) | |
path = 'model_large_pseudo_label.pth' | |
model = NBMEModel().to(config['device']) | |
model.load_state_dict( | |
torch.load( | |
path, | |
map_location=torch.device(config['device']) | |
) | |
) | |
model.eval() |