|
|
|
|
|
import math |
|
from functools import partial |
|
import json |
|
import os |
|
from collections import namedtuple |
|
import torch |
|
import torch.nn as nn |
|
from dataclasses import dataclass, field |
|
from mamba_ssm.modules.mamba_simple import Mamba, Block |
|
from mamba_ssm.utils.generation import GenerationMixin |
|
from mamba_ssm.utils.hf import load_config_hf, load_state_dict_hf |
|
|
|
try: |
|
from mamba_ssm.ops.triton.layernorm import RMSNorm, layer_norm_fn, rms_norm_fn |
|
except ImportError: |
|
RMSNorm, layer_norm_fn, rms_norm_fn = None, None, None |
|
|
|
_MODEL_REGISTRY = {} |
|
|
|
|
|
def register_model(name): |
|
def register_model_cls(cls): |
|
if name in _MODEL_REGISTRY: |
|
raise ValueError(f"Duplicate model name {name}") |
|
if not issubclass(cls, nn.Module): |
|
raise ValueError(f"Model {cls.__name__} does not inherit from nn.Module") |
|
_MODEL_REGISTRY[name] = cls |
|
return cls |
|
|
|
return register_model_cls |
|
|
|
|
|
@dataclass |
|
class MambaConfig: |
|
model_type = "mamba" |
|
d_model: int = 2560 |
|
n_layer: int = 64 |
|
vocab_size: int = 50277 |
|
ssm_cfg: dict = field(default_factory=dict) |
|
rms_norm: bool = True |
|
residual_in_fp32: bool = True |
|
fused_add_norm: bool = True |
|
pad_vocab_size_multiple: int = 8 |
|
|
|
|
|
def create_block( |
|
d_model, |
|
ssm_cfg=None, |
|
norm_epsilon=1e-5, |
|
rms_norm=False, |
|
residual_in_fp32=False, |
|
fused_add_norm=False, |
|
layer_idx=None, |
|
device=None, |
|
dtype=None, |
|
): |
|
if ssm_cfg is None: |
|
ssm_cfg = {} |
|
factory_kwargs = {"device": device, "dtype": dtype} |
|
mixer_cls = partial(Mamba, layer_idx=layer_idx, **ssm_cfg, **factory_kwargs) |
|
norm_cls = partial( |
|
nn.LayerNorm if not rms_norm else RMSNorm, eps=norm_epsilon, **factory_kwargs |
|
) |
|
block = Block( |
|
d_model, |
|
mixer_cls, |
|
norm_cls=norm_cls, |
|
fused_add_norm=fused_add_norm, |
|
residual_in_fp32=residual_in_fp32, |
|
) |
|
block.layer_idx = layer_idx |
|
return block |
|
|
|
|
|
|
|
def _init_weights( |
|
module, |
|
n_layer, |
|
initializer_range=0.02, |
|
rescale_prenorm_residual=True, |
|
n_residuals_per_layer=1, |
|
): |
|
if isinstance(module, nn.Linear): |
|
if module.bias is not None: |
|
if not getattr(module.bias, "_no_reinit", False): |
|
nn.init.zeros_(module.bias) |
|
elif isinstance(module, nn.Embedding): |
|
nn.init.normal_(module.weight, std=initializer_range) |
|
|
|
if rescale_prenorm_residual: |
|
|
|
|
|
|
|
|
|
|
|
|
|
for name, p in module.named_parameters(): |
|
if name in ["out_proj.weight", "fc2.weight"]: |
|
|
|
|
|
|
|
|
|
nn.init.kaiming_uniform_(p, a=math.sqrt(5)) |
|
with torch.no_grad(): |
|
p /= math.sqrt(n_residuals_per_layer * n_layer) |
|
|
|
|
|
@register_model("mamba") |
|
class MixerModel(nn.Module): |
|
def __init__( |
|
self, |
|
d_model: int, |
|
n_layer: int, |
|
vocab_size: int, |
|
ssm_cfg=None, |
|
norm_epsilon: float = 1e-5, |
|
rms_norm: bool = False, |
|
initializer_cfg=None, |
|
fused_add_norm=False, |
|
residual_in_fp32=False, |
|
device=None, |
|
dtype=None, |
|
) -> None: |
|
factory_kwargs = {"device": device, "dtype": dtype} |
|
super().__init__() |
|
self.residual_in_fp32 = residual_in_fp32 |
|
|
|
self.embedding = nn.Embedding(vocab_size, d_model, **factory_kwargs) |
|
|
|
|
|
|
|
|
|
|
|
|
|
self.fused_add_norm = fused_add_norm |
|
if self.fused_add_norm: |
|
if layer_norm_fn is None or rms_norm_fn is None: |
|
raise ImportError("Failed to import Triton LayerNorm / RMSNorm kernels") |
|
|
|
self.layers = nn.ModuleList( |
|
[ |
|
create_block( |
|
d_model, |
|
ssm_cfg=ssm_cfg, |
|
norm_epsilon=norm_epsilon, |
|
rms_norm=rms_norm, |
|
residual_in_fp32=residual_in_fp32, |
|
fused_add_norm=fused_add_norm, |
|
layer_idx=i, |
|
**factory_kwargs, |
|
) |
|
for i in range(n_layer) |
|
] |
|
) |
|
|
|
self.norm_f = (nn.LayerNorm if not rms_norm else RMSNorm)( |
|
d_model, eps=norm_epsilon, **factory_kwargs |
|
) |
|
|
|
self.apply( |
|
partial( |
|
_init_weights, |
|
n_layer=n_layer, |
|
**(initializer_cfg if initializer_cfg is not None else {}), |
|
) |
|
) |
|
|
|
def allocate_inference_cache(self, batch_size, max_seqlen, dtype=None, **kwargs): |
|
return { |
|
i: layer.allocate_inference_cache( |
|
batch_size, max_seqlen, dtype=dtype, **kwargs |
|
) |
|
for i, layer in enumerate(self.layers) |
|
} |
|
|
|
def forward(self, input_ids, embedding=None, inference_params=None): |
|
hidden_states = self.embedding(input_ids) if embedding is None else embedding |
|
residual = None |
|
for layer in self.layers: |
|
hidden_states, residual = layer( |
|
hidden_states, residual, inference_params=inference_params |
|
) |
|
if not self.fused_add_norm: |
|
residual = ( |
|
(hidden_states + residual) if residual is not None else hidden_states |
|
) |
|
hidden_states = self.norm_f(residual.to(dtype=self.norm_f.weight.dtype)) |
|
else: |
|
|
|
fused_add_norm_fn = ( |
|
rms_norm_fn if isinstance(self.norm_f, RMSNorm) else layer_norm_fn |
|
) |
|
hidden_states = fused_add_norm_fn( |
|
hidden_states, |
|
self.norm_f.weight, |
|
self.norm_f.bias, |
|
eps=self.norm_f.eps, |
|
residual=residual, |
|
prenorm=False, |
|
residual_in_fp32=self.residual_in_fp32, |
|
) |
|
return hidden_states |
|
|
|
|
|
@register_model("bidirectional_mamba") |
|
class BiDirectionMixerModel(nn.Module): |
|
def __init__( |
|
self, |
|
d_model: int, |
|
n_layer: int, |
|
vocab_size: int, |
|
ssm_cfg=None, |
|
norm_epsilon: float = 1e-5, |
|
rms_norm: bool = False, |
|
initializer_cfg=None, |
|
fused_add_norm=False, |
|
residual_in_fp32=False, |
|
device=None, |
|
dtype=None, |
|
) -> None: |
|
factory_kwargs = {"device": device, "dtype": dtype} |
|
super().__init__() |
|
self.residual_in_fp32 = residual_in_fp32 |
|
|
|
self.embedding = nn.Embedding(vocab_size, d_model, **factory_kwargs) |
|
self.gate = nn.Linear(2*d_model, 1,) |
|
|
|
|
|
|
|
|
|
|
|
self.fused_add_norm = fused_add_norm |
|
if self.fused_add_norm: |
|
if layer_norm_fn is None or rms_norm_fn is None: |
|
raise ImportError("Failed to import Triton LayerNorm / RMSNorm kernels") |
|
|
|
self.forward_layers = nn.ModuleList( |
|
[ |
|
create_block( |
|
d_model, |
|
ssm_cfg=ssm_cfg, |
|
norm_epsilon=norm_epsilon, |
|
rms_norm=rms_norm, |
|
residual_in_fp32=residual_in_fp32, |
|
fused_add_norm=fused_add_norm, |
|
layer_idx=i, |
|
**factory_kwargs, |
|
) |
|
for i in range(n_layer) |
|
] |
|
) |
|
self.backward_layers = nn.ModuleList( |
|
[ |
|
create_block( |
|
d_model, |
|
ssm_cfg=ssm_cfg, |
|
norm_epsilon=norm_epsilon, |
|
rms_norm=rms_norm, |
|
residual_in_fp32=residual_in_fp32, |
|
fused_add_norm=fused_add_norm, |
|
layer_idx=i, |
|
**factory_kwargs, |
|
) |
|
for i in range(n_layer) |
|
] |
|
) |
|
self.hidden_fc = nn.ModuleList( |
|
[nn.Linear(2 * d_model, d_model) for i in range(n_layer)] |
|
) |
|
|
|
self.norm_f = (nn.LayerNorm if not rms_norm else RMSNorm)( |
|
d_model, eps=norm_epsilon, **factory_kwargs |
|
) |
|
|
|
self.apply( |
|
partial( |
|
_init_weights, |
|
n_layer=n_layer, |
|
**(initializer_cfg if initializer_cfg is not None else {}), |
|
) |
|
) |
|
|
|
def allocate_inference_cache(self, batch_size, max_seqlen, dtype=None, **kwargs): |
|
return { |
|
i: layer.allocate_inference_cache( |
|
batch_size, max_seqlen, dtype=dtype, **kwargs |
|
) |
|
for i, layer in enumerate(self.layers) |
|
} |
|
|
|
def forward(self, input_ids, embedding=None, inference_params=None): |
|
hidden_states = self.embedding(input_ids) |
|
embedding = torch.zeros_like(hidden_states) if embedding is None else embedding |
|
gate = self.gate(torch.cat([hidden_states, embedding], dim=-1)).sigmoid() |
|
hidden_states = hidden_states * gate + embedding * (1 - gate) |
|
residual = None |
|
for f_layer, b_layer, h_fc in zip( |
|
self.forward_layers, self.backward_layers, self.hidden_fc |
|
): |
|
hidden_states_f, residual_f = f_layer( |
|
hidden_states, residual, inference_params=inference_params |
|
) |
|
flip_residual = residual.flip([1]) if residual is not None else None |
|
hidden_states_b, residual_b = b_layer( |
|
hidden_states.flip([1]), flip_residual, inference_params=inference_params |
|
) |
|
hidden_states = h_fc(torch.cat([hidden_states_f, hidden_states_b.flip([1])], dim=-1)) |
|
residual = 0.5 * (residual_f + residual_b.flip([1])) |
|
|
|
if not self.fused_add_norm: |
|
residual = ( |
|
(hidden_states + residual) if residual is not None else hidden_states |
|
) |
|
hidden_states = self.norm_f(residual.to(dtype=self.norm_f.weight.dtype)) |
|
else: |
|
|
|
fused_add_norm_fn = ( |
|
rms_norm_fn if isinstance(self.norm_f, RMSNorm) else layer_norm_fn |
|
) |
|
hidden_states = fused_add_norm_fn( |
|
hidden_states, |
|
self.norm_f.weight, |
|
self.norm_f.bias, |
|
eps=self.norm_f.eps, |
|
residual=residual, |
|
prenorm=False, |
|
residual_in_fp32=self.residual_in_fp32, |
|
) |
|
return hidden_states |
|
|
|
|
|
class MambaLMHeadModel(nn.Module, GenerationMixin): |
|
def __init__( |
|
self, |
|
config: MambaConfig, |
|
initializer_cfg=None, |
|
device=None, |
|
dtype=None, |
|
) -> None: |
|
self.config = config |
|
mamba_model = config.model_type |
|
d_model = config.d_model |
|
n_layer = config.n_layer |
|
vocab_size = config.vocab_size |
|
ssm_cfg = config.ssm_cfg |
|
rms_norm = config.rms_norm |
|
residual_in_fp32 = config.residual_in_fp32 |
|
fused_add_norm = config.fused_add_norm |
|
pad_vocab_size_multiple = 1 |
|
esm_embed_dim = config.esm_embed_dim |
|
factory_kwargs = {"device": device, "dtype": dtype} |
|
|
|
super().__init__() |
|
|
|
|
|
|
|
|
|
Backbone = _MODEL_REGISTRY[mamba_model] |
|
self.backbone = Backbone( |
|
d_model=d_model, |
|
n_layer=n_layer, |
|
vocab_size=vocab_size, |
|
ssm_cfg=ssm_cfg, |
|
rms_norm=rms_norm, |
|
initializer_cfg=initializer_cfg, |
|
fused_add_norm=fused_add_norm, |
|
residual_in_fp32=residual_in_fp32, |
|
**factory_kwargs, |
|
) |
|
self.lm_head = nn.Linear(d_model, vocab_size, bias=False, **factory_kwargs) |
|
self.esm_head = nn.Linear(esm_embed_dim, d_model, bias=False, **factory_kwargs) |
|
|
|
self.apply( |
|
partial( |
|
_init_weights, |
|
n_layer=n_layer, |
|
**(initializer_cfg if initializer_cfg is not None else {}), |
|
) |
|
) |
|
self.tie_weights() |
|
|
|
def tie_weights(self): |
|
self.lm_head.weight = self.backbone.embedding.weight |
|
|
|
def allocate_inference_cache(self, batch_size, max_seqlen, dtype=None, **kwargs): |
|
return self.backbone.allocate_inference_cache( |
|
batch_size, max_seqlen, dtype=dtype, **kwargs |
|
) |
|
|
|
def forward( |
|
self, |
|
input_ids, |
|
embedding=None, |
|
position_ids=None, |
|
inference_params=None, |
|
num_last_tokens=0, |
|
): |
|
""" |
|
"position_ids" is just to be compatible with Transformer generation. We don't use it. |
|
num_last_tokens: if > 0, only return the logits for the last n tokens |
|
""" |
|
if embedding is not None: |
|
embedding = self.esm_head(embedding) |
|
hidden_states = self.backbone( |
|
input_ids, embedding=embedding, inference_params=inference_params |
|
) |
|
if num_last_tokens > 0: |
|
hidden_states = hidden_states[:, -num_last_tokens:] |
|
lm_logits = self.lm_head(hidden_states) |
|
CausalLMOutput = namedtuple("CausalLMOutput", ["logits", "hidden_states"]) |
|
return CausalLMOutput(logits=lm_logits, hidden_states=hidden_states) |
|
|
|
@classmethod |
|
def from_pretrained(cls, pretrained_model_name, device=None, dtype=None, **kwargs): |
|
config_data = load_config_hf(pretrained_model_name) |
|
config = MambaConfig(**config_data) |
|
model = cls(config, device=device, dtype=dtype, **kwargs) |
|
model.load_state_dict( |
|
load_state_dict_hf(pretrained_model_name, device=device, dtype=dtype) |
|
) |
|
return model |
|
|
|
def save_pretrained(self, save_directory): |
|
""" |
|
Minimal implementation of save_pretrained for MambaLMHeadModel. |
|
Save the model and its configuration file to a directory. |
|
""" |
|
|
|
if not os.path.exists(save_directory): |
|
os.makedirs(save_directory) |
|
|
|
|
|
model_path = os.path.join(save_directory, "pytorch_model.bin") |
|
torch.save(self.state_dict(), model_path) |
|
|
|
|
|
config_path = os.path.join(save_directory, "config.json") |
|
with open(config_path, "w") as f: |
|
json.dump(self.config.__dict__, f) |
|
|
|
|