import argparse import logging import os from datetime import datetime import numpy as np import torch from backbones import get_model from dataset import get_dataloader from losses import CombinedMarginLoss from lr_scheduler import PolyScheduler from partial_fc_v2 import PartialFC_V2 from torch import distributed from torch.utils.data import DataLoader from torch.utils.tensorboard import SummaryWriter from utils.utils_callbacks import CallBackLogging from utils.utils_callbacks import CallBackVerification from utils.utils_config import get_config from utils.utils_distributed_sampler import setup_seed from utils.utils_logging import AverageMeter from utils.utils_logging import init_logging assert ( torch.__version__ >= "1.12.0" ), "In order to enjoy the features of the new torch, \ we have upgraded the torch to 1.12.0. torch before than 1.12.0 may not work in the future." try: rank = int(os.environ["RANK"]) local_rank = int(os.environ["LOCAL_RANK"]) world_size = int(os.environ["WORLD_SIZE"]) distributed.init_process_group("nccl") except KeyError: rank = 0 local_rank = 0 world_size = 1 distributed.init_process_group( backend="nccl", init_method="tcp://127.0.0.1:12584", rank=rank, world_size=world_size, ) def main(args): # get config cfg = get_config(args.config) # global control random seed setup_seed(seed=cfg.seed, cuda_deterministic=False) torch.cuda.set_device(local_rank) os.makedirs(cfg.output, exist_ok=True) init_logging(rank, cfg.output) summary_writer = SummaryWriter(log_dir=os.path.join(cfg.output, "tensorboard")) if rank == 0 else None wandb_logger = None if cfg.using_wandb: import wandb # Sign in to wandb try: wandb.login(key=cfg.wandb_key) except Exception as e: print("WandB Key must be provided in config file (base.py).") print(f"Config Error: {e}") # Initialize wandb run_name = datetime.now().strftime("%y%m%d_%H%M") + f"_GPU{rank}" run_name = run_name if cfg.suffix_run_name is None else run_name + f"_{cfg.suffix_run_name}" try: wandb_logger = ( wandb.init( entity=cfg.wandb_entity, project=cfg.wandb_project, sync_tensorboard=True, resume=cfg.wandb_resume, name=run_name, notes=cfg.notes, ) if rank == 0 or cfg.wandb_log_all else None ) if wandb_logger: wandb_logger.config.update(cfg) except Exception as e: print("WandB Data (Entity and Project name) must be provided in config file (base.py).") print(f"Config Error: {e}") train_loader = get_dataloader(cfg.rec, local_rank, cfg.batch_size, cfg.dali, cfg.seed, cfg.num_workers) backbone = get_model(cfg.network, dropout=0.0, fp16=cfg.fp16, num_features=cfg.embedding_size).cuda() backbone = torch.nn.parallel.DistributedDataParallel( module=backbone, broadcast_buffers=False, device_ids=[local_rank], bucket_cap_mb=16, find_unused_parameters=True ) backbone.train() # FIXME using gradient checkpoint if there are some unused parameters will cause error backbone._set_static_graph() margin_loss = CombinedMarginLoss( 64, cfg.margin_list[0], cfg.margin_list[1], cfg.margin_list[2], cfg.interclass_filtering_threshold ) if cfg.optimizer == "sgd": module_partial_fc = PartialFC_V2(margin_loss, cfg.embedding_size, cfg.num_classes, cfg.sample_rate, cfg.fp16) module_partial_fc.train().cuda() # TODO the params of partial fc must be last in the params list opt = torch.optim.SGD( params=[{"params": backbone.parameters()}, {"params": module_partial_fc.parameters()}], lr=cfg.lr, momentum=0.9, weight_decay=cfg.weight_decay, ) elif cfg.optimizer == "adamw": module_partial_fc = PartialFC_V2(margin_loss, cfg.embedding_size, cfg.num_classes, cfg.sample_rate, cfg.fp16) module_partial_fc.train().cuda() opt = torch.optim.AdamW( params=[{"params": backbone.parameters()}, {"params": module_partial_fc.parameters()}], lr=cfg.lr, weight_decay=cfg.weight_decay, ) else: raise cfg.total_batch_size = cfg.batch_size * world_size cfg.warmup_step = cfg.num_image // cfg.total_batch_size * cfg.warmup_epoch cfg.total_step = cfg.num_image // cfg.total_batch_size * cfg.num_epoch lr_scheduler = PolyScheduler( optimizer=opt, base_lr=cfg.lr, max_steps=cfg.total_step, warmup_steps=cfg.warmup_step, last_epoch=-1 ) start_epoch = 0 global_step = 0 if cfg.resume: dict_checkpoint = torch.load(os.path.join(cfg.output, f"checkpoint_gpu_{rank}.pt")) start_epoch = dict_checkpoint["epoch"] global_step = dict_checkpoint["global_step"] backbone.module.load_state_dict(dict_checkpoint["state_dict_backbone"]) module_partial_fc.load_state_dict(dict_checkpoint["state_dict_softmax_fc"]) opt.load_state_dict(dict_checkpoint["state_optimizer"]) lr_scheduler.load_state_dict(dict_checkpoint["state_lr_scheduler"]) del dict_checkpoint for key, value in cfg.items(): num_space = 25 - len(key) logging.info(": " + key + " " * num_space + str(value)) callback_verification = CallBackVerification( val_targets=cfg.val_targets, rec_prefix=cfg.rec, summary_writer=summary_writer, wandb_logger=wandb_logger ) callback_logging = CallBackLogging( frequent=cfg.frequent, total_step=cfg.total_step, batch_size=cfg.batch_size, start_step=global_step, writer=summary_writer, ) loss_am = AverageMeter() amp = torch.cuda.amp.grad_scaler.GradScaler(growth_interval=100) for epoch in range(start_epoch, cfg.num_epoch): if isinstance(train_loader, DataLoader): train_loader.sampler.set_epoch(epoch) for _, (img, local_labels) in enumerate(train_loader): global_step += 1 local_embeddings = backbone(img) loss: torch.Tensor = module_partial_fc(local_embeddings, local_labels) if cfg.fp16: amp.scale(loss).backward() if global_step % cfg.gradient_acc == 0: amp.unscale_(opt) torch.nn.utils.clip_grad_norm_(backbone.parameters(), 5) amp.step(opt) amp.update() opt.zero_grad() else: loss.backward() if global_step % cfg.gradient_acc == 0: torch.nn.utils.clip_grad_norm_(backbone.parameters(), 5) opt.step() opt.zero_grad() lr_scheduler.step() with torch.no_grad(): if wandb_logger: wandb_logger.log( { "Loss/Step Loss": loss.item(), "Loss/Train Loss": loss_am.avg, "Process/Step": global_step, "Process/Epoch": epoch, } ) loss_am.update(loss.item(), 1) callback_logging(global_step, loss_am, epoch, cfg.fp16, lr_scheduler.get_last_lr()[0], amp) if global_step % cfg.verbose == 0 and global_step > 0: callback_verification(global_step, backbone) if cfg.save_all_states: checkpoint = { "epoch": epoch + 1, "global_step": global_step, "state_dict_backbone": backbone.module.state_dict(), "state_dict_softmax_fc": module_partial_fc.state_dict(), "state_optimizer": opt.state_dict(), "state_lr_scheduler": lr_scheduler.state_dict(), } torch.save(checkpoint, os.path.join(cfg.output, f"checkpoint_gpu_{rank}.pt")) if rank == 0: path_module = os.path.join(cfg.output, "model.pt") torch.save(backbone.module.state_dict(), path_module) if wandb_logger and cfg.save_artifacts: artifact_name = f"{run_name}_E{epoch}" model = wandb.Artifact(artifact_name, type="model") model.add_file(path_module) wandb_logger.log_artifact(model) if cfg.dali: train_loader.reset() if rank == 0: path_module = os.path.join(cfg.output, "model.pt") torch.save(backbone.module.state_dict(), path_module) if wandb_logger and cfg.save_artifacts: artifact_name = f"{run_name}_Final" model = wandb.Artifact(artifact_name, type="model") model.add_file(path_module) wandb_logger.log_artifact(model) if __name__ == "__main__": torch.backends.cudnn.benchmark = True parser = argparse.ArgumentParser(description="Distributed Arcface Training in Pytorch") parser.add_argument("config", type=str, help="py config file") main(parser.parse_args())