Spaces:
Running
on
A10G
Running
on
A10G
from dataclasses import dataclass | |
import torch | |
import torch.nn as nn | |
import torch.nn.functional as F | |
from einops import rearrange | |
from vector_quantize_pytorch import GroupedResidualFSQ | |
from .firefly import ConvNeXtBlock | |
class FSQResult: | |
z: torch.Tensor | |
codes: torch.Tensor | |
latents: torch.Tensor | |
class DownsampleFiniteScalarQuantize(nn.Module): | |
def __init__( | |
self, | |
input_dim: int = 512, | |
n_codebooks: int = 9, | |
n_groups: int = 1, | |
levels: tuple[int] = (8, 5, 5, 5), # Approximate 2**10 | |
downsample_factor: tuple[int] = (2, 2), | |
downsample_dims: tuple[int] | None = None, | |
): | |
super().__init__() | |
if downsample_dims is None: | |
downsample_dims = [input_dim for _ in range(len(downsample_factor))] | |
all_dims = (input_dim,) + tuple(downsample_dims) | |
self.residual_fsq = GroupedResidualFSQ( | |
dim=all_dims[-1], | |
levels=levels, | |
num_quantizers=n_codebooks, | |
groups=n_groups, | |
) | |
self.downsample_factor = downsample_factor | |
self.downsample_dims = downsample_dims | |
self.downsample = nn.Sequential( | |
*[ | |
nn.Sequential( | |
nn.Conv1d( | |
all_dims[idx], | |
all_dims[idx + 1], | |
kernel_size=factor, | |
stride=factor, | |
), | |
ConvNeXtBlock(dim=all_dims[idx + 1]), | |
) | |
for idx, factor in enumerate(downsample_factor) | |
] | |
) | |
self.upsample = nn.Sequential( | |
*[ | |
nn.Sequential( | |
nn.ConvTranspose1d( | |
all_dims[idx + 1], | |
all_dims[idx], | |
kernel_size=factor, | |
stride=factor, | |
), | |
ConvNeXtBlock(dim=all_dims[idx]), | |
) | |
for idx, factor in reversed(list(enumerate(downsample_factor))) | |
] | |
) | |
self.apply(self._init_weights) | |
def _init_weights(self, m): | |
if isinstance(m, (nn.Conv1d, nn.Linear)): | |
nn.init.trunc_normal_(m.weight, std=0.02) | |
nn.init.constant_(m.bias, 0) | |
def forward(self, z) -> FSQResult: | |
original_shape = z.shape | |
z = self.downsample(z) | |
quantized, indices = self.residual_fsq(z.mT) | |
result = FSQResult( | |
z=quantized.mT, | |
codes=indices.mT, | |
latents=z, | |
) | |
result.z = self.upsample(result.z) | |
# Pad or crop z to match original shape | |
diff = original_shape[-1] - result.z.shape[-1] | |
left = diff // 2 | |
right = diff - left | |
if diff > 0: | |
result.z = F.pad(result.z, (left, right)) | |
elif diff < 0: | |
result.z = result.z[..., left:-right] | |
return result | |
def encode(self, z): | |
z = self.downsample(z) | |
_, indices = self.residual_fsq(z.mT) | |
indices = rearrange(indices, "g b l r -> b (g r) l") | |
return indices | |
def decode(self, indices: torch.Tensor): | |
indices = rearrange(indices, "b (g r) l -> g b l r", g=self.residual_fsq.groups) | |
z_q = self.residual_fsq.get_output_from_indices(indices) | |
z_q = self.upsample(z_q.mT) | |
return z_q | |
# def from_latents(self, latents: torch.Tensor): | |
# z_q, z_p, codes = super().from_latents(latents) | |
# z_q = self.upsample(z_q) | |
# return z_q, z_p, codes | |
if __name__ == "__main__": | |
rvq = DownsampleFiniteScalarQuantize( | |
n_codebooks=1, | |
downsample_factor=(2, 2), | |
) | |
x = torch.randn(16, 512, 80) | |
result = rvq(x) | |
print(rvq) | |
print(result.latents.shape, result.codes.shape, result.z.shape) | |
# y = rvq.from_codes(result.codes) | |
# print(y[0].shape) | |
# y = rvq.from_latents(result.latents) | |
# print(y[0].shape) | |