File size: 2,482 Bytes
ad16788 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
from typing import Dict
from typing import Tuple
import torch
import torch.nn.functional as F
from typeguard import check_argument_types
from espnet.nets.pytorch_backend.nets_utils import make_pad_mask
from espnet2.lm.abs_model import AbsLM
from espnet2.torch_utils.device_funcs import force_gatherable
from espnet2.train.abs_espnet_model import AbsESPnetModel
class ESPnetLanguageModel(AbsESPnetModel):
def __init__(self, lm: AbsLM, vocab_size: int, ignore_id: int = 0):
assert check_argument_types()
super().__init__()
self.lm = lm
self.sos = vocab_size - 1
self.eos = vocab_size - 1
# ignore_id may be assumed as 0, shared with CTC-blank symbol for ASR.
self.ignore_id = ignore_id
def nll(
self, text: torch.Tensor, text_lengths: torch.Tensor
) -> Tuple[torch.Tensor, torch.Tensor]:
batch_size = text.size(0)
# For data parallel
text = text[:, : text_lengths.max()]
# 1. Create a sentence pair like '<sos> w1 w2 w3' and 'w1 w2 w3 <eos>'
# text: (Batch, Length) -> x, y: (Batch, Length + 1)
x = F.pad(text, [1, 0], "constant", self.eos)
t = F.pad(text, [0, 1], "constant", self.ignore_id)
for i, l in enumerate(text_lengths):
t[i, l] = self.sos
x_lengths = text_lengths + 1
# 2. Forward Language model
# x: (Batch, Length) -> y: (Batch, Length, NVocab)
y, _ = self.lm(x, None)
# 3. Calc negative log likelihood
# nll: (BxL,)
nll = F.cross_entropy(y.view(-1, y.shape[-1]), t.view(-1), reduction="none")
# nll: (BxL,) -> (BxL,)
nll.masked_fill_(make_pad_mask(x_lengths).to(nll.device).view(-1), 0.0)
# nll: (BxL,) -> (B, L)
nll = nll.view(batch_size, -1)
return nll, x_lengths
def forward(
self, text: torch.Tensor, text_lengths: torch.Tensor
) -> Tuple[torch.Tensor, Dict[str, torch.Tensor], torch.Tensor]:
nll, y_lengths = self.nll(text, text_lengths)
ntokens = y_lengths.sum()
loss = nll.sum() / ntokens
stats = dict(loss=loss.detach())
# force_gatherable: to-device and to-tensor if scalar for DataParallel
loss, stats, weight = force_gatherable((loss, stats, ntokens), loss.device)
return loss, stats, weight
def collect_feats(
self, text: torch.Tensor, text_lengths: torch.Tensor
) -> Dict[str, torch.Tensor]:
return {}
|