Spaces:
Running
on
A10G
Running
on
A10G
File size: 4,000 Bytes
0a3525d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
from dataclasses import dataclass
import torch
import torch.nn as nn
import torch.nn.functional as F
from einops import rearrange
from vector_quantize_pytorch import GroupedResidualFSQ
from .firefly import ConvNeXtBlock
@dataclass
class FSQResult:
z: torch.Tensor
codes: torch.Tensor
latents: torch.Tensor
class DownsampleFiniteScalarQuantize(nn.Module):
def __init__(
self,
input_dim: int = 512,
n_codebooks: int = 9,
n_groups: int = 1,
levels: tuple[int] = (8, 5, 5, 5), # Approximate 2**10
downsample_factor: tuple[int] = (2, 2),
downsample_dims: tuple[int] | None = None,
):
super().__init__()
if downsample_dims is None:
downsample_dims = [input_dim for _ in range(len(downsample_factor))]
all_dims = (input_dim,) + tuple(downsample_dims)
self.residual_fsq = GroupedResidualFSQ(
dim=all_dims[-1],
levels=levels,
num_quantizers=n_codebooks,
groups=n_groups,
)
self.downsample_factor = downsample_factor
self.downsample_dims = downsample_dims
self.downsample = nn.Sequential(
*[
nn.Sequential(
nn.Conv1d(
all_dims[idx],
all_dims[idx + 1],
kernel_size=factor,
stride=factor,
),
ConvNeXtBlock(dim=all_dims[idx + 1]),
)
for idx, factor in enumerate(downsample_factor)
]
)
self.upsample = nn.Sequential(
*[
nn.Sequential(
nn.ConvTranspose1d(
all_dims[idx + 1],
all_dims[idx],
kernel_size=factor,
stride=factor,
),
ConvNeXtBlock(dim=all_dims[idx]),
)
for idx, factor in reversed(list(enumerate(downsample_factor)))
]
)
self.apply(self._init_weights)
def _init_weights(self, m):
if isinstance(m, (nn.Conv1d, nn.Linear)):
nn.init.trunc_normal_(m.weight, std=0.02)
nn.init.constant_(m.bias, 0)
def forward(self, z) -> FSQResult:
original_shape = z.shape
z = self.downsample(z)
quantized, indices = self.residual_fsq(z.mT)
result = FSQResult(
z=quantized.mT,
codes=indices.mT,
latents=z,
)
result.z = self.upsample(result.z)
# Pad or crop z to match original shape
diff = original_shape[-1] - result.z.shape[-1]
left = diff // 2
right = diff - left
if diff > 0:
result.z = F.pad(result.z, (left, right))
elif diff < 0:
result.z = result.z[..., left:-right]
return result
def encode(self, z):
z = self.downsample(z)
_, indices = self.residual_fsq(z.mT)
indices = rearrange(indices, "g b l r -> b (g r) l")
return indices
def decode(self, indices: torch.Tensor):
indices = rearrange(indices, "b (g r) l -> g b l r", g=self.residual_fsq.groups)
z_q = self.residual_fsq.get_output_from_indices(indices)
z_q = self.upsample(z_q.mT)
return z_q
# def from_latents(self, latents: torch.Tensor):
# z_q, z_p, codes = super().from_latents(latents)
# z_q = self.upsample(z_q)
# return z_q, z_p, codes
if __name__ == "__main__":
rvq = DownsampleFiniteScalarQuantize(
n_codebooks=1,
downsample_factor=(2, 2),
)
x = torch.randn(16, 512, 80)
result = rvq(x)
print(rvq)
print(result.latents.shape, result.codes.shape, result.z.shape)
# y = rvq.from_codes(result.codes)
# print(y[0].shape)
# y = rvq.from_latents(result.latents)
# print(y[0].shape)
|