File size: 3,448 Bytes
a0f6bbc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
from torchaudio.models import Conformer
from torchaudio.models.rnnt import _TimeReduction
from transformers import PretrainedConfig, PreTrainedModel
import torch
import torchaudio
import math
import numpy as np
from torch import nn
from typing import List, Tuple, Optional

HF_CTC_VOCAB = [
    '',
    'a',
    'b',
    'c',
    'd',
    'e',
    'f',
    'g',
    'h',
    'i',
    'j',
    'k',
    'l',
    'm',
    'n',
    'o',
    'p',
    'q',
    'r',
    's',
    't',
    'u',
    'v',
    'w',
    'x',
    'y',
    'z',
    '0',
    '1',
    '2',
    '3',
    '4',
    '5',
    '6',
    '7',
    '8',
    '9',
    ' ',
    '?',
    '_'
]

DECIBEL = 2 * 20 * math.log10(torch.iinfo(torch.int16).max)
GAIN = pow(10, 0.05 * DECIBEL)

spectrogram_transform = torchaudio.transforms.MelSpectrogram(
    sample_rate=16000, n_fft=400, n_mels=80, hop_length=160)


def piecewise_linear_log(x):
    x = x * GAIN
    x[x > math.e] = torch.log(x[x > math.e])
    x[x <= math.e] = x[x <= math.e] / math.e
    return x


def melspectrogram(x):
    if isinstance(x, np.ndarray):
        x = torch.Tensor(x)
    x = spectrogram_transform(x).transpose(1, 0)
    return piecewise_linear_log(x)


class ConformerConfig(PretrainedConfig):
    model_type = 'conformer'


class ConformerEncoder(PreTrainedModel):
    config_class = ConformerConfig

    def __init__(
        self,
        config,
    ) -> None:
        super().__init__(config)
        self.time_reduction = _TimeReduction(config.time_reduction_stride)
        self.input_linear = torch.nn.Linear(
            config.input_dim * config.time_reduction_stride,
            config.conformer_input_dim)
        self.conformer = Conformer(
            num_layers=config.conformer_num_layers,
            input_dim=config.conformer_input_dim,
            ffn_dim=config.conformer_ffn_dim,
            num_heads=config.conformer_num_heads,
            depthwise_conv_kernel_size=config.conformer_depthwise_conv_kernel_size,
            dropout=config.conformer_dropout,
            use_group_norm=True,
            convolution_first=True,
        )
        self.output_linear = torch.nn.Linear(config.conformer_input_dim, config.output_dim)

    def forward(self, inputs, lengths, labels=None):
        time_reduction_out, time_reduction_lengths = self.time_reduction(inputs, lengths)
        input_linear_out = self.input_linear(time_reduction_out)
        x, input_lengths = self.conformer(input_linear_out, time_reduction_lengths)
        logits = self.output_linear(x)

        loss = None
        if labels is not None:
            labels_mask = labels >= 0
            target_lengths = labels_mask.sum(-1)
            flattened_targets = labels.masked_select(labels_mask)
            log_probs = nn.functional.log_softmax(
                logits,
                dim=-1,
                dtype=torch.float32
            ).transpose(0, 1)

            with torch.backends.cudnn.flags(enabled=False):
                loss = nn.functional.ctc_loss(
                    log_probs,
                    flattened_targets,
                    input_lengths,
                    target_lengths,
                    blank=self.config.pad_token_id,
                    reduction=self.config.ctc_loss_reduction,
                    zero_infinity=self.config.ctc_zero_infinity,
                )

        output = (logits, input_lengths)
        return ((loss,) + output) if loss is not None else output