Spaces:
Sleeping
Sleeping
File size: 8,271 Bytes
abed76f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 |
from typing import Optional
from transformers import AutoTokenizer, AutoModel
import numpy as np
import os
import torch
from torch import Tensor
from transformers import BatchEncoding, PreTrainedTokenizerBase
import json
class ModelUtils :
def __init__(self, model_root) :
self.model_root = model_root
self.model_path = os.path.join(model_root, "model")
self.tokenizer_path = os.path.join(model_root, "tokenizer")
def download_model (self) :
BASE_MODEL = "HooshvareLab/bert-fa-zwnj-base"
tokenizer = AutoTokenizer.from_pretrained(BASE_MODEL)
model = AutoModel.from_pretrained(BASE_MODEL)
tokenizer.save_pretrained(self.tokenizer_path)
model.save_pretrained(self.model_path)
def make_dirs (self) :
if not os.path.isdir(self.model_root) :
os.mkdir(self.model_root)
if not os.path.isdir(self.model_path) :
os.mkdir(self.model_path)
if not os.path.isdir(self.tokenizer_path) :
os.mkdir(self.tokenizer_path)
class Preprocess :
def __init__(self, model_root) :
self.model_root = model_root
self.model_path = os.path.join(model_root, "model")
self.tokenizer_path = os.path.join(model_root, "tokenizer")
self.device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
def vectorize (self, text) :
model = AutoModel.from_pretrained(self.model_path).to(self.device)
tokenizer = AutoTokenizer.from_pretrained(self.tokenizer_path)
ids, masks = self.transform_single_text(text, tokenizer, 510, stride=510, minimal_chunk_length=0, maximal_text_length=None)
# ids = torch.cat(ids, dim=0)
# masks = torch.cat(masks, dim=0)
tokens = {'input_ids': ids.to(self.device), 'attention_mask': masks.to(self.device)}
output = model(**tokens)
last_hidden_states = output.last_hidden_state
# first token embedding of shape <1, hidden_size>
# first_token_embedding = last_hidden_states[:,0,:]
# pooled embedding of shape <1, hidden_size>
mean_pooled_embedding = last_hidden_states.mean(axis=1)
result = mean_pooled_embedding.flatten().cpu().detach().numpy()
# print(result.shape)
# print(result)
# Convert the list to JSON
json_data = json.dumps(result.tolist())
return json_data
def transform_list_of_texts(
self,
texts: list[str],
tokenizer: PreTrainedTokenizerBase,
chunk_size: int,
stride: int,
minimal_chunk_length: int,
maximal_text_length: Optional[int] = None,
) -> BatchEncoding:
model_inputs = [
self.transform_single_text(text, tokenizer, chunk_size, stride, minimal_chunk_length, maximal_text_length)
for text in texts
]
input_ids = [model_input[0] for model_input in model_inputs]
attention_mask = [model_input[1] for model_input in model_inputs]
tokens = {"input_ids": input_ids, "attention_mask": attention_mask}
return input_ids, attention_mask
def transform_single_text(
self,
text: str,
tokenizer: PreTrainedTokenizerBase,
chunk_size: int,
stride: int,
minimal_chunk_length: int,
maximal_text_length: Optional[int],
) -> tuple[Tensor, Tensor]:
"""Transforms (the entire) text to model input of BERT model."""
if maximal_text_length:
tokens = self.tokenize_text_with_truncation(text, tokenizer, maximal_text_length)
else:
tokens = self.tokenize_whole_text(text, tokenizer)
input_id_chunks, mask_chunks = self.split_tokens_into_smaller_chunks(tokens, chunk_size, stride, minimal_chunk_length)
self.add_special_tokens_at_beginning_and_end(input_id_chunks, mask_chunks)
self.add_padding_tokens(input_id_chunks, mask_chunks)
input_ids, attention_mask = self.stack_tokens_from_all_chunks(input_id_chunks, mask_chunks)
return input_ids, attention_mask
def tokenize_whole_text(self, text: str, tokenizer: PreTrainedTokenizerBase) -> BatchEncoding:
"""Tokenizes the entire text without truncation and without special tokens."""
tokens = tokenizer(text, add_special_tokens=False, truncation=False, return_tensors="pt")
return tokens
def tokenize_text_with_truncation(
self, text: str, tokenizer: PreTrainedTokenizerBase, maximal_text_length: int
) -> BatchEncoding:
"""Tokenizes the text with truncation to maximal_text_length and without special tokens."""
tokens = tokenizer(
text, add_special_tokens=False, max_length=maximal_text_length, truncation=True, return_tensors="pt"
)
return tokens
def split_tokens_into_smaller_chunks(
self,
tokens: BatchEncoding,
chunk_size: int,
stride: int,
minimal_chunk_length: int,
) -> tuple[list[Tensor], list[Tensor]]:
"""Splits tokens into overlapping chunks with given size and stride."""
input_id_chunks = self.split_overlapping(tokens["input_ids"][0], chunk_size, stride, minimal_chunk_length)
mask_chunks = self.split_overlapping(tokens["attention_mask"][0], chunk_size, stride, minimal_chunk_length)
return input_id_chunks, mask_chunks
def add_special_tokens_at_beginning_and_end(self, input_id_chunks: list[Tensor], mask_chunks: list[Tensor]) -> None:
"""
Adds special CLS token (token id = 101) at the beginning.
Adds SEP token (token id = 102) at the end of each chunk.
Adds corresponding attention masks equal to 1 (attention mask is boolean).
"""
for i in range(len(input_id_chunks)):
# adding CLS (token id 101) and SEP (token id 102) tokens
input_id_chunks[i] = torch.cat([Tensor([101]), input_id_chunks[i], Tensor([102])])
# adding attention masks corresponding to special tokens
mask_chunks[i] = torch.cat([Tensor([1]), mask_chunks[i], Tensor([1])])
def add_padding_tokens(self, input_id_chunks: list[Tensor], mask_chunks: list[Tensor]) -> None:
"""Adds padding tokens (token id = 0) at the end to make sure that all chunks have exactly 512 tokens."""
for i in range(len(input_id_chunks)):
# get required padding length
pad_len = 512 - input_id_chunks[i].shape[0]
# check if tensor length satisfies required chunk size
if pad_len > 0:
# if padding length is more than 0, we must add padding
input_id_chunks[i] = torch.cat([input_id_chunks[i], Tensor([0] * pad_len)])
mask_chunks[i] = torch.cat([mask_chunks[i], Tensor([0] * pad_len)])
def stack_tokens_from_all_chunks(self, input_id_chunks: list[Tensor], mask_chunks: list[Tensor]) -> tuple[Tensor, Tensor]:
"""Reshapes data to a form compatible with BERT model input."""
input_ids = torch.stack(input_id_chunks)
attention_mask = torch.stack(mask_chunks)
return input_ids.long(), attention_mask.int()
def split_overlapping(self, tensor: Tensor, chunk_size: int, stride: int, minimal_chunk_length: int) -> list[Tensor]:
"""Helper function for dividing 1-dimensional tensors into overlapping chunks."""
self.check_split_parameters_consistency(chunk_size, stride, minimal_chunk_length)
result = [tensor[i : i + chunk_size] for i in range(0, len(tensor), stride)]
if len(result) > 1:
# ignore chunks with less than minimal_length number of tokens
result = [x for x in result if len(x) >= minimal_chunk_length]
return result
def check_split_parameters_consistency(self, chunk_size: int, stride: int, minimal_chunk_length: int) -> None:
if chunk_size > 510:
raise RuntimeError("Size of each chunk cannot be bigger than 510!")
if minimal_chunk_length > chunk_size:
raise RuntimeError("Minimal length cannot be bigger than size!")
if stride > chunk_size:
raise RuntimeError(
"Stride cannot be bigger than size! Chunks must overlap or be near each other!"
)
|