File size: 8,271 Bytes
abed76f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
from typing import Optional

from transformers import AutoTokenizer, AutoModel
import numpy as np
import os

import torch
from torch import Tensor
from transformers import BatchEncoding, PreTrainedTokenizerBase
import json

class ModelUtils :
    def __init__(self, model_root) :
        self.model_root = model_root
        self.model_path = os.path.join(model_root, "model")
        self.tokenizer_path = os.path.join(model_root, "tokenizer")

    def download_model (self) :
        BASE_MODEL = "HooshvareLab/bert-fa-zwnj-base"
        tokenizer = AutoTokenizer.from_pretrained(BASE_MODEL)
        model = AutoModel.from_pretrained(BASE_MODEL)

        tokenizer.save_pretrained(self.tokenizer_path)
        model.save_pretrained(self.model_path)

    def make_dirs (self) :
        if not os.path.isdir(self.model_root) :
            os.mkdir(self.model_root)
        if not os.path.isdir(self.model_path) :
            os.mkdir(self.model_path)
        if not os.path.isdir(self.tokenizer_path) :
            os.mkdir(self.tokenizer_path)

class Preprocess :
    def __init__(self, model_root) :
        self.model_root = model_root
        self.model_path = os.path.join(model_root, "model")
        self.tokenizer_path = os.path.join(model_root, "tokenizer")
        self.device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

    def vectorize (self, text) :
        model = AutoModel.from_pretrained(self.model_path).to(self.device)
        tokenizer = AutoTokenizer.from_pretrained(self.tokenizer_path)
        ids, masks = self.transform_single_text(text, tokenizer, 510, stride=510, minimal_chunk_length=0, maximal_text_length=None)
        # ids = torch.cat(ids, dim=0)
        # masks = torch.cat(masks, dim=0)
        tokens = {'input_ids': ids.to(self.device), 'attention_mask': masks.to(self.device)}

        output = model(**tokens)
        last_hidden_states = output.last_hidden_state

        # first token embedding of shape <1, hidden_size>
        # first_token_embedding = last_hidden_states[:,0,:]

        # pooled embedding of shape <1, hidden_size>
        mean_pooled_embedding = last_hidden_states.mean(axis=1)

        result = mean_pooled_embedding.flatten().cpu().detach().numpy()
        # print(result.shape)
        # print(result)
        # Convert the list to JSON
        json_data = json.dumps(result.tolist())

        return json_data



    def transform_list_of_texts(
        self,
        texts: list[str],
        tokenizer: PreTrainedTokenizerBase,
        chunk_size: int,
        stride: int,
        minimal_chunk_length: int,
        maximal_text_length: Optional[int] = None,
    ) -> BatchEncoding:
        model_inputs = [
            self.transform_single_text(text, tokenizer, chunk_size, stride, minimal_chunk_length, maximal_text_length)
            for text in texts
        ]
        input_ids = [model_input[0] for model_input in model_inputs]
        attention_mask = [model_input[1] for model_input in model_inputs]
        tokens = {"input_ids": input_ids, "attention_mask": attention_mask}
        return input_ids, attention_mask


    def transform_single_text(
        self,
        text: str,
        tokenizer: PreTrainedTokenizerBase,
        chunk_size: int,
        stride: int,
        minimal_chunk_length: int,
        maximal_text_length: Optional[int],
    ) -> tuple[Tensor, Tensor]:
        """Transforms (the entire) text to model input of BERT model."""
        if maximal_text_length:
            tokens = self.tokenize_text_with_truncation(text, tokenizer, maximal_text_length)
        else:
            tokens = self.tokenize_whole_text(text, tokenizer)
        input_id_chunks, mask_chunks = self.split_tokens_into_smaller_chunks(tokens, chunk_size, stride, minimal_chunk_length)
        self.add_special_tokens_at_beginning_and_end(input_id_chunks, mask_chunks)
        self.add_padding_tokens(input_id_chunks, mask_chunks)
        input_ids, attention_mask = self.stack_tokens_from_all_chunks(input_id_chunks, mask_chunks)
        return input_ids, attention_mask


    def tokenize_whole_text(self, text: str, tokenizer: PreTrainedTokenizerBase) -> BatchEncoding:
        """Tokenizes the entire text without truncation and without special tokens."""
        tokens = tokenizer(text, add_special_tokens=False, truncation=False, return_tensors="pt")
        return tokens


    def tokenize_text_with_truncation(
        self, text: str, tokenizer: PreTrainedTokenizerBase, maximal_text_length: int
    ) -> BatchEncoding:
        """Tokenizes the text with truncation to maximal_text_length and without special tokens."""
        tokens = tokenizer(
            text, add_special_tokens=False, max_length=maximal_text_length, truncation=True, return_tensors="pt"
        )
        return tokens


    def split_tokens_into_smaller_chunks(
        self,
        tokens: BatchEncoding,
        chunk_size: int,
        stride: int,
        minimal_chunk_length: int,
    ) -> tuple[list[Tensor], list[Tensor]]:
        """Splits tokens into overlapping chunks with given size and stride."""
        input_id_chunks = self.split_overlapping(tokens["input_ids"][0], chunk_size, stride, minimal_chunk_length)
        mask_chunks = self.split_overlapping(tokens["attention_mask"][0], chunk_size, stride, minimal_chunk_length)
        return input_id_chunks, mask_chunks


    def add_special_tokens_at_beginning_and_end(self, input_id_chunks: list[Tensor], mask_chunks: list[Tensor]) -> None:
        """
        Adds special CLS token (token id = 101) at the beginning.
        Adds SEP token (token id = 102) at the end of each chunk.
        Adds corresponding attention masks equal to 1 (attention mask is boolean).
        """
        for i in range(len(input_id_chunks)):
            # adding CLS (token id 101) and SEP (token id 102) tokens
            input_id_chunks[i] = torch.cat([Tensor([101]), input_id_chunks[i], Tensor([102])])
            # adding attention masks  corresponding to special tokens
            mask_chunks[i] = torch.cat([Tensor([1]), mask_chunks[i], Tensor([1])])


    def add_padding_tokens(self, input_id_chunks: list[Tensor], mask_chunks: list[Tensor]) -> None:
        """Adds padding tokens (token id = 0) at the end to make sure that all chunks have exactly 512 tokens."""
        for i in range(len(input_id_chunks)):
            # get required padding length
            pad_len = 512 - input_id_chunks[i].shape[0]
            # check if tensor length satisfies required chunk size
            if pad_len > 0:
                # if padding length is more than 0, we must add padding
                input_id_chunks[i] = torch.cat([input_id_chunks[i], Tensor([0] * pad_len)])
                mask_chunks[i] = torch.cat([mask_chunks[i], Tensor([0] * pad_len)])


    def stack_tokens_from_all_chunks(self, input_id_chunks: list[Tensor], mask_chunks: list[Tensor]) -> tuple[Tensor, Tensor]:
        """Reshapes data to a form compatible with BERT model input."""
        input_ids = torch.stack(input_id_chunks)
        attention_mask = torch.stack(mask_chunks)

        return input_ids.long(), attention_mask.int()


    def split_overlapping(self, tensor: Tensor, chunk_size: int, stride: int, minimal_chunk_length: int) -> list[Tensor]:
        """Helper function for dividing 1-dimensional tensors into overlapping chunks."""
        self.check_split_parameters_consistency(chunk_size, stride, minimal_chunk_length)
        result = [tensor[i : i + chunk_size] for i in range(0, len(tensor), stride)]
        if len(result) > 1:
            # ignore chunks with less than minimal_length number of tokens
            result = [x for x in result if len(x) >= minimal_chunk_length]
        return result


    def check_split_parameters_consistency(self, chunk_size: int, stride: int, minimal_chunk_length: int) -> None:
        if chunk_size > 510:
            raise RuntimeError("Size of each chunk cannot be bigger than 510!")
        if minimal_chunk_length > chunk_size:
            raise RuntimeError("Minimal length cannot be bigger than size!")
        if stride > chunk_size:
            raise RuntimeError(
                "Stride cannot be bigger than size! Chunks must overlap or be near each other!"
            )