|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import csv |
|
import dataclasses |
|
import json |
|
from dataclasses import dataclass |
|
from typing import List, Optional, Union |
|
|
|
from ...utils import is_tf_available, is_torch_available, logging |
|
|
|
|
|
logger = logging.get_logger(__name__) |
|
|
|
|
|
@dataclass |
|
class InputExample: |
|
""" |
|
A single training/test example for simple sequence classification. |
|
|
|
Args: |
|
guid: Unique id for the example. |
|
text_a: string. The untokenized text of the first sequence. For single |
|
sequence tasks, only this sequence must be specified. |
|
text_b: (Optional) string. The untokenized text of the second sequence. |
|
Only must be specified for sequence pair tasks. |
|
label: (Optional) string. The label of the example. This should be |
|
specified for train and dev examples, but not for test examples. |
|
""" |
|
|
|
guid: str |
|
text_a: str |
|
text_b: Optional[str] = None |
|
label: Optional[str] = None |
|
|
|
def to_json_string(self): |
|
"""Serializes this instance to a JSON string.""" |
|
return json.dumps(dataclasses.asdict(self), indent=2) + "\n" |
|
|
|
|
|
@dataclass(frozen=True) |
|
class InputFeatures: |
|
""" |
|
A single set of features of data. Property names are the same names as the corresponding inputs to a model. |
|
|
|
Args: |
|
input_ids: Indices of input sequence tokens in the vocabulary. |
|
attention_mask: Mask to avoid performing attention on padding token indices. |
|
Mask values selected in `[0, 1]`: Usually `1` for tokens that are NOT MASKED, `0` for MASKED (padded) |
|
tokens. |
|
token_type_ids: (Optional) Segment token indices to indicate first and second |
|
portions of the inputs. Only some models use them. |
|
label: (Optional) Label corresponding to the input. Int for classification problems, |
|
float for regression problems. |
|
""" |
|
|
|
input_ids: List[int] |
|
attention_mask: Optional[List[int]] = None |
|
token_type_ids: Optional[List[int]] = None |
|
label: Optional[Union[int, float]] = None |
|
|
|
def to_json_string(self): |
|
"""Serializes this instance to a JSON string.""" |
|
return json.dumps(dataclasses.asdict(self)) + "\n" |
|
|
|
|
|
class DataProcessor: |
|
"""Base class for data converters for sequence classification data sets.""" |
|
|
|
def get_example_from_tensor_dict(self, tensor_dict): |
|
""" |
|
Gets an example from a dict with tensorflow tensors. |
|
|
|
Args: |
|
tensor_dict: Keys and values should match the corresponding Glue |
|
tensorflow_dataset examples. |
|
""" |
|
raise NotImplementedError() |
|
|
|
def get_train_examples(self, data_dir): |
|
"""Gets a collection of [`InputExample`] for the train set.""" |
|
raise NotImplementedError() |
|
|
|
def get_dev_examples(self, data_dir): |
|
"""Gets a collection of [`InputExample`] for the dev set.""" |
|
raise NotImplementedError() |
|
|
|
def get_test_examples(self, data_dir): |
|
"""Gets a collection of [`InputExample`] for the test set.""" |
|
raise NotImplementedError() |
|
|
|
def get_labels(self): |
|
"""Gets the list of labels for this data set.""" |
|
raise NotImplementedError() |
|
|
|
def tfds_map(self, example): |
|
""" |
|
Some tensorflow_datasets datasets are not formatted the same way the GLUE datasets are. This method converts |
|
examples to the correct format. |
|
""" |
|
if len(self.get_labels()) > 1: |
|
example.label = self.get_labels()[int(example.label)] |
|
return example |
|
|
|
@classmethod |
|
def _read_tsv(cls, input_file, quotechar=None): |
|
"""Reads a tab separated value file.""" |
|
with open(input_file, "r", encoding="utf-8-sig") as f: |
|
return list(csv.reader(f, delimiter="\t", quotechar=quotechar)) |
|
|
|
|
|
class SingleSentenceClassificationProcessor(DataProcessor): |
|
"""Generic processor for a single sentence classification data set.""" |
|
|
|
def __init__(self, labels=None, examples=None, mode="classification", verbose=False): |
|
self.labels = [] if labels is None else labels |
|
self.examples = [] if examples is None else examples |
|
self.mode = mode |
|
self.verbose = verbose |
|
|
|
def __len__(self): |
|
return len(self.examples) |
|
|
|
def __getitem__(self, idx): |
|
if isinstance(idx, slice): |
|
return SingleSentenceClassificationProcessor(labels=self.labels, examples=self.examples[idx]) |
|
return self.examples[idx] |
|
|
|
@classmethod |
|
def create_from_csv( |
|
cls, file_name, split_name="", column_label=0, column_text=1, column_id=None, skip_first_row=False, **kwargs |
|
): |
|
processor = cls(**kwargs) |
|
processor.add_examples_from_csv( |
|
file_name, |
|
split_name=split_name, |
|
column_label=column_label, |
|
column_text=column_text, |
|
column_id=column_id, |
|
skip_first_row=skip_first_row, |
|
overwrite_labels=True, |
|
overwrite_examples=True, |
|
) |
|
return processor |
|
|
|
@classmethod |
|
def create_from_examples(cls, texts_or_text_and_labels, labels=None, **kwargs): |
|
processor = cls(**kwargs) |
|
processor.add_examples(texts_or_text_and_labels, labels=labels) |
|
return processor |
|
|
|
def add_examples_from_csv( |
|
self, |
|
file_name, |
|
split_name="", |
|
column_label=0, |
|
column_text=1, |
|
column_id=None, |
|
skip_first_row=False, |
|
overwrite_labels=False, |
|
overwrite_examples=False, |
|
): |
|
lines = self._read_tsv(file_name) |
|
if skip_first_row: |
|
lines = lines[1:] |
|
texts = [] |
|
labels = [] |
|
ids = [] |
|
for i, line in enumerate(lines): |
|
texts.append(line[column_text]) |
|
labels.append(line[column_label]) |
|
if column_id is not None: |
|
ids.append(line[column_id]) |
|
else: |
|
guid = f"{split_name}-{i}" if split_name else str(i) |
|
ids.append(guid) |
|
|
|
return self.add_examples( |
|
texts, labels, ids, overwrite_labels=overwrite_labels, overwrite_examples=overwrite_examples |
|
) |
|
|
|
def add_examples( |
|
self, texts_or_text_and_labels, labels=None, ids=None, overwrite_labels=False, overwrite_examples=False |
|
): |
|
if labels is not None and len(texts_or_text_and_labels) != len(labels): |
|
raise ValueError( |
|
f"Text and labels have mismatched lengths {len(texts_or_text_and_labels)} and {len(labels)}" |
|
) |
|
if ids is not None and len(texts_or_text_and_labels) != len(ids): |
|
raise ValueError(f"Text and ids have mismatched lengths {len(texts_or_text_and_labels)} and {len(ids)}") |
|
if ids is None: |
|
ids = [None] * len(texts_or_text_and_labels) |
|
if labels is None: |
|
labels = [None] * len(texts_or_text_and_labels) |
|
examples = [] |
|
added_labels = set() |
|
for text_or_text_and_label, label, guid in zip(texts_or_text_and_labels, labels, ids): |
|
if isinstance(text_or_text_and_label, (tuple, list)) and label is None: |
|
text, label = text_or_text_and_label |
|
else: |
|
text = text_or_text_and_label |
|
added_labels.add(label) |
|
examples.append(InputExample(guid=guid, text_a=text, text_b=None, label=label)) |
|
|
|
|
|
if overwrite_examples: |
|
self.examples = examples |
|
else: |
|
self.examples.extend(examples) |
|
|
|
|
|
if overwrite_labels: |
|
self.labels = list(added_labels) |
|
else: |
|
self.labels = list(set(self.labels).union(added_labels)) |
|
|
|
return self.examples |
|
|
|
def get_features( |
|
self, |
|
tokenizer, |
|
max_length=None, |
|
pad_on_left=False, |
|
pad_token=0, |
|
mask_padding_with_zero=True, |
|
return_tensors=None, |
|
): |
|
""" |
|
Convert examples in a list of `InputFeatures` |
|
|
|
Args: |
|
tokenizer: Instance of a tokenizer that will tokenize the examples |
|
max_length: Maximum example length |
|
pad_on_left: If set to `True`, the examples will be padded on the left rather than on the right (default) |
|
pad_token: Padding token |
|
mask_padding_with_zero: If set to `True`, the attention mask will be filled by `1` for actual values |
|
and by `0` for padded values. If set to `False`, inverts it (`1` for padded values, `0` for actual |
|
values) |
|
|
|
Returns: |
|
If the `examples` input is a `tf.data.Dataset`, will return a `tf.data.Dataset` containing the |
|
task-specific features. If the input is a list of `InputExamples`, will return a list of task-specific |
|
`InputFeatures` which can be fed to the model. |
|
|
|
""" |
|
if max_length is None: |
|
max_length = tokenizer.max_len |
|
|
|
label_map = {label: i for i, label in enumerate(self.labels)} |
|
|
|
all_input_ids = [] |
|
for ex_index, example in enumerate(self.examples): |
|
if ex_index % 10000 == 0: |
|
logger.info(f"Tokenizing example {ex_index}") |
|
|
|
input_ids = tokenizer.encode( |
|
example.text_a, |
|
add_special_tokens=True, |
|
max_length=min(max_length, tokenizer.max_len), |
|
) |
|
all_input_ids.append(input_ids) |
|
|
|
batch_length = max(len(input_ids) for input_ids in all_input_ids) |
|
|
|
features = [] |
|
for ex_index, (input_ids, example) in enumerate(zip(all_input_ids, self.examples)): |
|
if ex_index % 10000 == 0: |
|
logger.info(f"Writing example {ex_index}/{len(self.examples)}") |
|
|
|
|
|
attention_mask = [1 if mask_padding_with_zero else 0] * len(input_ids) |
|
|
|
|
|
padding_length = batch_length - len(input_ids) |
|
if pad_on_left: |
|
input_ids = ([pad_token] * padding_length) + input_ids |
|
attention_mask = ([0 if mask_padding_with_zero else 1] * padding_length) + attention_mask |
|
else: |
|
input_ids = input_ids + ([pad_token] * padding_length) |
|
attention_mask = attention_mask + ([0 if mask_padding_with_zero else 1] * padding_length) |
|
|
|
if len(input_ids) != batch_length: |
|
raise ValueError(f"Error with input length {len(input_ids)} vs {batch_length}") |
|
if len(attention_mask) != batch_length: |
|
raise ValueError(f"Error with input length {len(attention_mask)} vs {batch_length}") |
|
|
|
if self.mode == "classification": |
|
label = label_map[example.label] |
|
elif self.mode == "regression": |
|
label = float(example.label) |
|
else: |
|
raise ValueError(self.mode) |
|
|
|
if ex_index < 5 and self.verbose: |
|
logger.info("*** Example ***") |
|
logger.info(f"guid: {example.guid}") |
|
logger.info(f"input_ids: {' '.join([str(x) for x in input_ids])}") |
|
logger.info(f"attention_mask: {' '.join([str(x) for x in attention_mask])}") |
|
logger.info(f"label: {example.label} (id = {label})") |
|
|
|
features.append(InputFeatures(input_ids=input_ids, attention_mask=attention_mask, label=label)) |
|
|
|
if return_tensors is None: |
|
return features |
|
elif return_tensors == "tf": |
|
if not is_tf_available(): |
|
raise RuntimeError("return_tensors set to 'tf' but TensorFlow 2.0 can't be imported") |
|
import tensorflow as tf |
|
|
|
def gen(): |
|
for ex in features: |
|
yield ({"input_ids": ex.input_ids, "attention_mask": ex.attention_mask}, ex.label) |
|
|
|
dataset = tf.data.Dataset.from_generator( |
|
gen, |
|
({"input_ids": tf.int32, "attention_mask": tf.int32}, tf.int64), |
|
({"input_ids": tf.TensorShape([None]), "attention_mask": tf.TensorShape([None])}, tf.TensorShape([])), |
|
) |
|
return dataset |
|
elif return_tensors == "pt": |
|
if not is_torch_available(): |
|
raise RuntimeError("return_tensors set to 'pt' but PyTorch can't be imported") |
|
import torch |
|
from torch.utils.data import TensorDataset |
|
|
|
all_input_ids = torch.tensor([f.input_ids for f in features], dtype=torch.long) |
|
all_attention_mask = torch.tensor([f.attention_mask for f in features], dtype=torch.long) |
|
if self.mode == "classification": |
|
all_labels = torch.tensor([f.label for f in features], dtype=torch.long) |
|
elif self.mode == "regression": |
|
all_labels = torch.tensor([f.label for f in features], dtype=torch.float) |
|
|
|
dataset = TensorDataset(all_input_ids, all_attention_mask, all_labels) |
|
return dataset |
|
else: |
|
raise ValueError("return_tensors should be one of 'tf' or 'pt'") |
|
|