|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
""" TF 2.0 CamemBERT model.""" |
|
|
|
|
|
from __future__ import annotations |
|
|
|
import math |
|
import warnings |
|
from typing import Optional, Tuple, Union |
|
|
|
import numpy as np |
|
import tensorflow as tf |
|
|
|
from ...activations_tf import get_tf_activation |
|
from ...modeling_tf_outputs import ( |
|
TFBaseModelOutputWithPastAndCrossAttentions, |
|
TFBaseModelOutputWithPoolingAndCrossAttentions, |
|
TFCausalLMOutputWithCrossAttentions, |
|
TFMaskedLMOutput, |
|
TFMultipleChoiceModelOutput, |
|
TFQuestionAnsweringModelOutput, |
|
TFSequenceClassifierOutput, |
|
TFTokenClassifierOutput, |
|
) |
|
from ...modeling_tf_utils import ( |
|
TFCausalLanguageModelingLoss, |
|
TFMaskedLanguageModelingLoss, |
|
TFModelInputType, |
|
TFMultipleChoiceLoss, |
|
TFPreTrainedModel, |
|
TFQuestionAnsweringLoss, |
|
TFSequenceClassificationLoss, |
|
TFTokenClassificationLoss, |
|
get_initializer, |
|
keras_serializable, |
|
unpack_inputs, |
|
) |
|
from ...tf_utils import check_embeddings_within_bounds, shape_list, stable_softmax |
|
from ...utils import ( |
|
add_code_sample_docstrings, |
|
add_start_docstrings, |
|
add_start_docstrings_to_model_forward, |
|
logging, |
|
) |
|
from .configuration_camembert import CamembertConfig |
|
|
|
|
|
logger = logging.get_logger(__name__) |
|
|
|
_CHECKPOINT_FOR_DOC = "camembert-base" |
|
_CONFIG_FOR_DOC = "CamembertConfig" |
|
|
|
TF_CAMEMBERT_PRETRAINED_MODEL_ARCHIVE_LIST = [ |
|
|
|
] |
|
|
|
|
|
CAMEMBERT_START_DOCSTRING = r""" |
|
|
|
This model inherits from [`TFPreTrainedModel`]. Check the superclass documentation for the generic methods the |
|
library implements for all its model (such as downloading or saving, resizing the input embeddings, pruning heads |
|
etc.) |
|
|
|
This model is also a [tf.keras.Model](https://www.tensorflow.org/api_docs/python/tf/keras/Model) subclass. Use it |
|
as a regular TF 2.0 Keras Model and refer to the TF 2.0 documentation for all matter related to general usage and |
|
behavior. |
|
|
|
<Tip> |
|
|
|
TensorFlow models and layers in `transformers` accept two formats as input: |
|
|
|
- having all inputs as keyword arguments (like PyTorch models), or |
|
- having all inputs as a list, tuple or dict in the first positional argument. |
|
|
|
The reason the second format is supported is that Keras methods prefer this format when passing inputs to models |
|
and layers. Because of this support, when using methods like `model.fit()` things should "just work" for you - just |
|
pass your inputs and labels in any format that `model.fit()` supports! If, however, you want to use the second |
|
format outside of Keras methods like `fit()` and `predict()`, such as when creating your own layers or models with |
|
the Keras `Functional` API, there are three possibilities you can use to gather all the input Tensors in the first |
|
positional argument: |
|
|
|
- a single Tensor with `input_ids` only and nothing else: `model(input_ids)` |
|
- a list of varying length with one or several input Tensors IN THE ORDER given in the docstring: |
|
`model([input_ids, attention_mask])` or `model([input_ids, attention_mask, token_type_ids])` |
|
- a dictionary with one or several input Tensors associated to the input names given in the docstring: |
|
`model({"input_ids": input_ids, "token_type_ids": token_type_ids})` |
|
|
|
Note that when creating models and layers with |
|
[subclassing](https://keras.io/guides/making_new_layers_and_models_via_subclassing/) then you don't need to worry |
|
about any of this, as you can just pass inputs like you would to any other Python function! |
|
|
|
</Tip> |
|
|
|
Parameters: |
|
config ([`CamembertConfig`]): Model configuration class with all the parameters of the |
|
model. Initializing with a config file does not load the weights associated with the model, only the |
|
configuration. Check out the [`~PreTrainedModel.from_pretrained`] method to load the model weights. |
|
""" |
|
|
|
CAMEMBERT_INPUTS_DOCSTRING = r""" |
|
Args: |
|
input_ids (`Numpy array` or `tf.Tensor` of shape `({0})`): |
|
Indices of input sequence tokens in the vocabulary. |
|
|
|
Indices can be obtained using [`AutoTokenizer`]. See [`PreTrainedTokenizer.__call__`] and |
|
[`PreTrainedTokenizer.encode`] for details. |
|
|
|
[What are input IDs?](../glossary#input-ids) |
|
attention_mask (`Numpy array` or `tf.Tensor` of shape `({0})`, *optional*): |
|
Mask to avoid performing attention on padding token indices. Mask values selected in `[0, 1]`: |
|
|
|
- 1 for tokens that are **not masked**, |
|
- 0 for tokens that are **masked**. |
|
|
|
[What are attention masks?](../glossary#attention-mask) |
|
token_type_ids (`Numpy array` or `tf.Tensor` of shape `({0})`, *optional*): |
|
Segment token indices to indicate first and second portions of the inputs. Indices are selected in `[0, |
|
1]`: |
|
|
|
- 0 corresponds to a *sentence A* token, |
|
- 1 corresponds to a *sentence B* token. |
|
|
|
[What are token type IDs?](../glossary#token-type-ids) |
|
position_ids (`Numpy array` or `tf.Tensor` of shape `({0})`, *optional*): |
|
Indices of positions of each input sequence tokens in the position embeddings. Selected in the range `[0, |
|
config.max_position_embeddings - 1]`. |
|
|
|
[What are position IDs?](../glossary#position-ids) |
|
head_mask (`Numpy array` or `tf.Tensor` of shape `(num_heads,)` or `(num_layers, num_heads)`, *optional*): |
|
Mask to nullify selected heads of the self-attention modules. Mask values selected in `[0, 1]`: |
|
|
|
- 1 indicates the head is **not masked**, |
|
- 0 indicates the head is **masked**. |
|
|
|
inputs_embeds (`tf.Tensor` of shape `({0}, hidden_size)`, *optional*): |
|
Optionally, instead of passing `input_ids` you can choose to directly pass an embedded representation. This |
|
is useful if you want more control over how to convert `input_ids` indices into associated vectors than the |
|
model's internal embedding lookup matrix. |
|
output_attentions (`bool`, *optional*): |
|
Whether or not to return the attentions tensors of all attention layers. See `attentions` under returned |
|
tensors for more detail. This argument can be used only in eager mode, in graph mode the value in the |
|
config will be used instead. |
|
output_hidden_states (`bool`, *optional*): |
|
Whether or not to return the hidden states of all layers. See `hidden_states` under returned tensors for |
|
more detail. This argument can be used only in eager mode, in graph mode the value in the config will be |
|
used instead. |
|
return_dict (`bool`, *optional*): |
|
Whether or not to return a [`~utils.ModelOutput`] instead of a plain tuple. This argument can be used in |
|
eager mode, in graph mode the value will always be set to True. |
|
training (`bool`, *optional*, defaults to `False`): |
|
Whether or not to use the model in training mode (some modules like dropout modules have different |
|
behaviors between training and evaluation). |
|
""" |
|
|
|
|
|
|
|
class TFCamembertEmbeddings(tf.keras.layers.Layer): |
|
""" |
|
Same as BertEmbeddings with a tiny tweak for positional embeddings indexing. |
|
""" |
|
|
|
def __init__(self, config, **kwargs): |
|
super().__init__(**kwargs) |
|
|
|
self.padding_idx = 1 |
|
self.config = config |
|
self.hidden_size = config.hidden_size |
|
self.max_position_embeddings = config.max_position_embeddings |
|
self.initializer_range = config.initializer_range |
|
self.LayerNorm = tf.keras.layers.LayerNormalization(epsilon=config.layer_norm_eps, name="LayerNorm") |
|
self.dropout = tf.keras.layers.Dropout(rate=config.hidden_dropout_prob) |
|
|
|
def build(self, input_shape: tf.TensorShape): |
|
with tf.name_scope("word_embeddings"): |
|
self.weight = self.add_weight( |
|
name="weight", |
|
shape=[self.config.vocab_size, self.hidden_size], |
|
initializer=get_initializer(self.initializer_range), |
|
) |
|
|
|
with tf.name_scope("token_type_embeddings"): |
|
self.token_type_embeddings = self.add_weight( |
|
name="embeddings", |
|
shape=[self.config.type_vocab_size, self.hidden_size], |
|
initializer=get_initializer(self.initializer_range), |
|
) |
|
|
|
with tf.name_scope("position_embeddings"): |
|
self.position_embeddings = self.add_weight( |
|
name="embeddings", |
|
shape=[self.max_position_embeddings, self.hidden_size], |
|
initializer=get_initializer(self.initializer_range), |
|
) |
|
|
|
super().build(input_shape) |
|
|
|
def create_position_ids_from_input_ids(self, input_ids, past_key_values_length=0): |
|
""" |
|
Replace non-padding symbols with their position numbers. Position numbers begin at padding_idx+1. Padding |
|
symbols are ignored. This is modified from fairseq's `utils.make_positions`. |
|
|
|
Args: |
|
input_ids: tf.Tensor |
|
Returns: tf.Tensor |
|
""" |
|
mask = tf.cast(tf.math.not_equal(input_ids, self.padding_idx), dtype=input_ids.dtype) |
|
incremental_indices = (tf.math.cumsum(mask, axis=1) + past_key_values_length) * mask |
|
|
|
return incremental_indices + self.padding_idx |
|
|
|
def call( |
|
self, |
|
input_ids=None, |
|
position_ids=None, |
|
token_type_ids=None, |
|
inputs_embeds=None, |
|
past_key_values_length=0, |
|
training=False, |
|
): |
|
""" |
|
Applies embedding based on inputs tensor. |
|
|
|
Returns: |
|
final_embeddings (`tf.Tensor`): output embedding tensor. |
|
""" |
|
assert not (input_ids is None and inputs_embeds is None) |
|
|
|
if input_ids is not None: |
|
check_embeddings_within_bounds(input_ids, self.config.vocab_size) |
|
inputs_embeds = tf.gather(params=self.weight, indices=input_ids) |
|
|
|
input_shape = shape_list(inputs_embeds)[:-1] |
|
|
|
if token_type_ids is None: |
|
token_type_ids = tf.fill(dims=input_shape, value=0) |
|
|
|
if position_ids is None: |
|
if input_ids is not None: |
|
|
|
position_ids = self.create_position_ids_from_input_ids( |
|
input_ids=input_ids, past_key_values_length=past_key_values_length |
|
) |
|
else: |
|
position_ids = tf.expand_dims( |
|
tf.range(start=self.padding_idx + 1, limit=input_shape[-1] + self.padding_idx + 1), axis=0 |
|
) |
|
|
|
position_embeds = tf.gather(params=self.position_embeddings, indices=position_ids) |
|
token_type_embeds = tf.gather(params=self.token_type_embeddings, indices=token_type_ids) |
|
final_embeddings = inputs_embeds + position_embeds + token_type_embeds |
|
final_embeddings = self.LayerNorm(inputs=final_embeddings) |
|
final_embeddings = self.dropout(inputs=final_embeddings, training=training) |
|
|
|
return final_embeddings |
|
|
|
|
|
|
|
class TFCamembertPooler(tf.keras.layers.Layer): |
|
def __init__(self, config: CamembertConfig, **kwargs): |
|
super().__init__(**kwargs) |
|
|
|
self.dense = tf.keras.layers.Dense( |
|
units=config.hidden_size, |
|
kernel_initializer=get_initializer(config.initializer_range), |
|
activation="tanh", |
|
name="dense", |
|
) |
|
|
|
def call(self, hidden_states: tf.Tensor) -> tf.Tensor: |
|
|
|
|
|
first_token_tensor = hidden_states[:, 0] |
|
pooled_output = self.dense(inputs=first_token_tensor) |
|
|
|
return pooled_output |
|
|
|
|
|
|
|
class TFCamembertSelfAttention(tf.keras.layers.Layer): |
|
def __init__(self, config: CamembertConfig, **kwargs): |
|
super().__init__(**kwargs) |
|
|
|
if config.hidden_size % config.num_attention_heads != 0: |
|
raise ValueError( |
|
f"The hidden size ({config.hidden_size}) is not a multiple of the number " |
|
f"of attention heads ({config.num_attention_heads})" |
|
) |
|
|
|
self.num_attention_heads = config.num_attention_heads |
|
self.attention_head_size = int(config.hidden_size / config.num_attention_heads) |
|
self.all_head_size = self.num_attention_heads * self.attention_head_size |
|
self.sqrt_att_head_size = math.sqrt(self.attention_head_size) |
|
|
|
self.query = tf.keras.layers.Dense( |
|
units=self.all_head_size, kernel_initializer=get_initializer(config.initializer_range), name="query" |
|
) |
|
self.key = tf.keras.layers.Dense( |
|
units=self.all_head_size, kernel_initializer=get_initializer(config.initializer_range), name="key" |
|
) |
|
self.value = tf.keras.layers.Dense( |
|
units=self.all_head_size, kernel_initializer=get_initializer(config.initializer_range), name="value" |
|
) |
|
self.dropout = tf.keras.layers.Dropout(rate=config.attention_probs_dropout_prob) |
|
|
|
self.is_decoder = config.is_decoder |
|
|
|
def transpose_for_scores(self, tensor: tf.Tensor, batch_size: int) -> tf.Tensor: |
|
|
|
tensor = tf.reshape(tensor=tensor, shape=(batch_size, -1, self.num_attention_heads, self.attention_head_size)) |
|
|
|
|
|
return tf.transpose(tensor, perm=[0, 2, 1, 3]) |
|
|
|
def call( |
|
self, |
|
hidden_states: tf.Tensor, |
|
attention_mask: tf.Tensor, |
|
head_mask: tf.Tensor, |
|
encoder_hidden_states: tf.Tensor, |
|
encoder_attention_mask: tf.Tensor, |
|
past_key_value: Tuple[tf.Tensor], |
|
output_attentions: bool, |
|
training: bool = False, |
|
) -> Tuple[tf.Tensor]: |
|
batch_size = shape_list(hidden_states)[0] |
|
mixed_query_layer = self.query(inputs=hidden_states) |
|
|
|
|
|
|
|
|
|
is_cross_attention = encoder_hidden_states is not None |
|
|
|
if is_cross_attention and past_key_value is not None: |
|
|
|
key_layer = past_key_value[0] |
|
value_layer = past_key_value[1] |
|
attention_mask = encoder_attention_mask |
|
elif is_cross_attention: |
|
key_layer = self.transpose_for_scores(self.key(inputs=encoder_hidden_states), batch_size) |
|
value_layer = self.transpose_for_scores(self.value(inputs=encoder_hidden_states), batch_size) |
|
attention_mask = encoder_attention_mask |
|
elif past_key_value is not None: |
|
key_layer = self.transpose_for_scores(self.key(inputs=hidden_states), batch_size) |
|
value_layer = self.transpose_for_scores(self.value(inputs=hidden_states), batch_size) |
|
key_layer = tf.concat([past_key_value[0], key_layer], axis=2) |
|
value_layer = tf.concat([past_key_value[1], value_layer], axis=2) |
|
else: |
|
key_layer = self.transpose_for_scores(self.key(inputs=hidden_states), batch_size) |
|
value_layer = self.transpose_for_scores(self.value(inputs=hidden_states), batch_size) |
|
|
|
query_layer = self.transpose_for_scores(mixed_query_layer, batch_size) |
|
|
|
if self.is_decoder: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
past_key_value = (key_layer, value_layer) |
|
|
|
|
|
|
|
attention_scores = tf.matmul(query_layer, key_layer, transpose_b=True) |
|
dk = tf.cast(self.sqrt_att_head_size, dtype=attention_scores.dtype) |
|
attention_scores = tf.divide(attention_scores, dk) |
|
|
|
if attention_mask is not None: |
|
|
|
attention_scores = tf.add(attention_scores, attention_mask) |
|
|
|
|
|
attention_probs = stable_softmax(logits=attention_scores, axis=-1) |
|
|
|
|
|
|
|
attention_probs = self.dropout(inputs=attention_probs, training=training) |
|
|
|
|
|
if head_mask is not None: |
|
attention_probs = tf.multiply(attention_probs, head_mask) |
|
|
|
attention_output = tf.matmul(attention_probs, value_layer) |
|
attention_output = tf.transpose(attention_output, perm=[0, 2, 1, 3]) |
|
|
|
|
|
attention_output = tf.reshape(tensor=attention_output, shape=(batch_size, -1, self.all_head_size)) |
|
outputs = (attention_output, attention_probs) if output_attentions else (attention_output,) |
|
|
|
if self.is_decoder: |
|
outputs = outputs + (past_key_value,) |
|
return outputs |
|
|
|
|
|
|
|
class TFCamembertSelfOutput(tf.keras.layers.Layer): |
|
def __init__(self, config: CamembertConfig, **kwargs): |
|
super().__init__(**kwargs) |
|
|
|
self.dense = tf.keras.layers.Dense( |
|
units=config.hidden_size, kernel_initializer=get_initializer(config.initializer_range), name="dense" |
|
) |
|
self.LayerNorm = tf.keras.layers.LayerNormalization(epsilon=config.layer_norm_eps, name="LayerNorm") |
|
self.dropout = tf.keras.layers.Dropout(rate=config.hidden_dropout_prob) |
|
|
|
def call(self, hidden_states: tf.Tensor, input_tensor: tf.Tensor, training: bool = False) -> tf.Tensor: |
|
hidden_states = self.dense(inputs=hidden_states) |
|
hidden_states = self.dropout(inputs=hidden_states, training=training) |
|
hidden_states = self.LayerNorm(inputs=hidden_states + input_tensor) |
|
|
|
return hidden_states |
|
|
|
|
|
|
|
class TFCamembertAttention(tf.keras.layers.Layer): |
|
def __init__(self, config: CamembertConfig, **kwargs): |
|
super().__init__(**kwargs) |
|
|
|
self.self_attention = TFCamembertSelfAttention(config, name="self") |
|
self.dense_output = TFCamembertSelfOutput(config, name="output") |
|
|
|
def prune_heads(self, heads): |
|
raise NotImplementedError |
|
|
|
def call( |
|
self, |
|
input_tensor: tf.Tensor, |
|
attention_mask: tf.Tensor, |
|
head_mask: tf.Tensor, |
|
encoder_hidden_states: tf.Tensor, |
|
encoder_attention_mask: tf.Tensor, |
|
past_key_value: Tuple[tf.Tensor], |
|
output_attentions: bool, |
|
training: bool = False, |
|
) -> Tuple[tf.Tensor]: |
|
self_outputs = self.self_attention( |
|
hidden_states=input_tensor, |
|
attention_mask=attention_mask, |
|
head_mask=head_mask, |
|
encoder_hidden_states=encoder_hidden_states, |
|
encoder_attention_mask=encoder_attention_mask, |
|
past_key_value=past_key_value, |
|
output_attentions=output_attentions, |
|
training=training, |
|
) |
|
attention_output = self.dense_output( |
|
hidden_states=self_outputs[0], input_tensor=input_tensor, training=training |
|
) |
|
|
|
outputs = (attention_output,) + self_outputs[1:] |
|
|
|
return outputs |
|
|
|
|
|
|
|
class TFCamembertIntermediate(tf.keras.layers.Layer): |
|
def __init__(self, config: CamembertConfig, **kwargs): |
|
super().__init__(**kwargs) |
|
|
|
self.dense = tf.keras.layers.Dense( |
|
units=config.intermediate_size, kernel_initializer=get_initializer(config.initializer_range), name="dense" |
|
) |
|
|
|
if isinstance(config.hidden_act, str): |
|
self.intermediate_act_fn = get_tf_activation(config.hidden_act) |
|
else: |
|
self.intermediate_act_fn = config.hidden_act |
|
|
|
def call(self, hidden_states: tf.Tensor) -> tf.Tensor: |
|
hidden_states = self.dense(inputs=hidden_states) |
|
hidden_states = self.intermediate_act_fn(hidden_states) |
|
|
|
return hidden_states |
|
|
|
|
|
|
|
class TFCamembertOutput(tf.keras.layers.Layer): |
|
def __init__(self, config: CamembertConfig, **kwargs): |
|
super().__init__(**kwargs) |
|
|
|
self.dense = tf.keras.layers.Dense( |
|
units=config.hidden_size, kernel_initializer=get_initializer(config.initializer_range), name="dense" |
|
) |
|
self.LayerNorm = tf.keras.layers.LayerNormalization(epsilon=config.layer_norm_eps, name="LayerNorm") |
|
self.dropout = tf.keras.layers.Dropout(rate=config.hidden_dropout_prob) |
|
|
|
def call(self, hidden_states: tf.Tensor, input_tensor: tf.Tensor, training: bool = False) -> tf.Tensor: |
|
hidden_states = self.dense(inputs=hidden_states) |
|
hidden_states = self.dropout(inputs=hidden_states, training=training) |
|
hidden_states = self.LayerNorm(inputs=hidden_states + input_tensor) |
|
|
|
return hidden_states |
|
|
|
|
|
|
|
class TFCamembertLayer(tf.keras.layers.Layer): |
|
def __init__(self, config: CamembertConfig, **kwargs): |
|
super().__init__(**kwargs) |
|
|
|
self.attention = TFCamembertAttention(config, name="attention") |
|
self.is_decoder = config.is_decoder |
|
self.add_cross_attention = config.add_cross_attention |
|
if self.add_cross_attention: |
|
if not self.is_decoder: |
|
raise ValueError(f"{self} should be used as a decoder model if cross attention is added") |
|
self.crossattention = TFCamembertAttention(config, name="crossattention") |
|
self.intermediate = TFCamembertIntermediate(config, name="intermediate") |
|
self.bert_output = TFCamembertOutput(config, name="output") |
|
|
|
def call( |
|
self, |
|
hidden_states: tf.Tensor, |
|
attention_mask: tf.Tensor, |
|
head_mask: tf.Tensor, |
|
encoder_hidden_states: tf.Tensor | None, |
|
encoder_attention_mask: tf.Tensor | None, |
|
past_key_value: Tuple[tf.Tensor] | None, |
|
output_attentions: bool, |
|
training: bool = False, |
|
) -> Tuple[tf.Tensor]: |
|
|
|
self_attn_past_key_value = past_key_value[:2] if past_key_value is not None else None |
|
self_attention_outputs = self.attention( |
|
input_tensor=hidden_states, |
|
attention_mask=attention_mask, |
|
head_mask=head_mask, |
|
encoder_hidden_states=None, |
|
encoder_attention_mask=None, |
|
past_key_value=self_attn_past_key_value, |
|
output_attentions=output_attentions, |
|
training=training, |
|
) |
|
attention_output = self_attention_outputs[0] |
|
|
|
|
|
if self.is_decoder: |
|
outputs = self_attention_outputs[1:-1] |
|
present_key_value = self_attention_outputs[-1] |
|
else: |
|
outputs = self_attention_outputs[1:] |
|
|
|
cross_attn_present_key_value = None |
|
if self.is_decoder and encoder_hidden_states is not None: |
|
if not hasattr(self, "crossattention"): |
|
raise ValueError( |
|
f"If `encoder_hidden_states` are passed, {self} has to be instantiated with cross-attention layers" |
|
" by setting `config.add_cross_attention=True`" |
|
) |
|
|
|
|
|
cross_attn_past_key_value = past_key_value[-2:] if past_key_value is not None else None |
|
cross_attention_outputs = self.crossattention( |
|
input_tensor=attention_output, |
|
attention_mask=attention_mask, |
|
head_mask=head_mask, |
|
encoder_hidden_states=encoder_hidden_states, |
|
encoder_attention_mask=encoder_attention_mask, |
|
past_key_value=cross_attn_past_key_value, |
|
output_attentions=output_attentions, |
|
training=training, |
|
) |
|
attention_output = cross_attention_outputs[0] |
|
outputs = outputs + cross_attention_outputs[1:-1] |
|
|
|
|
|
cross_attn_present_key_value = cross_attention_outputs[-1] |
|
present_key_value = present_key_value + cross_attn_present_key_value |
|
|
|
intermediate_output = self.intermediate(hidden_states=attention_output) |
|
layer_output = self.bert_output( |
|
hidden_states=intermediate_output, input_tensor=attention_output, training=training |
|
) |
|
outputs = (layer_output,) + outputs |
|
|
|
|
|
if self.is_decoder: |
|
outputs = outputs + (present_key_value,) |
|
|
|
return outputs |
|
|
|
|
|
|
|
class TFCamembertEncoder(tf.keras.layers.Layer): |
|
def __init__(self, config: CamembertConfig, **kwargs): |
|
super().__init__(**kwargs) |
|
self.config = config |
|
self.layer = [TFCamembertLayer(config, name=f"layer_._{i}") for i in range(config.num_hidden_layers)] |
|
|
|
def call( |
|
self, |
|
hidden_states: tf.Tensor, |
|
attention_mask: tf.Tensor, |
|
head_mask: tf.Tensor, |
|
encoder_hidden_states: tf.Tensor | None, |
|
encoder_attention_mask: tf.Tensor | None, |
|
past_key_values: Tuple[Tuple[tf.Tensor]] | None, |
|
use_cache: Optional[bool], |
|
output_attentions: bool, |
|
output_hidden_states: bool, |
|
return_dict: bool, |
|
training: bool = False, |
|
) -> Union[TFBaseModelOutputWithPastAndCrossAttentions, Tuple[tf.Tensor]]: |
|
all_hidden_states = () if output_hidden_states else None |
|
all_attentions = () if output_attentions else None |
|
all_cross_attentions = () if output_attentions and self.config.add_cross_attention else None |
|
|
|
next_decoder_cache = () if use_cache else None |
|
for i, layer_module in enumerate(self.layer): |
|
if output_hidden_states: |
|
all_hidden_states = all_hidden_states + (hidden_states,) |
|
|
|
past_key_value = past_key_values[i] if past_key_values is not None else None |
|
|
|
layer_outputs = layer_module( |
|
hidden_states=hidden_states, |
|
attention_mask=attention_mask, |
|
head_mask=head_mask[i], |
|
encoder_hidden_states=encoder_hidden_states, |
|
encoder_attention_mask=encoder_attention_mask, |
|
past_key_value=past_key_value, |
|
output_attentions=output_attentions, |
|
training=training, |
|
) |
|
hidden_states = layer_outputs[0] |
|
|
|
if use_cache: |
|
next_decoder_cache += (layer_outputs[-1],) |
|
|
|
if output_attentions: |
|
all_attentions = all_attentions + (layer_outputs[1],) |
|
if self.config.add_cross_attention and encoder_hidden_states is not None: |
|
all_cross_attentions = all_cross_attentions + (layer_outputs[2],) |
|
|
|
|
|
if output_hidden_states: |
|
all_hidden_states = all_hidden_states + (hidden_states,) |
|
|
|
if not return_dict: |
|
return tuple( |
|
v for v in [hidden_states, all_hidden_states, all_attentions, all_cross_attentions] if v is not None |
|
) |
|
|
|
return TFBaseModelOutputWithPastAndCrossAttentions( |
|
last_hidden_state=hidden_states, |
|
past_key_values=next_decoder_cache, |
|
hidden_states=all_hidden_states, |
|
attentions=all_attentions, |
|
cross_attentions=all_cross_attentions, |
|
) |
|
|
|
|
|
@keras_serializable |
|
|
|
class TFCamembertMainLayer(tf.keras.layers.Layer): |
|
config_class = CamembertConfig |
|
|
|
def __init__(self, config, add_pooling_layer=True, **kwargs): |
|
super().__init__(**kwargs) |
|
|
|
self.config = config |
|
self.is_decoder = config.is_decoder |
|
|
|
self.num_hidden_layers = config.num_hidden_layers |
|
self.initializer_range = config.initializer_range |
|
self.output_attentions = config.output_attentions |
|
self.output_hidden_states = config.output_hidden_states |
|
self.return_dict = config.use_return_dict |
|
self.encoder = TFCamembertEncoder(config, name="encoder") |
|
self.pooler = TFCamembertPooler(config, name="pooler") if add_pooling_layer else None |
|
|
|
self.embeddings = TFCamembertEmbeddings(config, name="embeddings") |
|
|
|
|
|
def get_input_embeddings(self) -> tf.keras.layers.Layer: |
|
return self.embeddings |
|
|
|
|
|
def set_input_embeddings(self, value: tf.Variable): |
|
self.embeddings.weight = value |
|
self.embeddings.vocab_size = shape_list(value)[0] |
|
|
|
|
|
def _prune_heads(self, heads_to_prune): |
|
""" |
|
Prunes heads of the model. heads_to_prune: dict of {layer_num: list of heads to prune in this layer} See base |
|
class PreTrainedModel |
|
""" |
|
raise NotImplementedError |
|
|
|
@unpack_inputs |
|
|
|
def call( |
|
self, |
|
input_ids: TFModelInputType | None = None, |
|
attention_mask: np.ndarray | tf.Tensor | None = None, |
|
token_type_ids: np.ndarray | tf.Tensor | None = None, |
|
position_ids: np.ndarray | tf.Tensor | None = None, |
|
head_mask: np.ndarray | tf.Tensor | None = None, |
|
inputs_embeds: np.ndarray | tf.Tensor | None = None, |
|
encoder_hidden_states: np.ndarray | tf.Tensor | None = None, |
|
encoder_attention_mask: np.ndarray | tf.Tensor | None = None, |
|
past_key_values: Optional[Tuple[Tuple[Union[np.ndarray, tf.Tensor]]]] = None, |
|
use_cache: Optional[bool] = None, |
|
output_attentions: Optional[bool] = None, |
|
output_hidden_states: Optional[bool] = None, |
|
return_dict: Optional[bool] = None, |
|
training: bool = False, |
|
) -> Union[TFBaseModelOutputWithPoolingAndCrossAttentions, Tuple[tf.Tensor]]: |
|
if not self.config.is_decoder: |
|
use_cache = False |
|
|
|
if input_ids is not None and inputs_embeds is not None: |
|
raise ValueError("You cannot specify both input_ids and inputs_embeds at the same time") |
|
elif input_ids is not None: |
|
input_shape = shape_list(input_ids) |
|
elif inputs_embeds is not None: |
|
input_shape = shape_list(inputs_embeds)[:-1] |
|
else: |
|
raise ValueError("You have to specify either input_ids or inputs_embeds") |
|
|
|
batch_size, seq_length = input_shape |
|
|
|
if past_key_values is None: |
|
past_key_values_length = 0 |
|
past_key_values = [None] * len(self.encoder.layer) |
|
else: |
|
past_key_values_length = shape_list(past_key_values[0][0])[-2] |
|
|
|
if attention_mask is None: |
|
attention_mask = tf.fill(dims=(batch_size, seq_length + past_key_values_length), value=1) |
|
|
|
if token_type_ids is None: |
|
token_type_ids = tf.fill(dims=input_shape, value=0) |
|
|
|
embedding_output = self.embeddings( |
|
input_ids=input_ids, |
|
position_ids=position_ids, |
|
token_type_ids=token_type_ids, |
|
inputs_embeds=inputs_embeds, |
|
past_key_values_length=past_key_values_length, |
|
training=training, |
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
attention_mask_shape = shape_list(attention_mask) |
|
|
|
mask_seq_length = seq_length + past_key_values_length |
|
|
|
|
|
|
|
|
|
if self.is_decoder: |
|
seq_ids = tf.range(mask_seq_length) |
|
causal_mask = tf.less_equal( |
|
tf.tile(seq_ids[None, None, :], (batch_size, mask_seq_length, 1)), |
|
seq_ids[None, :, None], |
|
) |
|
causal_mask = tf.cast(causal_mask, dtype=attention_mask.dtype) |
|
extended_attention_mask = causal_mask * attention_mask[:, None, :] |
|
attention_mask_shape = shape_list(extended_attention_mask) |
|
extended_attention_mask = tf.reshape( |
|
extended_attention_mask, (attention_mask_shape[0], 1, attention_mask_shape[1], attention_mask_shape[2]) |
|
) |
|
if past_key_values[0] is not None: |
|
|
|
extended_attention_mask = extended_attention_mask[:, :, -seq_length:, :] |
|
else: |
|
extended_attention_mask = tf.reshape( |
|
attention_mask, (attention_mask_shape[0], 1, 1, attention_mask_shape[1]) |
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
extended_attention_mask = tf.cast(extended_attention_mask, dtype=embedding_output.dtype) |
|
one_cst = tf.constant(1.0, dtype=embedding_output.dtype) |
|
ten_thousand_cst = tf.constant(-10000.0, dtype=embedding_output.dtype) |
|
extended_attention_mask = tf.multiply(tf.subtract(one_cst, extended_attention_mask), ten_thousand_cst) |
|
|
|
|
|
if self.is_decoder and encoder_attention_mask is not None: |
|
|
|
|
|
|
|
encoder_attention_mask = tf.cast(encoder_attention_mask, dtype=extended_attention_mask.dtype) |
|
num_dims_encoder_attention_mask = len(shape_list(encoder_attention_mask)) |
|
if num_dims_encoder_attention_mask == 3: |
|
encoder_extended_attention_mask = encoder_attention_mask[:, None, :, :] |
|
if num_dims_encoder_attention_mask == 2: |
|
encoder_extended_attention_mask = encoder_attention_mask[:, None, None, :] |
|
|
|
|
|
|
|
|
|
|
|
|
|
encoder_extended_attention_mask = (1.0 - encoder_extended_attention_mask) * -10000.0 |
|
else: |
|
encoder_extended_attention_mask = None |
|
|
|
|
|
|
|
|
|
|
|
|
|
if head_mask is not None: |
|
raise NotImplementedError |
|
else: |
|
head_mask = [None] * self.config.num_hidden_layers |
|
|
|
encoder_outputs = self.encoder( |
|
hidden_states=embedding_output, |
|
attention_mask=extended_attention_mask, |
|
head_mask=head_mask, |
|
encoder_hidden_states=encoder_hidden_states, |
|
encoder_attention_mask=encoder_extended_attention_mask, |
|
past_key_values=past_key_values, |
|
use_cache=use_cache, |
|
output_attentions=output_attentions, |
|
output_hidden_states=output_hidden_states, |
|
return_dict=return_dict, |
|
training=training, |
|
) |
|
|
|
sequence_output = encoder_outputs[0] |
|
pooled_output = self.pooler(hidden_states=sequence_output) if self.pooler is not None else None |
|
|
|
if not return_dict: |
|
return ( |
|
sequence_output, |
|
pooled_output, |
|
) + encoder_outputs[1:] |
|
|
|
return TFBaseModelOutputWithPoolingAndCrossAttentions( |
|
last_hidden_state=sequence_output, |
|
pooler_output=pooled_output, |
|
past_key_values=encoder_outputs.past_key_values, |
|
hidden_states=encoder_outputs.hidden_states, |
|
attentions=encoder_outputs.attentions, |
|
cross_attentions=encoder_outputs.cross_attentions, |
|
) |
|
|
|
|
|
class TFCamembertPreTrainedModel(TFPreTrainedModel): |
|
""" |
|
An abstract class to handle weights initialization and a simple interface for downloading and loading pretrained |
|
models. |
|
""" |
|
|
|
config_class = CamembertConfig |
|
base_model_prefix = "roberta" |
|
|
|
|
|
@add_start_docstrings( |
|
"The bare CamemBERT Model transformer outputting raw hidden-states without any specific head on top.", |
|
CAMEMBERT_START_DOCSTRING, |
|
) |
|
|
|
class TFCamembertModel(TFCamembertPreTrainedModel): |
|
def __init__(self, config, *inputs, **kwargs): |
|
super().__init__(config, *inputs, **kwargs) |
|
self.roberta = TFCamembertMainLayer(config, name="roberta") |
|
|
|
@unpack_inputs |
|
@add_start_docstrings_to_model_forward(CAMEMBERT_INPUTS_DOCSTRING.format("batch_size, sequence_length")) |
|
@add_code_sample_docstrings( |
|
checkpoint=_CHECKPOINT_FOR_DOC, |
|
output_type=TFBaseModelOutputWithPoolingAndCrossAttentions, |
|
config_class=_CONFIG_FOR_DOC, |
|
) |
|
def call( |
|
self, |
|
input_ids: TFModelInputType | None = None, |
|
attention_mask: np.ndarray | tf.Tensor | None = None, |
|
token_type_ids: np.ndarray | tf.Tensor | None = None, |
|
position_ids: np.ndarray | tf.Tensor | None = None, |
|
head_mask: np.ndarray | tf.Tensor | None = None, |
|
inputs_embeds: np.ndarray | tf.Tensor | None = None, |
|
encoder_hidden_states: np.ndarray | tf.Tensor | None = None, |
|
encoder_attention_mask: np.ndarray | tf.Tensor | None = None, |
|
past_key_values: Optional[Tuple[Tuple[Union[np.ndarray, tf.Tensor]]]] = None, |
|
use_cache: Optional[bool] = None, |
|
output_attentions: Optional[bool] = None, |
|
output_hidden_states: Optional[bool] = None, |
|
return_dict: Optional[bool] = None, |
|
training: Optional[bool] = False, |
|
) -> Union[Tuple, TFBaseModelOutputWithPoolingAndCrossAttentions]: |
|
r""" |
|
encoder_hidden_states (`tf.Tensor` of shape `(batch_size, sequence_length, hidden_size)`, *optional*): |
|
Sequence of hidden-states at the output of the last layer of the encoder. Used in the cross-attention if |
|
the model is configured as a decoder. |
|
encoder_attention_mask (`tf.Tensor` of shape `(batch_size, sequence_length)`, *optional*): |
|
Mask to avoid performing attention on the padding token indices of the encoder input. This mask is used in |
|
the cross-attention if the model is configured as a decoder. Mask values selected in `[0, 1]`: |
|
|
|
- 1 for tokens that are **not masked**, |
|
- 0 for tokens that are **masked**. |
|
|
|
past_key_values (`Tuple[Tuple[tf.Tensor]]` of length `config.n_layers`) |
|
contains precomputed key and value hidden states of the attention blocks. Can be used to speed up decoding. |
|
If `past_key_values` are used, the user can optionally input only the last `decoder_input_ids` (those that |
|
don't have their past key value states given to this model) of shape `(batch_size, 1)` instead of all |
|
`decoder_input_ids` of shape `(batch_size, sequence_length)`. |
|
use_cache (`bool`, *optional*, defaults to `True`): |
|
If set to `True`, `past_key_values` key value states are returned and can be used to speed up decoding (see |
|
`past_key_values`). Set to `False` during training, `True` during generation |
|
""" |
|
outputs = self.roberta( |
|
input_ids=input_ids, |
|
attention_mask=attention_mask, |
|
token_type_ids=token_type_ids, |
|
position_ids=position_ids, |
|
head_mask=head_mask, |
|
inputs_embeds=inputs_embeds, |
|
encoder_hidden_states=encoder_hidden_states, |
|
encoder_attention_mask=encoder_attention_mask, |
|
past_key_values=past_key_values, |
|
use_cache=use_cache, |
|
output_attentions=output_attentions, |
|
output_hidden_states=output_hidden_states, |
|
return_dict=return_dict, |
|
training=training, |
|
) |
|
|
|
return outputs |
|
|
|
|
|
|
|
class TFCamembertLMHead(tf.keras.layers.Layer): |
|
"""Camembert Head for masked language modeling.""" |
|
|
|
def __init__(self, config, input_embeddings, **kwargs): |
|
super().__init__(**kwargs) |
|
|
|
self.config = config |
|
self.hidden_size = config.hidden_size |
|
self.dense = tf.keras.layers.Dense( |
|
config.hidden_size, kernel_initializer=get_initializer(config.initializer_range), name="dense" |
|
) |
|
self.layer_norm = tf.keras.layers.LayerNormalization(epsilon=config.layer_norm_eps, name="layer_norm") |
|
self.act = get_tf_activation("gelu") |
|
|
|
|
|
|
|
self.decoder = input_embeddings |
|
|
|
def build(self, input_shape): |
|
self.bias = self.add_weight(shape=(self.config.vocab_size,), initializer="zeros", trainable=True, name="bias") |
|
|
|
super().build(input_shape) |
|
|
|
def get_output_embeddings(self): |
|
return self.decoder |
|
|
|
def set_output_embeddings(self, value): |
|
self.decoder.weight = value |
|
self.decoder.vocab_size = shape_list(value)[0] |
|
|
|
def get_bias(self): |
|
return {"bias": self.bias} |
|
|
|
def set_bias(self, value): |
|
self.bias = value["bias"] |
|
self.config.vocab_size = shape_list(value["bias"])[0] |
|
|
|
def call(self, hidden_states): |
|
hidden_states = self.dense(hidden_states) |
|
hidden_states = self.act(hidden_states) |
|
hidden_states = self.layer_norm(hidden_states) |
|
|
|
|
|
seq_length = shape_list(tensor=hidden_states)[1] |
|
hidden_states = tf.reshape(tensor=hidden_states, shape=[-1, self.hidden_size]) |
|
hidden_states = tf.matmul(a=hidden_states, b=self.decoder.weight, transpose_b=True) |
|
hidden_states = tf.reshape(tensor=hidden_states, shape=[-1, seq_length, self.config.vocab_size]) |
|
hidden_states = tf.nn.bias_add(value=hidden_states, bias=self.bias) |
|
|
|
return hidden_states |
|
|
|
|
|
@add_start_docstrings( |
|
"""CamemBERT Model with a `language modeling` head on top.""", |
|
CAMEMBERT_START_DOCSTRING, |
|
) |
|
|
|
class TFCamembertForMaskedLM(TFCamembertPreTrainedModel, TFMaskedLanguageModelingLoss): |
|
|
|
_keys_to_ignore_on_load_unexpected = [r"pooler", r"lm_head.decoder.weight"] |
|
|
|
def __init__(self, config, *inputs, **kwargs): |
|
super().__init__(config, *inputs, **kwargs) |
|
|
|
self.roberta = TFCamembertMainLayer(config, add_pooling_layer=False, name="roberta") |
|
self.lm_head = TFCamembertLMHead(config, self.roberta.embeddings, name="lm_head") |
|
|
|
def get_lm_head(self): |
|
return self.lm_head |
|
|
|
def get_prefix_bias_name(self): |
|
warnings.warn("The method get_prefix_bias_name is deprecated. Please use `get_bias` instead.", FutureWarning) |
|
return self.name + "/" + self.lm_head.name |
|
|
|
@unpack_inputs |
|
@add_start_docstrings_to_model_forward(CAMEMBERT_INPUTS_DOCSTRING.format("batch_size, sequence_length")) |
|
@add_code_sample_docstrings( |
|
checkpoint=_CHECKPOINT_FOR_DOC, |
|
output_type=TFMaskedLMOutput, |
|
config_class=_CONFIG_FOR_DOC, |
|
mask="<mask>", |
|
expected_output="' Paris'", |
|
expected_loss=0.1, |
|
) |
|
def call( |
|
self, |
|
input_ids: TFModelInputType | None = None, |
|
attention_mask: np.ndarray | tf.Tensor | None = None, |
|
token_type_ids: np.ndarray | tf.Tensor | None = None, |
|
position_ids: np.ndarray | tf.Tensor | None = None, |
|
head_mask: np.ndarray | tf.Tensor | None = None, |
|
inputs_embeds: np.ndarray | tf.Tensor | None = None, |
|
output_attentions: Optional[bool] = None, |
|
output_hidden_states: Optional[bool] = None, |
|
return_dict: Optional[bool] = None, |
|
labels: np.ndarray | tf.Tensor | None = None, |
|
training: Optional[bool] = False, |
|
) -> Union[TFMaskedLMOutput, Tuple[tf.Tensor]]: |
|
r""" |
|
labels (`tf.Tensor` of shape `(batch_size, sequence_length)`, *optional*): |
|
Labels for computing the masked language modeling loss. Indices should be in `[-100, 0, ..., |
|
config.vocab_size]` (see `input_ids` docstring) Tokens with indices set to `-100` are ignored (masked), the |
|
loss is only computed for the tokens with labels in `[0, ..., config.vocab_size]` |
|
""" |
|
outputs = self.roberta( |
|
input_ids, |
|
attention_mask=attention_mask, |
|
token_type_ids=token_type_ids, |
|
position_ids=position_ids, |
|
head_mask=head_mask, |
|
inputs_embeds=inputs_embeds, |
|
output_attentions=output_attentions, |
|
output_hidden_states=output_hidden_states, |
|
return_dict=return_dict, |
|
training=training, |
|
) |
|
|
|
sequence_output = outputs[0] |
|
prediction_scores = self.lm_head(sequence_output) |
|
|
|
loss = None if labels is None else self.hf_compute_loss(labels, prediction_scores) |
|
|
|
if not return_dict: |
|
output = (prediction_scores,) + outputs[2:] |
|
return ((loss,) + output) if loss is not None else output |
|
|
|
return TFMaskedLMOutput( |
|
loss=loss, |
|
logits=prediction_scores, |
|
hidden_states=outputs.hidden_states, |
|
attentions=outputs.attentions, |
|
) |
|
|
|
|
|
|
|
class TFCamembertClassificationHead(tf.keras.layers.Layer): |
|
"""Head for sentence-level classification tasks.""" |
|
|
|
def __init__(self, config, **kwargs): |
|
super().__init__(**kwargs) |
|
self.dense = tf.keras.layers.Dense( |
|
config.hidden_size, |
|
kernel_initializer=get_initializer(config.initializer_range), |
|
activation="tanh", |
|
name="dense", |
|
) |
|
classifier_dropout = ( |
|
config.classifier_dropout if config.classifier_dropout is not None else config.hidden_dropout_prob |
|
) |
|
self.dropout = tf.keras.layers.Dropout(classifier_dropout) |
|
self.out_proj = tf.keras.layers.Dense( |
|
config.num_labels, kernel_initializer=get_initializer(config.initializer_range), name="out_proj" |
|
) |
|
|
|
def call(self, features, training=False): |
|
x = features[:, 0, :] |
|
x = self.dropout(x, training=training) |
|
x = self.dense(x) |
|
x = self.dropout(x, training=training) |
|
x = self.out_proj(x) |
|
return x |
|
|
|
|
|
@add_start_docstrings( |
|
""" |
|
CamemBERT Model transformer with a sequence classification/regression head on top (a linear layer on top of the |
|
pooled output) e.g. for GLUE tasks. |
|
""", |
|
CAMEMBERT_START_DOCSTRING, |
|
) |
|
|
|
class TFCamembertForSequenceClassification(TFCamembertPreTrainedModel, TFSequenceClassificationLoss): |
|
|
|
_keys_to_ignore_on_load_unexpected = [r"pooler", r"lm_head"] |
|
|
|
def __init__(self, config, *inputs, **kwargs): |
|
super().__init__(config, *inputs, **kwargs) |
|
self.num_labels = config.num_labels |
|
|
|
self.roberta = TFCamembertMainLayer(config, add_pooling_layer=False, name="roberta") |
|
self.classifier = TFCamembertClassificationHead(config, name="classifier") |
|
|
|
@unpack_inputs |
|
@add_start_docstrings_to_model_forward(CAMEMBERT_INPUTS_DOCSTRING.format("batch_size, sequence_length")) |
|
@add_code_sample_docstrings( |
|
checkpoint="cardiffnlp/twitter-roberta-base-emotion", |
|
output_type=TFSequenceClassifierOutput, |
|
config_class=_CONFIG_FOR_DOC, |
|
expected_output="'optimism'", |
|
expected_loss=0.08, |
|
) |
|
def call( |
|
self, |
|
input_ids: TFModelInputType | None = None, |
|
attention_mask: np.ndarray | tf.Tensor | None = None, |
|
token_type_ids: np.ndarray | tf.Tensor | None = None, |
|
position_ids: np.ndarray | tf.Tensor | None = None, |
|
head_mask: np.ndarray | tf.Tensor | None = None, |
|
inputs_embeds: np.ndarray | tf.Tensor | None = None, |
|
output_attentions: Optional[bool] = None, |
|
output_hidden_states: Optional[bool] = None, |
|
return_dict: Optional[bool] = None, |
|
labels: np.ndarray | tf.Tensor | None = None, |
|
training: Optional[bool] = False, |
|
) -> Union[TFSequenceClassifierOutput, Tuple[tf.Tensor]]: |
|
r""" |
|
labels (`tf.Tensor` of shape `(batch_size,)`, *optional*): |
|
Labels for computing the sequence classification/regression loss. Indices should be in `[0, ..., |
|
config.num_labels - 1]`. If `config.num_labels == 1` a regression loss is computed (Mean-Square loss), If |
|
`config.num_labels > 1` a classification loss is computed (Cross-Entropy). |
|
""" |
|
outputs = self.roberta( |
|
input_ids, |
|
attention_mask=attention_mask, |
|
token_type_ids=token_type_ids, |
|
position_ids=position_ids, |
|
head_mask=head_mask, |
|
inputs_embeds=inputs_embeds, |
|
output_attentions=output_attentions, |
|
output_hidden_states=output_hidden_states, |
|
return_dict=return_dict, |
|
training=training, |
|
) |
|
sequence_output = outputs[0] |
|
logits = self.classifier(sequence_output, training=training) |
|
|
|
loss = None if labels is None else self.hf_compute_loss(labels, logits) |
|
|
|
if not return_dict: |
|
output = (logits,) + outputs[2:] |
|
return ((loss,) + output) if loss is not None else output |
|
|
|
return TFSequenceClassifierOutput( |
|
loss=loss, |
|
logits=logits, |
|
hidden_states=outputs.hidden_states, |
|
attentions=outputs.attentions, |
|
) |
|
|
|
|
|
@add_start_docstrings( |
|
""" |
|
CamemBERT Model with a token classification head on top (a linear layer on top of the hidden-states output) e.g. |
|
for Named-Entity-Recognition (NER) tasks. |
|
""", |
|
CAMEMBERT_START_DOCSTRING, |
|
) |
|
|
|
class TFCamembertForTokenClassification(TFCamembertPreTrainedModel, TFTokenClassificationLoss): |
|
|
|
_keys_to_ignore_on_load_unexpected = [r"pooler", r"lm_head"] |
|
_keys_to_ignore_on_load_missing = [r"dropout"] |
|
|
|
def __init__(self, config, *inputs, **kwargs): |
|
super().__init__(config, *inputs, **kwargs) |
|
self.num_labels = config.num_labels |
|
|
|
self.roberta = TFCamembertMainLayer(config, add_pooling_layer=False, name="roberta") |
|
classifier_dropout = ( |
|
config.classifier_dropout if config.classifier_dropout is not None else config.hidden_dropout_prob |
|
) |
|
self.dropout = tf.keras.layers.Dropout(classifier_dropout) |
|
self.classifier = tf.keras.layers.Dense( |
|
config.num_labels, kernel_initializer=get_initializer(config.initializer_range), name="classifier" |
|
) |
|
|
|
@unpack_inputs |
|
@add_start_docstrings_to_model_forward(CAMEMBERT_INPUTS_DOCSTRING.format("batch_size, sequence_length")) |
|
@add_code_sample_docstrings( |
|
checkpoint="ydshieh/roberta-large-ner-english", |
|
output_type=TFTokenClassifierOutput, |
|
config_class=_CONFIG_FOR_DOC, |
|
expected_output="['O', 'ORG', 'ORG', 'O', 'O', 'O', 'O', 'O', 'LOC', 'O', 'LOC', 'LOC']", |
|
expected_loss=0.01, |
|
) |
|
def call( |
|
self, |
|
input_ids: TFModelInputType | None = None, |
|
attention_mask: np.ndarray | tf.Tensor | None = None, |
|
token_type_ids: np.ndarray | tf.Tensor | None = None, |
|
position_ids: np.ndarray | tf.Tensor | None = None, |
|
head_mask: np.ndarray | tf.Tensor | None = None, |
|
inputs_embeds: np.ndarray | tf.Tensor | None = None, |
|
output_attentions: Optional[bool] = None, |
|
output_hidden_states: Optional[bool] = None, |
|
return_dict: Optional[bool] = None, |
|
labels: np.ndarray | tf.Tensor | None = None, |
|
training: Optional[bool] = False, |
|
) -> Union[TFTokenClassifierOutput, Tuple[tf.Tensor]]: |
|
r""" |
|
labels (`tf.Tensor` of shape `(batch_size, sequence_length)`, *optional*): |
|
Labels for computing the token classification loss. Indices should be in `[0, ..., config.num_labels - 1]`. |
|
""" |
|
outputs = self.roberta( |
|
input_ids, |
|
attention_mask=attention_mask, |
|
token_type_ids=token_type_ids, |
|
position_ids=position_ids, |
|
head_mask=head_mask, |
|
inputs_embeds=inputs_embeds, |
|
output_attentions=output_attentions, |
|
output_hidden_states=output_hidden_states, |
|
return_dict=return_dict, |
|
training=training, |
|
) |
|
sequence_output = outputs[0] |
|
|
|
sequence_output = self.dropout(sequence_output, training=training) |
|
logits = self.classifier(sequence_output) |
|
|
|
loss = None if labels is None else self.hf_compute_loss(labels, logits) |
|
|
|
if not return_dict: |
|
output = (logits,) + outputs[2:] |
|
return ((loss,) + output) if loss is not None else output |
|
|
|
return TFTokenClassifierOutput( |
|
loss=loss, |
|
logits=logits, |
|
hidden_states=outputs.hidden_states, |
|
attentions=outputs.attentions, |
|
) |
|
|
|
|
|
@add_start_docstrings( |
|
""" |
|
CamemBERT Model with a multiple choice classification head on top (a linear layer on top of the pooled output and a |
|
softmax) e.g. for RocStories/SWAG tasks. |
|
""", |
|
CAMEMBERT_START_DOCSTRING, |
|
) |
|
|
|
class TFCamembertForMultipleChoice(TFCamembertPreTrainedModel, TFMultipleChoiceLoss): |
|
|
|
_keys_to_ignore_on_load_unexpected = [r"lm_head"] |
|
_keys_to_ignore_on_load_missing = [r"dropout"] |
|
|
|
def __init__(self, config, *inputs, **kwargs): |
|
super().__init__(config, *inputs, **kwargs) |
|
|
|
self.roberta = TFCamembertMainLayer(config, name="roberta") |
|
self.dropout = tf.keras.layers.Dropout(config.hidden_dropout_prob) |
|
self.classifier = tf.keras.layers.Dense( |
|
1, kernel_initializer=get_initializer(config.initializer_range), name="classifier" |
|
) |
|
|
|
@unpack_inputs |
|
@add_start_docstrings_to_model_forward( |
|
CAMEMBERT_INPUTS_DOCSTRING.format("batch_size, num_choices, sequence_length") |
|
) |
|
@add_code_sample_docstrings( |
|
checkpoint=_CHECKPOINT_FOR_DOC, |
|
output_type=TFMultipleChoiceModelOutput, |
|
config_class=_CONFIG_FOR_DOC, |
|
) |
|
def call( |
|
self, |
|
input_ids: TFModelInputType | None = None, |
|
attention_mask: np.ndarray | tf.Tensor | None = None, |
|
token_type_ids: np.ndarray | tf.Tensor | None = None, |
|
position_ids: np.ndarray | tf.Tensor | None = None, |
|
head_mask: np.ndarray | tf.Tensor | None = None, |
|
inputs_embeds: np.ndarray | tf.Tensor | None = None, |
|
output_attentions: Optional[bool] = None, |
|
output_hidden_states: Optional[bool] = None, |
|
return_dict: Optional[bool] = None, |
|
labels: np.ndarray | tf.Tensor | None = None, |
|
training: Optional[bool] = False, |
|
) -> Union[TFMultipleChoiceModelOutput, Tuple[tf.Tensor]]: |
|
r""" |
|
labels (`tf.Tensor` of shape `(batch_size,)`, *optional*): |
|
Labels for computing the multiple choice classification loss. Indices should be in `[0, ..., num_choices]` |
|
where `num_choices` is the size of the second dimension of the input tensors. (See `input_ids` above) |
|
""" |
|
|
|
if input_ids is not None: |
|
num_choices = shape_list(input_ids)[1] |
|
seq_length = shape_list(input_ids)[2] |
|
else: |
|
num_choices = shape_list(inputs_embeds)[1] |
|
seq_length = shape_list(inputs_embeds)[2] |
|
|
|
flat_input_ids = tf.reshape(input_ids, (-1, seq_length)) if input_ids is not None else None |
|
flat_attention_mask = tf.reshape(attention_mask, (-1, seq_length)) if attention_mask is not None else None |
|
flat_token_type_ids = tf.reshape(token_type_ids, (-1, seq_length)) if token_type_ids is not None else None |
|
flat_position_ids = tf.reshape(position_ids, (-1, seq_length)) if position_ids is not None else None |
|
outputs = self.roberta( |
|
flat_input_ids, |
|
flat_attention_mask, |
|
flat_token_type_ids, |
|
flat_position_ids, |
|
head_mask, |
|
inputs_embeds, |
|
output_attentions, |
|
output_hidden_states, |
|
return_dict=return_dict, |
|
training=training, |
|
) |
|
pooled_output = outputs[1] |
|
pooled_output = self.dropout(pooled_output, training=training) |
|
logits = self.classifier(pooled_output) |
|
reshaped_logits = tf.reshape(logits, (-1, num_choices)) |
|
|
|
loss = None if labels is None else self.hf_compute_loss(labels, reshaped_logits) |
|
|
|
if not return_dict: |
|
output = (reshaped_logits,) + outputs[2:] |
|
return ((loss,) + output) if loss is not None else output |
|
|
|
return TFMultipleChoiceModelOutput( |
|
loss=loss, |
|
logits=reshaped_logits, |
|
hidden_states=outputs.hidden_states, |
|
attentions=outputs.attentions, |
|
) |
|
|
|
|
|
@add_start_docstrings( |
|
""" |
|
CamemBERT Model with a span classification head on top for extractive question-answering tasks like SQuAD (a linear |
|
layers on top of the hidden-states output to compute `span start logits` and `span end logits`). |
|
""", |
|
CAMEMBERT_START_DOCSTRING, |
|
) |
|
|
|
class TFCamembertForQuestionAnswering(TFCamembertPreTrainedModel, TFQuestionAnsweringLoss): |
|
|
|
_keys_to_ignore_on_load_unexpected = [r"pooler", r"lm_head"] |
|
|
|
def __init__(self, config, *inputs, **kwargs): |
|
super().__init__(config, *inputs, **kwargs) |
|
self.num_labels = config.num_labels |
|
|
|
self.roberta = TFCamembertMainLayer(config, add_pooling_layer=False, name="roberta") |
|
self.qa_outputs = tf.keras.layers.Dense( |
|
config.num_labels, kernel_initializer=get_initializer(config.initializer_range), name="qa_outputs" |
|
) |
|
|
|
@unpack_inputs |
|
@add_start_docstrings_to_model_forward(CAMEMBERT_INPUTS_DOCSTRING.format("batch_size, sequence_length")) |
|
@add_code_sample_docstrings( |
|
checkpoint="ydshieh/roberta-base-squad2", |
|
output_type=TFQuestionAnsweringModelOutput, |
|
config_class=_CONFIG_FOR_DOC, |
|
expected_output="' puppet'", |
|
expected_loss=0.86, |
|
) |
|
def call( |
|
self, |
|
input_ids: TFModelInputType | None = None, |
|
attention_mask: np.ndarray | tf.Tensor | None = None, |
|
token_type_ids: np.ndarray | tf.Tensor | None = None, |
|
position_ids: np.ndarray | tf.Tensor | None = None, |
|
head_mask: np.ndarray | tf.Tensor | None = None, |
|
inputs_embeds: np.ndarray | tf.Tensor | None = None, |
|
output_attentions: Optional[bool] = None, |
|
output_hidden_states: Optional[bool] = None, |
|
return_dict: Optional[bool] = None, |
|
start_positions: np.ndarray | tf.Tensor | None = None, |
|
end_positions: np.ndarray | tf.Tensor | None = None, |
|
training: Optional[bool] = False, |
|
) -> Union[TFQuestionAnsweringModelOutput, Tuple[tf.Tensor]]: |
|
r""" |
|
start_positions (`tf.Tensor` of shape `(batch_size,)`, *optional*): |
|
Labels for position (index) of the start of the labelled span for computing the token classification loss. |
|
Positions are clamped to the length of the sequence (`sequence_length`). Position outside of the sequence |
|
are not taken into account for computing the loss. |
|
end_positions (`tf.Tensor` of shape `(batch_size,)`, *optional*): |
|
Labels for position (index) of the end of the labelled span for computing the token classification loss. |
|
Positions are clamped to the length of the sequence (`sequence_length`). Position outside of the sequence |
|
are not taken into account for computing the loss. |
|
""" |
|
outputs = self.roberta( |
|
input_ids, |
|
attention_mask=attention_mask, |
|
token_type_ids=token_type_ids, |
|
position_ids=position_ids, |
|
head_mask=head_mask, |
|
inputs_embeds=inputs_embeds, |
|
output_attentions=output_attentions, |
|
output_hidden_states=output_hidden_states, |
|
return_dict=return_dict, |
|
training=training, |
|
) |
|
sequence_output = outputs[0] |
|
|
|
logits = self.qa_outputs(sequence_output) |
|
start_logits, end_logits = tf.split(logits, 2, axis=-1) |
|
start_logits = tf.squeeze(start_logits, axis=-1) |
|
end_logits = tf.squeeze(end_logits, axis=-1) |
|
|
|
loss = None |
|
if start_positions is not None and end_positions is not None: |
|
labels = {"start_position": start_positions} |
|
labels["end_position"] = end_positions |
|
loss = self.hf_compute_loss(labels, (start_logits, end_logits)) |
|
|
|
if not return_dict: |
|
output = (start_logits, end_logits) + outputs[2:] |
|
return ((loss,) + output) if loss is not None else output |
|
|
|
return TFQuestionAnsweringModelOutput( |
|
loss=loss, |
|
start_logits=start_logits, |
|
end_logits=end_logits, |
|
hidden_states=outputs.hidden_states, |
|
attentions=outputs.attentions, |
|
) |
|
|
|
|
|
@add_start_docstrings( |
|
"""CamemBERT Model with a `language modeling` head on top for CLM fine-tuning.""", CAMEMBERT_START_DOCSTRING |
|
) |
|
|
|
class TFCamembertForCausalLM(TFCamembertPreTrainedModel, TFCausalLanguageModelingLoss): |
|
|
|
_keys_to_ignore_on_load_unexpected = [r"pooler", r"lm_head.decoder.weight"] |
|
|
|
def __init__(self, config: CamembertConfig, *inputs, **kwargs): |
|
super().__init__(config, *inputs, **kwargs) |
|
|
|
if not config.is_decoder: |
|
logger.warning("If you want to use `TFCamembertLMHeadModel` as a standalone, add `is_decoder=True.`") |
|
|
|
self.roberta = TFCamembertMainLayer(config, add_pooling_layer=False, name="roberta") |
|
self.lm_head = TFCamembertLMHead(config, input_embeddings=self.roberta.embeddings, name="lm_head") |
|
|
|
def get_lm_head(self): |
|
return self.lm_head |
|
|
|
def get_prefix_bias_name(self): |
|
warnings.warn("The method get_prefix_bias_name is deprecated. Please use `get_bias` instead.", FutureWarning) |
|
return self.name + "/" + self.lm_head.name |
|
|
|
|
|
def prepare_inputs_for_generation(self, input_ids, past_key_values=None, attention_mask=None, **model_kwargs): |
|
input_shape = input_ids.shape |
|
|
|
if attention_mask is None: |
|
attention_mask = tf.ones(input_shape) |
|
|
|
|
|
if past_key_values is not None: |
|
input_ids = input_ids[:, -1:] |
|
|
|
return {"input_ids": input_ids, "attention_mask": attention_mask, "past_key_values": past_key_values} |
|
|
|
@unpack_inputs |
|
@add_start_docstrings_to_model_forward(CAMEMBERT_INPUTS_DOCSTRING.format("batch_size, sequence_length")) |
|
@add_code_sample_docstrings( |
|
checkpoint=_CHECKPOINT_FOR_DOC, |
|
output_type=TFCausalLMOutputWithCrossAttentions, |
|
config_class=_CONFIG_FOR_DOC, |
|
) |
|
def call( |
|
self, |
|
input_ids: TFModelInputType | None = None, |
|
attention_mask: np.ndarray | tf.Tensor | None = None, |
|
token_type_ids: np.ndarray | tf.Tensor | None = None, |
|
position_ids: np.ndarray | tf.Tensor | None = None, |
|
head_mask: np.ndarray | tf.Tensor | None = None, |
|
inputs_embeds: np.ndarray | tf.Tensor | None = None, |
|
encoder_hidden_states: np.ndarray | tf.Tensor | None = None, |
|
encoder_attention_mask: np.ndarray | tf.Tensor | None = None, |
|
past_key_values: Optional[Tuple[Tuple[Union[np.ndarray, tf.Tensor]]]] = None, |
|
use_cache: Optional[bool] = None, |
|
output_attentions: Optional[bool] = None, |
|
output_hidden_states: Optional[bool] = None, |
|
return_dict: Optional[bool] = None, |
|
labels: np.ndarray | tf.Tensor | None = None, |
|
training: Optional[bool] = False, |
|
) -> Union[TFCausalLMOutputWithCrossAttentions, Tuple[tf.Tensor]]: |
|
r""" |
|
encoder_hidden_states (`tf.Tensor` of shape `(batch_size, sequence_length, hidden_size)`, *optional*): |
|
Sequence of hidden-states at the output of the last layer of the encoder. Used in the cross-attention if |
|
the model is configured as a decoder. |
|
encoder_attention_mask (`tf.Tensor` of shape `(batch_size, sequence_length)`, *optional*): |
|
Mask to avoid performing attention on the padding token indices of the encoder input. This mask is used in |
|
the cross-attention if the model is configured as a decoder. Mask values selected in `[0, 1]`: |
|
|
|
- 1 for tokens that are **not masked**, |
|
- 0 for tokens that are **masked**. |
|
|
|
past_key_values (`Tuple[Tuple[tf.Tensor]]` of length `config.n_layers`) |
|
contains precomputed key and value hidden states of the attention blocks. Can be used to speed up decoding. |
|
If `past_key_values` are used, the user can optionally input only the last `decoder_input_ids` (those that |
|
don't have their past key value states given to this model) of shape `(batch_size, 1)` instead of all |
|
`decoder_input_ids` of shape `(batch_size, sequence_length)`. |
|
use_cache (`bool`, *optional*, defaults to `True`): |
|
If set to `True`, `past_key_values` key value states are returned and can be used to speed up decoding (see |
|
`past_key_values`). Set to `False` during training, `True` during generation |
|
labels (`tf.Tensor` or `np.ndarray` of shape `(batch_size, sequence_length)`, *optional*): |
|
Labels for computing the cross entropy classification loss. Indices should be in `[0, ..., |
|
config.vocab_size - 1]`. |
|
""" |
|
outputs = self.roberta( |
|
input_ids=input_ids, |
|
attention_mask=attention_mask, |
|
token_type_ids=token_type_ids, |
|
position_ids=position_ids, |
|
head_mask=head_mask, |
|
inputs_embeds=inputs_embeds, |
|
encoder_hidden_states=encoder_hidden_states, |
|
encoder_attention_mask=encoder_attention_mask, |
|
past_key_values=past_key_values, |
|
use_cache=use_cache, |
|
output_attentions=output_attentions, |
|
output_hidden_states=output_hidden_states, |
|
return_dict=return_dict, |
|
training=training, |
|
) |
|
|
|
sequence_output = outputs[0] |
|
logits = self.lm_head(hidden_states=sequence_output, training=training) |
|
loss = None |
|
|
|
if labels is not None: |
|
|
|
shifted_logits = logits[:, :-1] |
|
labels = labels[:, 1:] |
|
loss = self.hf_compute_loss(labels=labels, logits=shifted_logits) |
|
|
|
if not return_dict: |
|
output = (logits,) + outputs[2:] |
|
return ((loss,) + output) if loss is not None else output |
|
|
|
return TFCausalLMOutputWithCrossAttentions( |
|
loss=loss, |
|
logits=logits, |
|
past_key_values=outputs.past_key_values, |
|
hidden_states=outputs.hidden_states, |
|
attentions=outputs.attentions, |
|
cross_attentions=outputs.cross_attentions, |
|
) |
|
|