Spaces:
Running
Running
# -*- coding: utf-8 -*- | |
""" | |
@author:XuMing([email protected]) | |
@description: refer https://github.com/ThilinaRajapakse/simpletransformers | |
""" | |
import math | |
import os | |
import random | |
import warnings | |
from dataclasses import asdict | |
from multiprocessing import Pool | |
import numpy as np | |
import pandas as pd | |
import torch | |
from loguru import logger | |
from torch.utils.data import DataLoader, RandomSampler, SequentialSampler | |
from torch.utils.tensorboard import SummaryWriter | |
from tqdm.auto import tqdm, trange | |
from transformers import ByT5Tokenizer | |
from transformers import MT5Config, MT5ForConditionalGeneration | |
from transformers import T5Config, T5ForConditionalGeneration, T5Tokenizer, TextStreamer | |
from transformers.optimization import AdamW, Adafactor | |
from transformers.optimization import ( | |
get_constant_schedule, | |
get_constant_schedule_with_warmup, | |
get_linear_schedule_with_warmup, | |
get_cosine_schedule_with_warmup, | |
get_cosine_with_hard_restarts_schedule_with_warmup, | |
get_polynomial_decay_schedule_with_warmup, | |
) | |
from t5.config.model_args import T5Args | |
from t5.t5_utils import T5Dataset, load_hf_dataset | |
try: | |
import wandb | |
wandb_available = True | |
except ImportError: | |
wandb_available = False | |
has_cuda = torch.cuda.is_available() | |
os.environ["TOKENIZERS_PARALLELISM"] = "FALSE" | |
os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE" | |
def chunks(lst, n): | |
"""Yield successive n-sized chunks from lst.""" | |
for i in range(0, len(lst), n): | |
yield lst[i: i + n] | |
MODEL_CLASSES = { | |
"t5": (T5Config, T5ForConditionalGeneration), | |
"mt5": (MT5Config, MT5ForConditionalGeneration), | |
"byt5": (T5Config, T5ForConditionalGeneration), | |
} | |
class T5Model: | |
def __init__( | |
self, | |
model_type, | |
model_name, | |
args=None, | |
tokenizer=None, | |
use_cuda=has_cuda, | |
cuda_device=-1, | |
evaluate=False, | |
**kwargs, | |
): | |
""" | |
Initializes a T5Model model. | |
Args: | |
model_type: The type of model (t5, mt5, byt5) | |
model_name: The exact architecture and trained weights to use. This may be a Hugging Face Transformers compatible pre-trained model, a community model, or the path to a directory containing model files. | |
args (optional): Default args will be used if this parameter is not provided. If provided, it should be a dict containing the args that should be changed in the default args. | |
use_cuda (optional): Use GPU if available. Setting to False will force model to use CPU only. | |
cuda_device (optional): Specific GPU that should be used. Will use the first available GPU by default. | |
**kwargs (optional): For providing proxies, force_download, resume_download, cache_dir and other options specific to the 'from_pretrained' implementation where this will be supplied. | |
""" # noqa: ignore flake8" | |
self.args = self._load_model_args(model_name) | |
if isinstance(args, dict): | |
self.args.update_from_dict(args) | |
elif isinstance(args, T5Args): | |
self.args = args | |
self.is_sweeping = False | |
if self.args.manual_seed: | |
random.seed(self.args.manual_seed) | |
np.random.seed(self.args.manual_seed) | |
torch.manual_seed(self.args.manual_seed) | |
if self.args.n_gpu > 0: | |
torch.cuda.manual_seed_all(self.args.manual_seed) | |
if use_cuda: | |
if torch.cuda.is_available(): | |
if cuda_device == -1: | |
self.device = torch.device("cuda") | |
else: | |
self.device = torch.device(f"cuda:{cuda_device}") | |
else: | |
raise ValueError( | |
"'use_cuda' set to True when cuda is unavailable." | |
"Make sure CUDA is available or set `use_cuda=False`." | |
) | |
else: | |
if torch.backends.mps.is_available(): | |
self.device = torch.device("mps") | |
else: | |
self.device = "cpu" | |
logger.debug(f"Device: {self.device}") | |
self.results = {} | |
config_class, model_class = MODEL_CLASSES[model_type] | |
if model_name is None: | |
self.config = self.args.config | |
self.model = model_class(config=self.config) | |
else: | |
self.config = config_class.from_pretrained(model_name, **self.args.config) | |
self.model = model_class.from_pretrained(model_name, config=self.config) | |
if isinstance(tokenizer, T5Tokenizer): | |
self.tokenizer = tokenizer | |
self.model.resize_token_embeddings(len(self.tokenizer)) | |
elif model_type == "byt5": | |
self.tokenizer = ByT5Tokenizer.from_pretrained(model_name, truncate=True) | |
else: | |
self.tokenizer = T5Tokenizer.from_pretrained(model_name, truncate=True) | |
print(len(self.tokenizer)) | |
if not evaluate: | |
with open('./data/字音混淆集_s13.txt', 'r', encoding='utf-8') as confusion: | |
n = 0 | |
for line in confusion.readlines()+[str(chr(c+65248)) for c in range(33, 127)]: | |
token = line.split(' ')[0] | |
n+=1 | |
self.tokenizer.add_tokens([token]) | |
with open('./data/字音混淆集.txt', 'r', encoding='utf-8') as confusion: | |
for line in confusion.readlines(): | |
token = line.split(' ')[0] | |
n+=1 | |
self.tokenizer.add_tokens([token]) | |
with open('./data/wordtest4.txt', 'r', encoding='utf-8') as confusion: | |
for line in confusion.readlines(): | |
token = line.split(',')[0] | |
n+=1 | |
self.tokenizer.add_tokens([token]) | |
with open('./data/vocab.txt', 'r', encoding='utf-8') as confusion: | |
for line in confusion.readlines(): | |
n+=1 | |
self.tokenizer.add_tokens([line.replace('\n', '')]) | |
print(n) | |
self.streamer = TextStreamer(self.tokenizer) | |
print(len(self.tokenizer)) | |
self.model.resize_token_embeddings(len(self.tokenizer)) | |
if self.args.dynamic_quantize: | |
self.model = torch.quantization.quantize_dynamic( | |
self.model, {torch.nn.Linear}, dtype=torch.qint8 | |
) | |
if not use_cuda: | |
self.args.fp16 = False | |
if self.args.special_tokens_list: | |
self.tokenizer.add_tokens( | |
self.args.special_tokens_list, special_tokens=True | |
) | |
self.model.resize_token_embeddings(len(self.tokenizer)) | |
self.args.model_type = model_type | |
if model_name is None: | |
self.args.model_name = "T5_from_scratch" | |
else: | |
self.args.model_name = model_name | |
if self.args.wandb_project and not wandb_available: | |
warnings.warn( | |
"wandb_project specified but wandb is not available. Wandb disabled." | |
) | |
self.args.wandb_project = None | |
def train_model( | |
self, | |
train_data, | |
output_dir=None, | |
show_running_loss=True, | |
args=None, | |
eval_data=None, | |
verbose=True, | |
**kwargs, | |
): | |
""" | |
Trains the model using 'train_data' | |
Args: | |
train_data: Pandas DataFrame containing the 3 columns - `prefix`, `input_text`, `target_text`. | |
- `prefix`: A string indicating the task to perform. (E.g. `"question"`, `"stsb"`) | |
- `input_text`: The input text sequence. `prefix` is automatically prepended to form the full input. (<prefix>: <input_text>) | |
- `target_text`: The target sequence | |
output_dir: The directory where model files will be saved. If not given, self.args.output_dir will be used. | |
show_running_loss (optional): Set to False to prevent running loss from being printed to console. Defaults to True. | |
args (optional): Optional changes to the args dict of the model. Any changes made will persist for the model. | |
eval_data (optional): A DataFrame against which evaluation will be performed when evaluate_during_training is enabled. Is required if evaluate_during_training is enabled. | |
**kwargs: Additional metrics that should be used. Pass in the metrics as keyword arguments (name of metric: function to use). | |
A metric function should take in two parameters. The first parameter will be the true labels, and the second parameter will be the predictions. Both inputs | |
will be lists of strings. Note that this will slow down training significantly as the predicted sequences need to be generated. | |
Returns: | |
global_step: Number of global steps trained | |
training_details: Average training loss if evaluate_during_training is False or full training progress scores if evaluate_during_training is True | |
""" # noqa: ignore flake8" | |
if args: | |
self.args.update_from_dict(args) | |
if self.args.evaluate_during_training and eval_data is None: | |
raise ValueError( | |
"evaluate_during_training is enabled but eval_data is not specified." | |
" Pass eval_data to model.train_model() if using evaluate_during_training." | |
) | |
if not output_dir: | |
output_dir = self.args.output_dir | |
if ( | |
os.path.exists(output_dir) | |
and os.listdir(output_dir) | |
and not self.args.overwrite_output_dir | |
): | |
raise ValueError( | |
"Output directory ({}) already exists and is not empty." | |
" Set args.overwrite_output_dir = True to overcome.".format(output_dir) | |
) | |
self._move_model_to_device() | |
train_dataset = self.load_and_cache_examples(train_data, verbose=verbose) | |
os.makedirs(output_dir, exist_ok=True) | |
global_step, training_details = self.train( | |
train_dataset, | |
output_dir, | |
show_running_loss=show_running_loss, | |
eval_data=eval_data, | |
verbose=verbose, | |
**kwargs, | |
) | |
self.save_model(model=self.model) | |
if verbose: | |
logger.info( | |
" Training of {} model complete. Saved to {}.".format( | |
self.args.model_name, output_dir | |
) | |
) | |
return global_step, training_details | |
def train( | |
self, | |
train_dataset, | |
output_dir, | |
show_running_loss=True, | |
eval_data=None, | |
verbose=True, | |
**kwargs, | |
): | |
""" | |
Trains the model on train_dataset. | |
Utility function to be used by the train_model() method. Not intended to be used directly. | |
""" | |
model = self.model | |
args = self.args | |
device = self.device | |
tb_writer = SummaryWriter(log_dir=args.tensorboard_dir) | |
train_sampler = RandomSampler(train_dataset) | |
train_dataloader = DataLoader( | |
train_dataset, | |
sampler=train_sampler, | |
batch_size=args.train_batch_size, | |
num_workers=self.args.dataloader_num_workers, | |
) | |
if args.max_steps > 0: | |
t_total = args.max_steps | |
args.num_train_epochs = ( | |
args.max_steps | |
// (len(train_dataloader) // args.gradient_accumulation_steps) | |
+ 1 | |
) | |
else: | |
t_total = ( | |
len(train_dataloader) | |
// args.gradient_accumulation_steps | |
* args.num_train_epochs | |
) | |
no_decay = ["bias", "LayerNorm.weight"] | |
optimizer_grouped_parameters = [] | |
custom_parameter_names = set() | |
for group in self.args.custom_parameter_groups: | |
params = group.pop("params") | |
custom_parameter_names.update(params) | |
param_group = {**group} | |
param_group["params"] = [ | |
p for n, p in model.named_parameters() if n in params | |
] | |
optimizer_grouped_parameters.append(param_group) | |
for group in self.args.custom_layer_parameters: | |
layer_number = group.pop("layer") | |
layer = f"layer.{layer_number}." | |
group_d = {**group} | |
group_nd = {**group} | |
group_nd["weight_decay"] = 0.0 | |
params_d = [] | |
params_nd = [] | |
for n, p in model.named_parameters(): | |
if n not in custom_parameter_names and layer in n: | |
if any(nd in n for nd in no_decay): | |
params_nd.append(p) | |
else: | |
params_d.append(p) | |
custom_parameter_names.add(n) | |
group_d["params"] = params_d | |
group_nd["params"] = params_nd | |
optimizer_grouped_parameters.append(group_d) | |
optimizer_grouped_parameters.append(group_nd) | |
if not self.args.train_custom_parameters_only: | |
optimizer_grouped_parameters.extend( | |
[ | |
{ | |
"params": [ | |
p | |
for n, p in model.named_parameters() | |
if n not in custom_parameter_names | |
and not any(nd in n for nd in no_decay) | |
], | |
"weight_decay": args.weight_decay, | |
}, | |
{ | |
"params": [ | |
p | |
for n, p in model.named_parameters() | |
if n not in custom_parameter_names | |
and any(nd in n for nd in no_decay) | |
], | |
"weight_decay": 0.0, | |
}, | |
] | |
) | |
warmup_steps = math.ceil(t_total * args.warmup_ratio) | |
args.warmup_steps = ( | |
warmup_steps if args.warmup_steps == 0 else args.warmup_steps | |
) | |
if args.optimizer == "AdamW": | |
optimizer = AdamW( | |
optimizer_grouped_parameters, | |
lr=args.learning_rate, | |
eps=args.adam_epsilon, | |
) | |
elif args.optimizer == "Adafactor": | |
optimizer = Adafactor( | |
optimizer_grouped_parameters, | |
lr=args.learning_rate, | |
eps=args.adafactor_eps, | |
clip_threshold=args.adafactor_clip_threshold, | |
decay_rate=args.adafactor_decay_rate, | |
beta1=args.adafactor_beta1, | |
weight_decay=args.weight_decay, | |
scale_parameter=args.adafactor_scale_parameter, | |
relative_step=args.adafactor_relative_step, | |
warmup_init=args.adafactor_warmup_init, | |
) | |
else: | |
raise ValueError( | |
"{} is not a valid optimizer class. Please use one of ('AdamW', 'Adafactor') instead.".format( | |
args.optimizer | |
) | |
) | |
if args.scheduler == "constant_schedule": | |
scheduler = get_constant_schedule(optimizer) | |
elif args.scheduler == "constant_schedule_with_warmup": | |
scheduler = get_constant_schedule_with_warmup( | |
optimizer, num_warmup_steps=args.warmup_steps | |
) | |
elif args.scheduler == "linear_schedule_with_warmup": | |
scheduler = get_linear_schedule_with_warmup( | |
optimizer, | |
num_warmup_steps=args.warmup_steps, | |
num_training_steps=t_total, | |
) | |
elif args.scheduler == "cosine_schedule_with_warmup": | |
scheduler = get_cosine_schedule_with_warmup( | |
optimizer, | |
num_warmup_steps=args.warmup_steps, | |
num_training_steps=t_total, | |
num_cycles=args.cosine_schedule_num_cycles, | |
) | |
elif args.scheduler == "cosine_with_hard_restarts_schedule_with_warmup": | |
scheduler = get_cosine_with_hard_restarts_schedule_with_warmup( | |
optimizer, | |
num_warmup_steps=args.warmup_steps, | |
num_training_steps=t_total, | |
num_cycles=args.cosine_schedule_num_cycles, | |
) | |
elif args.scheduler == "polynomial_decay_schedule_with_warmup": | |
scheduler = get_polynomial_decay_schedule_with_warmup( | |
optimizer, | |
num_warmup_steps=args.warmup_steps, | |
num_training_steps=t_total, | |
lr_end=args.polynomial_decay_schedule_lr_end, | |
power=args.polynomial_decay_schedule_power, | |
) | |
else: | |
raise ValueError("{} is not a valid scheduler.".format(args.scheduler)) | |
if ( | |
args.model_name | |
and os.path.isfile(os.path.join(args.model_name, "optimizer.pt")) | |
and os.path.isfile(os.path.join(args.model_name, "scheduler.pt")) | |
): | |
# Load in optimizer and scheduler states | |
optimizer.load_state_dict( | |
torch.load(os.path.join(args.model_name, "optimizer.pt")) | |
) | |
scheduler.load_state_dict( | |
torch.load(os.path.join(args.model_name, "scheduler.pt")) | |
) | |
if args.n_gpu > 1: | |
model = torch.nn.DataParallel(model) | |
logger.info(" Training started") | |
global_step = 0 | |
training_progress_scores = None | |
tr_loss, logging_loss = 0.0, 0.0 | |
model.zero_grad() | |
train_iterator = trange( | |
int(args.num_train_epochs), desc="Epoch", disable=args.silent, mininterval=0 | |
) | |
epoch_number = 0 | |
best_eval_metric = None | |
early_stopping_counter = 0 | |
steps_trained_in_current_epoch = 0 | |
epochs_trained = 0 | |
if args.model_name and os.path.exists(args.model_name): | |
try: | |
# set global_step to gobal_step of last saved checkpoint from model path | |
checkpoint_suffix = args.model_name.split("/")[-1].split("-") | |
if len(checkpoint_suffix) > 2: | |
checkpoint_suffix = checkpoint_suffix[1] | |
else: | |
checkpoint_suffix = checkpoint_suffix[-1] | |
global_step = int(checkpoint_suffix) | |
epochs_trained = global_step // ( | |
len(train_dataloader) // args.gradient_accumulation_steps | |
) | |
steps_trained_in_current_epoch = global_step % ( | |
len(train_dataloader) // args.gradient_accumulation_steps | |
) | |
logger.info( | |
" Continuing training from checkpoint, will skip to saved global_step" | |
) | |
logger.info(" Continuing training from epoch %d", epochs_trained) | |
logger.info(" Continuing training from global step %d", global_step) | |
logger.info( | |
" Will skip the first %d steps in the current epoch", | |
steps_trained_in_current_epoch, | |
) | |
except ValueError: | |
logger.info(" Starting fine-tuning.") | |
if args.evaluate_during_training: | |
training_progress_scores = self._create_training_progress_scores(**kwargs) | |
if args.wandb_project: | |
wandb.init( | |
project=args.wandb_project, | |
config={**asdict(args)}, | |
**args.wandb_kwargs, | |
) | |
wandb.run._label(repo="textgen") | |
wandb.watch(self.model) | |
self.wandb_run_id = wandb.run.id | |
if args.fp16: | |
from torch.cuda import amp | |
scaler = amp.GradScaler() | |
for current_epoch in train_iterator: | |
model.train() | |
if epochs_trained > 0: | |
epochs_trained -= 1 | |
continue | |
train_iterator.set_description( | |
f"Epoch {epoch_number + 1} of {args.num_train_epochs}" | |
) | |
batch_iterator = tqdm( | |
train_dataloader, | |
desc=f"Running Epoch {epoch_number} of {args.num_train_epochs}", | |
disable=args.silent, | |
mininterval=0, | |
) | |
for step, batch in enumerate(batch_iterator): | |
if steps_trained_in_current_epoch > 0: | |
steps_trained_in_current_epoch -= 1 | |
continue | |
inputs = self._get_inputs_dict(batch) | |
if args.fp16: | |
with amp.autocast(): | |
outputs = model(**inputs) | |
# model outputs are always tuple in pytorch-transformers (see doc) | |
loss = outputs[0] | |
else: | |
outputs = model(**inputs) | |
# model outputs are always tuple in pytorch-transformers (see doc) | |
loss = outputs[0] | |
if args.n_gpu > 1: | |
loss = ( | |
loss.mean() | |
) # mean() to average on multi-gpu parallel training | |
current_loss = loss.item() | |
if show_running_loss: | |
batch_iterator.set_description( | |
f"Epochs {epoch_number}/{args.num_train_epochs}. Running Loss: {current_loss:9.4f}" | |
) | |
if args.gradient_accumulation_steps > 1: | |
loss = loss / args.gradient_accumulation_steps | |
if args.fp16: | |
scaler.scale(loss).backward() | |
else: | |
loss.backward() | |
tr_loss += loss.item() | |
if (step + 1) % args.gradient_accumulation_steps == 0: | |
if args.fp16: | |
scaler.unscale_(optimizer) | |
if args.optimizer == "AdamW": | |
torch.nn.utils.clip_grad_norm_( | |
model.parameters(), args.max_grad_norm | |
) | |
if args.fp16: | |
scaler.step(optimizer) | |
scaler.update() | |
else: | |
optimizer.step() | |
scheduler.step() # Update learning rate schedule | |
model.zero_grad() | |
global_step += 1 | |
if args.logging_steps > 0 and global_step % args.logging_steps == 0: | |
# Log metrics | |
tb_writer.add_scalar( | |
"lr", scheduler.get_last_lr()[0], global_step | |
) | |
tb_writer.add_scalar( | |
"loss", | |
(tr_loss - logging_loss) / args.logging_steps, | |
global_step, | |
) | |
logging_loss = tr_loss | |
if args.wandb_project or self.is_sweeping: | |
wandb.log( | |
{ | |
"Training loss": current_loss, | |
"lr": scheduler.get_last_lr()[0], | |
"global_step": global_step, | |
} | |
) | |
if args.save_steps > 0 and global_step % args.save_steps == 0: | |
# Save model checkpoint | |
output_dir_current = os.path.join( | |
output_dir, "checkpoint-{}".format(global_step) | |
) | |
self.save_model( | |
output_dir_current, optimizer, scheduler, model=model | |
) | |
if args.evaluate_during_training and ( | |
args.evaluate_during_training_steps > 0 | |
and global_step % args.evaluate_during_training_steps == 0 | |
): | |
# Only evaluate when single GPU otherwise metrics may not average well | |
results = self.eval_model( | |
eval_data, | |
verbose=verbose and args.evaluate_during_training_verbose, | |
silent=args.evaluate_during_training_silent, | |
**kwargs, | |
) | |
for key, value in results.items(): | |
try: | |
tb_writer.add_scalar( | |
"eval_{}".format(key), value, global_step | |
) | |
except (NotImplementedError, AssertionError): | |
pass | |
output_dir_current = os.path.join( | |
output_dir, "checkpoint-{}".format(global_step) | |
) | |
if args.save_eval_checkpoints: | |
self.save_model( | |
output_dir_current, | |
optimizer, | |
scheduler, | |
model=model, | |
results=results, | |
) | |
training_progress_scores["global_step"].append(global_step) | |
training_progress_scores["train_loss"].append(current_loss) | |
for key in results: | |
training_progress_scores[key].append(results[key]) | |
report = pd.DataFrame(training_progress_scores) | |
report.to_csv( | |
os.path.join( | |
args.output_dir, "training_progress_scores.csv" | |
), | |
index=False, | |
) | |
if args.wandb_project or self.is_sweeping: | |
wandb.log(self._get_last_metrics(training_progress_scores)) | |
if not best_eval_metric: | |
best_eval_metric = results[args.early_stopping_metric] | |
self.save_model( | |
args.best_model_dir, | |
optimizer, | |
scheduler, | |
model=model, | |
results=results, | |
) | |
if best_eval_metric and args.early_stopping_metric_minimize: | |
if ( | |
results[args.early_stopping_metric] - best_eval_metric | |
< args.early_stopping_delta | |
): | |
best_eval_metric = results[args.early_stopping_metric] | |
self.save_model( | |
args.best_model_dir, | |
optimizer, | |
scheduler, | |
model=model, | |
results=results, | |
) | |
early_stopping_counter = 0 | |
else: | |
if args.use_early_stopping: | |
if ( | |
early_stopping_counter | |
< args.early_stopping_patience | |
): | |
early_stopping_counter += 1 | |
if verbose: | |
logger.info( | |
f" No improvement in {args.early_stopping_metric}" | |
) | |
logger.info( | |
f" Current step: {early_stopping_counter}" | |
) | |
logger.info( | |
f" Early stopping patience: {args.early_stopping_patience}" | |
) | |
else: | |
if verbose: | |
logger.info( | |
f" Patience of {args.early_stopping_patience} steps reached" | |
) | |
logger.info(" Training terminated.") | |
train_iterator.close() | |
return ( | |
global_step, | |
tr_loss / global_step | |
if not self.args.evaluate_during_training | |
else training_progress_scores, | |
) | |
else: | |
if ( | |
results[args.early_stopping_metric] - best_eval_metric | |
> args.early_stopping_delta | |
): | |
best_eval_metric = results[args.early_stopping_metric] | |
self.save_model( | |
args.best_model_dir, | |
optimizer, | |
scheduler, | |
model=model, | |
results=results, | |
) | |
early_stopping_counter = 0 | |
else: | |
if args.use_early_stopping: | |
if ( | |
early_stopping_counter | |
< args.early_stopping_patience | |
): | |
early_stopping_counter += 1 | |
if verbose: | |
logger.info( | |
f" No improvement in {args.early_stopping_metric}" | |
) | |
logger.info( | |
f" Current step: {early_stopping_counter}" | |
) | |
logger.info( | |
f" Early stopping patience: {args.early_stopping_patience}" | |
) | |
else: | |
if verbose: | |
logger.info( | |
f" Patience of {args.early_stopping_patience} steps reached" | |
) | |
logger.info(" Training terminated.") | |
train_iterator.close() | |
return ( | |
global_step, | |
tr_loss / global_step | |
if not self.args.evaluate_during_training | |
else training_progress_scores, | |
) | |
model.train() | |
epoch_number += 1 | |
output_dir_current = os.path.join( | |
output_dir, "checkpoint-{}-epoch-{}".format(global_step, epoch_number) | |
) | |
if args.save_model_every_epoch: | |
self.save_model(output_dir_current, optimizer, scheduler, model=model) | |
if args.evaluate_during_training and args.evaluate_each_epoch: | |
results = self.eval_model( | |
eval_data, | |
verbose=verbose and args.evaluate_during_training_verbose, | |
silent=args.evaluate_during_training_silent, | |
**kwargs, | |
) | |
if args.save_eval_checkpoints: | |
self.save_model( | |
output_dir_current, optimizer, scheduler, results=results | |
) | |
training_progress_scores["global_step"].append(global_step) | |
training_progress_scores["train_loss"].append(current_loss) | |
for key in results: | |
training_progress_scores[key].append(results[key]) | |
report = pd.DataFrame(training_progress_scores) | |
report.to_csv( | |
os.path.join(args.output_dir, "training_progress_scores.csv"), | |
index=False, | |
) | |
if args.wandb_project or self.is_sweeping: | |
wandb.log(self._get_last_metrics(training_progress_scores)) | |
if not best_eval_metric: | |
best_eval_metric = results[args.early_stopping_metric] | |
self.save_model( | |
args.best_model_dir, | |
optimizer, | |
scheduler, | |
model=model, | |
results=results, | |
) | |
if best_eval_metric and args.early_stopping_metric_minimize: | |
if ( | |
results[args.early_stopping_metric] - best_eval_metric | |
< args.early_stopping_delta | |
): | |
best_eval_metric = results[args.early_stopping_metric] | |
self.save_model( | |
args.best_model_dir, | |
optimizer, | |
scheduler, | |
model=model, | |
results=results, | |
) | |
early_stopping_counter = 0 | |
else: | |
if ( | |
args.use_early_stopping | |
and args.early_stopping_consider_epochs | |
): | |
if early_stopping_counter < args.early_stopping_patience: | |
early_stopping_counter += 1 | |
if verbose: | |
logger.info( | |
f" No improvement in {args.early_stopping_metric}" | |
) | |
logger.info( | |
f" Current step: {early_stopping_counter}" | |
) | |
logger.info( | |
f" Early stopping patience: {args.early_stopping_patience}" | |
) | |
else: | |
if verbose: | |
logger.info( | |
f" Patience of {args.early_stopping_patience} steps reached" | |
) | |
logger.info(" Training terminated.") | |
train_iterator.close() | |
return ( | |
global_step, | |
tr_loss / global_step | |
if not self.args.evaluate_during_training | |
else training_progress_scores, | |
) | |
else: | |
if ( | |
results[args.early_stopping_metric] - best_eval_metric | |
> args.early_stopping_delta | |
): | |
best_eval_metric = results[args.early_stopping_metric] | |
self.save_model( | |
args.best_model_dir, | |
optimizer, | |
scheduler, | |
model=model, | |
results=results, | |
) | |
early_stopping_counter = 0 | |
else: | |
if ( | |
args.use_early_stopping | |
and args.early_stopping_consider_epochs | |
): | |
if early_stopping_counter < args.early_stopping_patience: | |
early_stopping_counter += 1 | |
if verbose: | |
logger.info( | |
f" No improvement in {args.early_stopping_metric}" | |
) | |
logger.info( | |
f" Current step: {early_stopping_counter}" | |
) | |
logger.info( | |
f" Early stopping patience: {args.early_stopping_patience}" | |
) | |
else: | |
if verbose: | |
logger.info( | |
f" Patience of {args.early_stopping_patience} steps reached" | |
) | |
logger.info(" Training terminated.") | |
train_iterator.close() | |
return ( | |
global_step, | |
tr_loss / global_step | |
if not self.args.evaluate_during_training | |
else training_progress_scores, | |
) | |
return ( | |
global_step, | |
tr_loss / global_step | |
if not self.args.evaluate_during_training | |
else training_progress_scores, | |
) | |
def eval_model( | |
self, eval_data, output_dir=None, verbose=True, silent=False, **kwargs | |
): | |
""" | |
Evaluates the model on eval_data. Saves results to output_dir. | |
Args: | |
eval_data: Pandas DataFrame containing the 3 columns - `prefix`, `input_text`, `target_text`. | |
- `prefix`: A string indicating the task to perform. (E.g. `"question"`, `"stsb"`) | |
- `input_text`: The input text sequence. `prefix` is automatically prepended to form the full input. (<prefix>: <input_text>) | |
- `target_text`: The target sequence | |
output_dir: The directory where model files will be saved. If not given, self.args.output_dir will be used. | |
verbose: If verbose, results will be printed to the console on completion of evaluation. | |
silent: If silent, tqdm progress bars will be hidden. | |
**kwargs: Additional metrics that should be used. Pass in the metrics as keyword arguments (name of metric: function to use). | |
A metric function should take in two parameters. The first parameter will be the true labels, and the second parameter will be the predictions. Both inputs | |
will be lists of strings. Note that this will slow down evaluation significantly as the predicted sequences need to be generated. | |
Returns: | |
results: Dictionary containing evaluation results. | |
""" # noqa: ignore flake8" | |
if not output_dir: | |
output_dir = self.args.output_dir | |
self._move_model_to_device() | |
eval_dataset = self.load_and_cache_examples( | |
eval_data, evaluate=True, verbose=verbose, silent=silent | |
) | |
os.makedirs(output_dir, exist_ok=True) | |
result = self.evaluate( | |
eval_dataset, output_dir, verbose=verbose, silent=silent, **kwargs | |
) | |
self.results.update(result) | |
if self.args.evaluate_generated_text: | |
if self.args.preprocess_inputs: | |
to_predict = [ | |
input_text | |
for prefix, input_text in zip( | |
eval_data["prefix"], eval_data["input_text"] | |
) | |
] | |
else: | |
to_predict = [ | |
prefix + input_text | |
for prefix, input_text in zip( | |
eval_data["prefix"], eval_data["input_text"] | |
) | |
] | |
preds = self.predict(to_predict[:self.args.eval_batch_size*3]) | |
result = self.compute_metrics( | |
eval_data["target_text"].tolist()[:self.args.eval_batch_size*3], preds, **kwargs | |
) | |
self.results.update(result) | |
if verbose: | |
logger.info(self.results) | |
return self.results | |
def evaluate(self, eval_dataset, output_dir, verbose=True, silent=False, **kwargs): | |
""" | |
Evaluates the model on eval_dataset. | |
Utility function to be used by the eval_model() method. Not intended to be used directly. | |
""" | |
model = self.model | |
args = self.args | |
eval_output_dir = output_dir | |
device = self.device | |
results = {} | |
eval_sampler = SequentialSampler(eval_dataset) | |
eval_dataloader = DataLoader( | |
eval_dataset, sampler=eval_sampler, batch_size=args.eval_batch_size | |
) | |
if args.n_gpu > 1: | |
model = torch.nn.DataParallel(model) | |
eval_loss = 0.0 | |
nb_eval_steps = 0 | |
model.eval() | |
if self.args.fp16: | |
from torch.cuda import amp | |
for batch in tqdm( | |
eval_dataloader, disable=args.silent or silent, desc="Running Evaluation" | |
): | |
inputs = self._get_inputs_dict(batch) | |
with torch.no_grad(): | |
if self.args.fp16: | |
with amp.autocast(): | |
outputs = model(**inputs) | |
loss = outputs[0] | |
else: | |
outputs = model(**inputs) | |
loss = outputs[0] | |
if self.args.n_gpu > 1: | |
loss = loss.mean() | |
eval_loss += loss.item() | |
nb_eval_steps += 1 | |
eval_loss = eval_loss / nb_eval_steps | |
results["eval_loss"] = eval_loss | |
output_eval_file = os.path.join(eval_output_dir, "eval_results.txt") | |
with open(output_eval_file, "w") as writer: | |
for key in sorted(results.keys()): | |
writer.write("{} = {}\n".format(key, str(results[key]))) | |
return results | |
def predict(self, to_predict, split_on_space=False): | |
""" | |
Performs predictions on a list of text. | |
Args: | |
to_predict: A python list of text (str) to be sent to the model for prediction. Note that the prefix should be prepended to the text. | |
split_on_space (optional): If True, input is english string, if False, input is chinese string. | |
Returns: | |
preds: A python list of the generated sequences. | |
""" # noqa: ignore flake8" | |
self._move_model_to_device() | |
all_outputs = [] | |
# Batching | |
for batch in tqdm( | |
[ | |
to_predict[i: i + self.args.eval_batch_size] | |
for i in range(0, len(to_predict), self.args.eval_batch_size) | |
], | |
desc="Generating outputs", | |
disable=self.args.silent, | |
): | |
input_batch = self.tokenizer.prepare_seq2seq_batch( | |
src_texts=batch, | |
max_length=self.args.max_seq_length, | |
padding="max_length", | |
return_tensors="pt", | |
truncation=True, | |
) | |
input_ids = input_batch["input_ids"] | |
attention_mask = input_batch["attention_mask"] | |
input_ids = input_ids.to(self.device) | |
attention_mask = attention_mask.to(self.device) | |
outputs = self.model.generate( | |
input_ids=input_ids, | |
attention_mask=attention_mask, | |
num_beams=self.args.num_beams, | |
max_length=self.args.max_length, | |
length_penalty=self.args.length_penalty, | |
early_stopping=self.args.early_stopping, | |
repetition_penalty=self.args.repetition_penalty, | |
do_sample=self.args.do_sample, | |
top_k=self.args.top_k, | |
top_p=self.args.top_p, | |
num_return_sequences=self.args.num_return_sequences, | |
#streamer=self.streamer, | |
) | |
all_outputs.extend(outputs.cpu().numpy()) | |
if self.args.use_multiprocessed_decoding: | |
self.model.to("cpu") | |
with Pool(self.args.process_count) as p: | |
if self.args.multiprocessing_chunksize == -1: | |
chunksize = max( | |
len(all_outputs) // (self.args.process_count * 2), 500 | |
) | |
else: | |
chunksize = self.args.multiprocessing_chunksize | |
outputs = list( | |
tqdm( | |
p.imap(self._decode, all_outputs, chunksize=chunksize), | |
total=len(all_outputs), | |
desc="Decoding outputs", | |
disable=self.args.silent, | |
) | |
) | |
self._move_model_to_device() | |
else: | |
outputs = [ | |
self.tokenizer.decode( | |
output_id, | |
skip_special_tokens=self.args.skip_special_tokens, | |
clean_up_tokenization_spaces=True, | |
) | |
for output_id in all_outputs | |
] | |
if not split_on_space: | |
outputs = [''.join(gen_text.split(' ')) for gen_text in outputs] | |
if self.args.num_return_sequences > 1: | |
return [ | |
outputs[i: i + self.args.num_return_sequences] | |
for i in range(0, len(outputs), self.args.num_return_sequences) | |
] | |
else: | |
return outputs | |
def _decode(self, output_id): | |
return self.tokenizer.decode( | |
output_id, | |
skip_special_tokens=self.args.skip_special_tokens, | |
clean_up_tokenization_spaces=True, | |
) | |
def compute_metrics(self, labels, preds, **kwargs): | |
""" | |
Computes the evaluation metrics for the model predictions. | |
Args: | |
labels: List of target sequences | |
preds: List of model generated outputs | |
**kwargs: Custom metrics that should be used. Pass in the metrics as keyword arguments (name of metric: function to use). | |
A metric function should take in two parameters. The first parameter will be the true labels, and the second parameter will be the predictions. Both inputs | |
will be lists of strings. Note that this will slow down evaluation significantly as the predicted sequences need to be generated. | |
Returns: | |
result: Dictionary containing evaluation results. | |
""" # noqa: ignore flake8" | |
assert len(labels) == len(preds) | |
results = {} | |
for metric, func in kwargs.items(): | |
results[metric] = func(labels, preds) | |
return results | |
def _move_model_to_device(self): | |
self.model.to(self.device) | |
def _get_inputs_dict(self, batch): | |
if self.args.use_hf_datasets: | |
inputs = {**batch, "labels": batch["input_ids"]} | |
return {key: value.to(self.device) for key, value in inputs.items()} | |
else: | |
batch = tuple(t.to(self.device) for t in batch) | |
input_ids = batch[0] | |
attention_mask = batch[1] | |
labels = batch[2] | |
labels[labels == self.tokenizer.pad_token_id] = -100 | |
inputs = { | |
"input_ids": input_ids, | |
"attention_mask": attention_mask, | |
"labels": labels, | |
} | |
return inputs | |
def load_and_cache_examples( | |
self, data, evaluate=False, no_cache=False, verbose=True, silent=False | |
): | |
""" | |
Creates a T5Dataset from data. | |
Utility function for train() and eval() methods. Not intended to be used directly. | |
""" | |
tokenizer = self.tokenizer | |
args = self.args | |
if not no_cache: | |
no_cache = args.no_cache | |
if not no_cache: | |
os.makedirs(self.args.cache_dir, exist_ok=True) | |
mode = "dev" if evaluate else "train" | |
if self.args.use_hf_datasets: | |
dataset = load_hf_dataset(data, tokenizer, self.args) | |
return dataset | |
elif args.dataset_class: | |
CustomDataset = args.dataset_class | |
return CustomDataset(tokenizer, args, data, mode) | |
else: | |
return T5Dataset( | |
tokenizer, | |
self.args, | |
data, | |
mode, | |
) | |
def _create_training_progress_scores(self, **kwargs): | |
extra_metrics = {key: [] for key in kwargs} | |
training_progress_scores = { | |
"global_step": [], | |
"eval_loss": [], | |
"train_loss": [], | |
**extra_metrics, | |
} | |
return training_progress_scores | |
def _get_last_metrics(self, metric_values): | |
return {metric: values[-1] for metric, values in metric_values.items()} | |
def save_model( | |
self, output_dir=None, optimizer=None, scheduler=None, model=None, results=None | |
): | |
if not output_dir: | |
output_dir = self.args.output_dir | |
os.makedirs(output_dir, exist_ok=True) | |
if model and not self.args.no_save: | |
# Take care of distributed/parallel training | |
model_to_save = model.module if hasattr(model, "module") else model | |
model_to_save.save_pretrained(output_dir) | |
self.tokenizer.save_pretrained(output_dir) | |
torch.save(self.args, os.path.join(output_dir, "training_args.bin")) | |
if optimizer and scheduler and self.args.save_optimizer_and_scheduler: | |
torch.save( | |
optimizer.state_dict(), os.path.join(output_dir, "optimizer.pt") | |
) | |
torch.save( | |
scheduler.state_dict(), os.path.join(output_dir, "scheduler.pt") | |
) | |
self.save_model_args(output_dir) | |
if results: | |
output_eval_file = os.path.join(output_dir, "eval_results.txt") | |
with open(output_eval_file, "w") as writer: | |
for key in sorted(results.keys()): | |
writer.write("{} = {}\n".format(key, str(results[key]))) | |
def save_model_args(self, output_dir): | |
os.makedirs(output_dir, exist_ok=True) | |
self.args.save(output_dir) | |
def _load_model_args(self, input_dir): | |
args = T5Args() | |
args.load(input_dir) | |
return args | |
def get_named_parameters(self): | |
return [n for n, p in self.model.named_parameters()] | |