HMM-NLP / app.py
madhavkotecha's picture
Create app.py
0515619 verified
raw
history blame
11.5 kB
import numpy as np
import math
import nltk
import matplotlib.pyplot as plt
import re
import gradio as gr
from collections import Counter, defaultdict
from sklearn.model_selection import KFold
from sklearn import metrics
nltk.download('brown')
nltk.download('universal_tagset')
class HMM:
def __init__(self):
self.tagged_sentences = nltk.corpus.brown.tagged_sents(tagset='universal')
self.tagset = ['.', 'ADJ', 'ADP', 'ADV', 'CONJ', 'DET', 'NOUN', 'NUM', 'PRON', 'PRT', 'VERB', 'X']
self.start_token = '^'
self.end_token = '$'
self.tagged_sentences = [[(self.start_token, self.start_token)] + sentence + [(self.end_token, self.end_token)] for sentence in self.tagged_sentences]
self.tagged_sentences = [[(word.lower(),tag) for word, tag in sentence] for sentence in self.tagged_sentences]
def train(self):
tagged_sent = np.array(self.tagged_sentences,dtype='object')
y_pred = []
y_true = []
train = (int)(0.8*len(tagged_sent))
train_sentences = tagged_sent[:train]
test_sentences = tagged_sent[train:]
tagsCount,wordTagMapping,tagTagMapping = self.mapping(train_sentences)
for sentence in test_sentences:
untaggedWords = [word for word,tag in sentence]
prediction = self.viterbi(untaggedWords,tagsCount,wordTagMapping,tagTagMapping)
for i in range(1,len(prediction)-1):
y_pred.append(prediction[i])
y_true.append(sentence[i][1])
f05_Score = metrics.fbeta_score(y_true,y_pred,beta=0.5,average='weighted',zero_division=0)
f1_Score = metrics.fbeta_score(y_true,y_pred,beta=1,average='weighted',zero_division=0)
f2_Score = metrics.fbeta_score(y_true,y_pred,beta=2,average='weighted',zero_division=0)
precision = metrics.precision_score(y_true,y_pred,average='weighted',zero_division=0)
recall = metrics.recall_score(y_true,y_pred,average='weighted',zero_division=0)
print(f"Precision = {precision:.2f}, Recall = {recall:.2f}, f05-Score = {f05_Score:.2f}, f1-Score = {f1_Score:.2f}, f2-Score = {f2_Score:.2f}")
return tagsCount,wordTagMapping,tagTagMapping
def viterbi(self,untaggedWords,tagsCount,wordTagMapping,tagTagMapping):
sent_len = len(untaggedWords)
# taglist = []
prev, curr, path = defaultdict(Counter), defaultdict(Counter), defaultdict(Counter)
prev = {tag: 0.0 for tag in tagsCount}
prev[self.start_token] = 1.0
path[0][self.start_token] = 1.0
for i in range(1,sent_len-1):
word = untaggedWords[i]
# max_prev_tag = max(prev, key=prev.get)
# taglist.append(max_prev_tag)
for tag in tagsCount:
curr[tag] = float('-inf')
# lprob = prev[max_prev_tag] + math.log(lexical_probability(word,tag,tagsCount,wordTagMapping)) + math.log(transition_probability(max_prev_tag,tag,tagsCount,tagTagMapping))
# if lprob>curr[tag]:
# curr[tag] = lprob
# path[i][tag] = max_prev_tag
for prev_tag in tagsCount:
lprob = prev[prev_tag] + math.log(self.lexical_probability(word,tag,tagsCount,wordTagMapping)) + math.log(self.transition_probability(prev_tag,tag,tagsCount,tagTagMapping))
if lprob>curr[tag]:
curr[tag] = lprob
path[i][tag] = prev_tag
for tag in tagsCount:
prev[tag] = curr[tag]
# max_prev_tag = max(prev, key=prev.get)
# taglist.append(max_prev_tag)
# taglist.append('$')
taglist = ['$' for i in range(sent_len)]
for tag in tagsCount:
if curr[tag] > curr[taglist[sent_len-2]]:
taglist[sent_len-2] = tag
for i in range(sent_len-3,0,-1):
taglist[i] = path[i+1][taglist[i+1]]
taglist[0] = self.start_token
return taglist
def mapping(self, sentences):
word_tag_pairs = [(word, tag) for sentence in sentences for word, tag in sentence]
tagsCount = Counter(tag for _,tag in word_tag_pairs)
wordTagMapping = defaultdict(Counter)
for word, tag in word_tag_pairs:
wordTagMapping[word][tag]+=1
tagTagMapping = defaultdict(Counter)
for sentence in sentences:
for i in range(len(sentence)-1):
tagTagMapping[sentence[i][1]][sentence[i+1][1]]+=1
return tagsCount,wordTagMapping,tagTagMapping
def transition_probability(self,curr,next,tagsCount,tagTagMapping):
currToNextCount = tagTagMapping[curr][next]
currCount = tagsCount[curr]
probability = (currToNextCount) / (currCount)
return 10**-9 if probability == 0 else probability
def lexical_probability(self,word,tag,tagsCount,wordTagMapping):
wordTagCount = wordTagMapping[word][tag]
tagCount = tagsCount[tag]
probability = (wordTagCount+1)/(tagCount+len(wordTagMapping)) # Adding Laplace Smoothing
return probability
def cross_validation(self, tagged_sentences):
kfold = KFold(n_splits=5, shuffle=True, random_state=1)
tagged_sent = np.array(tagged_sentences,dtype='object')
y_pred_list = []
y_true_list = []
for fold, (train, test) in enumerate(kfold.split(tagged_sent)):
train_sentences = tagged_sent[train]
test_sentences = tagged_sent[test]
tagsCount,wordTagMapping,tagTagMapping = self.mapping(train_sentences)
y_pred = []
y_true = []
for sentence in test_sentences:
untaggedWords = [word for word,_ in sentence]
pred_taglist = self.viterbi(untaggedWords,tagsCount,wordTagMapping,tagTagMapping)
for i in range(1,len(pred_taglist)-1):
y_pred.append(pred_taglist[i])
y_true.append(sentence[i][1])
y_pred_list.append(np.array(y_pred))
y_true_list.append(np.array(y_true))
accuracy = metrics.accuracy_score(y_true_list[-1],y_pred_list[-1],normalize=True)
print(f'Fold {fold + 1} Accuracy: {accuracy}')
f05_Score, f1_Score, f2_Score, precision, recall = 0, 0, 0, 0, 0
for i in range(5):
precision += metrics.precision_score(y_true_list[i],y_pred_list[i],average='weighted',zero_division=0)
recall += metrics.recall_score(y_true_list[i],y_pred_list[i],average='weighted',zero_division=0)
f05_Score += metrics.fbeta_score(y_true_list[i],y_pred_list[i],beta=0.5,average='weighted',zero_division=0)
f1_Score += metrics.fbeta_score(y_true_list[i],y_pred_list[i],beta=1,average='weighted',zero_division=0)
f2_Score += metrics.fbeta_score(y_true_list[i],y_pred_list[i],beta=2,average='weighted',zero_division=0)
precision = precision/5.0
recall = recall/5.0
f05_Score = f05_Score/5.0
f1_Score = f1_Score/5.0
f2_Score = f2_Score/5.0
print(f"Average Precision = {precision:.2f}, Average Recall = {recall:.2f}, Average f05-Score = {f05_Score:.2f}, Average f1-Score = {f1_Score:.2f}, Average f2-Score = {f2_Score:.2f}")
self.per_pos_report(y_true_list,y_pred_list)
self.confusion_matrix(y_true_list,y_pred_list)
def confusion_matrix(self,y_true_list,y_pred_list):
total = 0.0
for y_true,y_pred in zip(y_true_list,y_pred_list):
cm = metrics.confusion_matrix(y_true,y_pred,labels=self.tagset)
total += cm
matrix = total/len(y_true_list)
normalized_matrix = matrix/np.sum(matrix, axis=1, keepdims=True)
plt.subplots(figsize=(12,10))
plt.xticks(np.arange(len(self.tagset)), self.tagset)
plt.yticks(np.arange(len(self.tagset)), self.tagset)
for i in range(normalized_matrix.shape[0]):
for j in range(normalized_matrix.shape[1]):
plt.text(j, i, format(normalized_matrix[i, j], '0.2f'), horizontalalignment="center")
plt.imshow(normalized_matrix,interpolation='nearest',cmap=plt.cm.Greens)
plt.colorbar()
plt.savefig('Confusion_Matrix.png')
def per_pos_report(self,y_true_list,y_pred_list):
report, support = 0, 0
for y_true,y_pred in zip(y_true_list,y_pred_list):
cr = metrics.classification_report(y_true,y_pred,labels=self.tagset,zero_division=0)
cr = cr.replace('macro avg', 'MacroAvg').replace('micro avg', 'MicroAvg').replace('weighted avg', 'WeightedAvg')
rows = cr.split('\n')
tags , reportValues , supportValues = [], [], []
for row in rows[1:]:
row = row.strip().split()
if len(row) < 2:
continue
tagScores = [float(j) for j in row[1: len(row) - 1]]
supportValues.append(int(row[-1]))
tags.append(row[0])
reportValues.append(tagScores)
report += np.array(reportValues)
support += np.array(supportValues)
report = report/5.0
support = support/5.0
xlabels = ['Precision', 'Recall', 'F1 Score']
ylabels = ['{0}[{1}]'.format(tags[i], sup) for i, sup in enumerate(support)]
_, ax = plt.subplots(figsize=(18,10))
ax.xaxis.set_tick_params()
ax.yaxis.set_tick_params()
plt.imshow(report, aspect='auto',cmap=plt.cm.RdYlGn)
plt.xticks(np.arange(3), xlabels)
plt.yticks(np.arange(len(tags)), ylabels)
plt.colorbar()
for i in range(report.shape[0]):
for j in range(report.shape[1]):
plt.text(j, i, format(report[i, j], '.2f'), horizontalalignment="center", verticalalignment="center")
plt.savefig('Per_POS_Accuracy.png')
def doTagging(self,input_sentence,prevTagsCount,prevWordTagMapping,prevTagTagMapping):
input_sentence = (re.sub(r'(\S)([.,;:!?])', r'\1 \2', input_sentence.strip()))
untaggedWords = input_sentence.lower().split()
untaggedWords = ['^'] + untaggedWords + ['$']
tags = self.viterbi(untaggedWords, prevTagsCount, prevWordTagMapping, prevTagTagMapping)
output_sentence = ''.join(f'{untaggedWords[i]}[{tags[i]}] ' for i in range(1,len(untaggedWords)-1))
return output_sentence
hmm = HMM()
hmm.cross_validation(hmm.tagged_sentences)
tagsCount,wordTagMapping,tagTagMapping = hmm.train()
# test_sent = "the united kingdom and the usa are on two sides of the atlantic"
def tagging(input_sentence):
return hmm.doTagging(input_sentence, tagsCount, wordTagMapping, tagTagMapping)
interface = gr.Interface(fn = tagging,
inputs = gr.Textbox(
label="Input Sentence",
placeholder="Enter your sentence here...",
),
outputs = gr.Textbox(
label="Tagged Output",
placeholder="Tagged sentence appears here...",
),
title = "Hidden Markov Model POS Tagger",
description = "CS626 Assignment 1A (Autumn 2024)",
theme=gr.themes.Soft())
interface.launch(inline = False, share = True)