|
|
|
|
|
|
|
|
|
"""Tokenization classes for xgen.""" |
|
|
|
import os |
|
import json |
|
from typing import List, Optional, Tuple, Union |
|
import warnings |
|
import copy |
|
|
|
from transformers.tokenization_utils import AddedToken, PreTrainedTokenizer |
|
from transformers.utils import logging |
|
from transformers.dynamic_module_utils import custom_object_save |
|
from transformers.tokenization_utils import TOKENIZER_CONFIG_FILE, SPECIAL_TOKENS_MAP_FILE |
|
|
|
try: |
|
import tiktoken |
|
except ModuleNotFoundError as e: |
|
raise ModuleNotFoundError("XGen requires the installation of tiktoken. Please install it via `pip install tiktoken`.") from e |
|
|
|
|
|
logger = logging.get_logger(__name__) |
|
|
|
MAX_MODEL_INPUT_SIZES = { |
|
"Salesforce/xgen-7b-4k-base": 4096, |
|
"Salesforce/xgen-7b-8k-base": 8192, |
|
"Salesforce/xgen-7b-4k-inst": 4096, |
|
"Salesforce/xgen-7b-8k-inst": 8192 |
|
} |
|
|
|
|
|
def tiktoken_tokenizer(base="gpt2", pad_token=None, add_special=True): |
|
if not add_special: |
|
return tiktoken.get_encoding(base) |
|
|
|
def include_whitespace(n_min=2, n_max=20): |
|
whitespaces = [" " * n for n in reversed(range(n_min, n_max))] |
|
return whitespaces |
|
|
|
def include_tabs(n_min=2, n_max=20): |
|
tabs = ["\t" * n for n in reversed(range(n_min, n_max))] |
|
return tabs |
|
|
|
def include_fim_tokens(): |
|
fim_tokens = [ |
|
"<fim_prefix>", |
|
"<fim_middle>", |
|
"<fim_suffix>", |
|
"<fim_pad>", |
|
"<filename>", |
|
"<gh_stars>", |
|
"<issue_start>", |
|
"<issue_comment>", |
|
"<issue_closed>", |
|
"<jupyter_start>", |
|
"<jupyter_text>", |
|
"<jupyter_code>", |
|
"<jupyter_output>", |
|
"<empty_output>", |
|
"<commit_before>", |
|
"<commit_msg>", |
|
"<commit_after>", |
|
"<reponame>" |
|
] |
|
return fim_tokens |
|
|
|
def include_additional_tokens(): |
|
tokens = [] |
|
tokens += [f"<dummy_{i}>" for i in range(4)] |
|
tokens.append("<sep>") |
|
tokens.append("<eom>") |
|
tokens += [f"<mask_{i}>" for i in reversed(range(1, 51199-50318+1))] |
|
return tokens |
|
|
|
add_whitespaces = include_whitespace(n_min=2, n_max=32) |
|
add_tabs = include_tabs(n_min=2, n_max=10) |
|
fim_tokens = include_fim_tokens() |
|
additional_tokens = include_additional_tokens() |
|
|
|
tokenizer = tiktoken.get_encoding(base) |
|
|
|
idx = tokenizer.n_vocab |
|
|
|
bpe_ranks = tokenizer._mergeable_ranks |
|
|
|
for wsp in add_whitespaces: |
|
bpe_ranks[bytes(wsp, 'ascii')] = idx |
|
idx += 1 |
|
for t in add_tabs: |
|
bpe_ranks[bytes(t, 'ascii')] = idx |
|
idx += 1 |
|
|
|
special_tokens = dict() |
|
|
|
for sp in fim_tokens: |
|
special_tokens[sp] = idx |
|
idx += 1 |
|
for sp in additional_tokens: |
|
special_tokens[sp] = idx |
|
idx += 1 |
|
|
|
if pad_token and pad_token not in tokenizer._special_tokens and pad_token not in special_tokens: |
|
special_tokens[pad_token] = idx |
|
idx += 1 |
|
|
|
|
|
enc = tiktoken.Encoding( |
|
|
|
|
|
name=base.replace("base", "im"), |
|
pat_str=tokenizer._pat_str, |
|
mergeable_ranks=bpe_ranks, |
|
special_tokens={ |
|
**tokenizer._special_tokens, |
|
**special_tokens |
|
} |
|
) |
|
return enc |
|
|
|
|
|
class XgenTokenizer(PreTrainedTokenizer): |
|
""" |
|
Construct a Xgen tokenizer. Based on byte-level Byte-Pair-Encoding. |
|
Args: |
|
vocab_file (`str`): |
|
Path to the vocabulary file. |
|
""" |
|
max_model_input_sizes = MAX_MODEL_INPUT_SIZES |
|
model_input_names = ["input_ids", "attention_mask"] |
|
|
|
def __init__( |
|
self, |
|
pad_token=None, |
|
eos_token="<|endoftext|>", |
|
add_eos_token=False, |
|
add_special_tokens=True, |
|
**kwargs, |
|
): |
|
pad_token_added = AddedToken(pad_token, lstrip=False, rstrip=False) if isinstance(pad_token, str) else pad_token |
|
eos_token_added = AddedToken(eos_token, lstrip=False, rstrip=False) if isinstance(eos_token, str) else eos_token |
|
self.add_eos_token = add_eos_token |
|
self.encoder = tiktoken_tokenizer(base="gpt2", pad_token=pad_token, add_special=add_special_tokens) |
|
super().__init__( |
|
pad_token=pad_token_added, |
|
eos_token=eos_token_added, |
|
add_eos_token=add_eos_token, |
|
add_special_tokens=add_special_tokens, |
|
**kwargs, |
|
) |
|
|
|
@property |
|
def vocab_size(self): |
|
"""Returns vocab size""" |
|
return self.encoder.n_vocab |
|
|
|
def get_vocab(self): |
|
"""Returns vocab as a dict""" |
|
vocab = {self.encoder.decode_single_token_bytes(i): i for i in range(self.vocab_size)} |
|
return vocab |
|
|
|
def _tokenize(self, text, **kwargs): |
|
"""Returns a tokenized string.""" |
|
return self.encoder.encode(text, allowed_special="all") |
|
|
|
def _convert_token_to_id(self, token): |
|
"""Converts a token (str) in an id using the vocab.""" |
|
if isinstance(token, str): |
|
return self.encoder.encode_single_token(token) |
|
else: |
|
return token |
|
|
|
def _convert_id_to_token(self, index): |
|
"""Converts an index (integer) in a token (str) using the vocab.""" |
|
return self.encoder.decode_single_token_bytes(index).decode("utf-8") |
|
|
|
def _decode(self, token_ids, skip_special_tokens: bool = False, **kwargs): |
|
if not isinstance(token_ids, list): |
|
token_ids = [token_ids] |
|
if skip_special_tokens: |
|
token_ids = [t for t in token_ids if t not in self.all_special_ids] |
|
return self.encoder.decode(token_ids) |
|
|
|
def build_inputs_with_special_tokens(self, token_ids_0, token_ids_1=None) -> List[int]: |
|
"""Build model inputs from a sequence by appending eos_token_id.""" |
|
eos_token_id = [self.eos_token_id] if self.add_eos_token else [] |
|
|
|
output = token_ids_0 + eos_token_id |
|
|
|
if token_ids_1 is not None: |
|
output = output + token_ids_1 + eos_token_id |
|
|
|
return output |
|
|
|
def get_special_tokens_mask( |
|
self, token_ids_0: List[int], token_ids_1: Optional[List[int]] = None, |
|
already_has_special_tokens: bool = False |
|
) -> List[int]: |
|
""" |
|
Retrieve sequence ids from a token list that has no special tokens added. This method is called when adding |
|
special tokens using the tokenizer `prepare_for_model` method. |
|
Args: |
|
token_ids_0 (`List[int]`): |
|
List of IDs. |
|
token_ids_1 (`List[int]`, *optional*): |
|
Optional second list of IDs for sequence pairs. |
|
already_has_special_tokens (`bool`, *optional*, defaults to `False`): |
|
Whether the token list is already formatted with special tokens for the model. |
|
Returns: |
|
`List[int]`: A list of integers in the range [0, 1]: 1 for a special token, 0 for a sequence token. |
|
""" |
|
if already_has_special_tokens: |
|
return super().get_special_tokens_mask( |
|
token_ids_0=token_ids_0, token_ids_1=token_ids_1, already_has_special_tokens=True |
|
) |
|
|
|
eos_token_id = [1] if self.add_eos_token else [] |
|
|
|
if token_ids_1 is None: |
|
return ([0] * len(token_ids_0)) + eos_token_id |
|
return ([0] * len(token_ids_0)) + eos_token_id + ([0] * len(token_ids_1)) + eos_token_id |
|
|
|
def create_token_type_ids_from_sequences( |
|
self, token_ids_0: List[int], token_ids_1: Optional[List[int]] = None |
|
) -> List[int]: |
|
""" |
|
Creates a mask from the two sequences passed to be used in a sequence-pair classification task. An ALBERT |
|
sequence pair mask has the following format: |
|
``` |
|
0 0 0 0 0 0 0 0 0 0 0 1 1 1 1 1 1 1 1 1 |
|
| first sequence | second sequence | |
|
``` |
|
if token_ids_1 is None, only returns the first portion of the mask (0s). |
|
Args: |
|
token_ids_0 (`List[int]`): |
|
List of ids. |
|
token_ids_1 (`List[int]`, *optional*): |
|
Optional second list of IDs for sequence pairs. |
|
Returns: |
|
`List[int]`: List of [token type IDs](../glossary#token-type-ids) according to the given sequence(s). |
|
""" |
|
eos_token_id = [self.eos_token_id] if self.add_eos_token else [] |
|
|
|
output = [0] * len(token_ids_0 + eos_token_id) |
|
|
|
if token_ids_1 is not None: |
|
output += [1] * len(token_ids_1 + eos_token_id) |
|
|
|
return output |
|
|
|
|
|
def save_vocabulary(self, save_directory: str, filename_prefix: Optional[str] = None): |
|
return () |
|
|
|
|
|
def save_pretrained( |
|
self, |
|
save_directory: Union[str, os.PathLike], |
|
legacy_format: Optional[bool] = None, |
|
filename_prefix: Optional[str] = None, |
|
push_to_hub: bool = False, |
|
**kwargs, |
|
) -> Tuple[str]: |
|
""" |
|
Save the full tokenizer state. |
|
|
|
|
|
This method make sure the full tokenizer can then be re-loaded using the |
|
[`~tokenization_utils_base.PreTrainedTokenizer.from_pretrained`] class method.. |
|
|
|
Warning,None This won't save modifications you may have applied to the tokenizer after the instantiation (for |
|
instance, modifying `tokenizer.do_lower_case` after creation). |
|
|
|
Args: |
|
save_directory (`str` or `os.PathLike`): The path to a directory where the tokenizer will be saved. |
|
legacy_format (`bool`, *optional*): |
|
Only applicable for a fast tokenizer. If unset (default), will save the tokenizer in the unified JSON |
|
format as well as in legacy format if it exists, i.e. with tokenizer specific vocabulary and a separate |
|
added_tokens files. |
|
|
|
If `False`, will only save the tokenizer in the unified JSON format. This format is incompatible with |
|
"slow" tokenizers (not powered by the *tokenizers* library), so the tokenizer will not be able to be |
|
loaded in the corresponding "slow" tokenizer. |
|
|
|
If `True`, will save the tokenizer in legacy format. If the "slow" tokenizer doesn't exits, a value |
|
error is raised. |
|
filename_prefix (`str`, *optional*): |
|
A prefix to add to the names of the files saved by the tokenizer. |
|
push_to_hub (`bool`, *optional*, defaults to `False`): |
|
Whether or not to push your model to the Hugging Face model hub after saving it. You can specify the |
|
repository you want to push to with `repo_id` (will default to the name of `save_directory` in your |
|
namespace). |
|
kwargs (`Dict[str, Any]`, *optional*): |
|
Additional key word arguments passed along to the [`~utils.PushToHubMixin.push_to_hub`] method. |
|
|
|
Returns: |
|
A tuple of `str`: The files saved. |
|
""" |
|
use_auth_token = kwargs.pop("use_auth_token", None) |
|
|
|
if use_auth_token is not None: |
|
warnings.warn( |
|
"The `use_auth_token` argument is deprecated and will be removed in v5 of Transformers. Please use `token` instead.", |
|
FutureWarning, |
|
) |
|
if kwargs.get("token", None) is not None: |
|
raise ValueError( |
|
"`token` and `use_auth_token` are both specified. Please set only the argument `token`." |
|
) |
|
kwargs["token"] = use_auth_token |
|
|
|
if os.path.isfile(save_directory): |
|
logger.error(f"Provided path ({save_directory}) should be a directory, not a file") |
|
return |
|
|
|
os.makedirs(save_directory, exist_ok=True) |
|
|
|
if push_to_hub: |
|
commit_message = kwargs.pop("commit_message", None) |
|
repo_id = kwargs.pop("repo_id", save_directory.split(os.path.sep)[-1]) |
|
repo_id = self._create_repo(repo_id, **kwargs) |
|
files_timestamps = self._get_files_timestamps(save_directory) |
|
|
|
special_tokens_map_file = os.path.join( |
|
save_directory, (filename_prefix + "-" if filename_prefix else "") + SPECIAL_TOKENS_MAP_FILE |
|
) |
|
tokenizer_config_file = os.path.join( |
|
save_directory, (filename_prefix + "-" if filename_prefix else "") + TOKENIZER_CONFIG_FILE |
|
) |
|
|
|
tokenizer_config = copy.deepcopy(self.init_kwargs) |
|
|
|
|
|
target_keys = set(self.init_kwargs.keys()) |
|
|
|
target_keys.update(["model_max_length", "clean_up_tokenization_spaces"]) |
|
|
|
for k in target_keys: |
|
if hasattr(self, k) and k != "add_special_tokens": |
|
tokenizer_config[k] = getattr(self, k) |
|
|
|
|
|
tokenizer_config.update(self.special_tokens_map) |
|
|
|
if self.chat_template is not None: |
|
if isinstance(self.chat_template, dict): |
|
|
|
|
|
tokenizer_config["chat_template"] = [{"name": k, "template": v} for k, v in self.chat_template.items()] |
|
else: |
|
tokenizer_config["chat_template"] = self.chat_template |
|
|
|
if len(self.init_inputs) > 0: |
|
tokenizer_config["init_inputs"] = copy.deepcopy(self.init_inputs) |
|
for file_id in self.vocab_files_names.keys(): |
|
tokenizer_config.pop(file_id, None) |
|
|
|
|
|
tokenizer_config = self.convert_added_tokens(tokenizer_config, add_type_field=True, save=True) |
|
|
|
|
|
added_tokens = {} |
|
for key, value in self.added_tokens_decoder.items(): |
|
added_tokens[key] = value.__getstate__() |
|
tokenizer_config["added_tokens_decoder"] = added_tokens |
|
|
|
|
|
tokenizer_class = self.__class__.__name__ |
|
|
|
if tokenizer_class.endswith("Fast") and tokenizer_class != "PreTrainedTokenizerFast": |
|
tokenizer_class = tokenizer_class[:-4] |
|
tokenizer_config["tokenizer_class"] = tokenizer_class |
|
if getattr(self, "_auto_map", None) is not None: |
|
tokenizer_config["auto_map"] = self._auto_map |
|
if getattr(self, "_processor_class", None) is not None: |
|
tokenizer_config["processor_class"] = self._processor_class |
|
|
|
|
|
|
|
if self._auto_class is not None: |
|
custom_object_save(self, save_directory, config=tokenizer_config) |
|
|
|
|
|
if "name_or_path" in tokenizer_config: |
|
tokenizer_config.pop("name_or_path") |
|
tokenizer_config.pop("special_tokens_map_file", None) |
|
tokenizer_config.pop("tokenizer_file", None) |
|
|
|
with open(tokenizer_config_file, "w", encoding="utf-8") as f: |
|
out_str = json.dumps(tokenizer_config, indent=2, sort_keys=True, ensure_ascii=False) + "\n" |
|
f.write(out_str) |
|
logger.info(f"tokenizer config file saved in {tokenizer_config_file}") |
|
|
|
|
|
|
|
|
|
write_dict = self.convert_added_tokens(self.special_tokens_map_extended, save=True, add_type_field=False) |
|
with open(special_tokens_map_file, "w", encoding="utf-8") as f: |
|
out_str = json.dumps(write_dict, indent=2, sort_keys=True, ensure_ascii=False) + "\n" |
|
f.write(out_str) |
|
logger.info(f"Special tokens file saved in {special_tokens_map_file}") |
|
|
|
file_names = (tokenizer_config_file, special_tokens_map_file) |
|
|
|
save_files = self._save_pretrained( |
|
save_directory=save_directory, |
|
file_names=file_names, |
|
legacy_format=legacy_format, |
|
filename_prefix=filename_prefix, |
|
) |
|
|
|
if push_to_hub: |
|
self._upload_modified_files( |
|
save_directory, |
|
repo_id, |
|
files_timestamps, |
|
commit_message=commit_message, |
|
token=kwargs.get("token"), |
|
) |
|
|
|
return save_files |
|
|