Spaces:
Sleeping
Sleeping
import numpy as np | |
import math | |
import nltk | |
import matplotlib.pyplot as plt | |
import re | |
import gradio as gr | |
from collections import Counter, defaultdict | |
from sklearn.model_selection import KFold | |
from sklearn import metrics | |
nltk.download('brown') | |
nltk.download('universal_tagset') | |
class HMM: | |
def __init__(self): | |
self.tagged_sentences = nltk.corpus.brown.tagged_sents(tagset='universal') | |
self.tagset = ['.', 'ADJ', 'ADP', 'ADV', 'CONJ', 'DET', 'NOUN', 'NUM', 'PRON', 'PRT', 'VERB', 'X'] | |
self.start_token = '^' | |
self.end_token = '$' | |
self.tagged_sentences = [[(self.start_token, self.start_token)] + sentence + [(self.end_token, self.end_token)] for sentence in self.tagged_sentences] | |
self.tagged_sentences = [[(word.lower(),tag) for word, tag in sentence] for sentence in self.tagged_sentences] | |
def train(self): | |
tagged_sent = np.array(self.tagged_sentences,dtype='object') | |
y_pred = [] | |
y_true = [] | |
train = (int)(0.8*len(tagged_sent)) | |
train_sentences = tagged_sent[:train] | |
test_sentences = tagged_sent[train:] | |
tagsCount,wordTagMapping,tagTagMapping = self.mapping(train_sentences) | |
for sentence in test_sentences: | |
untaggedWords = [word for word,tag in sentence] | |
prediction = self.viterbi(untaggedWords,tagsCount,wordTagMapping,tagTagMapping) | |
for i in range(1,len(prediction)-1): | |
y_pred.append(prediction[i]) | |
y_true.append(sentence[i][1]) | |
f05_Score = metrics.fbeta_score(y_true,y_pred,beta=0.5,average='weighted',zero_division=0) | |
f1_Score = metrics.fbeta_score(y_true,y_pred,beta=1,average='weighted',zero_division=0) | |
f2_Score = metrics.fbeta_score(y_true,y_pred,beta=2,average='weighted',zero_division=0) | |
precision = metrics.precision_score(y_true,y_pred,average='weighted',zero_division=0) | |
recall = metrics.recall_score(y_true,y_pred,average='weighted',zero_division=0) | |
print(f"Precision = {precision:.2f}, Recall = {recall:.2f}, f05-Score = {f05_Score:.2f}, f1-Score = {f1_Score:.2f}, f2-Score = {f2_Score:.2f}") | |
return tagsCount,wordTagMapping,tagTagMapping | |
def viterbi(self,untaggedWords,tagsCount,wordTagMapping,tagTagMapping): | |
sent_len = len(untaggedWords) | |
# taglist = [] | |
prev, curr, path = defaultdict(Counter), defaultdict(Counter), defaultdict(Counter) | |
prev = {tag: 0.0 for tag in tagsCount} | |
prev[self.start_token] = 1.0 | |
path[0][self.start_token] = 1.0 | |
for i in range(1,sent_len-1): | |
word = untaggedWords[i] | |
# max_prev_tag = max(prev, key=prev.get) | |
# taglist.append(max_prev_tag) | |
for tag in tagsCount: | |
curr[tag] = float('-inf') | |
# lprob = prev[max_prev_tag] + math.log(lexical_probability(word,tag,tagsCount,wordTagMapping)) + math.log(transition_probability(max_prev_tag,tag,tagsCount,tagTagMapping)) | |
# if lprob>curr[tag]: | |
# curr[tag] = lprob | |
# path[i][tag] = max_prev_tag | |
for prev_tag in tagsCount: | |
lprob = prev[prev_tag] + math.log(self.lexical_probability(word,tag,tagsCount,wordTagMapping)) + math.log(self.transition_probability(prev_tag,tag,tagsCount,tagTagMapping)) | |
if lprob>curr[tag]: | |
curr[tag] = lprob | |
path[i][tag] = prev_tag | |
for tag in tagsCount: | |
prev[tag] = curr[tag] | |
# max_prev_tag = max(prev, key=prev.get) | |
# taglist.append(max_prev_tag) | |
# taglist.append('$') | |
taglist = ['$' for i in range(sent_len)] | |
for tag in tagsCount: | |
if curr[tag] > curr[taglist[sent_len-2]]: | |
taglist[sent_len-2] = tag | |
for i in range(sent_len-3,0,-1): | |
taglist[i] = path[i+1][taglist[i+1]] | |
taglist[0] = self.start_token | |
return taglist | |
def mapping(self, sentences): | |
word_tag_pairs = [(word, tag) for sentence in sentences for word, tag in sentence] | |
tagsCount = Counter(tag for _,tag in word_tag_pairs) | |
wordTagMapping = defaultdict(Counter) | |
for word, tag in word_tag_pairs: | |
wordTagMapping[word][tag]+=1 | |
tagTagMapping = defaultdict(Counter) | |
for sentence in sentences: | |
for i in range(len(sentence)-1): | |
tagTagMapping[sentence[i][1]][sentence[i+1][1]]+=1 | |
return tagsCount,wordTagMapping,tagTagMapping | |
def transition_probability(self,curr,next,tagsCount,tagTagMapping): | |
currToNextCount = tagTagMapping[curr][next] | |
currCount = tagsCount[curr] | |
probability = (currToNextCount) / (currCount) | |
return 10**-9 if probability == 0 else probability | |
def lexical_probability(self,word,tag,tagsCount,wordTagMapping): | |
wordTagCount = wordTagMapping[word][tag] | |
tagCount = tagsCount[tag] | |
probability = (wordTagCount+1)/(tagCount+len(wordTagMapping)) # Adding Laplace Smoothing | |
return probability | |
def cross_validation(self, tagged_sentences): | |
kfold = KFold(n_splits=5, shuffle=True, random_state=1) | |
tagged_sent = np.array(tagged_sentences,dtype='object') | |
y_pred_list = [] | |
y_true_list = [] | |
for fold, (train, test) in enumerate(kfold.split(tagged_sent)): | |
train_sentences = tagged_sent[train] | |
test_sentences = tagged_sent[test] | |
tagsCount,wordTagMapping,tagTagMapping = self.mapping(train_sentences) | |
y_pred = [] | |
y_true = [] | |
for sentence in test_sentences: | |
untaggedWords = [word for word,_ in sentence] | |
pred_taglist = self.viterbi(untaggedWords,tagsCount,wordTagMapping,tagTagMapping) | |
for i in range(1,len(pred_taglist)-1): | |
y_pred.append(pred_taglist[i]) | |
y_true.append(sentence[i][1]) | |
y_pred_list.append(np.array(y_pred)) | |
y_true_list.append(np.array(y_true)) | |
accuracy = metrics.accuracy_score(y_true_list[-1],y_pred_list[-1],normalize=True) | |
print(f'Fold {fold + 1} Accuracy: {accuracy}') | |
f05_Score, f1_Score, f2_Score, precision, recall = 0, 0, 0, 0, 0 | |
for i in range(5): | |
precision += metrics.precision_score(y_true_list[i],y_pred_list[i],average='weighted',zero_division=0) | |
recall += metrics.recall_score(y_true_list[i],y_pred_list[i],average='weighted',zero_division=0) | |
f05_Score += metrics.fbeta_score(y_true_list[i],y_pred_list[i],beta=0.5,average='weighted',zero_division=0) | |
f1_Score += metrics.fbeta_score(y_true_list[i],y_pred_list[i],beta=1,average='weighted',zero_division=0) | |
f2_Score += metrics.fbeta_score(y_true_list[i],y_pred_list[i],beta=2,average='weighted',zero_division=0) | |
precision = precision/5.0 | |
recall = recall/5.0 | |
f05_Score = f05_Score/5.0 | |
f1_Score = f1_Score/5.0 | |
f2_Score = f2_Score/5.0 | |
print(f"Average Precision = {precision:.2f}, Average Recall = {recall:.2f}, Average f05-Score = {f05_Score:.2f}, Average f1-Score = {f1_Score:.2f}, Average f2-Score = {f2_Score:.2f}") | |
self.per_pos_report(y_true_list,y_pred_list) | |
self.confusion_matrix(y_true_list,y_pred_list) | |
def confusion_matrix(self,y_true_list,y_pred_list): | |
total = 0.0 | |
for y_true,y_pred in zip(y_true_list,y_pred_list): | |
cm = metrics.confusion_matrix(y_true,y_pred,labels=self.tagset) | |
total += cm | |
matrix = total/len(y_true_list) | |
normalized_matrix = matrix/np.sum(matrix, axis=1, keepdims=True) | |
plt.subplots(figsize=(12,10)) | |
plt.xticks(np.arange(len(self.tagset)), self.tagset) | |
plt.yticks(np.arange(len(self.tagset)), self.tagset) | |
for i in range(normalized_matrix.shape[0]): | |
for j in range(normalized_matrix.shape[1]): | |
plt.text(j, i, format(normalized_matrix[i, j], '0.2f'), horizontalalignment="center") | |
plt.imshow(normalized_matrix,interpolation='nearest',cmap=plt.cm.Greens) | |
plt.colorbar() | |
plt.savefig('Confusion_Matrix.png') | |
def per_pos_report(self,y_true_list,y_pred_list): | |
report, support = 0, 0 | |
for y_true,y_pred in zip(y_true_list,y_pred_list): | |
cr = metrics.classification_report(y_true,y_pred,labels=self.tagset,zero_division=0) | |
cr = cr.replace('macro avg', 'MacroAvg').replace('micro avg', 'MicroAvg').replace('weighted avg', 'WeightedAvg') | |
rows = cr.split('\n') | |
tags , reportValues , supportValues = [], [], [] | |
for row in rows[1:]: | |
row = row.strip().split() | |
if len(row) < 2: | |
continue | |
tagScores = [float(j) for j in row[1: len(row) - 1]] | |
supportValues.append(int(row[-1])) | |
tags.append(row[0]) | |
reportValues.append(tagScores) | |
report += np.array(reportValues) | |
support += np.array(supportValues) | |
report = report/5.0 | |
support = support/5.0 | |
xlabels = ['Precision', 'Recall', 'F1 Score'] | |
ylabels = ['{0}[{1}]'.format(tags[i], sup) for i, sup in enumerate(support)] | |
_, ax = plt.subplots(figsize=(18,10)) | |
ax.xaxis.set_tick_params() | |
ax.yaxis.set_tick_params() | |
plt.imshow(report, aspect='auto',cmap=plt.cm.RdYlGn) | |
plt.xticks(np.arange(3), xlabels) | |
plt.yticks(np.arange(len(tags)), ylabels) | |
plt.colorbar() | |
for i in range(report.shape[0]): | |
for j in range(report.shape[1]): | |
plt.text(j, i, format(report[i, j], '.2f'), horizontalalignment="center", verticalalignment="center") | |
plt.savefig('Per_POS_Accuracy.png') | |
def doTagging(self,input_sentence,prevTagsCount,prevWordTagMapping,prevTagTagMapping): | |
input_sentence = (re.sub(r'(\S)([.,;:!?])', r'\1 \2', input_sentence.strip())) | |
untaggedWords = input_sentence.lower().split() | |
untaggedWords = ['^'] + untaggedWords + ['$'] | |
tags = self.viterbi(untaggedWords, prevTagsCount, prevWordTagMapping, prevTagTagMapping) | |
output_sentence = ''.join(f'{untaggedWords[i]}[{tags[i]}] ' for i in range(1,len(untaggedWords)-1)) | |
return output_sentence | |
hmm = HMM() | |
hmm.cross_validation(hmm.tagged_sentences) | |
tagsCount,wordTagMapping,tagTagMapping = hmm.train() | |
# test_sent = "the united kingdom and the usa are on two sides of the atlantic" | |
def tagging(input_sentence): | |
return hmm.doTagging(input_sentence, tagsCount, wordTagMapping, tagTagMapping) | |
interface = gr.Interface(fn = tagging, | |
inputs = gr.Textbox( | |
label="Input Sentence", | |
placeholder="Enter your sentence here...", | |
), | |
outputs = gr.Textbox( | |
label="Tagged Output", | |
placeholder="Tagged sentence appears here...", | |
), | |
title = "Hidden Markov Model POS Tagger", | |
description = "CS626 Assignment 1A (Autumn 2024)", | |
theme=gr.themes.Soft()) | |
interface.launch(inline = False, share = True) |