Spaces:
Running
on
Zero
Running
on
Zero
import importlib | |
import os | |
import numpy as np | |
import cv2 | |
import torch | |
import torch.distributed as dist | |
import torchvision | |
def count_params(model, verbose=False): | |
total_params = sum(p.numel() for p in model.parameters()) | |
if verbose: | |
print(f"{model.__class__.__name__} has {total_params*1.e-6:.2f} M params.") | |
return total_params | |
def check_istarget(name, para_list): | |
""" | |
name: full name of source para | |
para_list: partial name of target para | |
""" | |
istarget = False | |
for para in para_list: | |
if para in name: | |
return True | |
return istarget | |
def instantiate_from_config(config): | |
if not "target" in config: | |
if config == "__is_first_stage__": | |
return None | |
elif config == "__is_unconditional__": | |
return None | |
raise KeyError("Expected key `target` to instantiate.") | |
return get_obj_from_str(config["target"])(**config.get("params", dict())) | |
def get_obj_from_str(string, reload=False): | |
module, cls = string.rsplit(".", 1) | |
if reload: | |
module_imp = importlib.import_module(module) | |
importlib.reload(module_imp) | |
return getattr(importlib.import_module(module, package=None), cls) | |
def load_npz_from_dir(data_dir): | |
data = [ | |
np.load(os.path.join(data_dir, data_name))["arr_0"] | |
for data_name in os.listdir(data_dir) | |
] | |
data = np.concatenate(data, axis=0) | |
return data | |
def load_npz_from_paths(data_paths): | |
data = [np.load(data_path)["arr_0"] for data_path in data_paths] | |
data = np.concatenate(data, axis=0) | |
return data | |
def resize_numpy_image(image, max_resolution=512 * 512, resize_short_edge=None): | |
h, w = image.shape[:2] | |
if resize_short_edge is not None: | |
k = resize_short_edge / min(h, w) | |
else: | |
k = max_resolution / (h * w) | |
k = k**0.5 | |
h = int(np.round(h * k / 64)) * 64 | |
w = int(np.round(w * k / 64)) * 64 | |
image = cv2.resize(image, (w, h), interpolation=cv2.INTER_LANCZOS4) | |
return image | |
def setup_dist(args): | |
if dist.is_initialized(): | |
return | |
torch.cuda.set_device(args.local_rank) | |
torch.distributed.init_process_group("nccl", init_method="env://") | |
def save_videos(batch_tensors, savedir, filenames, fps=16): | |
# b,samples,c,t,h,w | |
n_samples = batch_tensors.shape[1] | |
for idx, vid_tensor in enumerate(batch_tensors): | |
video = vid_tensor.detach().cpu() | |
video = torch.clamp(video.float(), -1.0, 1.0) | |
video = video.permute(2, 0, 1, 3, 4) # t,n,c,h,w | |
frame_grids = [ | |
torchvision.utils.make_grid(framesheet, nrow=int(n_samples)) | |
for framesheet in video | |
] # [3, 1*h, n*w] | |
grid = torch.stack(frame_grids, dim=0) # stack in temporal dim [t, 3, n*h, w] | |
grid = (grid + 1.0) / 2.0 | |
grid = (grid * 255).to(torch.uint8).permute(0, 2, 3, 1) | |
savepath = os.path.join(savedir, f"{filenames[idx]}.mp4") | |
torchvision.io.write_video( | |
savepath, grid, fps=fps, video_codec="h264", options={"crf": "10"} | |
) |