|
import importlib |
|
import os |
|
import numpy as np |
|
import cv2 |
|
import torch |
|
import torch.distributed as dist |
|
import torchvision |
|
|
|
|
|
def count_params(model, verbose=False): |
|
total_params = sum(p.numel() for p in model.parameters()) |
|
if verbose: |
|
print(f"{model.__class__.__name__} has {total_params*1.e-6:.2f} M params.") |
|
return total_params |
|
|
|
|
|
def check_istarget(name, para_list): |
|
""" |
|
name: full name of source para |
|
para_list: partial name of target para |
|
""" |
|
istarget = False |
|
for para in para_list: |
|
if para in name: |
|
return True |
|
return istarget |
|
|
|
|
|
def instantiate_from_config(config): |
|
if not "target" in config: |
|
if config == "__is_first_stage__": |
|
return None |
|
elif config == "__is_unconditional__": |
|
return None |
|
raise KeyError("Expected key `target` to instantiate.") |
|
return get_obj_from_str(config["target"])(**config.get("params", dict())) |
|
|
|
|
|
def get_obj_from_str(string, reload=False): |
|
module, cls = string.rsplit(".", 1) |
|
if reload: |
|
module_imp = importlib.import_module(module) |
|
importlib.reload(module_imp) |
|
return getattr(importlib.import_module(module, package=None), cls) |
|
|
|
|
|
def load_npz_from_dir(data_dir): |
|
data = [ |
|
np.load(os.path.join(data_dir, data_name))["arr_0"] |
|
for data_name in os.listdir(data_dir) |
|
] |
|
data = np.concatenate(data, axis=0) |
|
return data |
|
|
|
|
|
def load_npz_from_paths(data_paths): |
|
data = [np.load(data_path)["arr_0"] for data_path in data_paths] |
|
data = np.concatenate(data, axis=0) |
|
return data |
|
|
|
|
|
def resize_numpy_image(image, max_resolution=512 * 512, resize_short_edge=None): |
|
h, w = image.shape[:2] |
|
if resize_short_edge is not None: |
|
k = resize_short_edge / min(h, w) |
|
else: |
|
k = max_resolution / (h * w) |
|
k = k**0.5 |
|
h = int(np.round(h * k / 64)) * 64 |
|
w = int(np.round(w * k / 64)) * 64 |
|
image = cv2.resize(image, (w, h), interpolation=cv2.INTER_LANCZOS4) |
|
return image |
|
|
|
|
|
def setup_dist(args): |
|
if dist.is_initialized(): |
|
return |
|
torch.cuda.set_device(args.local_rank) |
|
torch.distributed.init_process_group("nccl", init_method="env://") |
|
|
|
|
|
def save_videos(batch_tensors, savedir, filenames, fps=16): |
|
|
|
n_samples = batch_tensors.shape[1] |
|
for idx, vid_tensor in enumerate(batch_tensors): |
|
video = vid_tensor.detach().cpu() |
|
video = torch.clamp(video.float(), -1.0, 1.0) |
|
video = video.permute(2, 0, 1, 3, 4) |
|
frame_grids = [ |
|
torchvision.utils.make_grid(framesheet, nrow=int(n_samples)) |
|
for framesheet in video |
|
] |
|
grid = torch.stack(frame_grids, dim=0) |
|
grid = (grid + 1.0) / 2.0 |
|
grid = (grid * 255).to(torch.uint8).permute(0, 2, 3, 1) |
|
savepath = os.path.join(savedir, f"{filenames[idx]}.mp4") |
|
torchvision.io.write_video( |
|
savepath, grid, fps=fps, video_codec="h264", options={"crf": "10"} |
|
) |