ProSSTX-2048-2 / modeling_prosst.py
GinnM's picture
Upload ProSSTForMaskedLM
a0583a7 verified
from collections.abc import Sequence
from typing import Optional, Tuple, Union
import torch
import torch.utils.checkpoint
from torch import nn
from torch.nn import BCEWithLogitsLoss, CrossEntropyLoss, MSELoss
from transformers.activations import ACT2FN
from transformers.modeling_outputs import (
BaseModelOutput,
MaskedLMOutput,
SequenceClassifierOutput,
TokenClassifierOutput,
)
from transformers.modeling_utils import PreTrainedModel
from .configuration_prosst import ProSSTConfig
import torch.nn.functional as F
from functools import partial
def rbf(values, v_min, v_max, n_bins=16):
"""
Returns RBF encodings in a new dimension at the end.
https://github.com/evolutionaryscale/esm/blob/main/esm/utils/misc.py
"""
rbf_centers = torch.linspace(
v_min, v_max, n_bins, device=values.device, dtype=values.dtype
)
rbf_centers = rbf_centers.view([1] * len(values.shape) + [-1])
rbf_std = (v_max - v_min) / n_bins
z = (values.unsqueeze(-1) - rbf_centers) / rbf_std
return torch.exp(-(z**2))
def build_relative_position(query_size, key_size, device):
"""
Build relative position according to the query and key
We assume the absolute position of query \\(P_q\\) is range from (0, query_size) and the absolute position of key
\\(P_k\\) is range from (0, key_size), The relative positions from query to key is \\(R_{q \\rightarrow k} = P_q -
P_k\\)
Args:
query_size (int): the length of query
key_size (int): the length of key
Return:
`torch.LongTensor`: A tensor with shape [1, query_size, key_size]
"""
q_ids = torch.arange(query_size, dtype=torch.long, device=device)
k_ids = torch.arange(key_size, dtype=torch.long, device=device)
rel_pos_ids = q_ids[:, None] - k_ids.view(1, -1).repeat(query_size, 1)
rel_pos_ids = rel_pos_ids[:query_size, :]
rel_pos_ids = rel_pos_ids.unsqueeze(0)
return rel_pos_ids
@torch.jit.script
def c2p_dynamic_expand(c2p_pos, query_layer, relative_pos):
return c2p_pos.expand(
[
query_layer.size(0),
query_layer.size(1),
query_layer.size(2),
relative_pos.size(-1),
]
)
@torch.jit.script
def p2c_dynamic_expand(c2p_pos, query_layer, key_layer):
return c2p_pos.expand(
[
query_layer.size(0),
query_layer.size(1),
key_layer.size(-2),
key_layer.size(-2),
]
)
@torch.jit.script
def pos_dynamic_expand(pos_index, p2c_att, key_layer):
return pos_index.expand(
p2c_att.size()[:2] + (pos_index.size(-2), key_layer.size(-2))
)
def rotate_half(x):
x1, x2 = x.chunk(2, dim=-1)
return torch.cat((-x2, x1), dim=-1)
def apply_rotary_pos_emb(x, cos, sin):
cos = cos[:, :, : x.shape[-2], :]
sin = sin[:, :, : x.shape[-2], :]
return (x * cos) + (rotate_half(x) * sin)
class RotaryEmbedding(torch.nn.Module):
"""
Rotary position embeddings based on those in
[RoFormer](https://huggingface.co/docs/transformers/model_doc/roformer). Query and keys are transformed by rotation
matrices which depend on their relative positions.
"""
def __init__(self, dim: int):
super().__init__()
# Generate and save the inverse frequency buffer (non trainable)
inv_freq = 1.0 / (
10000 ** (torch.arange(0, dim, 2, dtype=torch.int64).float() / dim)
)
inv_freq = inv_freq
self.register_buffer("inv_freq", inv_freq)
self._seq_len_cached = None
self._cos_cached = None
self._sin_cached = None
def _update_cos_sin_tables(self, x, seq_dimension=2):
seq_len = x.shape[seq_dimension]
# Reset the tables if the sequence length has changed,
# or if we're on a new device (possibly due to tracing for instance)
if seq_len != self._seq_len_cached or self._cos_cached.device != x.device:
self._seq_len_cached = seq_len
t = torch.arange(x.shape[seq_dimension], device=x.device).type_as(
self.inv_freq
)
freqs = torch.outer(t, self.inv_freq)
emb = torch.cat((freqs, freqs), dim=-1).to(x.device)
self._cos_cached = emb.cos()[None, None, :, :]
self._sin_cached = emb.sin()[None, None, :, :]
return self._cos_cached, self._sin_cached
def forward(
self, q: torch.Tensor, k: torch.Tensor
) -> Tuple[torch.Tensor, torch.Tensor]:
self._cos_cached, self._sin_cached = self._update_cos_sin_tables(
k, seq_dimension=-2
)
return (
apply_rotary_pos_emb(q, self._cos_cached, self._sin_cached),
apply_rotary_pos_emb(k, self._cos_cached, self._sin_cached),
)
class MaskedConv1d(nn.Conv1d):
"""A masked 1-dimensional convolution layer.
Takes the same arguments as torch.nn.Conv1D, except that the padding is set automatically.
Shape:
Input: (N, L, in_channels)
input_mask: (N, L, 1), optional
Output: (N, L, out_channels)
"""
def __init__(
self,
in_channels: int,
out_channels: int,
kernel_size: int,
stride: int = 1,
dilation: int = 1,
groups: int = 1,
bias: bool = True,
):
"""
:param in_channels: input channels
:param out_channels: output channels
:param kernel_size: the kernel width
:param stride: filter shift
:param dilation: dilation factor
:param groups: perform depth-wise convolutions
:param bias: adds learnable bias to output
"""
padding = dilation * (kernel_size - 1) // 2
super().__init__(
in_channels,
out_channels,
kernel_size,
stride=stride,
dilation=dilation,
groups=groups,
bias=bias,
padding=padding,
)
def forward(self, x, input_mask=None):
if input_mask is not None:
x = x * input_mask
return super().forward(x.transpose(1, 2)).transpose(1, 2)
class Attention1dPooling(nn.Module):
def __init__(self, config):
super().__init__()
self.layer = MaskedConv1d(config.hidden_size, 1, 1)
def forward(self, x, input_mask=None):
batch_szie = x.shape[0]
attn = self.layer(x)
attn = attn.view(batch_szie, -1)
if input_mask is not None:
attn = attn.masked_fill_(
~input_mask.view(batch_szie, -1).bool(), float("-inf")
)
attn = F.softmax(attn, dim=-1).view(batch_szie, -1, 1)
out = (attn * x).sum(dim=1)
return out
class MeanPooling(nn.Module):
"""Mean Pooling for sentence-level classification tasks."""
def __init__(self):
super().__init__()
def forward(self, features, input_mask=None):
if input_mask is not None:
# Applying input_mask to zero out masked values
masked_features = features * input_mask.unsqueeze(2)
sum_features = torch.sum(masked_features, dim=1)
mean_pooled_features = sum_features / input_mask.sum(dim=1, keepdim=True)
else:
mean_pooled_features = torch.mean(features, dim=1)
return mean_pooled_features
class ContextPooler(nn.Module):
def __init__(self, config):
super().__init__()
scale_hidden = getattr(config, "scale_hidden", 1)
if config.pooling_head == "mean":
self.mean_pooling = MeanPooling()
elif config.pooling_head == "attention":
self.mean_pooling = Attention1dPooling(config)
self.dense = nn.Linear(
config.pooler_hidden_size, scale_hidden * config.pooler_hidden_size
)
self.dropout = nn.Dropout(config.pooler_dropout)
self.config = config
def forward(self, hidden_states, input_mask=None):
# We "pool" the model by simply taking the hidden state corresponding
# to the first token.
context_token = self.mean_pooling(hidden_states, input_mask)
context_token = self.dropout(context_token)
pooled_output = self.dense(context_token)
pooled_output = torch.tanh(pooled_output)
return pooled_output
@property
def output_dim(self):
return self.config.hidden_size
class ProSSTLayerNorm(nn.Module):
"""LayerNorm module in the TF style (epsilon inside the square root)."""
def __init__(self, size, eps=1e-12):
super().__init__()
self.weight = nn.Parameter(torch.ones(size))
self.bias = nn.Parameter(torch.zeros(size))
self.variance_epsilon = eps
def forward(self, hidden_states):
input_type = hidden_states.dtype
hidden_states = hidden_states.float()
mean = hidden_states.mean(-1, keepdim=True)
variance = (hidden_states - mean).pow(2).mean(-1, keepdim=True)
hidden_states = (hidden_states - mean) / torch.sqrt(
variance + self.variance_epsilon
)
hidden_states = hidden_states.to(input_type)
y = self.weight * hidden_states + self.bias
return y
class DisentangledSelfAttention(nn.Module):
def __init__(self, config: ProSSTConfig):
super().__init__()
self.config = config
self.num_attention_heads = config.num_attention_heads
self.attention_head_size = int(config.hidden_size / config.num_attention_heads)
self.all_head_size = self.num_attention_heads * self.attention_head_size
# Q, K, V projection layers
self.query = nn.Linear(config.hidden_size, self.all_head_size)
self.key = nn.Linear(config.hidden_size, self.all_head_size)
self.value = nn.Linear(config.hidden_size, self.all_head_size)
# AA->SS, AA->POS, SS->AA, POS->AA and AA->AA attention layers
if config.pos_att_type is not None:
self.pos_att_type = config.pos_att_type
else:
self.pos_att_type = []
self.relative_attention = config.relative_attention
self.position_embedding_type = config.position_embedding_type
if self.position_embedding_type == "rotary":
self.rotary_embeddings = RotaryEmbedding(dim=self.attention_head_size)
if self.relative_attention:
if "aa2ss" in self.pos_att_type:
self.ss_proj = nn.Linear(
config.hidden_size, self.all_head_size, bias=False
)
if "ss2aa" in self.pos_att_type:
self.ss_q_proj = nn.Linear(config.hidden_size, self.all_head_size)
elif self.position_embedding_type == "relative":
if self.relative_attention:
self.max_relative_positions = config.max_relative_positions
self.pos_dropout = nn.Dropout(config.hidden_dropout_prob)
# AA2POS
if "aa2pos" in self.pos_att_type:
self.pos_proj = nn.Linear(
config.hidden_size, self.all_head_size, bias=False
) # Key
# POS2AA
if "pos2aa" in self.pos_att_type:
self.pos_q_proj = nn.Linear(
config.hidden_size, self.all_head_size
) # Query
# AA2SS
if "aa2ss" in self.pos_att_type:
self.ss_proj = nn.Linear(
config.hidden_size, self.all_head_size, bias=False
)
# SS2AA
if "ss2aa" in self.pos_att_type:
self.ss_q_proj = nn.Linear(config.hidden_size, self.all_head_size)
self.dropout = nn.Dropout(config.attention_probs_dropout_prob)
def transpose_for_scores(self, x):
# x [batch_size, seq_len, all_head_size]
new_x_shape = x.size()[:-1] + (self.num_attention_heads, -1)
# x [batch_size, seq_len, num_attention_heads, attention_head_size]
x = x.view(new_x_shape)
# x [batch_size, num_attention_heads, seq_len, attention_head_size]
return x.permute(0, 2, 1, 3)
def forward(
self,
hidden_states,
attention_mask,
ss_hidden_states=None,
relative_pos=None,
rel_embeddings=None,
output_attentions=False,
):
query_layer = self.transpose_for_scores(self.query(hidden_states))
key_layer = self.transpose_for_scores(self.key(hidden_states))
value_layer = self.transpose_for_scores(self.value(hidden_states))
if self.position_embedding_type == "rotary":
query_layer, key_layer = self.rotary_embeddings(query_layer, key_layer)
rel_att = None
scale_factor = 1 + len(self.pos_att_type)
scale = torch.sqrt(
torch.tensor(query_layer.size(-1), dtype=torch.float) * scale_factor
)
query_layer = query_layer / scale.to(dtype=query_layer.dtype)
attention_scores = torch.matmul(query_layer, key_layer.transpose(-1, -2))
if self.relative_attention:
if self.position_embedding_type == "relative":
rel_embeddings = self.pos_dropout(rel_embeddings)
rel_att, output_attentions_dict = self.disentangled_att_bias(
query_layer,
key_layer,
relative_pos,
rel_embeddings,
scale_factor,
ss_hidden_states,
)
output_attentions_dict["aa2aa"] = attention_scores
attention_scores = attention_scores + rel_att
rmask = ~(attention_mask.to(torch.bool))
attention_probs = attention_scores.masked_fill(
rmask, torch.finfo(attention_scores.dtype).min
)
attention_probs = torch.softmax(attention_probs, -1)
# attention_probs = attention_probs.masked_fill(rmask, 0.0)
attention_probs = self.dropout(attention_probs)
context_layer = torch.matmul(attention_probs, value_layer)
context_layer = context_layer.permute(0, 2, 1, 3).contiguous()
new_context_layer_shape = context_layer.size()[:-2] + (-1,)
context_layer = context_layer.view(new_context_layer_shape)
if output_attentions:
if self.relative_attention:
return (context_layer, output_attentions_dict)
else:
return (context_layer, attention_probs)
else:
return context_layer
def disentangled_att_bias(
self,
query_layer,
key_layer,
relative_pos,
rel_embeddings,
scale_factor,
ss_hidden_states,
):
disentangled_attentions = {}
if self.position_embedding_type == "relative":
if relative_pos.dim() == 2:
relative_pos = relative_pos.unsqueeze(0).unsqueeze(0)
elif relative_pos.dim() == 3:
relative_pos = relative_pos.unsqueeze(1)
# bxhxqxk
elif relative_pos.dim() != 4:
raise ValueError(
f"Relative position ids must be of dim 2 or 3 or 4. {relative_pos.dim()}"
)
att_span = min(
max(query_layer.size(-2), key_layer.size(-2)),
self.max_relative_positions,
)
relative_pos = relative_pos.long().to(query_layer.device)
rel_embeddings = rel_embeddings[
self.max_relative_positions
- att_span : self.max_relative_positions
+ att_span,
:,
].unsqueeze(0)
score = 0
if "aa2pos" in self.pos_att_type:
pos_key_layer = self.pos_proj(rel_embeddings)
pos_key_layer = self.transpose_for_scores(pos_key_layer)
aa2p_att = torch.matmul(query_layer, pos_key_layer.transpose(-1, -2))
aa2p_pos = torch.clamp(relative_pos + att_span, 0, att_span * 2 - 1)
aa2p_att = torch.gather(
aa2p_att,
dim=-1,
index=c2p_dynamic_expand(aa2p_pos, query_layer, relative_pos),
)
score += aa2p_att
disentangled_attentions["aa2pos"] = aa2p_att
if "pos2aa" in self.pos_att_type:
pos_query_layer = self.pos_q_proj(rel_embeddings)
pos_query_layer = self.transpose_for_scores(pos_query_layer)
pos_query_layer /= torch.sqrt(
torch.tensor(pos_query_layer.size(-1), dtype=torch.float)
* scale_factor
)
if query_layer.size(-2) != key_layer.size(-2):
r_pos = build_relative_position(
key_layer.size(-2), key_layer.size(-2), query_layer.device
)
else:
r_pos = relative_pos
p2aa_pos = torch.clamp(-r_pos + att_span, 0, att_span * 2 - 1)
p2aa_att = torch.matmul(
key_layer,
pos_query_layer.transpose(-1, -2).to(dtype=key_layer.dtype),
)
p2aa_att = torch.gather(
p2aa_att,
dim=-1,
index=p2c_dynamic_expand(p2aa_pos, query_layer, key_layer),
).transpose(-1, -2)
if query_layer.size(-2) != key_layer.size(-2):
pos_index = relative_pos[:, :, :, 0].unsqueeze(-1)
p2aa_att = torch.gather(
p2aa_att,
dim=-2,
index=pos_dynamic_expand(pos_index, p2aa_att, key_layer),
)
score += p2aa_att
disentangled_attentions["pos2aa"] = p2aa_att
# content -> structure
if "aa2ss" in self.pos_att_type:
assert ss_hidden_states is not None
ss_key_layer = self.ss_proj(ss_hidden_states)
ss_key_layer = self.transpose_for_scores(ss_key_layer)
# [batch_size, num_attention_heads, seq_len, seq_len]
aa2ss_att = torch.matmul(query_layer, ss_key_layer.transpose(-1, -2))
score += aa2ss_att
disentangled_attentions["aa2ss"] = aa2ss_att
if "ss2aa" in self.pos_att_type:
assert ss_hidden_states is not None
ss_query_layer = self.ss_q_proj(ss_hidden_states)
ss_query_layer = self.transpose_for_scores(ss_query_layer)
ss_query_layer /= torch.sqrt(
torch.tensor(ss_query_layer.size(-1), dtype=torch.float)
* scale_factor
)
ss2aa_att = torch.matmul(
key_layer, query_layer.transpose(-1, -2).to(dtype=key_layer.dtype)
)
score += ss2aa_att
disentangled_attentions["ss2aa"] = ss2aa_att
return score, disentangled_attentions
elif self.position_embedding_type == "rotary":
score = 0
if "aa2ss" in self.pos_att_type:
assert ss_hidden_states is not None
ss_key_layer = self.ss_proj(ss_hidden_states)
ss_key_layer = self.transpose_for_scores(ss_key_layer)
aa2ss_att = torch.matmul(query_layer, ss_key_layer.transpose(-1, -2))
score += aa2ss_att
disentangled_attentions["aa2ss"] = aa2ss_att
if "ss2aa" in self.pos_att_type:
assert ss_hidden_states is not None
ss_query_layer = self.ss_q_proj(ss_hidden_states)
ss_query_layer = self.transpose_for_scores(ss_query_layer)
ss_query_layer /= torch.sqrt(
torch.tensor(ss_query_layer.size(-1), dtype=torch.float)
* scale_factor
)
ss2aa_att = torch.matmul(
key_layer, query_layer.transpose(-1, -2).to(dtype=key_layer.dtype)
)
score += ss2aa_att
disentangled_attentions["ss2aa"] = ss2aa_att
return score, disentangled_attentions
class ProSSTSelfOutput(nn.Module):
def __init__(self, config):
super().__init__()
self.dense = nn.Linear(config.hidden_size, config.hidden_size)
self.LayerNorm = ProSSTLayerNorm(config.hidden_size, config.layer_norm_eps)
self.dropout = nn.Dropout(config.hidden_dropout_prob)
def forward(self, hidden_states, input_tensor):
hidden_states = self.dense(hidden_states)
hidden_states = self.dropout(hidden_states)
hidden_states = self.LayerNorm(hidden_states + input_tensor)
return hidden_states
class ProSSTAttention(nn.Module):
def __init__(self, config):
super().__init__()
self.config = config
self.self = DisentangledSelfAttention(config)
self.output = ProSSTSelfOutput(config)
def forward(
self,
hidden_states,
attention_mask,
ss_hidden_states=None,
relative_pos=None,
rel_embeddings=None,
output_attentions=False,
):
self_output = self.self(
hidden_states=hidden_states,
attention_mask=attention_mask,
output_attentions=output_attentions,
relative_pos=relative_pos,
rel_embeddings=rel_embeddings,
ss_hidden_states=ss_hidden_states
)
if output_attentions:
self_output, att_matrix = self_output
attention_output = self.output(self_output, hidden_states)
if output_attentions:
return (attention_output, att_matrix)
else:
return attention_output
class ProSSTIntermediate(nn.Module):
def __init__(self, config):
super().__init__()
self.dense = nn.Linear(config.hidden_size, config.intermediate_size)
if isinstance(config.hidden_act, str):
self.intermediate_act_fn = ACT2FN[config.hidden_act]
else:
self.intermediate_act_fn = config.hidden_act
def forward(self, hidden_states: torch.Tensor) -> torch.Tensor:
hidden_states = self.dense(hidden_states)
hidden_states = self.intermediate_act_fn(hidden_states)
return hidden_states
class ProSSTOutput(nn.Module):
def __init__(self, config):
super().__init__()
self.dense = nn.Linear(config.intermediate_size, config.hidden_size)
self.LayerNorm = ProSSTLayerNorm(config.hidden_size, config.layer_norm_eps)
self.dropout = nn.Dropout(config.hidden_dropout_prob)
self.config = config
def forward(self, hidden_states, input_tensor):
hidden_states = self.dense(hidden_states)
hidden_states = self.dropout(hidden_states)
hidden_states = self.LayerNorm(hidden_states + input_tensor)
return hidden_states
class ProSSTLayer(nn.Module):
def __init__(self, config):
super().__init__()
self.config = config
self.attention = ProSSTAttention(config)
self.intermediate = ProSSTIntermediate(config)
self.output = ProSSTOutput(config)
def forward(
self,
hidden_states,
attention_mask,
ss_hidden_states=None,
relative_pos=None,
rel_embeddings=None,
output_attentions=False,
):
attention_output = self.attention(
hidden_states,
attention_mask,
output_attentions=output_attentions,
relative_pos=relative_pos,
rel_embeddings=rel_embeddings,
ss_hidden_states=ss_hidden_states,
)
if output_attentions:
attention_output, att_matrix = attention_output
intermediate_output = self.intermediate(attention_output)
layer_output = self.output(intermediate_output, attention_output)
if output_attentions:
return (layer_output, att_matrix)
else:
return layer_output
class ProSSTEncoder(nn.Module):
"""Modified BertEncoder with relative position bias support"""
def __init__(self, config):
super().__init__()
self.layer = nn.ModuleList(
[ProSSTLayer(config) for _ in range(config.num_hidden_layers)]
)
self.relative_attention = config.relative_attention
if self.relative_attention:
self.max_relative_positions = config.max_relative_positions
self.rel_embeddings = nn.Embedding(
self.max_relative_positions * 2, config.hidden_size
)
def get_attention_mask(self, attention_mask):
if attention_mask.dim() <= 2:
extended_attention_mask = attention_mask.unsqueeze(1).unsqueeze(2)
attention_mask = extended_attention_mask * extended_attention_mask.squeeze(
-2
).unsqueeze(-1)
elif attention_mask.dim() == 3:
attention_mask = attention_mask.unsqueeze(1)
return attention_mask
def get_rel_pos(self, hidden_states):
query_size = hidden_states.size(-2)
key_size = hidden_states.size(-2)
relative_pos = build_relative_position(
query_size, key_size, hidden_states.device
)
return relative_pos
def forward(
self,
hidden_states,
attention_mask,
ss_hidden_states=None,
output_hidden_states=False,
output_attentions=False,
) -> BaseModelOutput:
attention_mask = self.get_attention_mask(attention_mask)
relative_pos = self.get_rel_pos(hidden_states)
all_hidden_states = []
all_attentions = []
rel_embeddings = self.rel_embeddings.weight
for i, layer_module in enumerate(self.layer):
if output_hidden_states:
all_hidden_states.append(hidden_states)
hidden_states = layer_module(
hidden_states,
attention_mask,
relative_pos=relative_pos,
rel_embeddings=rel_embeddings,
output_attentions=output_attentions,
ss_hidden_states=ss_hidden_states,
)
if output_attentions:
hidden_states, att_matrix = hidden_states
all_attentions.append(att_matrix)
if output_hidden_states:
all_hidden_states.append(hidden_states)
return BaseModelOutput(
last_hidden_state=hidden_states,
hidden_states=all_hidden_states,
attentions=all_attentions,
)
class ProSSTEmbeddings(nn.Module):
"""Construct the embeddings from word, position and token_type embeddings."""
def __init__(self, config):
super().__init__()
self.config = config
self.pad_token_id = config.pad_token_id
self.embedding_size = config.hidden_size
self.word_embeddings = nn.Embedding(
config.vocab_size, self.embedding_size, padding_idx=self.pad_token_id
)
self.LayerNorm = ProSSTLayerNorm(config.hidden_size, config.layer_norm_eps)
# 绝对位置编码
self.position_biased_input = config.position_biased_input
if not self.position_biased_input:
self.position_embeddings = None
else:
self.position_embeddings = nn.Embedding(
config.max_position_embeddings,
self.embedding_size,
padding_idx=self.pad_token_id,
)
# Token-type embeddings
if config.type_vocab_size > 0:
self.token_type_embeddings = nn.Embedding(
config.type_vocab_size, self.embedding_size
)
# SS embeddings
if config.ss_vocab_size > 0:
self.ss_embeddings = nn.Embedding(config.ss_vocab_size, self.embedding_size)
self.ss_layer_norm = ProSSTLayerNorm(
config.hidden_size, config.layer_norm_eps
)
self.dropout = nn.Dropout(config.hidden_dropout_prob)
# PLDDT Embeddings
self.rbf_16_fn = partial(rbf, v_min=0.0, v_max=1.0, n_bins=16)
self.plddt_proj = nn.Linear(16, self.embedding_size)
self.avg_plddt_proj = nn.Linear(16, self.embedding_size)
# position_ids (1, len position emb) is contiguous in memory and exported when serialized
if self.position_biased_input:
self.register_buffer(
"position_ids",
torch.arange(config.max_position_embeddings).expand((1, -1)),
persistent=False,
)
def forward(
self,
input_ids,
attention_mask,
ss_input_ids=None,
plddt=None,
avg_plddt=None,
token_type_ids=None,
):
inputs_embeds = self.word_embeddings(input_ids)
inputs_embeds = 0.1 * inputs_embeds + 0.9 * inputs_embeds.detach()
embeddings = inputs_embeds
if self.position_biased_input:
input_shape = input_ids.size()
seq_length = input_shape[1]
position_ids = self.position_ids[:, :seq_length]
if seq_length > position_ids.size(1):
zero_padding = (
torch.zeros(
(input_shape[0], seq_length - position_ids.size(1)),
dtype=torch.long,
device=position_ids.device,
)
+ 2047
)
position_ids = torch.cat([position_ids, zero_padding], dim=1)
position_embeddings = self.position_embeddings(position_ids.long())
embeddings += position_embeddings
if self.config.type_vocab_size > 0:
token_type_embeddings = self.token_type_embeddings(token_type_ids)
embeddings += token_type_embeddings
embeddings = self.LayerNorm(embeddings)
embeddings = embeddings * attention_mask.unsqueeze(-1)
embeddings = self.dropout(embeddings)
if self.config.ss_vocab_size > 0:
ss_embeddings = self.ss_embeddings(ss_input_ids)
ss_embeddings = ss_embeddings * attention_mask.unsqueeze(-1)
# plddt
plddt_embedding = self.plddt_proj(self.rbf_16_fn(plddt))
avg_plddt_embedding = self.plddt_proj(self.rbf_16_fn(avg_plddt))
ss_embeddings = ss_embeddings + plddt_embedding + avg_plddt_embedding
ss_embeddings = self.ss_layer_norm(ss_embeddings)
ss_embeddings = ss_embeddings * 0.1 + ss_embeddings.detach() * 0.9
ss_embeddings = self.dropout(ss_embeddings)
return embeddings, ss_embeddings
return embeddings, None
class ProSSTPreTrainedModel(PreTrainedModel):
"""
An abstract class to handle weights initialization and a simple interface for downloading and loading pretrained
models.
"""
config_class = ProSSTConfig
base_model_prefix = "ProSST"
_keys_to_ignore_on_load_unexpected = ["position_embeddings"]
supports_gradient_checkpointing = True
def _init_weights(self, module):
"""Initialize the weights."""
if isinstance(module, nn.Linear):
# Slightly different from the TF version which uses truncated_normal for initialization
# cf https://github.com/pytorch/pytorch/pull/5617
module.weight.data.normal_(mean=0.0, std=self.config.initializer_range)
if module.bias is not None:
module.bias.data.zero_()
elif isinstance(module, nn.Embedding):
module.weight.data.normal_(mean=0.0, std=self.config.initializer_range)
if module.padding_idx is not None:
module.weight.data[module.padding_idx].zero_()
def _set_gradient_checkpointing(self, module, value=False):
if isinstance(module, ProSSTEncoder):
module.gradient_checkpointing = value
class ProSSTModel(ProSSTPreTrainedModel):
def __init__(self, config):
super().__init__(config)
self.config = config
self.embeddings = ProSSTEmbeddings(config)
self.encoder = ProSSTEncoder(config)
self.post_init()
def forward(
self,
input_ids,
attention_mask,
ss_input_ids=None,
plddt=None,
avg_plddt=None,
token_type_ids=None,
output_attentions=False,
output_hidden_states=False,
) -> BaseModelOutput:
embedding_output, ss_embeddings = self.embeddings(
input_ids=input_ids,
attention_mask=attention_mask,
ss_input_ids=ss_input_ids,
plddt=plddt,
avg_plddt=avg_plddt,
token_type_ids=token_type_ids,
)
encoder_outputs = self.encoder(
embedding_output,
attention_mask,
output_hidden_states=output_hidden_states,
output_attentions=output_attentions,
ss_hidden_states=ss_embeddings,
)
return BaseModelOutput(
last_hidden_state=encoder_outputs.last_hidden_state,
hidden_states=encoder_outputs.hidden_states if output_hidden_states else None,
attentions=encoder_outputs.attentions if output_attentions else None,
)
class ProSSTPredictionHeadTransform(nn.Module):
def __init__(self, config):
super().__init__()
self.embedding_size = getattr(config, "embedding_size", config.hidden_size)
self.dense = nn.Linear(config.hidden_size, self.embedding_size)
if isinstance(config.hidden_act, str):
self.transform_act_fn = ACT2FN[config.hidden_act]
else:
self.transform_act_fn = config.hidden_act
self.LayerNorm = nn.LayerNorm(self.embedding_size, eps=config.layer_norm_eps)
def forward(self, hidden_states):
hidden_states = self.dense(hidden_states)
hidden_states = self.transform_act_fn(hidden_states)
hidden_states = self.LayerNorm(hidden_states)
return hidden_states
class ProSSTLMPredictionHead(nn.Module):
def __init__(self, config):
super().__init__()
self.config = config
self.transform = ProSSTPredictionHeadTransform(config)
self.embedding_size = config.hidden_size
self.decoder = nn.Linear(self.embedding_size, config.vocab_size, bias=False)
def forward(self, hidden_states):
hidden_states = self.transform(hidden_states)
hidden_states = self.decoder(hidden_states)
return hidden_states
class ProSSTOnlyMLMHead(nn.Module):
def __init__(self, config):
super().__init__()
self.predictions = ProSSTLMPredictionHead(config)
def forward(self, sequence_output):
prediction_scores = self.predictions(sequence_output)
return prediction_scores
class ProSSTPreTrainedModel(PreTrainedModel):
"""
An abstract class to handle weights initialization and a simple interface for downloading and loading pretrained
models.
"""
config_class = ProSSTConfig
base_model_prefix = "ProSST"
_keys_to_ignore_on_load_unexpected = ["position_embeddings"]
supports_gradient_checkpointing = True
def _init_weights(self, module):
"""Initialize the weights."""
if isinstance(module, nn.Linear):
# Slightly different from the TF version which uses truncated_normal for initialization
# cf https://github.com/pytorch/pytorch/pull/5617
module.weight.data.normal_(mean=0.0, std=self.config.initializer_range)
if module.bias is not None:
module.bias.data.zero_()
elif isinstance(module, nn.Embedding):
module.weight.data.normal_(mean=0.0, std=self.config.initializer_range)
if module.padding_idx is not None:
module.weight.data[module.padding_idx].zero_()
def _set_gradient_checkpointing(self, module, value=False):
if isinstance(module, ProSSTEncoder):
module.gradient_checkpointing = value
class ProSSTForMaskedLM(ProSSTPreTrainedModel):
_tied_weights_keys = [
"cls.predictions.decoder.weight",
"cls.predictions.decoder.bias",
]
def __init__(self, config):
super().__init__(config)
self.prosst = ProSSTModel(config)
self.cls = ProSSTOnlyMLMHead(config)
self.post_init()
def forward(
self,
input_ids,
attention_mask,
ss_input_ids=None,
plddt=None,
avg_plddt=None,
token_type_ids=None,
labels=None,
output_attentions=False,
output_hidden_states=False,
) -> MaskedLMOutput:
outputs = self.prosst(
input_ids=input_ids,
attention_mask=attention_mask,
ss_input_ids=ss_input_ids,
plddt=plddt,
avg_plddt=avg_plddt,
token_type_ids=token_type_ids,
output_attentions=output_attentions,
output_hidden_states=output_hidden_states,
)
sequence_output = outputs[0]
prediction_scores = self.cls(sequence_output)
masked_lm_loss = None
if labels is not None:
loss_fct = CrossEntropyLoss(ignore_index=0) # -100 index = padding token
masked_lm_loss = loss_fct(
prediction_scores.view(-1, self.config.vocab_size), labels.view(-1)
)
else:
masked_lm_loss = None
return MaskedLMOutput(
loss=masked_lm_loss,
logits=prediction_scores,
hidden_states=outputs.hidden_states,
attentions=outputs.attentions,
)
class ProSSTForSequenceClassification(ProSSTPreTrainedModel):
def __init__(self, config):
super().__init__(config)
num_labels = getattr(config, "num_labels", 2)
self.num_labels = num_labels
self.scale_hidden = getattr(config, "scale_hidden", 1)
self.prosst = ProSSTModel(config)
self.pooler = ContextPooler(config)
output_dim = self.pooler.output_dim * self.scale_hidden
self.classifier = nn.Linear(output_dim, num_labels)
drop_out = getattr(config, "cls_dropout", None)
drop_out = self.config.hidden_dropout_prob if drop_out is None else drop_out
self.dropout = nn.Dropout(drop_out)
# Initialize weights and apply final processing
self.post_init()
def get_input_embeddings(self):
return self.prosst.get_input_embeddings()
def set_input_embeddings(self, new_embeddings):
self.prosst.set_input_embeddings(new_embeddings)
def forward(
self,
input_ids: Optional[torch.Tensor] = None,
ss_input_ids: Optional[torch.Tensor] = None,
attention_mask: Optional[torch.Tensor] = None,
token_type_ids: Optional[torch.Tensor] = None,
position_ids: Optional[torch.Tensor] = None,
inputs_embeds: Optional[torch.Tensor] = None,
labels: Optional[torch.Tensor] = None,
output_attentions: Optional[bool] = None,
output_hidden_states: Optional[bool] = None,
return_dict: Optional[bool] = None,
) -> Union[Tuple, SequenceClassifierOutput]:
r"""
labels (`torch.LongTensor` of shape `(batch_size,)`, *optional*):
Labels for computing the sequence classification/regression loss. Indices should be in `[0, ...,
config.num_labels - 1]`. If `config.num_labels == 1` a regression loss is computed (Mean-Square loss), If
`config.num_labels > 1` a classification loss is computed (Cross-Entropy).
"""
return_dict = (
return_dict if return_dict is not None else self.config.use_return_dict
)
outputs = self.prosst(
input_ids,
ss_input_ids=ss_input_ids,
token_type_ids=token_type_ids,
attention_mask=attention_mask,
position_ids=position_ids,
inputs_embeds=inputs_embeds,
output_attentions=output_attentions,
output_hidden_states=output_hidden_states,
return_dict=return_dict,
)
encoder_layer = outputs[0]
pooled_output = self.pooler(encoder_layer, attention_mask)
pooled_output = self.dropout(pooled_output)
logits = self.classifier(pooled_output)
loss = None
if labels is not None:
if self.config.problem_type is None:
if self.num_labels == 1:
# regression task
loss_fn = nn.MSELoss()
logits = logits.view(-1).to(labels.dtype)
loss = loss_fn(logits, labels.view(-1))
elif labels.dim() == 1 or labels.size(-1) == 1:
label_index = (labels >= 0).nonzero()
labels = labels.long()
if label_index.size(0) > 0:
labeled_logits = torch.gather(
logits,
0,
label_index.expand(label_index.size(0), logits.size(1)),
)
labels = torch.gather(labels, 0, label_index.view(-1))
loss_fct = CrossEntropyLoss()
loss = loss_fct(
labeled_logits.view(-1, self.num_labels).float(),
labels.view(-1),
)
else:
loss = torch.tensor(0).to(logits)
else:
log_softmax = nn.LogSoftmax(-1)
loss = -((log_softmax(logits) * labels).sum(-1)).mean()
elif self.config.problem_type == "regression":
loss_fct = MSELoss()
if self.num_labels == 1:
loss = loss_fct(logits.squeeze(), labels.squeeze())
else:
loss = loss_fct(logits, labels)
elif self.config.problem_type == "binary_classification":
loss_fct = BCEWithLogitsLoss()
loss = loss_fct(logits.squeeze(), labels.squeeze().to(logits.dtype))
elif self.config.problem_type == "single_label_classification":
loss_fct = CrossEntropyLoss()
loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1))
elif self.config.problem_type == "multi_label_classification":
loss_fct = BCEWithLogitsLoss()
loss = loss_fct(logits, labels.to(logits.dtype))
if not return_dict:
output = (logits,) + outputs[1:]
return ((loss,) + output) if loss is not None else output
return SequenceClassifierOutput(
loss=loss,
logits=logits,
hidden_states=outputs.hidden_states,
attentions=outputs.attentions,
)
class ProSSTForTokenClassification(ProSSTPreTrainedModel):
def __init__(self, config):
super().__init__(config)
self.num_labels = config.num_labels
self.prosst = ProSSTModel(config)
self.dropout = nn.Dropout(config.hidden_dropout_prob)
self.classifier = nn.Linear(config.hidden_size, config.num_labels)
# Initialize weights and apply final processing
self.post_init()
def forward(
self,
input_ids: Optional[torch.Tensor] = None,
attention_mask: Optional[torch.Tensor] = None,
token_type_ids: Optional[torch.Tensor] = None,
position_ids: Optional[torch.Tensor] = None,
inputs_embeds: Optional[torch.Tensor] = None,
labels: Optional[torch.Tensor] = None,
output_attentions: Optional[bool] = None,
output_hidden_states: Optional[bool] = None,
return_dict: Optional[bool] = None,
) -> Union[Tuple, TokenClassifierOutput]:
r"""
labels (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*):
Labels for computing the token classification loss. Indices should be in `[0, ..., config.num_labels - 1]`.
"""
return_dict = (
return_dict if return_dict is not None else self.config.use_return_dict
)
outputs = self.prosst(
input_ids,
attention_mask=attention_mask,
token_type_ids=token_type_ids,
position_ids=position_ids,
inputs_embeds=inputs_embeds,
output_attentions=output_attentions,
output_hidden_states=output_hidden_states,
return_dict=return_dict,
)
sequence_output = outputs[0]
sequence_output = self.dropout(sequence_output)
logits = self.classifier(sequence_output)
loss = None
if labels is not None:
loss_fct = CrossEntropyLoss()
loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1))
if not return_dict:
output = (logits,) + outputs[1:]
return ((loss,) + output) if loss is not None else output
return TokenClassifierOutput(
loss=loss,
logits=logits,
hidden_states=outputs.hidden_states,
attentions=outputs.attentions,
)
ProSSTModel.register_for_auto_class("AutoModel")
ProSSTForMaskedLM.register_for_auto_class("AutoModelForMaskedLM")
ProSSTForSequenceClassification.register_for_auto_class(
"AutoModelForSequenceClassification"
)
ProSSTForTokenClassification.register_for_auto_class("AutoModelForTokenClassification")