Spaces:
Runtime error
Runtime error
import torch | |
from torch import nn | |
from torch.nn import CrossEntropyLoss | |
from transformers import ( | |
AlbertForSequenceClassification as SeqClassification, | |
AlbertPreTrainedModel, | |
AlbertModel, | |
AlbertConfig, | |
) | |
from .modeling_outputs import ( | |
QuestionAnsweringModelOutput, | |
QuestionAnsweringNaModelOutput, | |
) | |
class AlbertForSequenceClassification(SeqClassification): | |
model_type = "albert" | |
class AlbertForQuestionAnsweringAVPool(AlbertPreTrainedModel): | |
_keys_to_ignore_on_load_unexpected = [r"pooler"] | |
model_type = "albert" | |
def __init__(self, config): | |
super().__init__(config) | |
self.num_labels = config.num_labels | |
self.electra = AlbertModel(config) | |
self.qa_outputs = nn.Linear(config.hidden_size, config.num_labels) | |
self.has_ans = nn.Sequential( | |
nn.Dropout(p=config.hidden_dropout_prob), | |
nn.Linear(config.hidden_size, self.num_labels) | |
) | |
# Initialize weights and apply final processing | |
self.post_init() | |
def forward( | |
self, | |
input_ids=None, | |
attention_mask=None, | |
token_type_ids=None, | |
position_ids=None, | |
head_mask=None, | |
inputs_embeds=None, | |
start_positions=None, | |
end_positions=None, | |
is_impossibles=None, | |
output_attentions=None, | |
output_hidden_states=None, | |
return_dict=None, | |
): | |
return_dict = return_dict if return_dict is not None else self.config.use_return_dict | |
outputs = self.albert( | |
input_ids=input_ids, | |
attention_mask=attention_mask, | |
token_type_ids=token_type_ids, | |
position_ids=position_ids, | |
head_mask=head_mask, | |
inputs_embeds=inputs_embeds, | |
output_attentions=output_attentions, | |
output_hidden_states=output_hidden_states, | |
) | |
sequence_output = outputs[0] | |
logits = self.qa_outputs(sequence_output) | |
start_logits, end_logits = logits.split(1, dim=-1) | |
start_logits = start_logits.squeeze(-1).contiguous() | |
end_logits = end_logits.squeeze(-1).contiguous() | |
first_word = sequence_output[:, 0, :] | |
has_logits = self.has_ans(first_word) | |
total_loss = None | |
if ( | |
start_positions is not None and | |
end_positions is not None and | |
is_impossibles is not None | |
): | |
# If we are on multi-GPU, split add a dimension | |
if len(start_positions.size()) > 1: | |
start_positions = start_positions.squeeze(-1) | |
if len(end_positions.size()) > 1: | |
end_positions = end_positions.squeeze(-1) | |
if len(is_impossibles.size()) > 1: | |
is_impossibles = is_impossibles.squeeze(-1) | |
# sometimes the start/end positions are outside our model inputs, we ignore these terms | |
ignored_index = start_logits.size(1) | |
start_positions.clamp_(0, ignored_index) | |
end_positions.clamp_(0, ignored_index) | |
is_impossibles.clamp_(0, ignored_index) | |
loss_fct = CrossEntropyLoss(ignore_index=ignored_index) | |
start_loss = loss_fct(start_logits, start_positions) | |
end_loss = loss_fct(end_logits, end_positions) | |
span_loss = start_loss + end_loss | |
# Internal Front Verification (I-FV) | |
# alpha1 == 1.0, alpha2 == 0.5 | |
choice_loss = loss_fct(has_logits, is_impossibles.long()) | |
total_loss = (span_loss + choice_loss) / 3 | |
if not return_dict: | |
output = ( | |
start_logits, | |
end_logits, | |
has_logits, | |
) + outputs[2:] # hidden_states, attentions | |
return ((total_loss,) + output) if total_loss is not None else output | |
return QuestionAnsweringNaModelOutput( | |
loss=total_loss, | |
start_logits=start_logits, | |
end_logits=end_logits, | |
has_logits=has_logits, | |
hidden_states=outputs.hidden_states, | |
attentions=outputs.attentions, | |
) | |
class AlbertForQuestionAnsweringAVPoolBCEv3(AlbertPreTrainedModel): | |
_keys_to_ignore_on_load_unexpected = [r"pooler"] | |
model_type = "albert" | |
def __init__(self, config): | |
super().__init__(config) | |
self.num_labels = config.num_labels | |
self.electra = AlbertModel(config) | |
self.qa_outputs = nn.Linear(config.hidden_size, config.num_labels) | |
self.has_ans1 = nn.Sequential( | |
nn.Dropout(p=config.hidden_dropout_prob), | |
nn.Linear(config.hidden_size, 2) | |
) | |
self.has_ans2 = nn.Sequential( | |
nn.Dropout(p=config.hidden_dropout_prob), | |
nn.Linear(config.hidden_size, 1) | |
) | |
# Initialize weights and apply final processing | |
self.post_init() | |
def forward( | |
self, | |
input_ids=None, | |
attention_mask=None, | |
token_type_ids=None, | |
position_ids=None, | |
head_mask=None, | |
inputs_embeds=None, | |
start_positions=None, | |
end_positions=None, | |
is_impossibles=None, | |
output_attentions=None, | |
output_hidden_states=None, | |
return_dict=None, | |
): | |
return_dict = return_dict if return_dict is not None else self.config.use_return_dict | |
outputs = self.albert( | |
input_ids=input_ids, | |
attention_mask=attention_mask, | |
token_type_ids=token_type_ids, | |
position_ids=position_ids, | |
head_mask=head_mask, | |
inputs_embeds=inputs_embeds, | |
output_attentions=output_attentions, | |
output_hidden_states=output_hidden_states, | |
) | |
sequence_output = outputs[0] | |
logits = self.qa_outputs(sequence_output) | |
start_logits, end_logits = logits.split(1, dim=-1) | |
start_logits = start_logits.squeeze(-1).contiguous() | |
end_logits = end_logits.squeeze(-1).contiguous() | |
first_word = sequence_output[:, 0, :] | |
has_logits1 = self.has_ans1(first_word).squeeze(-1) | |
has_logits2 = self.has_ans2(first_word).squeeze(-1) | |
total_loss = None | |
if ( | |
start_positions is not None and | |
end_positions is not None and | |
is_impossibles is not None | |
): | |
# If we are on multi-GPU, split add a dimension | |
if len(start_positions.size()) > 1: | |
start_positions = start_positions.squeeze(-1) | |
if len(end_positions.size()) > 1: | |
end_positions = end_positions.squeeze(-1) | |
if len(is_impossibles.size()) > 1: | |
is_impossibles = is_impossibles.squeeze(-1) | |
# sometimes the start/end positions are outside our model inputs, we ignore these terms | |
ignored_index = start_logits.size(1) | |
start_positions.clamp_(0, ignored_index) | |
end_positions.clamp_(0, ignored_index) | |
is_impossibles.clamp_(0, ignored_index) | |
is_impossibles = is_impossibles.to( | |
dtype=next(self.parameters()).dtype) # fp16 compatibility | |
loss_fct = CrossEntropyLoss(ignore_index=ignored_index) | |
start_loss = loss_fct(start_logits, start_positions) | |
end_loss = loss_fct(end_logits, end_positions) | |
span_loss = start_loss + end_loss | |
# Internal Front Verification (I-FV) | |
choice_fct = nn.BCEWithLogitsLoss() | |
mse_loss_fct = nn.MSELoss() | |
choice_loss1 = loss_fct(has_logits1, is_impossibles.long()) | |
choice_loss2 = choice_fct(has_logits2, is_impossibles) | |
choice_loss3 = mse_loss_fct(has_logits2.view(-1), is_impossibles.view(-1)) | |
choice_loss = choice_loss1 + choice_loss2 + choice_loss3 | |
total_loss = (span_loss + choice_loss) / 5 | |
if not return_dict: | |
output = ( | |
start_logits, | |
end_logits, | |
has_logits1, | |
) + outputs[2:] # hidden_states, attentions | |
return ((total_loss,) + output) if total_loss is not None else output | |
return QuestionAnsweringNaModelOutput( | |
loss=total_loss, | |
start_logits=start_logits, | |
end_logits=end_logits, | |
has_logits=has_logits1, | |
hidden_states=outputs.hidden_states, | |
attentions=outputs.attentions, | |
) |