Chinese-Grammarly / t5 /t5_model.py
TedYeh
add t5 package
da060de
raw
history blame
52.1 kB
# -*- coding: utf-8 -*-
"""
@author:XuMing([email protected])
@description: refer https://github.com/ThilinaRajapakse/simpletransformers
"""
import math
import os
import random
import warnings
from dataclasses import asdict
from multiprocessing import Pool
import numpy as np
import pandas as pd
import torch
from loguru import logger
from torch.utils.data import DataLoader, RandomSampler, SequentialSampler
from torch.utils.tensorboard import SummaryWriter
from tqdm.auto import tqdm, trange
from transformers import ByT5Tokenizer
from transformers import MT5Config, MT5ForConditionalGeneration
from transformers import T5Config, T5ForConditionalGeneration, T5Tokenizer, TextStreamer
from transformers.optimization import AdamW, Adafactor
from transformers.optimization import (
get_constant_schedule,
get_constant_schedule_with_warmup,
get_linear_schedule_with_warmup,
get_cosine_schedule_with_warmup,
get_cosine_with_hard_restarts_schedule_with_warmup,
get_polynomial_decay_schedule_with_warmup,
)
from t5.config.model_args import T5Args
from t5.t5_utils import T5Dataset, load_hf_dataset
try:
import wandb
wandb_available = True
except ImportError:
wandb_available = False
has_cuda = torch.cuda.is_available()
os.environ["TOKENIZERS_PARALLELISM"] = "FALSE"
os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"
def chunks(lst, n):
"""Yield successive n-sized chunks from lst."""
for i in range(0, len(lst), n):
yield lst[i: i + n]
MODEL_CLASSES = {
"t5": (T5Config, T5ForConditionalGeneration),
"mt5": (MT5Config, MT5ForConditionalGeneration),
"byt5": (T5Config, T5ForConditionalGeneration),
}
class T5Model:
def __init__(
self,
model_type,
model_name,
args=None,
tokenizer=None,
use_cuda=has_cuda,
cuda_device=-1,
evaluate=False,
**kwargs,
):
"""
Initializes a T5Model model.
Args:
model_type: The type of model (t5, mt5, byt5)
model_name: The exact architecture and trained weights to use. This may be a Hugging Face Transformers compatible pre-trained model, a community model, or the path to a directory containing model files.
args (optional): Default args will be used if this parameter is not provided. If provided, it should be a dict containing the args that should be changed in the default args.
use_cuda (optional): Use GPU if available. Setting to False will force model to use CPU only.
cuda_device (optional): Specific GPU that should be used. Will use the first available GPU by default.
**kwargs (optional): For providing proxies, force_download, resume_download, cache_dir and other options specific to the 'from_pretrained' implementation where this will be supplied.
""" # noqa: ignore flake8"
self.args = self._load_model_args(model_name)
if isinstance(args, dict):
self.args.update_from_dict(args)
elif isinstance(args, T5Args):
self.args = args
self.is_sweeping = False
if self.args.manual_seed:
random.seed(self.args.manual_seed)
np.random.seed(self.args.manual_seed)
torch.manual_seed(self.args.manual_seed)
if self.args.n_gpu > 0:
torch.cuda.manual_seed_all(self.args.manual_seed)
if use_cuda:
if torch.cuda.is_available():
if cuda_device == -1:
self.device = torch.device("cuda")
else:
self.device = torch.device(f"cuda:{cuda_device}")
else:
raise ValueError(
"'use_cuda' set to True when cuda is unavailable."
"Make sure CUDA is available or set `use_cuda=False`."
)
else:
if torch.backends.mps.is_available():
self.device = torch.device("mps")
else:
self.device = "cpu"
logger.debug(f"Device: {self.device}")
self.results = {}
config_class, model_class = MODEL_CLASSES[model_type]
if model_name is None:
self.config = self.args.config
self.model = model_class(config=self.config)
else:
self.config = config_class.from_pretrained(model_name, **self.args.config)
self.model = model_class.from_pretrained(model_name, config=self.config)
if isinstance(tokenizer, T5Tokenizer):
self.tokenizer = tokenizer
self.model.resize_token_embeddings(len(self.tokenizer))
elif model_type == "byt5":
self.tokenizer = ByT5Tokenizer.from_pretrained(model_name, truncate=True)
else:
self.tokenizer = T5Tokenizer.from_pretrained(model_name, truncate=True)
print(len(self.tokenizer))
if not evaluate:
with open('./data/字音混淆集_s13.txt', 'r', encoding='utf-8') as confusion:
n = 0
for line in confusion.readlines()+[str(chr(c+65248)) for c in range(33, 127)]:
token = line.split(' ')[0]
n+=1
self.tokenizer.add_tokens([token])
with open('./data/字音混淆集.txt', 'r', encoding='utf-8') as confusion:
for line in confusion.readlines():
token = line.split(' ')[0]
n+=1
self.tokenizer.add_tokens([token])
with open('./data/wordtest4.txt', 'r', encoding='utf-8') as confusion:
for line in confusion.readlines():
token = line.split(',')[0]
n+=1
self.tokenizer.add_tokens([token])
with open('./data/vocab.txt', 'r', encoding='utf-8') as confusion:
for line in confusion.readlines():
n+=1
self.tokenizer.add_tokens([line.replace('\n', '')])
print(n)
self.streamer = TextStreamer(self.tokenizer)
print(len(self.tokenizer))
self.model.resize_token_embeddings(len(self.tokenizer))
if self.args.dynamic_quantize:
self.model = torch.quantization.quantize_dynamic(
self.model, {torch.nn.Linear}, dtype=torch.qint8
)
if not use_cuda:
self.args.fp16 = False
if self.args.special_tokens_list:
self.tokenizer.add_tokens(
self.args.special_tokens_list, special_tokens=True
)
self.model.resize_token_embeddings(len(self.tokenizer))
self.args.model_type = model_type
if model_name is None:
self.args.model_name = "T5_from_scratch"
else:
self.args.model_name = model_name
if self.args.wandb_project and not wandb_available:
warnings.warn(
"wandb_project specified but wandb is not available. Wandb disabled."
)
self.args.wandb_project = None
def train_model(
self,
train_data,
output_dir=None,
show_running_loss=True,
args=None,
eval_data=None,
verbose=True,
**kwargs,
):
"""
Trains the model using 'train_data'
Args:
train_data: Pandas DataFrame containing the 3 columns - `prefix`, `input_text`, `target_text`.
- `prefix`: A string indicating the task to perform. (E.g. `"question"`, `"stsb"`)
- `input_text`: The input text sequence. `prefix` is automatically prepended to form the full input. (<prefix>: <input_text>)
- `target_text`: The target sequence
output_dir: The directory where model files will be saved. If not given, self.args.output_dir will be used.
show_running_loss (optional): Set to False to prevent running loss from being printed to console. Defaults to True.
args (optional): Optional changes to the args dict of the model. Any changes made will persist for the model.
eval_data (optional): A DataFrame against which evaluation will be performed when evaluate_during_training is enabled. Is required if evaluate_during_training is enabled.
**kwargs: Additional metrics that should be used. Pass in the metrics as keyword arguments (name of metric: function to use).
A metric function should take in two parameters. The first parameter will be the true labels, and the second parameter will be the predictions. Both inputs
will be lists of strings. Note that this will slow down training significantly as the predicted sequences need to be generated.
Returns:
global_step: Number of global steps trained
training_details: Average training loss if evaluate_during_training is False or full training progress scores if evaluate_during_training is True
""" # noqa: ignore flake8"
if args:
self.args.update_from_dict(args)
if self.args.evaluate_during_training and eval_data is None:
raise ValueError(
"evaluate_during_training is enabled but eval_data is not specified."
" Pass eval_data to model.train_model() if using evaluate_during_training."
)
if not output_dir:
output_dir = self.args.output_dir
if (
os.path.exists(output_dir)
and os.listdir(output_dir)
and not self.args.overwrite_output_dir
):
raise ValueError(
"Output directory ({}) already exists and is not empty."
" Set args.overwrite_output_dir = True to overcome.".format(output_dir)
)
self._move_model_to_device()
train_dataset = self.load_and_cache_examples(train_data, verbose=verbose)
os.makedirs(output_dir, exist_ok=True)
global_step, training_details = self.train(
train_dataset,
output_dir,
show_running_loss=show_running_loss,
eval_data=eval_data,
verbose=verbose,
**kwargs,
)
self.save_model(model=self.model)
if verbose:
logger.info(
" Training of {} model complete. Saved to {}.".format(
self.args.model_name, output_dir
)
)
return global_step, training_details
def train(
self,
train_dataset,
output_dir,
show_running_loss=True,
eval_data=None,
verbose=True,
**kwargs,
):
"""
Trains the model on train_dataset.
Utility function to be used by the train_model() method. Not intended to be used directly.
"""
model = self.model
args = self.args
device = self.device
tb_writer = SummaryWriter(log_dir=args.tensorboard_dir)
train_sampler = RandomSampler(train_dataset)
train_dataloader = DataLoader(
train_dataset,
sampler=train_sampler,
batch_size=args.train_batch_size,
num_workers=self.args.dataloader_num_workers,
)
if args.max_steps > 0:
t_total = args.max_steps
args.num_train_epochs = (
args.max_steps
// (len(train_dataloader) // args.gradient_accumulation_steps)
+ 1
)
else:
t_total = (
len(train_dataloader)
// args.gradient_accumulation_steps
* args.num_train_epochs
)
no_decay = ["bias", "LayerNorm.weight"]
optimizer_grouped_parameters = []
custom_parameter_names = set()
for group in self.args.custom_parameter_groups:
params = group.pop("params")
custom_parameter_names.update(params)
param_group = {**group}
param_group["params"] = [
p for n, p in model.named_parameters() if n in params
]
optimizer_grouped_parameters.append(param_group)
for group in self.args.custom_layer_parameters:
layer_number = group.pop("layer")
layer = f"layer.{layer_number}."
group_d = {**group}
group_nd = {**group}
group_nd["weight_decay"] = 0.0
params_d = []
params_nd = []
for n, p in model.named_parameters():
if n not in custom_parameter_names and layer in n:
if any(nd in n for nd in no_decay):
params_nd.append(p)
else:
params_d.append(p)
custom_parameter_names.add(n)
group_d["params"] = params_d
group_nd["params"] = params_nd
optimizer_grouped_parameters.append(group_d)
optimizer_grouped_parameters.append(group_nd)
if not self.args.train_custom_parameters_only:
optimizer_grouped_parameters.extend(
[
{
"params": [
p
for n, p in model.named_parameters()
if n not in custom_parameter_names
and not any(nd in n for nd in no_decay)
],
"weight_decay": args.weight_decay,
},
{
"params": [
p
for n, p in model.named_parameters()
if n not in custom_parameter_names
and any(nd in n for nd in no_decay)
],
"weight_decay": 0.0,
},
]
)
warmup_steps = math.ceil(t_total * args.warmup_ratio)
args.warmup_steps = (
warmup_steps if args.warmup_steps == 0 else args.warmup_steps
)
if args.optimizer == "AdamW":
optimizer = AdamW(
optimizer_grouped_parameters,
lr=args.learning_rate,
eps=args.adam_epsilon,
)
elif args.optimizer == "Adafactor":
optimizer = Adafactor(
optimizer_grouped_parameters,
lr=args.learning_rate,
eps=args.adafactor_eps,
clip_threshold=args.adafactor_clip_threshold,
decay_rate=args.adafactor_decay_rate,
beta1=args.adafactor_beta1,
weight_decay=args.weight_decay,
scale_parameter=args.adafactor_scale_parameter,
relative_step=args.adafactor_relative_step,
warmup_init=args.adafactor_warmup_init,
)
else:
raise ValueError(
"{} is not a valid optimizer class. Please use one of ('AdamW', 'Adafactor') instead.".format(
args.optimizer
)
)
if args.scheduler == "constant_schedule":
scheduler = get_constant_schedule(optimizer)
elif args.scheduler == "constant_schedule_with_warmup":
scheduler = get_constant_schedule_with_warmup(
optimizer, num_warmup_steps=args.warmup_steps
)
elif args.scheduler == "linear_schedule_with_warmup":
scheduler = get_linear_schedule_with_warmup(
optimizer,
num_warmup_steps=args.warmup_steps,
num_training_steps=t_total,
)
elif args.scheduler == "cosine_schedule_with_warmup":
scheduler = get_cosine_schedule_with_warmup(
optimizer,
num_warmup_steps=args.warmup_steps,
num_training_steps=t_total,
num_cycles=args.cosine_schedule_num_cycles,
)
elif args.scheduler == "cosine_with_hard_restarts_schedule_with_warmup":
scheduler = get_cosine_with_hard_restarts_schedule_with_warmup(
optimizer,
num_warmup_steps=args.warmup_steps,
num_training_steps=t_total,
num_cycles=args.cosine_schedule_num_cycles,
)
elif args.scheduler == "polynomial_decay_schedule_with_warmup":
scheduler = get_polynomial_decay_schedule_with_warmup(
optimizer,
num_warmup_steps=args.warmup_steps,
num_training_steps=t_total,
lr_end=args.polynomial_decay_schedule_lr_end,
power=args.polynomial_decay_schedule_power,
)
else:
raise ValueError("{} is not a valid scheduler.".format(args.scheduler))
if (
args.model_name
and os.path.isfile(os.path.join(args.model_name, "optimizer.pt"))
and os.path.isfile(os.path.join(args.model_name, "scheduler.pt"))
):
# Load in optimizer and scheduler states
optimizer.load_state_dict(
torch.load(os.path.join(args.model_name, "optimizer.pt"))
)
scheduler.load_state_dict(
torch.load(os.path.join(args.model_name, "scheduler.pt"))
)
if args.n_gpu > 1:
model = torch.nn.DataParallel(model)
logger.info(" Training started")
global_step = 0
training_progress_scores = None
tr_loss, logging_loss = 0.0, 0.0
model.zero_grad()
train_iterator = trange(
int(args.num_train_epochs), desc="Epoch", disable=args.silent, mininterval=0
)
epoch_number = 0
best_eval_metric = None
early_stopping_counter = 0
steps_trained_in_current_epoch = 0
epochs_trained = 0
if args.model_name and os.path.exists(args.model_name):
try:
# set global_step to gobal_step of last saved checkpoint from model path
checkpoint_suffix = args.model_name.split("/")[-1].split("-")
if len(checkpoint_suffix) > 2:
checkpoint_suffix = checkpoint_suffix[1]
else:
checkpoint_suffix = checkpoint_suffix[-1]
global_step = int(checkpoint_suffix)
epochs_trained = global_step // (
len(train_dataloader) // args.gradient_accumulation_steps
)
steps_trained_in_current_epoch = global_step % (
len(train_dataloader) // args.gradient_accumulation_steps
)
logger.info(
" Continuing training from checkpoint, will skip to saved global_step"
)
logger.info(" Continuing training from epoch %d", epochs_trained)
logger.info(" Continuing training from global step %d", global_step)
logger.info(
" Will skip the first %d steps in the current epoch",
steps_trained_in_current_epoch,
)
except ValueError:
logger.info(" Starting fine-tuning.")
if args.evaluate_during_training:
training_progress_scores = self._create_training_progress_scores(**kwargs)
if args.wandb_project:
wandb.init(
project=args.wandb_project,
config={**asdict(args)},
**args.wandb_kwargs,
)
wandb.run._label(repo="textgen")
wandb.watch(self.model)
self.wandb_run_id = wandb.run.id
if args.fp16:
from torch.cuda import amp
scaler = amp.GradScaler()
for current_epoch in train_iterator:
model.train()
if epochs_trained > 0:
epochs_trained -= 1
continue
train_iterator.set_description(
f"Epoch {epoch_number + 1} of {args.num_train_epochs}"
)
batch_iterator = tqdm(
train_dataloader,
desc=f"Running Epoch {epoch_number} of {args.num_train_epochs}",
disable=args.silent,
mininterval=0,
)
for step, batch in enumerate(batch_iterator):
if steps_trained_in_current_epoch > 0:
steps_trained_in_current_epoch -= 1
continue
inputs = self._get_inputs_dict(batch)
if args.fp16:
with amp.autocast():
outputs = model(**inputs)
# model outputs are always tuple in pytorch-transformers (see doc)
loss = outputs[0]
else:
outputs = model(**inputs)
# model outputs are always tuple in pytorch-transformers (see doc)
loss = outputs[0]
if args.n_gpu > 1:
loss = (
loss.mean()
) # mean() to average on multi-gpu parallel training
current_loss = loss.item()
if show_running_loss:
batch_iterator.set_description(
f"Epochs {epoch_number}/{args.num_train_epochs}. Running Loss: {current_loss:9.4f}"
)
if args.gradient_accumulation_steps > 1:
loss = loss / args.gradient_accumulation_steps
if args.fp16:
scaler.scale(loss).backward()
else:
loss.backward()
tr_loss += loss.item()
if (step + 1) % args.gradient_accumulation_steps == 0:
if args.fp16:
scaler.unscale_(optimizer)
if args.optimizer == "AdamW":
torch.nn.utils.clip_grad_norm_(
model.parameters(), args.max_grad_norm
)
if args.fp16:
scaler.step(optimizer)
scaler.update()
else:
optimizer.step()
scheduler.step() # Update learning rate schedule
model.zero_grad()
global_step += 1
if args.logging_steps > 0 and global_step % args.logging_steps == 0:
# Log metrics
tb_writer.add_scalar(
"lr", scheduler.get_last_lr()[0], global_step
)
tb_writer.add_scalar(
"loss",
(tr_loss - logging_loss) / args.logging_steps,
global_step,
)
logging_loss = tr_loss
if args.wandb_project or self.is_sweeping:
wandb.log(
{
"Training loss": current_loss,
"lr": scheduler.get_last_lr()[0],
"global_step": global_step,
}
)
if args.save_steps > 0 and global_step % args.save_steps == 0:
# Save model checkpoint
output_dir_current = os.path.join(
output_dir, "checkpoint-{}".format(global_step)
)
self.save_model(
output_dir_current, optimizer, scheduler, model=model
)
if args.evaluate_during_training and (
args.evaluate_during_training_steps > 0
and global_step % args.evaluate_during_training_steps == 0
):
# Only evaluate when single GPU otherwise metrics may not average well
results = self.eval_model(
eval_data,
verbose=verbose and args.evaluate_during_training_verbose,
silent=args.evaluate_during_training_silent,
**kwargs,
)
for key, value in results.items():
try:
tb_writer.add_scalar(
"eval_{}".format(key), value, global_step
)
except (NotImplementedError, AssertionError):
pass
output_dir_current = os.path.join(
output_dir, "checkpoint-{}".format(global_step)
)
if args.save_eval_checkpoints:
self.save_model(
output_dir_current,
optimizer,
scheduler,
model=model,
results=results,
)
training_progress_scores["global_step"].append(global_step)
training_progress_scores["train_loss"].append(current_loss)
for key in results:
training_progress_scores[key].append(results[key])
report = pd.DataFrame(training_progress_scores)
report.to_csv(
os.path.join(
args.output_dir, "training_progress_scores.csv"
),
index=False,
)
if args.wandb_project or self.is_sweeping:
wandb.log(self._get_last_metrics(training_progress_scores))
if not best_eval_metric:
best_eval_metric = results[args.early_stopping_metric]
self.save_model(
args.best_model_dir,
optimizer,
scheduler,
model=model,
results=results,
)
if best_eval_metric and args.early_stopping_metric_minimize:
if (
results[args.early_stopping_metric] - best_eval_metric
< args.early_stopping_delta
):
best_eval_metric = results[args.early_stopping_metric]
self.save_model(
args.best_model_dir,
optimizer,
scheduler,
model=model,
results=results,
)
early_stopping_counter = 0
else:
if args.use_early_stopping:
if (
early_stopping_counter
< args.early_stopping_patience
):
early_stopping_counter += 1
if verbose:
logger.info(
f" No improvement in {args.early_stopping_metric}"
)
logger.info(
f" Current step: {early_stopping_counter}"
)
logger.info(
f" Early stopping patience: {args.early_stopping_patience}"
)
else:
if verbose:
logger.info(
f" Patience of {args.early_stopping_patience} steps reached"
)
logger.info(" Training terminated.")
train_iterator.close()
return (
global_step,
tr_loss / global_step
if not self.args.evaluate_during_training
else training_progress_scores,
)
else:
if (
results[args.early_stopping_metric] - best_eval_metric
> args.early_stopping_delta
):
best_eval_metric = results[args.early_stopping_metric]
self.save_model(
args.best_model_dir,
optimizer,
scheduler,
model=model,
results=results,
)
early_stopping_counter = 0
else:
if args.use_early_stopping:
if (
early_stopping_counter
< args.early_stopping_patience
):
early_stopping_counter += 1
if verbose:
logger.info(
f" No improvement in {args.early_stopping_metric}"
)
logger.info(
f" Current step: {early_stopping_counter}"
)
logger.info(
f" Early stopping patience: {args.early_stopping_patience}"
)
else:
if verbose:
logger.info(
f" Patience of {args.early_stopping_patience} steps reached"
)
logger.info(" Training terminated.")
train_iterator.close()
return (
global_step,
tr_loss / global_step
if not self.args.evaluate_during_training
else training_progress_scores,
)
model.train()
epoch_number += 1
output_dir_current = os.path.join(
output_dir, "checkpoint-{}-epoch-{}".format(global_step, epoch_number)
)
if args.save_model_every_epoch:
self.save_model(output_dir_current, optimizer, scheduler, model=model)
if args.evaluate_during_training and args.evaluate_each_epoch:
results = self.eval_model(
eval_data,
verbose=verbose and args.evaluate_during_training_verbose,
silent=args.evaluate_during_training_silent,
**kwargs,
)
if args.save_eval_checkpoints:
self.save_model(
output_dir_current, optimizer, scheduler, results=results
)
training_progress_scores["global_step"].append(global_step)
training_progress_scores["train_loss"].append(current_loss)
for key in results:
training_progress_scores[key].append(results[key])
report = pd.DataFrame(training_progress_scores)
report.to_csv(
os.path.join(args.output_dir, "training_progress_scores.csv"),
index=False,
)
if args.wandb_project or self.is_sweeping:
wandb.log(self._get_last_metrics(training_progress_scores))
if not best_eval_metric:
best_eval_metric = results[args.early_stopping_metric]
self.save_model(
args.best_model_dir,
optimizer,
scheduler,
model=model,
results=results,
)
if best_eval_metric and args.early_stopping_metric_minimize:
if (
results[args.early_stopping_metric] - best_eval_metric
< args.early_stopping_delta
):
best_eval_metric = results[args.early_stopping_metric]
self.save_model(
args.best_model_dir,
optimizer,
scheduler,
model=model,
results=results,
)
early_stopping_counter = 0
else:
if (
args.use_early_stopping
and args.early_stopping_consider_epochs
):
if early_stopping_counter < args.early_stopping_patience:
early_stopping_counter += 1
if verbose:
logger.info(
f" No improvement in {args.early_stopping_metric}"
)
logger.info(
f" Current step: {early_stopping_counter}"
)
logger.info(
f" Early stopping patience: {args.early_stopping_patience}"
)
else:
if verbose:
logger.info(
f" Patience of {args.early_stopping_patience} steps reached"
)
logger.info(" Training terminated.")
train_iterator.close()
return (
global_step,
tr_loss / global_step
if not self.args.evaluate_during_training
else training_progress_scores,
)
else:
if (
results[args.early_stopping_metric] - best_eval_metric
> args.early_stopping_delta
):
best_eval_metric = results[args.early_stopping_metric]
self.save_model(
args.best_model_dir,
optimizer,
scheduler,
model=model,
results=results,
)
early_stopping_counter = 0
else:
if (
args.use_early_stopping
and args.early_stopping_consider_epochs
):
if early_stopping_counter < args.early_stopping_patience:
early_stopping_counter += 1
if verbose:
logger.info(
f" No improvement in {args.early_stopping_metric}"
)
logger.info(
f" Current step: {early_stopping_counter}"
)
logger.info(
f" Early stopping patience: {args.early_stopping_patience}"
)
else:
if verbose:
logger.info(
f" Patience of {args.early_stopping_patience} steps reached"
)
logger.info(" Training terminated.")
train_iterator.close()
return (
global_step,
tr_loss / global_step
if not self.args.evaluate_during_training
else training_progress_scores,
)
return (
global_step,
tr_loss / global_step
if not self.args.evaluate_during_training
else training_progress_scores,
)
def eval_model(
self, eval_data, output_dir=None, verbose=True, silent=False, **kwargs
):
"""
Evaluates the model on eval_data. Saves results to output_dir.
Args:
eval_data: Pandas DataFrame containing the 3 columns - `prefix`, `input_text`, `target_text`.
- `prefix`: A string indicating the task to perform. (E.g. `"question"`, `"stsb"`)
- `input_text`: The input text sequence. `prefix` is automatically prepended to form the full input. (<prefix>: <input_text>)
- `target_text`: The target sequence
output_dir: The directory where model files will be saved. If not given, self.args.output_dir will be used.
verbose: If verbose, results will be printed to the console on completion of evaluation.
silent: If silent, tqdm progress bars will be hidden.
**kwargs: Additional metrics that should be used. Pass in the metrics as keyword arguments (name of metric: function to use).
A metric function should take in two parameters. The first parameter will be the true labels, and the second parameter will be the predictions. Both inputs
will be lists of strings. Note that this will slow down evaluation significantly as the predicted sequences need to be generated.
Returns:
results: Dictionary containing evaluation results.
""" # noqa: ignore flake8"
if not output_dir:
output_dir = self.args.output_dir
self._move_model_to_device()
eval_dataset = self.load_and_cache_examples(
eval_data, evaluate=True, verbose=verbose, silent=silent
)
os.makedirs(output_dir, exist_ok=True)
result = self.evaluate(
eval_dataset, output_dir, verbose=verbose, silent=silent, **kwargs
)
self.results.update(result)
if self.args.evaluate_generated_text:
if self.args.preprocess_inputs:
to_predict = [
input_text
for prefix, input_text in zip(
eval_data["prefix"], eval_data["input_text"]
)
]
else:
to_predict = [
prefix + input_text
for prefix, input_text in zip(
eval_data["prefix"], eval_data["input_text"]
)
]
preds = self.predict(to_predict[:self.args.eval_batch_size*3])
result = self.compute_metrics(
eval_data["target_text"].tolist()[:self.args.eval_batch_size*3], preds, **kwargs
)
self.results.update(result)
if verbose:
logger.info(self.results)
return self.results
def evaluate(self, eval_dataset, output_dir, verbose=True, silent=False, **kwargs):
"""
Evaluates the model on eval_dataset.
Utility function to be used by the eval_model() method. Not intended to be used directly.
"""
model = self.model
args = self.args
eval_output_dir = output_dir
device = self.device
results = {}
eval_sampler = SequentialSampler(eval_dataset)
eval_dataloader = DataLoader(
eval_dataset, sampler=eval_sampler, batch_size=args.eval_batch_size
)
if args.n_gpu > 1:
model = torch.nn.DataParallel(model)
eval_loss = 0.0
nb_eval_steps = 0
model.eval()
if self.args.fp16:
from torch.cuda import amp
for batch in tqdm(
eval_dataloader, disable=args.silent or silent, desc="Running Evaluation"
):
inputs = self._get_inputs_dict(batch)
with torch.no_grad():
if self.args.fp16:
with amp.autocast():
outputs = model(**inputs)
loss = outputs[0]
else:
outputs = model(**inputs)
loss = outputs[0]
if self.args.n_gpu > 1:
loss = loss.mean()
eval_loss += loss.item()
nb_eval_steps += 1
eval_loss = eval_loss / nb_eval_steps
results["eval_loss"] = eval_loss
output_eval_file = os.path.join(eval_output_dir, "eval_results.txt")
with open(output_eval_file, "w") as writer:
for key in sorted(results.keys()):
writer.write("{} = {}\n".format(key, str(results[key])))
return results
def predict(self, to_predict, split_on_space=False):
"""
Performs predictions on a list of text.
Args:
to_predict: A python list of text (str) to be sent to the model for prediction. Note that the prefix should be prepended to the text.
split_on_space (optional): If True, input is english string, if False, input is chinese string.
Returns:
preds: A python list of the generated sequences.
""" # noqa: ignore flake8"
self._move_model_to_device()
all_outputs = []
# Batching
for batch in tqdm(
[
to_predict[i: i + self.args.eval_batch_size]
for i in range(0, len(to_predict), self.args.eval_batch_size)
],
desc="Generating outputs",
disable=self.args.silent,
):
input_batch = self.tokenizer.prepare_seq2seq_batch(
src_texts=batch,
max_length=self.args.max_seq_length,
padding="max_length",
return_tensors="pt",
truncation=True,
)
input_ids = input_batch["input_ids"]
attention_mask = input_batch["attention_mask"]
input_ids = input_ids.to(self.device)
attention_mask = attention_mask.to(self.device)
outputs = self.model.generate(
input_ids=input_ids,
attention_mask=attention_mask,
num_beams=self.args.num_beams,
max_length=self.args.max_length,
length_penalty=self.args.length_penalty,
early_stopping=self.args.early_stopping,
repetition_penalty=self.args.repetition_penalty,
do_sample=self.args.do_sample,
top_k=self.args.top_k,
top_p=self.args.top_p,
num_return_sequences=self.args.num_return_sequences,
#streamer=self.streamer,
)
all_outputs.extend(outputs.cpu().numpy())
if self.args.use_multiprocessed_decoding:
self.model.to("cpu")
with Pool(self.args.process_count) as p:
if self.args.multiprocessing_chunksize == -1:
chunksize = max(
len(all_outputs) // (self.args.process_count * 2), 500
)
else:
chunksize = self.args.multiprocessing_chunksize
outputs = list(
tqdm(
p.imap(self._decode, all_outputs, chunksize=chunksize),
total=len(all_outputs),
desc="Decoding outputs",
disable=self.args.silent,
)
)
self._move_model_to_device()
else:
outputs = [
self.tokenizer.decode(
output_id,
skip_special_tokens=self.args.skip_special_tokens,
clean_up_tokenization_spaces=True,
)
for output_id in all_outputs
]
if not split_on_space:
outputs = [''.join(gen_text.split(' ')) for gen_text in outputs]
if self.args.num_return_sequences > 1:
return [
outputs[i: i + self.args.num_return_sequences]
for i in range(0, len(outputs), self.args.num_return_sequences)
]
else:
return outputs
def _decode(self, output_id):
return self.tokenizer.decode(
output_id,
skip_special_tokens=self.args.skip_special_tokens,
clean_up_tokenization_spaces=True,
)
def compute_metrics(self, labels, preds, **kwargs):
"""
Computes the evaluation metrics for the model predictions.
Args:
labels: List of target sequences
preds: List of model generated outputs
**kwargs: Custom metrics that should be used. Pass in the metrics as keyword arguments (name of metric: function to use).
A metric function should take in two parameters. The first parameter will be the true labels, and the second parameter will be the predictions. Both inputs
will be lists of strings. Note that this will slow down evaluation significantly as the predicted sequences need to be generated.
Returns:
result: Dictionary containing evaluation results.
""" # noqa: ignore flake8"
assert len(labels) == len(preds)
results = {}
for metric, func in kwargs.items():
results[metric] = func(labels, preds)
return results
def _move_model_to_device(self):
self.model.to(self.device)
def _get_inputs_dict(self, batch):
if self.args.use_hf_datasets:
inputs = {**batch, "labels": batch["input_ids"]}
return {key: value.to(self.device) for key, value in inputs.items()}
else:
batch = tuple(t.to(self.device) for t in batch)
input_ids = batch[0]
attention_mask = batch[1]
labels = batch[2]
labels[labels == self.tokenizer.pad_token_id] = -100
inputs = {
"input_ids": input_ids,
"attention_mask": attention_mask,
"labels": labels,
}
return inputs
def load_and_cache_examples(
self, data, evaluate=False, no_cache=False, verbose=True, silent=False
):
"""
Creates a T5Dataset from data.
Utility function for train() and eval() methods. Not intended to be used directly.
"""
tokenizer = self.tokenizer
args = self.args
if not no_cache:
no_cache = args.no_cache
if not no_cache:
os.makedirs(self.args.cache_dir, exist_ok=True)
mode = "dev" if evaluate else "train"
if self.args.use_hf_datasets:
dataset = load_hf_dataset(data, tokenizer, self.args)
return dataset
elif args.dataset_class:
CustomDataset = args.dataset_class
return CustomDataset(tokenizer, args, data, mode)
else:
return T5Dataset(
tokenizer,
self.args,
data,
mode,
)
def _create_training_progress_scores(self, **kwargs):
extra_metrics = {key: [] for key in kwargs}
training_progress_scores = {
"global_step": [],
"eval_loss": [],
"train_loss": [],
**extra_metrics,
}
return training_progress_scores
def _get_last_metrics(self, metric_values):
return {metric: values[-1] for metric, values in metric_values.items()}
def save_model(
self, output_dir=None, optimizer=None, scheduler=None, model=None, results=None
):
if not output_dir:
output_dir = self.args.output_dir
os.makedirs(output_dir, exist_ok=True)
if model and not self.args.no_save:
# Take care of distributed/parallel training
model_to_save = model.module if hasattr(model, "module") else model
model_to_save.save_pretrained(output_dir)
self.tokenizer.save_pretrained(output_dir)
torch.save(self.args, os.path.join(output_dir, "training_args.bin"))
if optimizer and scheduler and self.args.save_optimizer_and_scheduler:
torch.save(
optimizer.state_dict(), os.path.join(output_dir, "optimizer.pt")
)
torch.save(
scheduler.state_dict(), os.path.join(output_dir, "scheduler.pt")
)
self.save_model_args(output_dir)
if results:
output_eval_file = os.path.join(output_dir, "eval_results.txt")
with open(output_eval_file, "w") as writer:
for key in sorted(results.keys()):
writer.write("{} = {}\n".format(key, str(results[key])))
def save_model_args(self, output_dir):
os.makedirs(output_dir, exist_ok=True)
self.args.save(output_dir)
def _load_model_args(self, input_dir):
args = T5Args()
args.load(input_dir)
return args
def get_named_parameters(self):
return [n for n, p in self.model.named_parameters()]