|
import time |
|
import datetime |
|
import torch |
|
import numpy as np |
|
import tqdm |
|
import random |
|
from torch import nn |
|
from transformers import RobertaTokenizer, RobertaModel, AdamW, RobertaConfig |
|
from sklearn.model_selection import train_test_split |
|
|
|
from transformers import BertTokenizer, BertForSequenceClassification, get_linear_schedule_with_warmup, AutoModel, AutoTokenizer |
|
from torch.utils.data import TensorDataset, random_split, DataLoader, RandomSampler, SequentialSampler |
|
|
|
|
|
class BERTClassifier(): |
|
|
|
|
|
def __init__(self, model_name="bert-base-uncased", tokenizer_name="bert-base-uncased") -> None: |
|
print(f'Loading BERT:{model_name}...') |
|
|
|
self.model_name = model_name |
|
|
|
|
|
self.tokenizer = BertTokenizer.from_pretrained(tokenizer_name, do_lower_case=True) |
|
|
|
if model_name.startswith('jeevavijay10'): |
|
|
|
self.model = BertForSequenceClassification.from_pretrained(model_name) |
|
else: |
|
self.model = BertForSequenceClassification.from_pretrained( |
|
self.model_name, |
|
num_labels=14, |
|
output_attentions=False, |
|
output_hidden_states=False |
|
) |
|
|
|
self.device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu') |
|
|
|
self.model.to(self.device) |
|
|
|
def tokenizeText(self, sentence: str): |
|
|
|
encoded_dict = self.tokenizer.encode_plus( |
|
sentence, |
|
add_special_tokens=True, |
|
max_length=64, |
|
pad_to_max_length=True, |
|
return_attention_mask=True, |
|
return_tensors='pt') |
|
return encoded_dict['input_ids'], encoded_dict['attention_mask'] |
|
|
|
def tokenizeSentences(self, sentences: list, labels: list): |
|
input_ids = [] |
|
attention_masks = [] |
|
for sent in sentences: |
|
input_id, attention_mask = self.tokenizeText(sent) |
|
input_ids.append(input_id) |
|
attention_masks.append(attention_mask) |
|
|
|
input_ids = torch.cat(input_ids, dim=0) |
|
attention_masks = torch.cat(attention_masks, dim=0) |
|
|
|
dataset = TensorDataset(input_ids, attention_masks, labels) |
|
|
|
train_size = int(0.9 * len(dataset)) |
|
val_size = len(dataset) - train_size |
|
return random_split(dataset, [train_size, val_size]) |
|
|
|
def flat_accuracy(self, preds, labels): |
|
pred_flat = np.argmax(preds, axis=1).flatten() |
|
labels_flat = labels.flatten() |
|
return np.sum(pred_flat == labels_flat) / len(labels_flat) |
|
|
|
def format_time(self, elapsed): |
|
|
|
elapsed_rounded = int(round((elapsed))) |
|
|
|
|
|
return str(datetime.timedelta(seconds=elapsed_rounded)) |
|
|
|
def trainModel(self, sentences: list, labels: list, epochs=4, batch_size=32): |
|
optimizer = AdamW(self.model.parameters(), lr=2e-5, eps=1e-8) |
|
|
|
train_dataset, val_dataset = self.tokenizeSentences(sentences, labels) |
|
|
|
train_dataloader = DataLoader( |
|
train_dataset, |
|
sampler=RandomSampler(train_dataset), |
|
batch_size=batch_size |
|
) |
|
|
|
validation_dataloader = DataLoader( |
|
val_dataset, |
|
sampler=SequentialSampler(val_dataset), |
|
batch_size=batch_size |
|
) |
|
|
|
total_steps = len(train_dataloader) * epochs |
|
|
|
|
|
scheduler = get_linear_schedule_with_warmup(optimizer, |
|
num_warmup_steps=0, |
|
num_training_steps=total_steps) |
|
|
|
self.train(train_dataloader, optimizer, scheduler, epochs) |
|
torch.save(self.model, f"Bert_GoEmotions_BS{batch_size}_E{epochs}.model") |
|
|
|
|
|
def train(self, train_dataloader, optimizer, scheduler, epochs): |
|
|
|
|
|
|
|
|
|
total_t0 = time.time() |
|
|
|
|
|
for epoch_i in range(epochs): |
|
print('======== Epoch {:} / {:} ========'.format(epoch_i + 1, epochs)) |
|
print('Training...') |
|
|
|
|
|
t0 = time.time() |
|
|
|
|
|
total_train_loss = 0 |
|
|
|
|
|
|
|
|
|
|
|
self.model.train() |
|
|
|
|
|
for step, batch in enumerate(train_dataloader): |
|
|
|
|
|
if step % 40 == 0 and step != 0: |
|
|
|
elapsed = self.format_time(time.time() - t0) |
|
|
|
|
|
print(' Batch {:>5,} of {:>5,}. Elapsed: {:}.'.format(step, len(train_dataloader), elapsed)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
b_input_ids = batch[0].to(self.device) |
|
b_input_mask = batch[1].to(self.device) |
|
b_labels = batch[2].to(self.device) |
|
|
|
|
|
|
|
|
|
|
|
self.model.zero_grad() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
output = self.model(b_input_ids, |
|
token_type_ids=None, |
|
attention_mask=b_input_mask, |
|
labels=b_labels) |
|
|
|
|
|
loss = output.loss |
|
logits = output.logits |
|
|
|
|
|
|
|
|
|
|
|
total_train_loss += loss.item() |
|
|
|
|
|
loss.backward() |
|
|
|
|
|
|
|
torch.nn.utils.clip_grad_norm_(self.model.parameters(), 1.0) |
|
|
|
|
|
|
|
|
|
optimizer.step() |
|
|
|
|
|
scheduler.step() |
|
|
|
|
|
avg_train_loss = total_train_loss / len(train_dataloader) |
|
|
|
|
|
training_time = self.format_time(time.time() - t0) |
|
|
|
print("") |
|
print(" Average training loss: {0:.2f}".format(avg_train_loss)) |
|
print(" Training epoch took: {:}".format(training_time)) |
|
|
|
print("") |
|
print("Training complete!") |
|
|
|
print("Total training took {:} (h:mm:ss)".format(self.format_time(time.time()-total_t0))) |
|
|
|
def evaluate(self, sentences:list): |
|
input_ids = [] |
|
attention_masks = [] |
|
|
|
for sent in sentences: |
|
input_id, attention_mask = self.tokenizeText(sent) |
|
input_ids.append(input_id) |
|
attention_masks.append(attention_mask) |
|
|
|
input_ids = torch.cat(input_ids, dim=0) |
|
attention_masks = torch.cat(attention_masks, dim=0) |
|
labels = torch.zeros(len(sentences)) |
|
|
|
batch_size = 32 |
|
|
|
prediction_data = TensorDataset(input_ids, attention_masks, labels) |
|
prediction_sampler = SequentialSampler(prediction_data) |
|
prediction_dataloader = DataLoader(prediction_data, sampler=prediction_sampler, batch_size=batch_size) |
|
|
|
self.model.eval() |
|
|
|
predictions = [] |
|
|
|
for batch in prediction_dataloader: |
|
batch = tuple(t.to(self.device) for t in batch) |
|
|
|
b_input_ids, b_input_mask, _ = batch |
|
|
|
with torch.no_grad(): |
|
outputs = self.model(b_input_ids, token_type_ids=None, |
|
attention_mask=b_input_mask) |
|
|
|
logits = outputs[0] |
|
|
|
logits = logits.detach().cpu().numpy() |
|
predictions.append(logits) |
|
|
|
|
|
return [predictions[0][i].argmax() for i, x in enumerate(sentences)] |
|
|
|
|
|
|