|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from typing import Any, Callable, Dict, List, Optional, Union |
|
import logging |
|
from diffusers.image_processor import VaeImageProcessor |
|
import pdb |
|
from diffusers.utils import load_image, make_image_grid |
|
from typing import Dict, Optional, Union |
|
import torchvision.transforms as transforms |
|
import PIL.Image |
|
import numpy as np |
|
import torch |
|
from diffusers import ( |
|
AutoencoderKL, |
|
DDIMScheduler, |
|
DiffusionPipeline, |
|
LCMScheduler, |
|
UNet2DConditionModel, |
|
) |
|
from .duplicate_unet import DoubleUNet2DConditionModel |
|
import os |
|
from torch.nn import Conv2d |
|
from PIL import Image, ImageDraw, ImageFont |
|
from torch.nn.parameter import Parameter |
|
from diffusers.utils import BaseOutput |
|
from PIL import Image |
|
from torch.utils.data import DataLoader, TensorDataset |
|
from torchvision.transforms import InterpolationMode |
|
from torchvision.transforms.functional import pil_to_tensor, resize |
|
from tqdm.auto import tqdm |
|
from transformers import CLIPTextModel, CLIPTokenizer, CLIPTextModelWithProjection |
|
|
|
from .util.batchsize import find_batch_size |
|
from .util.ensemble import ensemble_depth |
|
from .util.image_util import ( |
|
chw2hwc, |
|
colorize_depth_maps, |
|
get_tv_resample_method, |
|
resize_max_res, |
|
) |
|
|
|
def rescale_noise_cfg(noise_cfg, noise_pred_text, guidance_rescale=0.0): |
|
""" |
|
Rescale `noise_cfg` according to `guidance_rescale`. Based on findings of [Common Diffusion Noise Schedules and |
|
Sample Steps are Flawed](https://arxiv.org/pdf/2305.08891.pdf). See Section 3.4 |
|
""" |
|
std_text = noise_pred_text.std(dim=list(range(1, noise_pred_text.ndim)), keepdim=True) |
|
std_cfg = noise_cfg.std(dim=list(range(1, noise_cfg.ndim)), keepdim=True) |
|
|
|
noise_pred_rescaled = noise_cfg * (std_text / std_cfg) |
|
|
|
noise_cfg = guidance_rescale * noise_pred_rescaled + (1 - guidance_rescale) * noise_cfg |
|
return noise_cfg |
|
|
|
class MarigoldDepthOutput(BaseOutput): |
|
""" |
|
Output class for Marigold monocular depth prediction pipeline. |
|
|
|
Args: |
|
depth_np (`np.ndarray`): |
|
Predicted depth map, with depth values in the range of [0, 1]. |
|
depth_colored (`PIL.Image.Image`): |
|
Colorized depth map, with the shape of [3, H, W] and values in [0, 1]. |
|
uncertainty (`None` or `np.ndarray`): |
|
Uncalibrated uncertainty(MAD, median absolute deviation) coming from ensembling. |
|
""" |
|
|
|
depth_np: np.ndarray |
|
depth_colored: Union[None, Image.Image] |
|
uncertainty: Union[None, np.ndarray] |
|
|
|
class MarigoldPipeline(DiffusionPipeline): |
|
""" |
|
Pipeline for monocular depth estimation using Marigold: https://marigoldmonodepth.github.io. |
|
|
|
This model inherits from [`DiffusionPipeline`]. Check the superclass documentation for the generic methods the |
|
library implements for all the pipelines (such as downloading or saving, running on a particular device, etc.) |
|
|
|
Args: |
|
unet (`UNet2DConditionModel`): |
|
Conditional U-Net to denoise the depth latent, conditioned on image latent. |
|
vae (`AutoencoderKL`): |
|
Variational Auto-Encoder (VAE) Model to encode and decode images and depth maps |
|
to and from latent representations. |
|
scheduler (`DDIMScheduler`): |
|
A scheduler to be used in combination with `unet` to denoise the encoded image latents. |
|
text_encoder (`CLIPTextModel`): |
|
Text-encoder, for empty text embedding. |
|
tokenizer (`CLIPTokenizer`): |
|
CLIP tokenizer. |
|
scale_invariant (`bool`, *optional*): |
|
A model property specifying whether the predicted depth maps are scale-invariant. This value must be set in |
|
the model config. When used together with the `shift_invariant=True` flag, the model is also called |
|
"affine-invariant". NB: overriding this value is not supported. |
|
shift_invariant (`bool`, *optional*): |
|
A model property specifying whether the predicted depth maps are shift-invariant. This value must be set in |
|
the model config. When used together with the `scale_invariant=True` flag, the model is also called |
|
"affine-invariant". NB: overriding this value is not supported. |
|
default_denoising_steps (`int`, *optional*): |
|
The minimum number of denoising diffusion steps that are required to produce a prediction of reasonable |
|
quality with the given model. This value must be set in the model config. When the pipeline is called |
|
without explicitly setting `num_inference_steps`, the default value is used. This is required to ensure |
|
reasonable results with various model flavors compatible with the pipeline, such as those relying on very |
|
short denoising schedules (`LCMScheduler`) and those with full diffusion schedules (`DDIMScheduler`). |
|
default_processing_resolution (`int`, *optional*): |
|
The recommended value of the `processing_resolution` parameter of the pipeline. This value must be set in |
|
the model config. When the pipeline is called without explicitly setting `processing_resolution`, the |
|
default value is used. This is required to ensure reasonable results with various model flavors trained |
|
with varying optimal processing resolution values. |
|
""" |
|
|
|
rgb_latent_scale_factor = 0.18215 |
|
depth_latent_scale_factor = 0.18215 |
|
|
|
def __init__( |
|
self, |
|
unet: DoubleUNet2DConditionModel, |
|
vae: AutoencoderKL, |
|
scheduler: Union[DDIMScheduler, LCMScheduler], |
|
text_encoder: CLIPTextModel, |
|
tokenizer: CLIPTokenizer, |
|
scale_invariant: Optional[bool] = True, |
|
shift_invariant: Optional[bool] = True, |
|
default_denoising_steps: Optional[int] = None, |
|
default_processing_resolution: Optional[int] = None, |
|
requires_safety_checker: bool = False, |
|
): |
|
super().__init__() |
|
|
|
self.register_modules( |
|
unet=unet, |
|
vae=vae, |
|
scheduler=scheduler, |
|
text_encoder=text_encoder, |
|
tokenizer=tokenizer, |
|
) |
|
self.register_to_config( |
|
scale_invariant=scale_invariant, |
|
shift_invariant=shift_invariant, |
|
default_denoising_steps=default_denoising_steps, |
|
default_processing_resolution=default_processing_resolution, |
|
) |
|
|
|
self.scale_invariant = scale_invariant |
|
self.shift_invariant = shift_invariant |
|
self.default_denoising_steps = default_denoising_steps |
|
self.default_processing_resolution = default_processing_resolution |
|
|
|
self.empty_text_embed = None |
|
self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1) |
|
self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor) |
|
self.register_to_config(requires_safety_checker=requires_safety_checker) |
|
self.separate_list = [0,0] |
|
|
|
@torch.no_grad() |
|
def __call__( |
|
self, |
|
input_image: Union[Image.Image, torch.Tensor], |
|
denoising_steps: Optional[int] = None, |
|
ensemble_size: int = 5, |
|
processing_res: Optional[int] = None, |
|
match_input_res: bool = True, |
|
resample_method: str = "bilinear", |
|
batch_size: int = 0, |
|
generator: Union[torch.Generator, None] = None, |
|
color_map: str = "Spectral", |
|
show_progress_bar: bool = True, |
|
ensemble_kwargs: Dict = None, |
|
) -> MarigoldDepthOutput: |
|
""" |
|
Function invoked when calling the pipeline. |
|
|
|
Args: |
|
input_image (`Image`): |
|
Input RGB (or gray-scale) image. |
|
denoising_steps (`int`, *optional*, defaults to `None`): |
|
Number of denoising diffusion steps during inference. The default value `None` results in automatic |
|
selection. The number of steps should be at least 10 with the full Marigold models, and between 1 and 4 |
|
for Marigold-LCM models. |
|
ensemble_size (`int`, *optional*, defaults to `10`): |
|
Number of predictions to be ensembled. |
|
processing_res (`int`, *optional*, defaults to `None`): |
|
Effective processing resolution. When set to `0`, processes at the original image resolution. This |
|
produces crisper predictions, but may also lead to the overall loss of global context. The default |
|
value `None` resolves to the optimal value from the model config. |
|
match_input_res (`bool`, *optional*, defaults to `True`): |
|
Resize depth prediction to match input resolution. |
|
Only valid if `processing_res` > 0. |
|
resample_method: (`str`, *optional*, defaults to `bilinear`): |
|
Resampling method used to resize images and depth predictions. This can be one of `bilinear`, `bicubic` or `nearest`, defaults to: `bilinear`. |
|
batch_size (`int`, *optional*, defaults to `0`): |
|
Inference batch size, no bigger than `num_ensemble`. |
|
If set to 0, the script will automatically decide the proper batch size. |
|
generator (`torch.Generator`, *optional*, defaults to `None`) |
|
Random generator for initial noise generation. |
|
show_progress_bar (`bool`, *optional*, defaults to `True`): |
|
Display a progress bar of diffusion denoising. |
|
color_map (`str`, *optional*, defaults to `"Spectral"`, pass `None` to skip colorized depth map generation): |
|
Colormap used to colorize the depth map. |
|
scale_invariant (`str`, *optional*, defaults to `True`): |
|
Flag of scale-invariant prediction, if True, scale will be adjusted from the raw prediction. |
|
shift_invariant (`str`, *optional*, defaults to `True`): |
|
Flag of shift-invariant prediction, if True, shift will be adjusted from the raw prediction, if False, near plane will be fixed at 0m. |
|
ensemble_kwargs (`dict`, *optional*, defaults to `None`): |
|
Arguments for detailed ensembling settings. |
|
Returns: |
|
`MarigoldDepthOutput`: Output class for Marigold monocular depth prediction pipeline, including: |
|
- **depth_np** (`np.ndarray`) Predicted depth map, with depth values in the range of [0, 1] |
|
- **depth_colored** (`PIL.Image.Image`) Colorized depth map, with the shape of [3, H, W] and values in [0, 1], None if `color_map` is `None` |
|
- **uncertainty** (`None` or `np.ndarray`) Uncalibrated uncertainty(MAD, median absolute deviation) |
|
coming from ensembling. None if `ensemble_size = 1` |
|
""" |
|
|
|
if denoising_steps is None: |
|
denoising_steps = self.default_denoising_steps |
|
if processing_res is None: |
|
processing_res = self.default_processing_resolution |
|
|
|
assert processing_res >= 0 |
|
assert ensemble_size >= 1 |
|
|
|
|
|
self._check_inference_step(denoising_steps) |
|
|
|
resample_method: InterpolationMode = get_tv_resample_method(resample_method) |
|
|
|
|
|
|
|
if isinstance(input_image, Image.Image): |
|
input_image = input_image.convert("RGB") |
|
|
|
rgb = pil_to_tensor(input_image) |
|
rgb = rgb.unsqueeze(0) |
|
elif isinstance(input_image, torch.Tensor): |
|
rgb = input_image |
|
else: |
|
raise TypeError(f"Unknown input type: {type(input_image) = }") |
|
input_size = rgb.shape |
|
assert ( |
|
4 == rgb.dim() and 3 == input_size[-3] |
|
), f"Wrong input shape {input_size}, expected [1, rgb, H, W]" |
|
|
|
|
|
if processing_res > 0: |
|
rgb = resize_max_res( |
|
rgb, |
|
max_edge_resolution=processing_res, |
|
resample_method=resample_method, |
|
) |
|
|
|
|
|
rgb_norm: torch.Tensor = rgb / 255.0 * 2.0 - 1.0 |
|
rgb_norm = rgb_norm.to(self.dtype) |
|
assert rgb_norm.min() >= -1.0 and rgb_norm.max() <= 1.0 |
|
|
|
|
|
|
|
duplicated_rgb = rgb_norm.expand(ensemble_size, -1, -1, -1) |
|
single_rgb_dataset = TensorDataset(duplicated_rgb) |
|
if batch_size > 0: |
|
_bs = batch_size |
|
else: |
|
_bs = find_batch_size( |
|
ensemble_size=ensemble_size, |
|
input_res=max(rgb_norm.shape[1:]), |
|
dtype=self.dtype, |
|
) |
|
|
|
single_rgb_loader = DataLoader( |
|
single_rgb_dataset, batch_size=_bs, shuffle=False |
|
) |
|
|
|
|
|
depth_pred_ls = [] |
|
if show_progress_bar: |
|
iterable = tqdm( |
|
single_rgb_loader, desc=" " * 2 + "Inference batches", leave=False |
|
) |
|
else: |
|
iterable = single_rgb_loader |
|
for batch in iterable: |
|
(batched_img,) = batch |
|
depth_pred_raw = self.single_infer( |
|
rgb_in=batched_img, |
|
num_inference_steps=denoising_steps, |
|
show_pbar=show_progress_bar, |
|
generator=generator, |
|
) |
|
depth_pred_ls.append(depth_pred_raw.detach()) |
|
depth_preds = torch.concat(depth_pred_ls, dim=0) |
|
torch.cuda.empty_cache() |
|
|
|
|
|
if ensemble_size > 1: |
|
depth_pred, pred_uncert = ensemble_depth( |
|
depth_preds, |
|
scale_invariant=self.scale_invariant, |
|
shift_invariant=self.shift_invariant, |
|
max_res=50, |
|
**(ensemble_kwargs or {}), |
|
) |
|
else: |
|
depth_pred = depth_preds |
|
pred_uncert = None |
|
|
|
|
|
if match_input_res: |
|
depth_pred = resize( |
|
depth_pred, |
|
input_size[-2:], |
|
interpolation=resample_method, |
|
antialias=True, |
|
) |
|
|
|
|
|
depth_pred = depth_pred.squeeze() |
|
depth_pred = depth_pred.cpu().numpy() |
|
if pred_uncert is not None: |
|
pred_uncert = pred_uncert.squeeze().cpu().numpy() |
|
|
|
|
|
depth_pred = depth_pred.clip(0, 1) |
|
|
|
|
|
if color_map is not None: |
|
depth_colored = colorize_depth_maps( |
|
depth_pred, 0, 1, cmap=color_map |
|
).squeeze() |
|
depth_colored = (depth_colored * 255).astype(np.uint8) |
|
depth_colored_hwc = chw2hwc(depth_colored) |
|
depth_colored_img = Image.fromarray(depth_colored_hwc) |
|
else: |
|
depth_colored_img = None |
|
|
|
return MarigoldDepthOutput( |
|
depth_np=depth_pred, |
|
depth_colored=depth_colored_img, |
|
uncertainty=pred_uncert, |
|
) |
|
|
|
def _replace_unet_conv_in(self): |
|
|
|
_weight = self.unet.conv_in.weight.clone() |
|
_bias = self.unet.conv_in.bias.clone() |
|
zero_weight = torch.zeros(_weight.shape).to(_weight.device) |
|
_weight = torch.cat([_weight, zero_weight], dim=1) |
|
|
|
|
|
|
|
|
|
_n_convin_out_channel = self.unet.conv_in.out_channels |
|
_new_conv_in = Conv2d( |
|
8, _n_convin_out_channel, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1) |
|
) |
|
_new_conv_in.weight = Parameter(_weight) |
|
_new_conv_in.bias = Parameter(_bias) |
|
self.unet.conv_in = _new_conv_in |
|
logging.info("Unet conv_in layer is replaced") |
|
|
|
self.unet.config["in_channels"] = 8 |
|
logging.info("Unet config is updated") |
|
return |
|
|
|
def _replace_unet_conv_out(self): |
|
|
|
_weight = self.unet.conv_out.weight.clone() |
|
_bias = self.unet.conv_out.bias.clone() |
|
_weight = _weight.repeat((2, 1, 1, 1)) |
|
_bias = _bias.repeat((2)) |
|
|
|
|
|
_n_convin_out_channel = self.unet.conv_out.out_channels |
|
_new_conv_out = Conv2d( |
|
_n_convin_out_channel, 8, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1) |
|
) |
|
_new_conv_out.weight = Parameter(_weight) |
|
_new_conv_out.bias = Parameter(_bias) |
|
self.unet.conv_out = _new_conv_out |
|
logging.info("Unet conv_out layer is replaced") |
|
|
|
self.unet.config["out_channels"] = 8 |
|
logging.info("Unet config is updated") |
|
return |
|
|
|
def _check_inference_step(self, n_step: int) -> None: |
|
""" |
|
Check if denoising step is reasonable |
|
Args: |
|
n_step (`int`): denoising steps |
|
""" |
|
assert n_step >= 1 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def encode_empty_text(self): |
|
""" |
|
Encode text embedding for empty prompt |
|
""" |
|
prompt = "" |
|
text_inputs = self.tokenizer( |
|
prompt, |
|
padding="max_length", |
|
max_length=self.tokenizer.model_max_length, |
|
truncation=True, |
|
return_tensors="pt", |
|
) |
|
text_input_ids = text_inputs.input_ids.to(self.text_encoder.device) |
|
self.empty_text_embed = self.text_encoder(text_input_ids)[0].to(self.dtype) |
|
|
|
def encode_text(self, prompt): |
|
""" |
|
Encode text embedding for empty prompt |
|
""" |
|
text_inputs = self.tokenizer( |
|
prompt, |
|
padding="max_length", |
|
max_length=self.tokenizer.model_max_length, |
|
truncation=True, |
|
return_tensors="pt", |
|
) |
|
text_input_ids = text_inputs.input_ids.to(self.text_encoder.device) |
|
text_embed = self.text_encoder(text_input_ids)[0].to(self.dtype) |
|
return text_embed |
|
|
|
def numpy_to_pil(self, images: np.ndarray) -> PIL.Image.Image: |
|
""" |
|
Convert a numpy image or a batch of images to a PIL image. |
|
""" |
|
if images.ndim == 3: |
|
images = images[None, ...] |
|
images = (images * 255).round().astype("uint8") |
|
if images.shape[-1] == 1: |
|
|
|
pil_images = [Image.fromarray(image.squeeze(), mode="L") for image in images] |
|
else: |
|
pil_images = [Image.fromarray(image) for image in images] |
|
|
|
return pil_images |
|
|
|
@torch.no_grad() |
|
def generate_rgbd( |
|
self, |
|
prompt: str or list, |
|
num_inference_steps: int, |
|
generator: Union[torch.Generator, None], |
|
show_pbar: bool = None, |
|
color_map: str = "Spectral", |
|
height: int = 60, |
|
width: int = 80 |
|
): |
|
""" |
|
Perform an individual depth prediction without ensembling. |
|
|
|
Args: |
|
rgb_in (`torch.Tensor`): |
|
Input RGB image. |
|
num_inference_steps (`int`): |
|
Number of diffusion denoisign steps (DDIM) during inference. |
|
show_pbar (`bool`): |
|
Display a progress bar of diffusion denoising. |
|
generator (`torch.Generator`) |
|
Random generator for initial noise generation. |
|
Returns: |
|
`torch.Tensor`: Predicted depth map. |
|
""" |
|
device = self.device |
|
ori_type = self.dtype |
|
|
|
|
|
self.scheduler.set_timesteps(num_inference_steps, device=device) |
|
timesteps = self.scheduler.timesteps |
|
|
|
if isinstance(prompt, list): |
|
bs = len(prompt) |
|
batch_text_embed = [] |
|
for p in prompt: |
|
batch_text_embed.append(self.encode_text(p)) |
|
batch_text_embed = torch.cat(batch_text_embed, dim=0) |
|
elif isinstance(prompt, str): |
|
bs = 1 |
|
batch_text_embed = self.encode_text(prompt).unsqueeze(0) |
|
else: |
|
raise NotImplementedError |
|
|
|
if self.empty_text_embed is None: |
|
self.encode_empty_text() |
|
batch_empty_text_embed = self.empty_text_embed.repeat( |
|
(batch_text_embed.shape[0], 1, 1) |
|
).to(device) |
|
|
|
text_embed = torch.cat([batch_empty_text_embed, batch_text_embed], dim=0) |
|
|
|
|
|
cat_latent = torch.randn( |
|
[bs, self.unet.config["in_channels"], height, width], |
|
device=device, |
|
dtype=torch.bfloat16, |
|
generator=generator, |
|
) * self.scheduler.init_noise_sigma |
|
|
|
|
|
if show_pbar: |
|
iterable = tqdm( |
|
enumerate(timesteps), |
|
total=len(timesteps), |
|
leave=False, |
|
desc=" " * 4 + "Diffusion denoising", |
|
) |
|
else: |
|
iterable = enumerate(timesteps) |
|
|
|
self.to(torch.bfloat16) |
|
for i, t in iterable: |
|
latent_model_input = torch.cat([cat_latent] * 2) |
|
latent_model_input = self.scheduler.scale_model_input(latent_model_input, t) |
|
|
|
|
|
with torch.no_grad(): |
|
noise_pred = self.unet( |
|
latent_model_input, |
|
t, |
|
t, |
|
encoder_hidden_states=text_embed.to(torch.bfloat16), |
|
return_dict=False, |
|
|
|
)[0] |
|
|
|
guidance_scale = 7.5 |
|
noise_pred_uncond, noise_pred_text = noise_pred.chunk(2) |
|
noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond) |
|
|
|
|
|
cat_latent = self.scheduler.step(noise_pred, t, cat_latent).prev_sample |
|
|
|
|
|
|
|
|
|
image = self.decode_image(cat_latent[:, 0:4, :, :]) |
|
|
|
image = self.numpy_to_pil(image) |
|
|
|
depth = self.decode_depth(cat_latent[:, 4:, :, :]) |
|
depth = (depth - depth.min()) / (depth.max() - depth.min()) |
|
|
|
|
|
depth_pred = depth.squeeze() |
|
depth_pred = depth_pred.float().cpu().numpy() |
|
depth_pred = depth_pred.clip(0, 1) |
|
|
|
|
|
if color_map is not None: |
|
depth_colored_img = [] |
|
depth_colored = colorize_depth_maps( |
|
depth_pred, 0, 1, cmap=color_map |
|
).squeeze() |
|
depth_colored_img = self.numpy_to_pil(np.transpose(depth_colored, (0, 2, 3, 1))) |
|
else: |
|
depth_colored_img = None |
|
|
|
rgbd_images = self.post_process_rgbd(prompt, image, depth_colored_img) |
|
self.to(ori_type) |
|
|
|
return rgbd_images |
|
|
|
@torch.no_grad() |
|
def image2depth(self, |
|
input_image: Union[Image.Image, torch.Tensor], |
|
denoising_steps: Optional[int] = None, |
|
ensemble_size: int = 5, |
|
processing_res: Optional[int] = None, |
|
match_input_res: bool = True, |
|
resample_method: str = "bilinear", |
|
batch_size: int = 0, |
|
generator: Union[torch.Generator, None] = None, |
|
color_map: str = "Spectral", |
|
show_progress_bar: bool = True, |
|
ensemble_kwargs: Dict = None, |
|
cfg_scale: float = 1.0 |
|
): |
|
|
|
if denoising_steps is None: |
|
denoising_steps = self.default_denoising_steps |
|
if processing_res is None: |
|
processing_res = self.default_processing_resolution |
|
|
|
ori_type = self.dtype |
|
self.to(torch.bfloat16) |
|
|
|
assert processing_res >= 0 |
|
assert ensemble_size >= 1 |
|
|
|
|
|
self._check_inference_step(denoising_steps) |
|
|
|
resample_method: InterpolationMode = get_tv_resample_method(resample_method) |
|
|
|
|
|
|
|
if isinstance(input_image, Image.Image): |
|
input_image = input_image.convert("RGB") |
|
|
|
rgb = pil_to_tensor(input_image) |
|
rgb = rgb.unsqueeze(0) |
|
elif isinstance(input_image, torch.Tensor): |
|
rgb = input_image |
|
else: |
|
raise TypeError(f"Unknown input type: {type(input_image) = }") |
|
input_size = rgb.shape |
|
assert ( |
|
4 == rgb.dim() and 3 == input_size[-3] |
|
), f"Wrong input shape {input_size}, expected [1, rgb, H, W]" |
|
|
|
|
|
if processing_res > 0: |
|
rgb = resize_max_res( |
|
rgb, |
|
max_edge_resolution=processing_res, |
|
resample_method=resample_method, |
|
) |
|
|
|
|
|
rgb_norm: torch.Tensor = rgb / 255.0 * 2.0 - 1.0 |
|
rgb_norm = rgb_norm.to(self.dtype) |
|
assert rgb_norm.min() >= -1.0 and rgb_norm.max() <= 1.0 |
|
|
|
|
|
|
|
duplicated_rgb = rgb_norm.expand(ensemble_size, -1, -1, -1) |
|
single_rgb_dataset = TensorDataset(duplicated_rgb) |
|
if batch_size > 0: |
|
_bs = batch_size |
|
else: |
|
_bs = find_batch_size( |
|
ensemble_size=ensemble_size, |
|
input_res=max(rgb_norm.shape[1:]), |
|
dtype=self.dtype, |
|
) |
|
|
|
single_rgb_loader = DataLoader( |
|
single_rgb_dataset, batch_size=_bs, shuffle=False |
|
) |
|
|
|
|
|
depth_pred_ls = [] |
|
if show_progress_bar: |
|
iterable = tqdm( |
|
single_rgb_loader, desc=" " * 2 + "Inference batches", leave=False |
|
) |
|
else: |
|
iterable = single_rgb_loader |
|
for batch in iterable: |
|
(batched_img,) = batch |
|
depth_pred_raw = self.single_image2depth( |
|
rgb_in=batched_img, |
|
num_inference_steps=denoising_steps, |
|
show_pbar=show_progress_bar, |
|
generator=generator, |
|
cfg_scale=cfg_scale |
|
) |
|
depth_pred_ls.append(depth_pred_raw.detach()) |
|
depth_preds = torch.concat(depth_pred_ls, dim=0) |
|
torch.cuda.empty_cache() |
|
depth_preds = depth_preds.to(torch.float32) |
|
|
|
if ensemble_size > 1: |
|
depth_pred, pred_uncert = ensemble_depth( |
|
depth_preds, |
|
scale_invariant=self.scale_invariant, |
|
shift_invariant=self.shift_invariant, |
|
max_res=50, |
|
**(ensemble_kwargs or {}), |
|
) |
|
else: |
|
depth_pred = depth_preds |
|
pred_uncert = None |
|
|
|
|
|
if match_input_res: |
|
depth_pred = resize( |
|
depth_pred, |
|
input_size[-2:], |
|
interpolation=resample_method, |
|
antialias=True, |
|
) |
|
|
|
|
|
depth_pred = depth_pred.squeeze() |
|
depth_pred = depth_pred.cpu().numpy() |
|
if pred_uncert is not None: |
|
pred_uncert = pred_uncert.squeeze().cpu().numpy() |
|
|
|
|
|
depth_pred = depth_pred.clip(0, 1) |
|
|
|
|
|
if color_map is not None: |
|
depth_colored = colorize_depth_maps( |
|
depth_pred, 0, 1, cmap=color_map |
|
).squeeze() |
|
depth_colored = (depth_colored * 255).astype(np.uint8) |
|
depth_colored_hwc = chw2hwc(depth_colored) |
|
depth_colored_img = Image.fromarray(depth_colored_hwc) |
|
else: |
|
depth_colored_img = None |
|
|
|
self.to(ori_type) |
|
|
|
return MarigoldDepthOutput( |
|
depth_np=depth_pred, |
|
depth_colored=depth_colored_img, |
|
uncertainty=pred_uncert, |
|
) |
|
|
|
@torch.no_grad() |
|
def single_image2depth( |
|
self, |
|
rgb_in: torch.Tensor, |
|
num_inference_steps: int, |
|
generator: Union[torch.Generator, None], |
|
show_pbar: bool, |
|
cfg_scale: float = 1.0 |
|
) -> torch.Tensor: |
|
""" |
|
Perform an individual depth prediction without ensembling. |
|
|
|
Args: |
|
rgb_in (`torch.Tensor`): |
|
Input RGB image. |
|
num_inference_steps (`int`): |
|
Number of diffusion denoisign steps (DDIM) during inference. |
|
show_pbar (`bool`): |
|
Display a progress bar of diffusion denoising. |
|
generator (`torch.Generator`) |
|
Random generator for initial noise generation. |
|
Returns: |
|
`torch.Tensor`: Predicted depth map. |
|
""" |
|
device = self.device |
|
rgb_in = rgb_in.to(device) |
|
|
|
|
|
self.scheduler.set_timesteps(num_inference_steps, device=device) |
|
timesteps = self.scheduler.timesteps |
|
|
|
rgb_latent = self.encode_rgb(rgb_in) |
|
|
|
|
|
depth_latent = torch.randn( |
|
rgb_latent.shape, |
|
device=device, |
|
dtype=self.dtype, |
|
generator=generator, |
|
) * self.scheduler.init_noise_sigma |
|
|
|
|
|
if self.empty_text_embed is None: |
|
self.encode_empty_text() |
|
batch_empty_text_embed = self.empty_text_embed.repeat( |
|
(rgb_latent.shape[0], 1, 1) |
|
).to(device).to(self.dtype) |
|
|
|
|
|
if show_pbar: |
|
iterable = tqdm( |
|
enumerate(timesteps), |
|
total=len(timesteps), |
|
leave=False, |
|
desc=" " * 4 + "Diffusion denoising", |
|
) |
|
else: |
|
iterable = enumerate(timesteps) |
|
|
|
for i, t in iterable: |
|
unet_input = torch.cat( |
|
[rgb_latent, depth_latent], dim=1 |
|
) |
|
|
|
noise_pred = self.unet( |
|
unet_input, rgb_timestep=0, depth_timestep=t, encoder_hidden_states=batch_empty_text_embed |
|
).sample |
|
|
|
if cfg_scale > 1: |
|
uncond_noise_pred = self.unet( |
|
unet_input, rgb_timestep=0, depth_timestep=t, encoder_hidden_states=batch_empty_text_embed, rgb2depth_scale=0.3 |
|
).sample |
|
|
|
uncond_pred = uncond_noise_pred[:, 4:, :, :] |
|
cond_pred = noise_pred[:, 4:, :, :] |
|
|
|
cond_pred = uncond_pred + cfg_scale * (cond_pred - uncond_pred) |
|
else: |
|
cond_pred = noise_pred[:, 4:, :, :] |
|
|
|
|
|
depth_latent = self.scheduler.step( |
|
cond_pred, t, depth_latent |
|
).prev_sample |
|
|
|
depth = self.decode_depth(depth_latent) |
|
|
|
|
|
depth = torch.clip(depth, -1.0, 1.0) |
|
|
|
depth = (depth + 1.0) / 2.0 |
|
|
|
return depth |
|
@torch.no_grad() |
|
def rgbd2rgbd(self, |
|
input_image:[torch.Tensor, PIL.Image.Image], |
|
depth_image:[torch.Tensor, PIL.Image.Image], |
|
prompt: str = '', |
|
guidance_scale: float = 7.5, |
|
strength: float = 0.75, |
|
generator: Union[torch.Generator, None] = None, |
|
num_inference_steps: int = 50, |
|
show_pbar: bool = False, |
|
resample_method: str = "bilinear", |
|
processing_res: int = 768 |
|
) -> torch.Tensor: |
|
self._check_inference_step(num_inference_steps) |
|
|
|
resample_method: InterpolationMode = get_tv_resample_method(resample_method) |
|
|
|
|
|
if isinstance(prompt, list): |
|
bs = len(prompt) |
|
batch_text_embed = [] |
|
for p in prompt: |
|
batch_text_embed.append(self.encode_text(p)) |
|
batch_text_embed = torch.cat(batch_text_embed, dim=0) |
|
elif isinstance(prompt, str): |
|
bs = 1 |
|
batch_text_embed = self.encode_text(prompt).unsqueeze(0) |
|
else: |
|
raise NotImplementedError |
|
|
|
if self.empty_text_embed is None: |
|
self.encode_empty_text() |
|
batch_empty_text_embed = self.empty_text_embed.repeat( |
|
(batch_text_embed.shape[0], 1, 1) |
|
).to(self.device) |
|
text_embed = torch.cat([batch_empty_text_embed, batch_text_embed], dim=0) |
|
|
|
|
|
|
|
rgb = input_image |
|
input_size = rgb.shape |
|
assert ( |
|
4 == rgb.dim() and 3 == input_size[-3] |
|
), f"Wrong input shape {input_size}, expected [1, rgb, H, W]" |
|
if processing_res > 0: |
|
rgb = resize_max_res( |
|
rgb, |
|
max_edge_resolution=processing_res, |
|
resample_method=resample_method, |
|
) |
|
rgb_norm: torch.Tensor = rgb / 255.0 * 2.0 - 1.0 |
|
rgb_in = rgb_norm.to(self.dtype).to(self.device) |
|
assert rgb_norm.min() >= -1.0 and rgb_norm.max() <= 1.0 |
|
|
|
depth = depth_image |
|
depth = depth.repeat(1, 3, 1, 1) |
|
input_size = depth.shape |
|
assert ( |
|
4 == depth.dim() and 3 == input_size[-3] |
|
), f"Wrong input shape {input_size}, expected [1, rgb, H, W]" |
|
if processing_res > 0: |
|
depth = resize_max_res( |
|
depth, |
|
max_edge_resolution=processing_res, |
|
resample_method=resample_method, |
|
) |
|
depth_norm: torch.Tensor = (depth - depth.min()) / (depth.max() - depth.min()) * 2.0 - 1.0 |
|
depth_in = depth_norm.to(self.dtype).to(self.device) |
|
assert depth_norm.min() >= -1.0 and depth_norm.max() <= 1.0 |
|
|
|
|
|
self.scheduler.set_timesteps(num_inference_steps, device=self.device) |
|
init_timestep = min(int(num_inference_steps * strength), num_inference_steps) |
|
t_start = max(num_inference_steps - init_timestep, 0) |
|
timesteps = self.scheduler.timesteps[t_start * self.scheduler.order:] |
|
num_inference_steps = num_inference_steps - t_start |
|
latent_timestep = timesteps[:1] |
|
|
|
|
|
rgb_latent = self.encode_rgb(rgb_in) |
|
depth_latent = self.encode_depth(depth_in) |
|
input_latent = torch.cat([rgb_latent, depth_latent], dim=1) |
|
noise = torch.randn( |
|
input_latent.shape, |
|
device=self.device, |
|
dtype=self.dtype, |
|
generator=generator, |
|
) |
|
|
|
cat_latent = self.scheduler.add_noise(input_latent, noise, latent_timestep) |
|
|
|
|
|
|
|
for i, t in enumerate(timesteps): |
|
latent_model_input = torch.cat([cat_latent] * 2) |
|
latent_model_input = self.scheduler.scale_model_input(latent_model_input, t) |
|
|
|
|
|
with torch.no_grad(): |
|
noise_pred = self.unet( |
|
latent_model_input, |
|
rgb_timestep=t, |
|
depth_timestep=t, |
|
encoder_hidden_states=text_embed, |
|
return_dict=False, |
|
|
|
)[0] |
|
|
|
noise_pred_uncond, noise_pred_text = noise_pred.chunk(2) |
|
noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond) |
|
|
|
|
|
cat_latent = self.scheduler.step(noise_pred, t, cat_latent).prev_sample |
|
|
|
image = self.decode_image(cat_latent[:, :4, :, :]) |
|
image = self.numpy_to_pil(image) |
|
d_image = self.decode_depth(cat_latent[:, 4:, :, :]) |
|
d_image = d_image.cpu().permute(0, 2, 3, 1).numpy() |
|
for i in range(len(prompt)): |
|
d_image[i] = (d_image[i] - d_image[i].min()) / (d_image[i].max() - d_image[i].min()) |
|
d_image = self.numpy_to_pil(d_image) |
|
|
|
cat_image = make_image_grid([image[0], d_image[0]], rows=1, cols=2) |
|
return cat_image |
|
|
|
@torch.no_grad() |
|
def single_depth2image( |
|
self, |
|
depth_image: [torch.Tensor, PIL.Image.Image], |
|
prompt: str = '', |
|
generator: Union[torch.Generator, None] = None, |
|
num_inference_steps: int = 50, |
|
show_pbar: bool = False, |
|
resample_method: str = "bilinear", |
|
processing_res: int = 640 |
|
) -> torch.Tensor: |
|
""" |
|
Perform an individual depth prediction without ensembling. |
|
|
|
Args: |
|
rgb_in (`torch.Tensor`): |
|
Input RGB image. |
|
num_inference_steps (`int`): |
|
Number of diffusion denoisign steps (DDIM) during inference. |
|
show_pbar (`bool`): |
|
Display a progress bar of diffusion denoising. |
|
generator (`torch.Generator`) |
|
Random generator for initial noise generation. |
|
Returns: |
|
`torch.Tensor`: Predicted depth map. |
|
""" |
|
device = self.device |
|
ori_type = self.dtype |
|
|
|
self._check_inference_step(num_inference_steps) |
|
|
|
resample_method: InterpolationMode = get_tv_resample_method(resample_method) |
|
|
|
|
|
|
|
if isinstance(depth_image, Image.Image): |
|
depth = pil_to_tensor(depth_image) |
|
depth = depth.unsqueeze(0) |
|
elif isinstance(depth_image, torch.Tensor): |
|
depth = depth_image |
|
else: |
|
raise TypeError(f"Unknown input type: {type(depth_image) = }") |
|
depth = depth.repeat(1, 3, 1, 1) |
|
input_size = depth.shape |
|
assert ( |
|
4 == depth.dim() and 3 == input_size[-3] |
|
), f"Wrong input shape {input_size}, expected [1, rgb, H, W]" |
|
|
|
|
|
if processing_res > 0: |
|
depth = resize_max_res( |
|
depth, |
|
max_edge_resolution=processing_res, |
|
resample_method=resample_method, |
|
) |
|
|
|
|
|
depth_norm: torch.Tensor = depth / 255.0 * 2.0 - 1.0 |
|
assert depth_norm.min() >= -1.0 and depth_norm.max() <= 1.0 |
|
depth_in = depth_norm.to(ori_type).to(device) |
|
|
|
|
|
self.scheduler.set_timesteps(num_inference_steps, device=device) |
|
timesteps = self.scheduler.timesteps |
|
|
|
|
|
depth_latent = self.encode_depth(depth_in) |
|
|
|
|
|
rgb_latent = torch.randn( |
|
depth_latent.shape, |
|
device=device, |
|
dtype=ori_type, |
|
generator=generator, |
|
) * self.scheduler.init_noise_sigma |
|
|
|
|
|
if isinstance(prompt, list): |
|
bs = len(prompt) |
|
batch_text_embed = [] |
|
for p in prompt: |
|
batch_text_embed.append(self.encode_text(p)) |
|
batch_text_embed = torch.cat(batch_text_embed, dim=0) |
|
elif isinstance(prompt, str): |
|
bs = 1 |
|
batch_text_embed = self.encode_text(prompt) |
|
else: |
|
raise NotImplementedError |
|
|
|
if self.empty_text_embed is None: |
|
self.encode_empty_text() |
|
batch_empty_text_embed = self.empty_text_embed.repeat((batch_text_embed.shape[0], 1, 1)).to(device) |
|
|
|
text_embed = torch.cat([batch_empty_text_embed, batch_text_embed], dim=0) |
|
|
|
|
|
if show_pbar: |
|
iterable = tqdm( |
|
enumerate(timesteps), |
|
total=len(timesteps), |
|
leave=False, |
|
desc=" " * 4 + "Diffusion denoising", |
|
) |
|
else: |
|
iterable = enumerate(timesteps) |
|
|
|
self.unet.to(torch.bfloat16) |
|
for i, t in iterable: |
|
cat_latent = torch.cat( |
|
[rgb_latent, depth_latent], dim=1 |
|
) |
|
latent_model_input = torch.cat([cat_latent] * 2) |
|
|
|
with torch.no_grad(): |
|
noise_pred = self.unet( |
|
latent_model_input.to(torch.bfloat16), |
|
rgb_timestep=t, |
|
depth_timestep=0, |
|
encoder_hidden_states=text_embed.to(torch.bfloat16), |
|
return_dict=False, |
|
)[0] |
|
|
|
|
|
guidance_scale = 7.5 |
|
noise_pred_uncond, noise_pred_text = noise_pred.chunk(2) |
|
noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond) |
|
|
|
|
|
rgb_latent = self.scheduler.step(noise_pred[:, :4, :, :], t, rgb_latent).prev_sample |
|
|
|
image = self.decode_image(rgb_latent) |
|
image = self.numpy_to_pil(image)[0] |
|
image = image.resize((input_size[-1], input_size[-2]), Image.BILINEAR) |
|
|
|
self.unet.to(ori_type) |
|
|
|
return image |
|
|
|
def encode_rgb(self, rgb_in: torch.Tensor) -> torch.Tensor: |
|
""" |
|
Encode RGB image into latent. |
|
|
|
Args: |
|
rgb_in (`torch.Tensor`): |
|
Input RGB image to be encoded. |
|
|
|
Returns: |
|
`torch.Tensor`: Image latent. |
|
""" |
|
|
|
h = self.vae.encoder(rgb_in) |
|
moments = self.vae.quant_conv(h) |
|
mean, logvar = torch.chunk(moments, 2, dim=1) |
|
|
|
rgb_latent = mean * self.rgb_latent_scale_factor |
|
return rgb_latent |
|
|
|
def encode_depth(self, depth_in: torch.Tensor) -> torch.Tensor: |
|
""" |
|
Encode RGB image into latent. |
|
|
|
Args: |
|
rgb_in (`torch.Tensor`): |
|
Input RGB image to be encoded. |
|
|
|
Returns: |
|
`torch.Tensor`: Image latent. |
|
""" |
|
|
|
h = self.vae.encoder(depth_in) |
|
moments = self.vae.quant_conv(h) |
|
mean, logvar = torch.chunk(moments, 2, dim=1) |
|
|
|
depth_latent = mean * self.depth_latent_scale_factor |
|
return depth_latent |
|
|
|
def decode_image(self, latents): |
|
latents = 1 / self.vae.config.scaling_factor * latents |
|
z = self.vae.post_quant_conv(latents) |
|
image = self.vae.decoder(z) |
|
image = (image / 2 + 0.5).clamp(0, 1) |
|
|
|
image = image.cpu().permute(0, 2, 3, 1).float().numpy() |
|
return image |
|
|
|
def decode_depth(self, depth_latent: torch.Tensor) -> torch.Tensor: |
|
""" |
|
Decode depth latent into depth map. |
|
|
|
Args: |
|
depth_latent (`torch.Tensor`): |
|
Depth latent to be decoded. |
|
|
|
Returns: |
|
`torch.Tensor`: Decoded depth map. |
|
""" |
|
|
|
depth_latent = depth_latent / self.depth_latent_scale_factor |
|
|
|
z = self.vae.post_quant_conv(depth_latent) |
|
stacked = self.vae.decoder(z) |
|
|
|
depth_mean = stacked.mean(dim=1, keepdim=True) |
|
return depth_mean |
|
|
|
def post_process_rgbd(self, prompts, rgb_image, depth_image): |
|
|
|
rgbd_images = [] |
|
for idx, p in enumerate(prompts): |
|
image1, image2 = rgb_image[idx], depth_image[idx] |
|
|
|
width1, height1 = image1.size |
|
width2, height2 = image2.size |
|
|
|
font = ImageFont.load_default(size=20) |
|
text = p |
|
draw = ImageDraw.Draw(image1) |
|
text_bbox = draw.textbbox((0, 0), text, font=font) |
|
text_width = text_bbox[2] - text_bbox[0] |
|
text_height = text_bbox[3] - text_bbox[1] |
|
|
|
new_image = Image.new('RGB', (width1 + width2, max(height1, height2) + text_height), (255, 255, 255)) |
|
|
|
text_x = (new_image.width - text_width) // 2 |
|
text_y = 0 |
|
draw = ImageDraw.Draw(new_image) |
|
draw.text((text_x, text_y), text, fill="black", font=font) |
|
|
|
new_image.paste(image1, (0, text_height)) |
|
new_image.paste(image2, (width1, text_height)) |
|
|
|
rgbd_images.append(pil_to_tensor(new_image)) |
|
|
|
return rgbd_images |
|
|