ClearVoice / models /mossformer2_se /mossformer2_block.py
alibabasglab's picture
Upload 161 files
8e8cd3e verified
raw
history blame
26.8 kB
"""
This source code is modified by Shengkui Zhao based on https://github.com/lucidrains/FLASH-pytorch
"""
import math
import torch
import torch.nn.functional as F
from torch import nn, einsum
from einops import rearrange
from rotary_embedding_torch import RotaryEmbedding
from models.mossformer2_se.conv_module import ConvModule, GLU, FFConvM_Dilated
from models.mossformer2_se.fsmn import UniDeepFsmn, UniDeepFsmn_dilated
from torchinfo import summary
from models.mossformer2_se.layer_norm import CLayerNorm, GLayerNorm, GlobLayerNorm, ILayerNorm
# Helper functions
def identity(t, *args, **kwargs):
"""
Returns the input tensor unchanged.
Args:
t (torch.Tensor): Input tensor.
*args: Additional arguments (ignored).
**kwargs: Additional keyword arguments (ignored).
Returns:
torch.Tensor: The input tensor.
"""
return t
def append_dims(x, num_dims):
"""
Adds additional dimensions to the input tensor.
Args:
x (torch.Tensor): Input tensor.
num_dims (int): Number of dimensions to append.
Returns:
torch.Tensor: Tensor with appended dimensions.
"""
if num_dims <= 0:
return x
return x.view(*x.shape, *((1,) * num_dims)) # Reshape to append dimensions
def exists(val):
"""
Checks if a value exists (is not None).
Args:
val: The value to check.
Returns:
bool: True if value exists, False otherwise.
"""
return val is not None
def default(val, d):
"""
Returns a default value if the given value does not exist.
Args:
val: The value to check.
d: Default value to return if val does not exist.
Returns:
The original value if it exists, otherwise the default value.
"""
return val if exists(val) else d
def padding_to_multiple_of(n, mult):
"""
Calculates the amount of padding needed to make a number a multiple of another.
Args:
n (int): The number to pad.
mult (int): The multiple to match.
Returns:
int: The padding amount required to make n a multiple of mult.
"""
remainder = n % mult
if remainder == 0:
return 0
return mult - remainder # Return the required padding
# Scale Normalization class
class ScaleNorm(nn.Module):
"""
ScaleNorm implements a scaled normalization technique for neural network layers.
Attributes:
dim (int): Dimension of the input features.
eps (float): Small value to prevent division by zero.
g (nn.Parameter): Learnable parameter for scaling.
"""
def __init__(self, dim, eps=1e-5):
super().__init__()
self.scale = dim ** -0.5 # Calculate scale factor
self.eps = eps # Set epsilon
self.g = nn.Parameter(torch.ones(1)) # Initialize scaling parameter
def forward(self, x):
"""
Forward pass for the ScaleNorm layer.
Args:
x (torch.Tensor): Input tensor.
Returns:
torch.Tensor: Scaled and normalized output tensor.
"""
norm = torch.norm(x, dim=-1, keepdim=True) * self.scale # Compute norm
return x / norm.clamp(min=self.eps) * self.g # Normalize and scale
# Absolute positional encodings class
class ScaledSinuEmbedding(nn.Module):
"""
ScaledSinuEmbedding provides sinusoidal positional encodings for inputs.
Attributes:
scale (nn.Parameter): Learnable scale factor for the embeddings.
inv_freq (torch.Tensor): Inverse frequency used for sine and cosine calculations.
"""
def __init__(self, dim):
super().__init__()
self.scale = nn.Parameter(torch.ones(1,)) # Initialize scale
inv_freq = 1. / (10000 ** (torch.arange(0, dim, 2).float() / dim)) # Calculate inverse frequency
self.register_buffer('inv_freq', inv_freq) # Register as a buffer
def forward(self, x):
"""
Forward pass for the ScaledSinuEmbedding layer.
Args:
x (torch.Tensor): Input tensor of shape (batch_size, sequence_length).
Returns:
torch.Tensor: Positional encoding tensor of shape (batch_size, sequence_length, dim).
"""
n, device = x.shape[1], x.device # Extract sequence length and device
t = torch.arange(n, device=device).type_as(self.inv_freq) # Create time steps
sinu = einsum('i , j -> i j', t, self.inv_freq) # Calculate sine and cosine embeddings
emb = torch.cat((sinu.sin(), sinu.cos()), dim=-1) # Concatenate sine and cosine embeddings
return emb * self.scale # Scale the embeddings
class OffsetScale(nn.Module):
"""
OffsetScale applies learned offsets and scales to the input tensor.
Attributes:
gamma (nn.Parameter): Learnable scale parameter for each head.
beta (nn.Parameter): Learnable offset parameter for each head.
"""
def __init__(self, dim, heads=1):
super().__init__()
self.gamma = nn.Parameter(torch.ones(heads, dim)) # Initialize scale parameters
self.beta = nn.Parameter(torch.zeros(heads, dim)) # Initialize offset parameters
nn.init.normal_(self.gamma, std=0.02) # Normal initialization for gamma
def forward(self, x):
"""
Forward pass for the OffsetScale layer.
Args:
x (torch.Tensor): Input tensor.
Returns:
List[torch.Tensor]: A list of tensors with applied offsets and scales for each head.
"""
out = einsum('... d, h d -> ... h d', x, self.gamma) + self.beta # Apply scaling and offsets
return out.unbind(dim=-2) # Unbind heads into a list
# Feed-Forward Convolutional Module
class FFConvM(nn.Module):
"""
FFConvM is a feed-forward convolutional module with normalization and dropout.
Attributes:
dim_in (int): Input dimension of the features.
dim_out (int): Output dimension after processing.
norm_klass (nn.Module): Normalization class to be used.
dropout (float): Dropout probability.
"""
def __init__(
self,
dim_in,
dim_out,
norm_klass=nn.LayerNorm,
dropout=0.1
):
super().__init__()
self.mdl = nn.Sequential(
norm_klass(dim_in), # Normalize input
nn.Linear(dim_in, dim_out), # Linear transformation
nn.SiLU(), # Activation function
ConvModule(dim_out), # Convolution module
nn.Dropout(dropout) # Apply dropout
)
def forward(self, x):
"""
Forward pass for the FFConvM module.
Args:
x (torch.Tensor): Input tensor.
Returns:
torch.Tensor: Output tensor after processing.
"""
output = self.mdl(x) # Pass through the model
return output
class FFM(nn.Module):
"""
FFM is a feed-forward module with normalization and dropout.
Attributes:
dim_in (int): Input dimension of the features.
dim_out (int): Output dimension after processing.
norm_klass (nn.Module): Normalization class to be used.
dropout (float): Dropout probability.
"""
def __init__(
self,
dim_in,
dim_out,
norm_klass=nn.LayerNorm,
dropout=0.1
):
super().__init__()
self.mdl = nn.Sequential(
norm_klass(dim_in), # Normalize input
nn.Linear(dim_in, dim_out), # Linear transformation
nn.SiLU(), # Activation function
nn.Dropout(dropout) # Apply dropout
)
def forward(self, x):
"""
Forward pass for the FFM module.
Args:
x (torch.Tensor): Input tensor.
Returns:
torch.Tensor: Output tensor after processing.
"""
output = self.mdl(x) # Pass through the model
return output
class FLASH_ShareA_FFConvM(nn.Module):
"""
Fast Shared Dual Attention Mechanism with feed-forward convolutional blocks.
Published in paper: "MossFormer: Pushing the Performance Limit of Monaural Speech Separation
using Gated Single-Head Transformer with Convolution-Augmented Joint Self-Attentions", ICASSP 2023.
(https://arxiv.org/abs/2302.11824)
Args:
dim (int): Input dimension.
group_size (int, optional): Size of groups for processing. Defaults to 256.
query_key_dim (int, optional): Dimension of the query and key. Defaults to 128.
expansion_factor (float, optional): Factor to expand the hidden dimension. Defaults to 1.
causal (bool, optional): Whether to use causal masking. Defaults to False.
dropout (float, optional): Dropout rate. Defaults to 0.1.
rotary_pos_emb (optional): Rotary positional embeddings for attention. Defaults to None.
norm_klass (callable, optional): Normalization class to use. Defaults to nn.LayerNorm.
shift_tokens (bool, optional): Whether to shift tokens for attention calculation. Defaults to True.
"""
def __init__(
self,
*,
dim,
group_size=256,
query_key_dim=128,
expansion_factor=1.,
causal=False,
dropout=0.1,
rotary_pos_emb=None,
norm_klass=nn.LayerNorm,
shift_tokens=True
):
super().__init__()
hidden_dim = int(dim * expansion_factor)
self.group_size = group_size
self.causal = causal
self.shift_tokens = shift_tokens
# Initialize positional embeddings, dropout, and projections
self.rotary_pos_emb = rotary_pos_emb
self.dropout = nn.Dropout(dropout)
# Feed-forward layers
self.to_hidden = FFConvM(
dim_in=dim,
dim_out=hidden_dim,
norm_klass=norm_klass,
dropout=dropout,
)
self.to_qk = FFConvM(
dim_in=dim,
dim_out=query_key_dim,
norm_klass=norm_klass,
dropout=dropout,
)
# Offset and scale for query and key
self.qk_offset_scale = OffsetScale(query_key_dim, heads=4)
self.to_out = FFConvM(
dim_in=dim * 2,
dim_out=dim,
norm_klass=norm_klass,
dropout=dropout,
)
self.gateActivate = nn.Sigmoid()
def forward(self, x, *, mask=None):
"""
Forward pass for FLASH layer.
Args:
x (Tensor): Input tensor of shape (batch, seq_len, features).
mask (Tensor, optional): Mask for attention. Defaults to None.
Returns:
Tensor: Output tensor after applying attention and projections.
"""
# Pre-normalization step
normed_x = x
residual = x # Save residual for skip connection
# Token shifting if enabled
if self.shift_tokens:
x_shift, x_pass = normed_x.chunk(2, dim=-1)
x_shift = F.pad(x_shift, (0, 0, 1, -1), value=0.)
normed_x = torch.cat((x_shift, x_pass), dim=-1)
# Initial projections
v, u = self.to_hidden(normed_x).chunk(2, dim=-1)
qk = self.to_qk(normed_x)
# Offset and scale
quad_q, lin_q, quad_k, lin_k = self.qk_offset_scale(qk)
att_v, att_u = self.cal_attention(x, quad_q, lin_q, quad_k, lin_k, v, u)
# Output calculation with gating
out = (att_u * v) * self.gateActivate(att_v * u)
x = x + self.to_out(out) # Residual connection
return x
def cal_attention(self, x, quad_q, lin_q, quad_k, lin_k, v, u, mask=None):
"""
Calculate attention output using quadratic and linear attention mechanisms.
Args:
x (Tensor): Input tensor of shape (batch, seq_len, features).
quad_q (Tensor): Quadratic query representation.
lin_q (Tensor): Linear query representation.
quad_k (Tensor): Quadratic key representation.
lin_k (Tensor): Linear key representation.
v (Tensor): Value representation.
u (Tensor): Additional value representation.
mask (Tensor, optional): Mask for attention. Defaults to None.
Returns:
Tuple[Tensor, Tensor]: Attention outputs for v and u.
"""
b, n, device, g = x.shape[0], x.shape[-2], x.device, self.group_size
# Apply mask to linear keys if provided
if exists(mask):
lin_mask = rearrange(mask, '... -> ... 1')
lin_k = lin_k.masked_fill(~lin_mask, 0.)
# Rotate queries and keys with rotary positional embeddings
if exists(self.rotary_pos_emb):
quad_q, lin_q, quad_k, lin_k = map(self.rotary_pos_emb.rotate_queries_or_keys, (quad_q, lin_q, quad_k, lin_k))
# Padding for group processing
padding = padding_to_multiple_of(n, g)
if padding > 0:
quad_q, quad_k, lin_q, lin_k, v, u = map(lambda t: F.pad(t, (0, 0, 0, padding), value=0.), (quad_q, quad_k, lin_q, lin_k, v, u))
mask = default(mask, torch.ones((b, n), device=device, dtype=torch.bool))
mask = F.pad(mask, (0, padding), value=False)
# Group along sequence for attention
quad_q, quad_k, lin_q, lin_k, v, u = map(lambda t: rearrange(t, 'b (g n) d -> b g n d', n=self.group_size), (quad_q, quad_k, lin_q, lin_k, v, u))
if exists(mask):
mask = rearrange(mask, 'b (g j) -> b g 1 j', j=g)
# Calculate quadratic attention output
sim = einsum('... i d, ... j d -> ... i j', quad_q, quad_k) / g
attn = F.relu(sim) ** 2 # ReLU activation
attn = self.dropout(attn)
# Apply mask to attention if provided
if exists(mask):
attn = attn.masked_fill(~mask, 0.)
# Apply causal mask if needed
if self.causal:
causal_mask = torch.ones((g, g), dtype=torch.bool, device=device).triu(1)
attn = attn.masked_fill(causal_mask, 0.)
# Calculate output from attention
quad_out_v = einsum('... i j, ... j d -> ... i d', attn, v)
quad_out_u = einsum('... i j, ... j d -> ... i d', attn, u)
# Calculate linear attention output
if self.causal:
lin_kv = einsum('b g n d, b g n e -> b g d e', lin_k, v) / g
lin_kv = lin_kv.cumsum(dim=1) # Cumulative sum for linear attention
lin_kv = F.pad(lin_kv, (0, 0, 0, 0, 1, -1), value=0.)
lin_out_v = einsum('b g d e, b g n d -> b g n e', lin_kv, lin_q)
lin_ku = einsum('b g n d, b g n e -> b g d e', lin_k, u) / g
lin_ku = lin_ku.cumsum(dim=1) # Cumulative sum for linear attention
lin_ku = F.pad(lin_ku, (0, 0, 0, 0, 1, -1), value=0.)
lin_out_u = einsum('b g d e, b g n d -> b g n e', lin_ku, lin_q)
else:
lin_kv = einsum('b g n d, b g n e -> b d e', lin_k, v) / n
lin_out_v = einsum('b g n d, b d e -> b g n e', lin_q, lin_kv)
lin_ku = einsum('b g n d, b g n e -> b d e', lin_k, u) / n
lin_out_u = einsum('b g n d, b d e -> b g n e', lin_q, lin_ku)
# Reshape and remove padding from outputs
return map(lambda t: rearrange(t, 'b g n d -> b (g n) d')[:, :n], (quad_out_v + lin_out_v, quad_out_u + lin_out_u))
class Gated_FSMN(nn.Module):
"""
Gated Frequency Selective Memory Network (FSMN) class.
This class implements a gated FSMN that combines two feedforward
convolutional networks with a frequency selective memory module.
Args:
in_channels (int): Number of input channels.
out_channels (int): Number of output channels.
lorder (int): Order of the filter for FSMN.
hidden_size (int): Number of hidden units in the network.
"""
def __init__(self, in_channels, out_channels, lorder, hidden_size):
super().__init__()
# Feedforward network for the first branch (u)
self.to_u = FFConvM(
dim_in=in_channels,
dim_out=hidden_size,
norm_klass=nn.LayerNorm,
dropout=0.1,
)
# Feedforward network for the second branch (v)
self.to_v = FFConvM(
dim_in=in_channels,
dim_out=hidden_size,
norm_klass=nn.LayerNorm,
dropout=0.1,
)
# Frequency selective memory network
self.fsmn = UniDeepFsmn(in_channels, out_channels, lorder, hidden_size)
def forward(self, x):
"""
Forward pass for the Gated FSMN.
Args:
x (Tensor): Input tensor of shape (batch_size, in_channels, sequence_length).
Returns:
Tensor: Output tensor after applying gated FSMN operations.
"""
input = x
x_u = self.to_u(x) # Process input through the first branch
x_v = self.to_v(x) # Process input through the second branch
x_u = self.fsmn(x_u) # Apply FSMN to the output of the first branch
x = x_v * x_u + input # Combine outputs with the original input
return x
class Gated_FSMN_Block(nn.Module):
"""
A 1-D convolutional block that incorporates a gated FSMN.
This block consists of two convolutional layers, followed by a
gated FSMN and normalization layers.
Args:
dim (int): Dimensionality of the input.
inner_channels (int): Number of channels in the inner layers.
group_size (int): Size of the groups for normalization.
norm_type (str): Type of normalization to use ('scalenorm' or 'layernorm').
"""
def __init__(self, dim, inner_channels=256, group_size=256, norm_type='scalenorm'):
super(Gated_FSMN_Block, self).__init__()
# Choose normalization class based on the provided type
if norm_type == 'scalenorm':
norm_klass = ScaleNorm
elif norm_type == 'layernorm':
norm_klass = nn.LayerNorm
self.group_size = group_size
# First convolutional layer with PReLU activation
self.conv1 = nn.Sequential(
nn.Conv1d(dim, inner_channels, kernel_size=1),
nn.PReLU(),
)
self.norm1 = CLayerNorm(inner_channels) # Normalization after first convolution
self.gated_fsmn = Gated_FSMN(inner_channels, inner_channels, lorder=20, hidden_size=inner_channels) # Gated FSMN layer
self.norm2 = CLayerNorm(inner_channels) # Normalization after FSMN
self.conv2 = nn.Conv1d(inner_channels, dim, kernel_size=1) # Final convolutional layer
def forward(self, input):
"""
Forward pass for the Gated FSMN Block.
Args:
input (Tensor): Input tensor of shape (batch_size, dim, sequence_length).
Returns:
Tensor: Output tensor after processing through the block.
"""
conv1 = self.conv1(input.transpose(2, 1)) # Apply first convolution
norm1 = self.norm1(conv1) # Apply normalization
seq_out = self.gated_fsmn(norm1.transpose(2, 1)) # Apply gated FSMN
norm2 = self.norm2(seq_out.transpose(2, 1)) # Apply second normalization
conv2 = self.conv2(norm2) # Apply final convolution
return conv2.transpose(2, 1) + input # Residual connection
class MossformerBlock_GFSMN(nn.Module):
"""
Mossformer Block with Gated FSMN.
This block combines attention mechanisms and gated FSMN layers
to process input sequences.
Args:
dim (int): Dimensionality of the input.
depth (int): Number of layers in the block.
group_size (int): Size of the groups for normalization.
query_key_dim (int): Dimension of the query and key in attention.
expansion_factor (float): Expansion factor for feedforward layers.
causal (bool): If True, enables causal attention.
attn_dropout (float): Dropout rate for attention layers.
norm_type (str): Type of normalization to use ('scalenorm' or 'layernorm').
shift_tokens (bool): If True, shifts tokens in the attention layer.
"""
def __init__(self, *, dim, depth, group_size=256, query_key_dim=128, expansion_factor=4., causal=False, attn_dropout=0.1, norm_type='scalenorm', shift_tokens=True):
super().__init__()
assert norm_type in ('scalenorm', 'layernorm'), 'norm_type must be one of scalenorm or layernorm'
if norm_type == 'scalenorm':
norm_klass = ScaleNorm
elif norm_type == 'layernorm':
norm_klass = nn.LayerNorm
self.group_size = group_size
# Rotary positional embedding for attention
rotary_pos_emb = RotaryEmbedding(dim=min(32, query_key_dim))
# Create a list of Gated FSMN blocks
self.fsmn = nn.ModuleList([Gated_FSMN_Block(dim) for _ in range(depth)])
# Create a list of attention layers using FLASH_ShareA_FFConvM
self.layers = nn.ModuleList([
FLASH_ShareA_FFConvM(
dim=dim,
group_size=group_size,
query_key_dim=query_key_dim,
expansion_factor=expansion_factor,
causal=causal,
dropout=attn_dropout,
rotary_pos_emb=rotary_pos_emb,
norm_klass=norm_klass,
shift_tokens=shift_tokens
) for _ in range(depth)
])
def _build_repeats(self, in_channels, out_channels, lorder, hidden_size, repeats=1):
"""
Builds repeated UniDeep FSMN layers.
Args:
in_channels (int): Number of input channels.
out_channels (int): Number of output channels.
lorder (int): Order of the filter for FSMN.
hidden_size (int): Number of hidden units.
repeats (int): Number of repetitions.
Returns:
Sequential: A sequential container with repeated layers.
"""
repeats = [
UniDeepFsmn(in_channels, out_channels, lorder, hidden_size)
for i in range(repeats)
]
return nn.Sequential(*repeats)
def forward(self, x, *, mask=None):
"""
Forward pass for the Mossformer Block with Gated FSMN.
Args:
x (Tensor): Input tensor of shape (batch_size, dim, sequence_length).
mask (Tensor, optional): Mask tensor for attention operations.
Returns:
Tensor: Output tensor after processing through the block.
"""
ii = 0
for flash in self.layers: # Process through each layer
x = flash(x, mask=mask)
x = self.fsmn[ii](x) # Apply corresponding Gated FSMN block
ii += 1
return x
class MossformerBlock(nn.Module):
"""
Mossformer Block with attention mechanisms.
This block is designed to process input sequences using attention
layers and incorporates rotary positional embeddings. It allows
for configurable normalization types and can handle causal
attention.
Args:
dim (int): Dimensionality of the input.
depth (int): Number of attention layers in the block.
group_size (int, optional): Size of groups for normalization. Default is 256.
query_key_dim (int, optional): Dimension of the query and key in attention. Default is 128.
expansion_factor (float, optional): Expansion factor for feedforward layers. Default is 4.
causal (bool, optional): If True, enables causal attention. Default is False.
attn_dropout (float, optional): Dropout rate for attention layers. Default is 0.1.
norm_type (str, optional): Type of normalization to use ('scalenorm' or 'layernorm'). Default is 'scalenorm'.
shift_tokens (bool, optional): If True, shifts tokens in the attention layer. Default is True.
"""
def __init__(
self,
*,
dim,
depth,
group_size=256,
query_key_dim=128,
expansion_factor=4.0,
causal=False,
attn_dropout=0.1,
norm_type='scalenorm',
shift_tokens=True
):
super().__init__()
# Ensure normalization type is valid
assert norm_type in ('scalenorm', 'layernorm'), 'norm_type must be one of scalenorm or layernorm'
# Select normalization class based on the provided type
if norm_type == 'scalenorm':
norm_klass = ScaleNorm
elif norm_type == 'layernorm':
norm_klass = nn.LayerNorm
self.group_size = group_size # Group size for normalization
# Rotary positional embedding for attention
rotary_pos_emb = RotaryEmbedding(dim=min(32, query_key_dim))
# Max rotary embedding dimensions of 32, partial Rotary embeddings, from Wang et al - GPT-J
# Create a list of attention layers using FLASH_ShareA_FFConvM
self.layers = nn.ModuleList([
FLASH_ShareA_FFConvM(
dim=dim,
group_size=group_size,
query_key_dim=query_key_dim,
expansion_factor=expansion_factor,
causal=causal,
dropout=attn_dropout,
rotary_pos_emb=rotary_pos_emb,
norm_klass=norm_klass,
shift_tokens=shift_tokens
) for _ in range(depth)
])
def _build_repeats(self, in_channels, out_channels, lorder, hidden_size, repeats=1):
"""
Builds repeated UniDeep FSMN layers.
Args:
in_channels (int): Number of input channels.
out_channels (int): Number of output channels.
lorder (int): Order of the filter for FSMN.
hidden_size (int): Number of hidden units.
repeats (int, optional): Number of repetitions. Default is 1.
Returns:
Sequential: A sequential container with repeated layers.
"""
repeats = [
UniDeepFsmn(in_channels, out_channels, lorder, hidden_size)
for _ in range(repeats)
]
return nn.Sequential(*repeats)
def forward(self, x, *, mask=None):
"""
Forward pass for the Mossformer Block.
Args:
x (Tensor): Input tensor of shape (batch_size, dim, sequence_length).
mask (Tensor, optional): Mask tensor for attention operations.
Returns:
Tensor: Output tensor after processing through the block.
"""
# Process input through each attention layer
for flash in self.layers:
x = flash(x, mask=mask) # Apply attention layer with optional mask
return x # Return the final output tensor