|
import torch |
|
from models import FBankCrossEntropyNet |
|
import tqdm |
|
import multiprocessing |
|
import time |
|
import numpy as np |
|
from models import DynamicLinearClassifier |
|
MODEL_PATH = './weights/triplet_loss_trained_model.pth' |
|
model_instance = FBankCrossEntropyNet() |
|
model_instance.load_state_dict(torch.load(MODEL_PATH, map_location=lambda storage, loc: storage)) |
|
|
|
use_cuda = False |
|
kwargs = {'num_workers': multiprocessing.cpu_count(), |
|
'pin_memory': True} if use_cuda else {} |
|
|
|
|
|
def train_classification(model, device, train_loader, optimizer, epoch, log_interval): |
|
model.train() |
|
losses = [] |
|
accuracy = 0 |
|
for batch_idx, (x, y) in enumerate(tqdm.tqdm(train_loader)): |
|
x, y = x.to(device), y.to(device) |
|
x = model_instance(x) |
|
optimizer.zero_grad() |
|
out = model(x) |
|
loss = model.loss(out, y) |
|
|
|
with torch.no_grad(): |
|
pred = torch.argmax(out, dim=1) |
|
accuracy += torch.sum((pred == y)) |
|
|
|
losses.append(loss.item()) |
|
loss.backward() |
|
optimizer.step() |
|
|
|
if batch_idx % log_interval == 0: |
|
print('{} Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format( |
|
time.ctime(time.time()), |
|
epoch, batch_idx * len(x), len(train_loader.dataset), |
|
100. * batch_idx / len(train_loader), loss.item())) |
|
|
|
accuracy_mean = (100. * accuracy) / len(train_loader.dataset) |
|
|
|
return np.mean(losses), accuracy_mean.item() |
|
|
|
|
|
|
|
|
|
def test_classification(model, device, test_loader, log_interval=None): |
|
model.eval() |
|
losses = [] |
|
|
|
accuracy = 0 |
|
with torch.no_grad(): |
|
for batch_idx, (x, y) in enumerate(tqdm.tqdm(test_loader)): |
|
x, y = x.to(device), y.to(device) |
|
x = model_instance(x) |
|
out = model(x) |
|
test_loss_on = model.loss(out, y).item() |
|
losses.append(test_loss_on) |
|
|
|
pred = torch.argmax(out, dim=1) |
|
accuracy += torch.sum((pred == y)) |
|
|
|
if log_interval is not None and batch_idx % log_interval == 0: |
|
print('{} Test: [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format( |
|
time.ctime(time.time()), |
|
batch_idx * len(x), len(test_loader.dataset), |
|
100. * batch_idx / len(test_loader), test_loss_on)) |
|
|
|
test_loss = np.mean(losses) |
|
accuracy_mean = (100. * accuracy) / len(test_loader.dataset) |
|
|
|
print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} , ({:.4f})%\n'.format( |
|
test_loss, accuracy, len(test_loader.dataset), accuracy_mean)) |
|
return test_loss, accuracy_mean.item() |
|
|
|
|
|
|
|
def speaker_probability(tensor): |
|
counts = {} |
|
total = 0 |
|
for value in tensor: |
|
value = int(value) |
|
counts[value] = counts.get(value, 0) + 1 |
|
total += 1 |
|
|
|
probabilities = {} |
|
for key, value in counts.items(): |
|
probabilities['speaker '+str(key)] = value / total |
|
|
|
return probabilities |
|
|
|
|
|
|
|
def inference_speaker_classification( |
|
file_speaker, |
|
num_class=3, |
|
num_layers= 2, |
|
model_instance=model_instance, |
|
model_path='saved_models_cross_entropy_classification/0.pth' |
|
): |
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
from utils.preprocessing import extract_fbanks |
|
fbanks = extract_fbanks(file_speaker) |
|
model = DynamicLinearClassifier(num_layers =num_layers ,output_size=num_class) |
|
cpkt = torch.load(model_path) |
|
model.load_state_dict(cpkt) |
|
model = model.double() |
|
model.to(device) |
|
model_instance = model_instance.double() |
|
model_instance.eval() |
|
model_instance.to(device) |
|
with torch.no_grad(): |
|
x = torch.from_numpy(fbanks) |
|
embedings = model_instance(x.to(device)) |
|
|
|
|
|
output = model(embedings) |
|
output = torch.argmax(output,dim=-1) |
|
speaker_pro = speaker_probability(output) |
|
print(speaker_pro) |
|
return speaker_pro |
|
|
|
|