|
import streamlit as st |
|
|
|
import numpy as np |
|
|
|
import torch |
|
from torch.autograd import Variable |
|
|
|
import argparse |
|
import os |
|
import re |
|
|
|
from data_preprocessing import remove_xem_them, remove_emojis, remove_stopwords, format_punctuation, remove_punctuation, clean_text, normalize_format, word_segment, format_price, format_price_v2 |
|
|
|
class inferSSCL(): |
|
def __init__(self, args='None'): |
|
self.args = args |
|
self.base_models = {} |
|
self.batch_data = {} |
|
self.test_data = [] |
|
|
|
def load_vocab_pretrain(self, file_pretrain_vocab, file_pretrain_vec, pad_tokens=True): |
|
vocab2id = {'<pad>': 0} |
|
id2vocab = {0: '<pad>'} |
|
|
|
cnt = len(id2vocab) |
|
with open(file_pretrain_vocab, 'r', encoding='utf-8') as fp: |
|
for line in fp: |
|
arr = re.split(' ', line[:-1]) |
|
vocab2id[arr[1]] = cnt |
|
id2vocab[cnt] = arr[1] |
|
cnt += 1 |
|
|
|
pretrain_vec = np.load(file_pretrain_vec) |
|
pad_vec = np.zeros([1, pretrain_vec.shape[1]]) |
|
pretrain_vec = np.vstack((pad_vec, pretrain_vec)) |
|
return vocab2id, id2vocab, pretrain_vec |
|
|
|
def load_vocabulary(self): |
|
cluster_dir = './' |
|
file_wordvec = 'vectors.npy' |
|
file_vocab = 'vocab.txt' |
|
file_kmeans_centroid = 'aspect_centroid.txt' |
|
file_aspect_mapping = 'aspect_mapping.txt' |
|
|
|
vocab2id, id2vocab, pretrain_vec = self.load_vocab_pretrain(os.path.join(cluster_dir, file_vocab), os.path.join(cluster_dir, file_wordvec)) |
|
vocab_size = len(vocab2id) |
|
|
|
self.batch_data['vocab2id'] = vocab2id |
|
self.batch_data['id2vocab'] = id2vocab |
|
self.batch_data['pretrain_emb'] = pretrain_vec |
|
self.batch_data['vocab_size'] = vocab_size |
|
|
|
aspect_vec = np.loadtxt(os.path.join(cluster_dir, file_kmeans_centroid), dtype=float) |
|
|
|
tmp = [] |
|
fp = open(os.path.join(cluster_dir, file_aspect_mapping), 'r') |
|
for line in fp: |
|
line = re.sub(r'[0-9]+', '', line) |
|
line = line.replace(' ', '').replace('\n', '') |
|
if line == "none": |
|
tmp.append([0.] * 256) |
|
else : |
|
tmp.append([1.] * 256) |
|
fp.close() |
|
|
|
aspect_vec = aspect_vec * tmp |
|
aspect_vec = torch.FloatTensor(aspect_vec).to(device) |
|
self.batch_data['aspect_centroid'] = aspect_vec |
|
self.batch_data['n_aspects'] = aspect_vec.shape[0] |
|
|
|
def load_models(self): |
|
self.base_models['embedding'] = torch.nn.Embedding(self.batch_data['vocab_size'], emb_size).to(device) |
|
emb_para = torch.FloatTensor(self.batch_data['pretrain_emb']).to(device) |
|
self.base_models['embedding'].weight = torch.nn.Parameter(emb_para) |
|
|
|
self.base_models['asp_weight'] = torch.nn.Linear(emb_size, self.batch_data['n_aspects']).to(device) |
|
self.base_models['asp_weight'].load_state_dict(torch.load('./asp_weight.model')) |
|
|
|
self.base_models['attn_kernel'] = torch.nn.Linear(emb_size, emb_size).to(device) |
|
self.base_models['attn_kernel'].load_state_dict(torch.load('./attn_kernel.model'), strict=False) |
|
|
|
|
|
def build_pipe(self): |
|
|
|
attn_pos, lbl_pos = self.encoder( |
|
self.batch_data['pos_sen_var'], |
|
self.batch_data['pos_pad_mask'] |
|
) |
|
|
|
outw = np.around(attn_pos.data.cpu().numpy().tolist(), 4) |
|
outw = outw.tolist() |
|
outw = outw[:len(self.batch_data['comment'].split())] |
|
|
|
asp_weight = self.base_models['asp_weight'](lbl_pos) |
|
|
|
asp_weight = torch.softmax(asp_weight, dim=1) |
|
|
|
return asp_weight |
|
|
|
def encoder(self, input_, mask_): |
|
|
|
with torch.no_grad(): |
|
emb_ = self.base_models['embedding'](input_) |
|
|
|
print(emb_.shape) |
|
|
|
emb_ = emb_ * mask_.unsqueeze(2) |
|
|
|
emb_avg = torch.sum(emb_, dim=1) |
|
norm = torch.sum(mask_, dim=1, keepdim=True) + 1e-20 |
|
|
|
|
|
enc_ = emb_avg.div(norm.expand_as(emb_avg)) |
|
|
|
|
|
emb_trn = self.base_models['attn_kernel'](emb_) |
|
|
|
|
|
attn_ = enc_.unsqueeze(1) @ emb_trn.transpose(1, 2) |
|
attn_ = attn_.squeeze(1) |
|
|
|
|
|
attn_ = self.args.smooth_factor * torch.tanh(attn_) |
|
|
|
attn_ = attn_.masked_fill(mask_ == 0, -1e20) |
|
|
|
|
|
attn_ = torch.softmax(attn_, dim=1) |
|
|
|
|
|
lbl_ = attn_.unsqueeze(1) @ emb_ |
|
lbl_ = lbl_.squeeze(1) |
|
|
|
return attn_, lbl_ |
|
|
|
def build_batch(self, review): |
|
vocab2id = self.batch_data['vocab2id'] |
|
|
|
sen_text = [] |
|
cmt = [] |
|
|
|
sen_text_len = emb_size |
|
|
|
senid = [vocab2id[wd] for wd in review.split() if wd in vocab2id] |
|
sen_text.append(senid) |
|
|
|
cmt.append(review) |
|
|
|
|
|
|
|
sen_text_len = min(len(senid), sen_text_len) |
|
sen_text = [itm[:sen_text_len] + [vocab2id['<pad>'] for _ in range(sen_text_len - len(itm))] for itm in sen_text] |
|
|
|
sen_text_var = Variable(torch.LongTensor(sen_text)).to(device) |
|
sen_pad_mask = Variable(torch.LongTensor(sen_text)).to(device) |
|
sen_pad_mask[sen_pad_mask != vocab2id['<pad>']] = -1 |
|
sen_pad_mask[sen_pad_mask == vocab2id['<pad>']] = 0 |
|
sen_pad_mask = -sen_pad_mask |
|
|
|
self.batch_data['comment'] = cmt |
|
|
|
self.batch_data['pos_sen_var'] = sen_text_var |
|
self.batch_data['pos_pad_mask'] = sen_pad_mask |
|
|
|
def calculate_atten_weight(self): |
|
|
|
attn_pos, lbl_pos = self.encoder( |
|
self.batch_data['pos_sen_var'], |
|
self.batch_data['pos_pad_mask'] |
|
) |
|
|
|
|
|
asp_weight = self.base_models['asp_weight'](lbl_pos) |
|
|
|
asp_weight = torch.softmax(asp_weight, dim=1) |
|
|
|
|
|
return asp_weight |
|
|
|
def get_test_data(self): |
|
asp_weight = self.calculate_atten_weight() |
|
asp_weight = asp_weight.data.cpu().numpy().tolist() |
|
|
|
output = {} |
|
output['comment'] = self.batch_data['comment'] |
|
output['aspect_weight'] = asp_weight[0] |
|
self.test_data.append(output) |
|
|
|
def select_top(self, data): |
|
|
|
d = np.abs(data - np.median(data)) |
|
mdev = np.median(d) |
|
s = d/mdev if mdev else 0 |
|
|
|
return s |
|
|
|
def get_predict(self, top_pred, aspect_label, threshold=1): |
|
pred = {'none':0, 'do_an': 0, 'gia_ca':0, 'khong_gian': 0, 'phuc_vu': 0} |
|
try: |
|
for i in range(len(top_pred)): |
|
if top_pred[i] > threshold: |
|
pred[aspect_label[i]] = 1 |
|
except: |
|
print('Error') |
|
return pred |
|
|
|
def get_evaluate_result(self, input_): |
|
|
|
aspect_label = [] |
|
fp = open('./aspect_mapping.txt', 'r', encoding='utf8') |
|
for line in fp: |
|
aspect_label.append(line.split()[1]) |
|
fp.close() |
|
|
|
top_score = self.select_top(input_['aspect_weight']) |
|
print(top_score) |
|
curr_pred = self.get_predict(top_score, aspect_label) |
|
|
|
aspect_key = [] |
|
for key, value in curr_pred.items(): |
|
if int(value) == 1: |
|
aspect_key.append(key) |
|
|
|
return self.get_aspect(aspect_key) |
|
|
|
def get_aspect(self, pred, ignore='none'): |
|
if len(pred) > 1: |
|
return(pred[1:]) |
|
else: |
|
return(['None']) |
|
|
|
def infer(self, text=''): |
|
self.args.task = 'sscl-infer' |
|
|
|
text = remove_xem_them(text) |
|
text = remove_emojis(text) |
|
text = format_punctuation(text) |
|
text = remove_punctuation(text) |
|
text = clean_text(text) |
|
text = normalize_format(text) |
|
text = word_segment(text) |
|
text = remove_stopwords(text) |
|
text = format_price(text) |
|
input_ = format_price_v2(text) |
|
print(input_) |
|
|
|
self.load_vocabulary() |
|
self.load_models() |
|
|
|
self.build_batch(input_) |
|
self.get_test_data() |
|
|
|
val_result = self.test_data |
|
|
|
self.get_evaluate_result(val_result[0]) |
|
|
|
|
|
parser = argparse.ArgumentParser() |
|
parser.add_argument('--task', default='infer') |
|
parser.add_argument('--smooth_factor', type=float, default=0.9) |
|
device = 'cuda:0' |
|
emb_size = 256 |
|
|
|
args = parser.parse_args(args=[]) |
|
model = inferSSCL(args) |
|
|
|
cmt = st.text_area('Enter some text: ') |
|
output = model.infer(cmt) |
|
|
|
if output: |
|
st.title(output) |