""" Implementation borrowed from transformers package and extended to support multiple prediction heads: https://github.com/huggingface/transformers/blob/master/src/transformers/models/bert/modeling_bert.py """ import math import torch import torch.utils.checkpoint from packaging import version from torch import nn from torch.nn import BCEWithLogitsLoss, CrossEntropyLoss, MSELoss from transformers import PretrainedConfig from transformers.activations import ACT2FN, gelu from transformers.file_utils import ( add_code_sample_docstrings, add_start_docstrings, add_start_docstrings_to_model_forward, replace_return_docstrings, ) from transformers.modeling_outputs import ( BaseModelOutputWithPastAndCrossAttentions, BaseModelOutputWithPoolingAndCrossAttentions, CausalLMOutputWithCrossAttentions, MaskedLMOutput, MultipleChoiceModelOutput, QuestionAnsweringModelOutput, SequenceClassifierOutput, TokenClassifierOutput, ) from transformers.modeling_utils import ( PreTrainedModel, apply_chunking_to_forward, find_pruneable_heads_and_indices, prune_linear_layer, ) from transformers.utils import logging # from transformers.models.roberta.configuration_roberta import RobertaConfig logger = logging.get_logger(__name__) _CHECKPOINT_FOR_DOC = "roberta-base" _CONFIG_FOR_DOC = "RobertaConfig" _TOKENIZER_FOR_DOC = "RobertaTokenizer" from transformers.models.roberta.modeling_roberta import ( RobertaPreTrainedModel, RobertaClassificationHead, RobertaModel, ) class RobertaClassificationHead(nn.Module): """Head for sentence-level classification tasks.""" def __init__(self, config, num_labels): super().__init__() self.dense = nn.Linear(config.hidden_size, config.hidden_size) classifier_dropout = ( config.classifier_dropout if config.classifier_dropout is not None else config.hidden_dropout_prob ) self.dropout = nn.Dropout(classifier_dropout) self.out_proj = nn.Linear(config.hidden_size, num_labels) def forward(self, features, **kwargs): x = features[:, 0, :] # take token (equiv. to [CLS]) x = self.dropout(x) x = self.dense(x) x = torch.tanh(x) x = self.dropout(x) x = self.out_proj(x) return x class RobertaForMultitaskQA(RobertaPreTrainedModel): _keys_to_ignore_on_load_missing = [r"position_ids"] def __init__(self, config, **kwargs): super().__init__(PretrainedConfig()) self.num_labels = kwargs.get("task_labels_map", {}) self.config = config self.roberta = RobertaModel(config, add_pooling_layer=False) ## for squad2 QA task self.qa_outputs = nn.Linear( config.hidden_size, list(self.num_labels.values())[0] ) ## for boolq self.classifier = RobertaClassificationHead( config, num_labels=list(self.num_labels.values())[1] ) self.init_weights() def forward( self, input_ids=None, attention_mask=None, token_type_ids=None, position_ids=None, head_mask=None, inputs_embeds=None, labels=None, start_positions=None, end_positions=None, output_attentions=None, output_hidden_states=None, return_dict=None, task_name=None, ): r""" labels (:obj:`torch.LongTensor` of shape :obj:`(batch_size,)`, `optional`): Labels for computing the sequence classification/regression loss. Indices should be in :obj:`[0, ..., config.num_labels - 1]`. If :obj:`config.num_labels == 1` a regression loss is computed (Mean-Square loss), If :obj:`config.num_labels > 1` a classification loss is computed (Cross-Entropy). """ return_dict = ( return_dict if return_dict is not None else self.config.use_return_dict ) outputs = self.roberta( input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids, position_ids=position_ids, head_mask=head_mask, inputs_embeds=inputs_embeds, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict, ) sequence_output = outputs[0] logits = self.qa_outputs(sequence_output) start_logits, end_logits = logits.split(1, dim=-1) start_logits = start_logits.squeeze(-1).contiguous() end_logits = end_logits.squeeze(-1).contiguous() total_loss = None if start_positions is not None and end_positions is not None: # If we are on multi-GPU, split add a dimension if len(start_positions.size()) > 1: start_positions = start_positions.squeeze(-1) if len(end_positions.size()) > 1: end_positions = end_positions.squeeze(-1) # sometimes the start/end positions are outside our model inputs, we ignore these terms ignored_index = start_logits.size(1) start_positions = start_positions.clamp(0, ignored_index) end_positions = end_positions.clamp(0, ignored_index) loss_fct = CrossEntropyLoss(ignore_index=ignored_index) start_loss = loss_fct(start_logits, start_positions) end_loss = loss_fct(end_logits, end_positions) total_loss = (start_loss + end_loss) / 2 if not return_dict: output = (start_logits, end_logits) + outputs[2:] return ((total_loss,) + output) if total_loss is not None else output qa_result = QuestionAnsweringModelOutput( loss=total_loss, start_logits=start_logits, end_logits=end_logits, hidden_states=outputs.hidden_states, attentions=outputs.attentions, ) loss = None logits = self.classifier(sequence_output) if labels is not None: if self.config.problem_type is None: if list(self.num_labels.values())[1] == 1: self.config.problem_type = "regression" elif list(self.num_labels.values())[1] > 1 and ( labels.dtype == torch.long or labels.dtype == torch.int ): self.config.problem_type = "single_label_classification" else: self.config.problem_type = "multi_label_classification" if self.config.problem_type == "regression": loss_fct = MSELoss() if list(self.num_labels.values())[1] == 1: loss = loss_fct(logits.squeeze(), labels.squeeze()) else: loss = loss_fct(logits, labels) elif self.config.problem_type == "single_label_classification": loss_fct = CrossEntropyLoss() loss = loss_fct( logits.view(-1, list(self.num_labels.values())[1]), labels.view(-1), ) elif self.config.problem_type == "multi_label_classification": loss_fct = BCEWithLogitsLoss() loss = loss_fct(logits, labels) if not return_dict: output = (logits,) + outputs[2:] return ((loss,) + output) if loss is not None else output classifier_result = SequenceClassifierOutput( loss=loss, logits=logits, hidden_states=outputs.hidden_states, attentions=outputs.attentions, ) return qa_result, classifier_result