|
!pip install -q -U watermark |
|
|
|
!pip install -qq transformers |
|
|
|
|
|
import transformers |
|
from transformers import BertModel, BertTokenizer, AdamW, get_linear_schedule_with_warmup |
|
import torch |
|
|
|
import numpy as np |
|
import pandas as pd |
|
import seaborn as sns |
|
from pylab import rcParams |
|
import matplotlib.pyplot as plt |
|
from matplotlib import rc |
|
from sklearn.model_selection import train_test_split |
|
from sklearn.metrics import confusion_matrix, classification_report |
|
from collections import defaultdict |
|
from textwrap import wrap |
|
|
|
from torch import nn, optim |
|
from torch.utils.data import Dataset, DataLoader |
|
import torch.nn.functional as F |
|
|
|
|
|
|
|
sns.set(style='whitegrid', palette='muted', font_scale=1.2) |
|
|
|
HAPPY_COLORS_PALETTE = ["#01BEFE", "#FFDD00", "#FF7D00", "#FF006D", "#ADFF02", "#8F00FF"] |
|
|
|
sns.set_palette(sns.color_palette(HAPPY_COLORS_PALETTE)) |
|
|
|
rcParams['figure.figsize'] = 12, 8 |
|
|
|
RANDOM_SEED = 42 |
|
np.random.seed(RANDOM_SEED) |
|
torch.manual_seed(RANDOM_SEED) |
|
|
|
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") |
|
|
|
|
|
!gdown --id 1S6qMioqPJjyBLpLVz4gmRTnJHnjitnuV |
|
!gdown --id 1zdmewp7ayS4js4VtrJEHzAheSW-5NBZv |
|
|
|
df = pd.read_csv("reviews.csv") |
|
|
|
|
|
sns.countplot(x='score', data = df) |
|
plt.xlabel('review score'); |
|
|
|
def to_sentiment(rating): |
|
rating = int(rating) |
|
if rating <= 2: |
|
return 0 |
|
elif rating == 3: |
|
return 1 |
|
else: |
|
return 2 |
|
|
|
df['sentiment'] = df.score.apply(to_sentiment) |
|
|
|
class_names = ['negative', 'neutral', 'positive'] |
|
|
|
print(df.sentiment) |
|
|
|
ax = sns.countplot(x='sentiment', data = df) |
|
plt.xlabel('review sentiment') |
|
ax.set_xticklabels(class_names); |
|
|
|
PRE_TRAINED_MODEL_NAME = 'bert-base-uncased' |
|
|
|
tokenizer = BertTokenizer.from_pretrained(PRE_TRAINED_MODEL_NAME) |
|
|
|
sample_txt = 'When was I last outside? I am stuck at home for 2 weeks.' |
|
|
|
tokens = tokenizer.tokenize(sample_txt) |
|
token_ids = tokenizer.convert_tokens_to_ids(tokens) |
|
|
|
print(f' Sentence: {sample_txt}') |
|
print(f' Tokens: {tokens}') |
|
print(f'Token IDs: {token_ids}') |
|
|
|
tokenizer.sep_token, tokenizer.sep_token_id |
|
|
|
tokenizer.cls_token, tokenizer.cls_token_id |
|
|
|
tokenizer.pad_token, tokenizer.pad_token_id |
|
|
|
tokenizer.unk_token, tokenizer.unk_token_id |
|
|
|
encoding = tokenizer.encode_plus( |
|
sample_txt, |
|
max_length=32, |
|
add_special_tokens=True, |
|
return_token_type_ids=False, |
|
pad_to_max_length=True, |
|
return_attention_mask=True, |
|
return_tensors='pt', |
|
) |
|
|
|
encoding.keys() |
|
|
|
print(len(encoding['input_ids'][0])) |
|
encoding['input_ids'][0] |
|
|
|
print(len(encoding['attention_mask'][0])) |
|
encoding['attention_mask'] |
|
|
|
tokenizer.convert_ids_to_tokens(encoding['input_ids'][0]) |
|
|
|
token_lens = [] |
|
|
|
for txt in df.content: |
|
tokens = tokenizer.encode(txt, max_length=512) |
|
token_lens.append(len(tokens)) |
|
|
|
sns.distplot(token_lens) |
|
plt.xlim([0, 256]); |
|
plt.xlabel('Token count'); |
|
|
|
MAX_LEN = 160 |
|
|
|
class GPReviewDataset(Dataset): |
|
|
|
def __init__(self, reviews, targets, tokenizer, max_len): |
|
self.reviews = reviews |
|
self.targets = targets |
|
self.tokenizer = tokenizer |
|
self.max_len = max_len |
|
|
|
def __len__(self): |
|
return len(self.reviews) |
|
|
|
def __getitem__(self, item): |
|
review = str(self.reviews[item]) |
|
target = self.targets[item] |
|
|
|
encoding = self.tokenizer.encode_plus( |
|
review, |
|
add_special_tokens=True, |
|
max_length=self.max_len, |
|
return_token_type_ids=False, |
|
pad_to_max_length=True, |
|
return_attention_mask=True, |
|
return_tensors='pt', |
|
) |
|
|
|
return { |
|
'review_text': review, |
|
'input_ids': encoding['input_ids'].flatten(), |
|
'attention_mask': encoding['attention_mask'].flatten(), |
|
'targets': torch.tensor(target, dtype=torch.long) |
|
} |
|
|
|
df_train, df_test = train_test_split(df, test_size=0.1, random_state=RANDOM_SEED) |
|
df_val, df_test = train_test_split(df_test, test_size=0.5, random_state=RANDOM_SEED) |
|
|
|
df_train.shape, df_val.shape, df_test.shape |
|
|
|
def create_data_loader(df, tokenizer, max_len, batch_size): |
|
ds = GPReviewDataset( |
|
reviews=df.content.to_numpy(), |
|
targets=df.sentiment.to_numpy(), |
|
tokenizer=tokenizer, |
|
max_len=max_len |
|
) |
|
|
|
return DataLoader( |
|
ds, |
|
batch_size=batch_size, |
|
num_workers=4 |
|
) |
|
|
|
BATCH_SIZE = 16 |
|
|
|
train_data_loader = create_data_loader(df_train, tokenizer, MAX_LEN, BATCH_SIZE) |
|
val_data_loader = create_data_loader(df_val, tokenizer, MAX_LEN, BATCH_SIZE) |
|
test_data_loader = create_data_loader(df_test, tokenizer, MAX_LEN, BATCH_SIZE) |
|
|
|
data = next(iter(train_data_loader)) |
|
data.keys() |
|
|
|
print(data['input_ids'].shape) |
|
print(data['attention_mask'].shape) |
|
print(data['targets'].shape) |
|
|
|
bert_model = BertModel.from_pretrained(PRE_TRAINED_MODEL_NAME) |
|
|
|
last_hidden_state, pooled_output = bert_model( |
|
input_ids=encoding['input_ids'], |
|
attention_mask=encoding['attention_mask'], |
|
return_dict = False |
|
) |
|
|
|
last_hidden_state.shape |
|
|
|
bert_model.config.hidden_size |
|
|
|
pooled_output.shape |
|
|
|
class SentimentClassifier(nn.Module): |
|
|
|
def __init__(self, n_classes): |
|
super(SentimentClassifier, self).__init__() |
|
self.bert = BertModel.from_pretrained(PRE_TRAINED_MODEL_NAME) |
|
self.drop = nn.Dropout(p=0.3) |
|
self.out = nn.Linear(self.bert.config.hidden_size, n_classes) |
|
|
|
def forward(self, input_ids, attention_mask): |
|
returned = self.bert( |
|
input_ids=input_ids, |
|
attention_mask=attention_mask |
|
) |
|
pooled_output = returned["pooler_output"] |
|
output = self.drop(pooled_output) |
|
return self.out(output) |
|
|
|
model = SentimentClassifier(len(class_names)) |
|
model = model.to(device) |
|
|
|
input_ids = data['input_ids'].to(device) |
|
attention_mask = data['attention_mask'].to(device) |
|
|
|
print(input_ids.shape) |
|
print(attention_mask.shape) |
|
|
|
F.softmax(model(input_ids, attention_mask), dim=1) |
|
|
|
|
|
EPOCHS = 6 |
|
|
|
optimizer = AdamW(model.parameters(), lr=2e-5, correct_bias=False) |
|
total_steps = len(train_data_loader) * EPOCHS |
|
|
|
scheduler = get_linear_schedule_with_warmup( |
|
optimizer, |
|
num_warmup_steps=0, |
|
num_training_steps=total_steps |
|
) |
|
|
|
loss_fn = nn.CrossEntropyLoss().to(device) |
|
|
|
def train_epoch( |
|
model, |
|
data_loader, |
|
loss_fn, |
|
optimizer, |
|
device, |
|
scheduler, |
|
n_examples |
|
): |
|
model = model.train() |
|
|
|
losses = [] |
|
correct_predictions = 0 |
|
|
|
for d in data_loader: |
|
input_ids = d["input_ids"].to(device) |
|
attention_mask = d["attention_mask"].to(device) |
|
targets = d["targets"].to(device) |
|
|
|
outputs = model( |
|
input_ids=input_ids, |
|
attention_mask=attention_mask |
|
) |
|
|
|
_, preds = torch.max(outputs, dim=1) |
|
loss = loss_fn(outputs, targets) |
|
|
|
correct_predictions += torch.sum(preds == targets) |
|
losses.append(loss.item()) |
|
|
|
loss.backward() |
|
nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0) |
|
optimizer.step() |
|
scheduler.step() |
|
optimizer.zero_grad() |
|
|
|
return correct_predictions.double() / n_examples, np.mean(losses) |
|
|
|
def eval_model(model, data_loader, loss_fn, device, n_examples): |
|
model = model.eval() |
|
|
|
losses = [] |
|
correct_predictions = 0 |
|
|
|
with torch.no_grad(): |
|
for d in data_loader: |
|
input_ids = d["input_ids"].to(device) |
|
attention_mask = d["attention_mask"].to(device) |
|
targets = d["targets"].to(device) |
|
|
|
outputs = model( |
|
input_ids=input_ids, |
|
attention_mask=attention_mask |
|
) |
|
_, preds = torch.max(outputs, dim=1) |
|
|
|
loss = loss_fn(outputs, targets) |
|
|
|
correct_predictions += torch.sum(preds == targets) |
|
losses.append(loss.item()) |
|
|
|
return correct_predictions.double() / n_examples, np.mean(losses) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print(history['train_acc']) |
|
|
|
list_of_train_accuracy= [t.cpu().numpy() for t in history['train_acc']] |
|
list_of_train_accuracy |
|
|
|
print(history['val_acc']) |
|
|
|
list_of_val_accuracy= [t.cpu().numpy() for t in history['val_acc']] |
|
list_of_val_accuracy |
|
|
|
plt.plot(list_of_train_accuracy, label='train accuracy') |
|
plt.plot(list_of_val_accuracy, label='validation accuracy') |
|
|
|
plt.title('Training history') |
|
plt.ylabel('Accuracy') |
|
plt.xlabel('Epoch') |
|
plt.legend() |
|
plt.ylim([0, 1]); |
|
|
|
test_acc, _ = eval_model( |
|
model, |
|
test_data_loader, |
|
loss_fn, |
|
device, |
|
len(df_test) |
|
) |
|
|
|
print(('\n')) |
|
print('Test Accuracy : ', test_acc.item()) |
|
|
|
def get_predictions(model, data_loader): |
|
model = model.eval() |
|
|
|
review_texts = [] |
|
predictions = [] |
|
prediction_probs = [] |
|
real_values = [] |
|
|
|
with torch.no_grad(): |
|
for d in data_loader: |
|
|
|
texts = d["review_text"] |
|
input_ids = d["input_ids"].to(device) |
|
attention_mask = d["attention_mask"].to(device) |
|
targets = d["targets"].to(device) |
|
|
|
outputs = model( |
|
input_ids=input_ids, |
|
attention_mask=attention_mask |
|
) |
|
_, preds = torch.max(outputs, dim=1) |
|
|
|
probs = F.softmax(outputs, dim=1) |
|
|
|
review_texts.extend(texts) |
|
predictions.extend(preds) |
|
prediction_probs.extend(probs) |
|
real_values.extend(targets) |
|
|
|
predictions = torch.stack(predictions).cpu() |
|
prediction_probs = torch.stack(prediction_probs).cpu() |
|
real_values = torch.stack(real_values).cpu() |
|
return review_texts, predictions, prediction_probs, real_values |
|
|
|
y_review_texts, y_pred, y_pred_probs, y_test = get_predictions( |
|
model, |
|
test_data_loader |
|
) |
|
|
|
print(classification_report(y_test, y_pred, target_names=class_names)) |
|
|
|
def show_confusion_matrix(confusion_matrix): |
|
hmap = sns.heatmap(confusion_matrix, annot=True, fmt="d", cmap="Blues") |
|
hmap.yaxis.set_ticklabels(hmap.yaxis.get_ticklabels(), rotation=0, ha='right') |
|
hmap.xaxis.set_ticklabels(hmap.xaxis.get_ticklabels(), rotation=30, ha='right') |
|
plt.ylabel('True sentiment') |
|
plt.xlabel('Predicted sentiment'); |
|
|
|
cm = confusion_matrix(y_test, y_pred) |
|
df_cm = pd.DataFrame(cm, index=class_names, columns=class_names) |
|
show_confusion_matrix(df_cm) |
|
|
|
idx = 2 |
|
|
|
review_text = y_review_texts[idx] |
|
true_sentiment = y_test[idx] |
|
pred_df = pd.DataFrame({ |
|
'class_names': class_names, |
|
'values': y_pred_probs[idx] |
|
}) |
|
|
|
print("\n".join(wrap(review_text))) |
|
print() |
|
print(f'True sentiment: {class_names[true_sentiment]}') |
|
|
|
sns.barplot(x='values', y='class_names', data=pred_df, orient='h') |
|
plt.ylabel('sentiment') |
|
plt.xlabel('probability') |
|
plt.xlim([0, 1]); |
|
|
|
review_text = input("Enter a comment for sentiment analysis: ") |
|
|
|
encoded_review = tokenizer.encode_plus( |
|
review_text, |
|
max_length=MAX_LEN, |
|
add_special_tokens=True, |
|
return_token_type_ids=False, |
|
pad_to_max_length=True, |
|
return_attention_mask=True, |
|
return_tensors='pt', |
|
) |
|
|
|
input_ids = encoded_review['input_ids'].to(device) |
|
attention_mask = encoded_review['attention_mask'].to(device) |
|
|
|
output = model(input_ids, attention_mask) |
|
_, prediction = torch.max(output, dim=1) |
|
|
|
print(f'Review text: {review_text}') |
|
print(f'Sentiment : {class_names[prediction]}') |
|
|
|
def suggest_improved_text(review_text, model, tokenizer): |
|
|
|
sentiment = analyze_sentiment(review_text, model, tokenizer) |
|
|
|
|
|
if sentiment in ['negative', 'neutral']: |
|
|
|
encoded_input = tokenizer.encode_plus( |
|
review_text, |
|
max_length=MAX_LEN, |
|
add_special_tokens=True, |
|
return_token_type_ids=False, |
|
pad_to_max_length=True, |
|
return_attention_mask=True, |
|
return_tensors='pt' |
|
) |
|
|
|
input_ids = encoded_input['input_ids'].to(device) |
|
attention_mask = encoded_input['attention_mask'].to(device) |
|
outputs = model(input_ids, attention_mask) |
|
_, predicted_sentiment = torch.max(outputs, dim=1) |
|
|
|
improved_text = generate_improved_text(text, predicted_sentiment) |
|
|
|
return improved_text |
|
|
|
return review_text |
|
|
|
def analyze_sentiment(review_text, model, tokenizer): |
|
encoded_input = tokenizer.encode_plus( |
|
review_text, |
|
max_length=MAX_LEN, |
|
add_special_tokens=True, |
|
return_token_type_ids=False, |
|
pad_to_max_length=True, |
|
return_attention_mask=True, |
|
return_tensors='pt' |
|
) |
|
|
|
input_ids = encoded_input['input_ids'].to(device) |
|
attention_mask = encoded_input['attention_mask'].to(device) |
|
outputs = model(input_ids, attention_mask) |
|
_, predicted_sentiment = torch.max(outputs, dim=1) |
|
|
|
return class_names[predicted_sentiment] |
|
def generate_improved_text(review_text, predicted_sentiment): |
|
positive_words = ["marvellous", "fantastic", "excellent", "admirable", "formidable"] |
|
|
|
if predicted_sentiment == 0: |
|
improved_text = review_text + " " + " ".join(positive_words) |
|
else: |
|
improved_text = review_text |
|
|
|
return improved_text |