from transformers import PreTrainedModel from nltk.corpus import stopwords from nltk.tokenize import word_tokenize from .config_gzipembed import * from tqdm.auto import tqdm import torch import gzip import multiprocessing class GZIPEmbeddingModel(PreTrainedModel): config_class = GZIPEmbeddingConfig def __init__(self, config): super().__init__(config) if config.reduction: self.reduction_head = torch.nn.Linear(len(config.corpus), config.reduced_dimension) else: self.reduction_head = None self.dummy_parameter = torch.nn.Parameter(torch.ones(1)) def forward(self, prompt, num_procs=16, return_tensor=True): global calculate_ncd_row global p def calculate_ncd_row(data_row): i = data_row[0] row = self.ncd(data_row[1], p) return i, row if type(prompt) == str: prompt = [prompt] x = [] for p in prompt: ncd = [0] * len(self.config.corpus) with multiprocessing.Pool(num_procs) as pool: data = enumerate(self.config.corpus) results = pool.map(calculate_ncd_row,data) for i,row in results: ncd[i]=row x.append(ncd) if self.reduction_head is not None: x = torch.tensor(x) x = x.to(self.reduction_head.dtype).to(self.reduction_head.device) return self.reduction_head(x) return x if not return_tensor else torch.tensor(x) def encode(self, sentences, batch_size=32, **kwargs): """ Returns a list of embeddings for the given sentences. Args: sentences (`List[str]`): List of sentences to encode batch_size (`int`): Batch size for the encoding Returns: `List[np.ndarray]` or `List[tensor]`: List of embeddings for the given sentences """ import numpy as np x = self.forward(sentences, num_procs=batch_size, return_tensor=False) # return [torch.tensor(i) for i in x] return [np.array(i) for i in x] # test? def normalize(self, x): x = ''.join([char for char in x.lower() if char in "abcdefghijklmnopqrstuvwxyz "]) x = word_tokenize(x) x = [w for w in x if not w in self.config.stop_words] return ' '.join(x) def ncd(self, x, y): _x = self.normalize(x) if self.config.normalize else x _y = self.normalize(y) if (not self.config.normalized_corpus) and self.config.normalize else y x_c = len(gzip.compress(_x.encode())) y_c = len(gzip.compress(_y.encode())) xy_c = len(gzip.compress(f"{_x} {_y}".encode())) return (xy_c-min(x_c,y_c))/max(x_c,y_c) def gzip_embed( self, corpus, document, verbose=False, ): embedding = [] for reference_document in (corpus if not verbose else tqdm(corpus)): embedding.append(self.ncd(reference_document, document)) return embedding def dimensionality(self): return len(self.config.corpus)