Spaces:
Sleeping
Sleeping
import nltk | |
import sklearn_crfsuite | |
from sklearn_crfsuite import metrics | |
from nltk.stem import LancasterStemmer | |
import numpy as np | |
from sklearn.metrics import confusion_matrix | |
import seaborn as sns | |
import matplotlib.pyplot as plt | |
import re | |
import gradio as gr | |
lancaster = LancasterStemmer() | |
nltk.download('brown') | |
nltk.download('universal_tagset') | |
class CRF_POS_Tagger: | |
def __init__(self, train=False): | |
print("Loading Data...") | |
self.corpus = nltk.corpus.brown.tagged_sents(tagset='universal') | |
print("Data Loaded...") | |
self.corpus = [[(word, tag) for word, tag in sentence] for sentence in self.corpus] | |
self.actual_tag = [] | |
self.predicted_tag = [] | |
self.prefixes = [ | |
"a", "anti", "auto", "bi", "co", "dis", "en", "em", "ex", "in", "im", | |
"inter", "mis", "non", "over", "pre", "re", "sub", "trans", "un", "under" | |
] | |
self.suffixes = [ | |
"able", "ible", "al", "ance", "ence", "dom", "er", "or", "ful", "hood", | |
"ic", "ing", "ion", "tion", "ity", "ty", "ive", "less", "ly", "ment", | |
"ness", "ous", "ship", "y", "es", "s" | |
] | |
self.prefix_pattern = f"^({'|'.join(self.prefixes)})" | |
self.suffix_pattern = f"({'|'.join(self.suffixes)})$" | |
self.X = [[self.word_features(sentence, i) for i in range(len(sentence))] for sentence in self.corpus] | |
self.y = [[postag for _, postag in sentence] for sentence in self.corpus] | |
self.split = int(0.8 * len(self.X)) | |
self.X_train = self.X[:self.split] | |
self.y_train = self.y[:self.split] | |
self.X_test = self.X[self.split:] | |
self.y_test = self.y[self.split:] | |
print("Data Loaded...") | |
self.crf_model = sklearn_crfsuite.CRF(algorithm='lbfgs', c1=0.1, c2=0.1, max_iterations=100, all_possible_transitions=True) | |
print("Model Created...") | |
if train: | |
self.train() | |
def word_splitter(self, word): | |
prefix = "" | |
stem = word | |
suffix = "" | |
prefix_match = re.match(self.prefix_pattern, word) | |
if prefix_match: | |
prefix = prefix_match.group(1) | |
stem = word[len(prefix):] | |
suffix_match = re.search(self.suffix_pattern, stem) | |
if suffix_match: | |
suffix = suffix_match.group(1) | |
stem = stem[: -len(suffix)] | |
return prefix, stem, suffix | |
# Define a function to extract features for each word in a sentence | |
def word_features(self, sentence, i): | |
word = sentence[i][0] | |
prefix, stem, suffix = self.word_splitter(word) | |
features = { | |
'word': word, | |
'prefix': prefix, | |
# 'stem': stem, | |
'stem': lancaster.stem(word), | |
'suffix': suffix, | |
'position': i, | |
'is_first': i == 0, #if the word is a first word | |
'is_last': i == len(sentence) - 1, #if the word is a last word | |
# 'is_capitalized': word[0].upper() == word[0], | |
'is_all_caps': word.isupper(), #word is in uppercase | |
'is_all_lower': word.islower(), #word is in lowercase | |
'prefix-1': word[0], | |
'prefix-2': word[:2], | |
'prefix-3': word[:3], | |
'suffix-1': word[-1], | |
'suffix-2': word[-2:], | |
'suffix-3': word[-3:], | |
'prefix-un': word[:2] == 'un', #if word starts with un | |
'prefix-re': word[:2] == 're', #if word starts with re | |
'prefix-over': word[:4] == 'over', #if word starts with over | |
'prefix-dis': word[:4] == 'dis', #if word starts with dis | |
'prefix-mis': word[:4] == 'mis', #if word starts with mis | |
'prefix-pre': word[:4] == 'pre', #if word starts with pre | |
'prefix-non': word[:4] == 'non', #if word starts with non | |
'prefix-de': word[:3] == 'de', #if word starts with de | |
'prefix-in': word[:3] == 'in', #if word starts with in | |
'prefix-en': word[:3] == 'en', #if word starts with en | |
'suffix-ed': word[-2:] == 'ed', #if word ends with ed | |
'suffix-ing': word[-3:] == 'ing', #if word ends with ing | |
'suffix-es': word[-2:] == 'es', #if word ends with es | |
'suffix-ly': word[-2:] == 'ly', #if word ends with ly | |
'suffix-ment': word[-4:] == 'ment', #if word ends with ment | |
'suffix-er': word[-2:] == 'er', #if word ends with er | |
'suffix-ive': word[-3:] == 'ive', | |
'suffix-ous': word[-3:] == 'ous', | |
'suffix-ness': word[-4:] == 'ness', | |
'ends_with_s': word[-1] == 's', | |
'ends_with_es': word[-2:] == 'es', | |
'has_hyphen': '-' in word, #if word has hypen | |
'is_numeric': word.isdigit(), #if word is in numeric | |
'capitals_inside': word[1:].lower() != word[1:], | |
'is_title_case': word.istitle(), #if first letter is in uppercase | |
} | |
if i > 0: | |
# prev_word, prev_postag = sentence[i-1] | |
prev_word = sentence[i-1][0] | |
prev_prefix, prev_stem, prev_suffix = self.word_splitter(prev_word) | |
features.update({ | |
'prev_word': prev_word, | |
# 'prev_postag': prev_postag, | |
'prev_prefix': prev_prefix, | |
'prev_stem': lancaster.stem(prev_word), | |
'prev_suffix': prev_suffix, | |
'prev:is_all_caps': prev_word.isupper(), | |
'prev:is_all_lower': prev_word.islower(), | |
'prev:is_numeric': prev_word.isdigit(), | |
'prev:is_title_case': prev_word.istitle(), | |
}) | |
if i < len(sentence)-1: | |
next_word = sentence[i-1][0] | |
next_prefix, next_stem, next_suffix = self.word_splitter(next_word) | |
features.update({ | |
'next_word': next_word, | |
'next_prefix': next_prefix, | |
'next_stem': lancaster.stem(next_word), | |
'next_suffix': next_suffix, | |
'next:is_all_caps': next_word.isupper(), | |
'next:is_all_lower': next_word.islower(), | |
'next:is_numeric': next_word.isdigit(), | |
'next:is_title_case': next_word.istitle(), | |
}) | |
return features | |
def train(self, data=None): | |
if data: | |
X_train, y_train = zip(*data) | |
else: | |
X_train, y_train = self.X_train, self.y_train | |
print("Training CRF Model...", len(self.X_train), len(self.y_train)) | |
# Ensure X_train is a list of lists of dictionaries | |
X_train = [list(map(dict, x)) for x in X_train] | |
self.crf_model.fit(X_train, y_train) | |
def predict(self, X_test): | |
return self.crf_model.predict(X_test) | |
def accuracy(self, test_data): | |
X_test, y_test = zip(*test_data) | |
y_pred = self.predict(X_test) | |
self.actual_tag.extend([item for sublist in y_test for item in sublist]) | |
self.predicted_tag.extend([item for sublist in y_pred for item in sublist]) | |
print(len(self.actual_tag), len(self.predicted_tag)) | |
return metrics.flat_accuracy_score(y_test, y_pred) | |
def cross_validation(self): | |
validator = CRF_POS_Tagger() | |
data = list(zip(self.X, self.y)) | |
print("Cross-Validation...") | |
accuracies = [] | |
for i in range(5): | |
n1 = int(i / 5.0 * len(data)) | |
n2 = int((i + 1) / 5.0 * len(data)) | |
test_data = data[n1:n2] | |
train_data = data[:n1] + data[n2:] | |
validator.train(train_data) | |
acc = validator.accuracy(test_data) | |
accuracies.append(acc) | |
self.actual_tag = validator.actual_tag | |
self.predicted_tag = validator.predicted_tag | |
return accuracies, sum(accuracies) / 5.0 | |
def con_matrix(self): | |
self.labels = np.unique(self.actual_tag) | |
print(self.labels, self.actual_tag, self.predicted_tag) | |
conf_matrix = confusion_matrix(self.actual_tag, self.predicted_tag, labels=self.labels) | |
normalized_matrix = conf_matrix/np.sum(conf_matrix, axis=1, keepdims=True) | |
plt.figure(figsize=(10, 7)) | |
sns.heatmap(normalized_matrix, annot=True, fmt='.2f', cmap='Blues', xticklabels=self.labels, yticklabels=self.labels) | |
plt.xlabel('Predicted Tags') | |
plt.ylabel('Actual Tags') | |
plt.title('Confusion Matrix Heatmap') | |
plt.savefig("Confusion_matrix.png") | |
plt.show() | |
return normalized_matrix | |
def per_pos_accuracy(self, conf_matrix): | |
print("Per Tag Precision, Recall, and F-Score:") | |
per_tag_metrics = {} | |
for i, tag in enumerate(self.labels): | |
true_positives = conf_matrix[i, i] | |
false_positives = np.sum(conf_matrix[:, i]) - true_positives | |
false_negatives = np.sum(conf_matrix[i, :]) - true_positives | |
precision = true_positives / (true_positives + false_positives) if (true_positives + false_positives) > 0 else 0 | |
recall = true_positives / (true_positives + false_negatives) if (true_positives + false_negatives) > 0 else 0 | |
f1_score = (2 * precision * recall) / (precision + recall) if (precision + recall) > 0 else 0 | |
beta_0_5 = 0.5 | |
beta_2 = 2.0 | |
f0_5_score = (1 + beta_0_5**2) * (precision * recall) / ((beta_0_5**2 * precision) + recall) if (precision + recall) > 0 else 0 | |
f2_score = (1 + beta_2**2) * (precision * recall) / ((beta_2**2 * precision) + recall) if (precision + recall) > 0 else 0 | |
per_tag_metrics[tag] = { | |
'Precision': precision, | |
'Recall': recall, | |
'f1-Score': f1_score, | |
'f05-Score': f0_5_score, | |
'f2-Score': f2_score | |
} | |
print(f"{tag}: Precision = {precision:.2f}, Recall = {recall:.2f}, f1-Score = {f1_score:.2f}, " | |
f"f05-Score = {f0_5_score:.2f}, f2-Score = {f2_score:.2f}") | |
def tagging(self, input): | |
sentence = (re.sub(r'(\S)([.,;:!?])', r'\1 \2', input.strip())).split() | |
sentence_list = [[word] for word in sentence] | |
features = [self.word_features(sentence_list, i) for i in range(len(sentence_list))] | |
predicted_tags = self.crf_model.predict([features]) | |
output = "".join(f"{sentence[i]}[{predicted_tags[0][i]}] " for i in range(len(sentence))) | |
return output | |
# validate = CRF_POS_Tagger() | |
# accuracies, avg_accuracy = validate.cross_validation() | |
# print(f"Cross-Validation Accuracies: {accuracies}") | |
# print(f"Average Accuracy: {avg_accuracy}") | |
# conf_matrix = validate.con_matrix() | |
# print(validate.per_pos_accuracy(conf_matrix)) | |
tagger = CRF_POS_Tagger(True) | |
interface = gr.Interface(fn = tagger.tagging, | |
inputs = gr.Textbox( | |
label="Input Sentence", | |
placeholder="Enter your sentence here...", | |
), | |
outputs = gr.Textbox( | |
label="Tagged Output", | |
placeholder="Tagged sentence appears here...", | |
), | |
title = "Conditional Random Field POS Tagger", | |
description = "CS626 Assignment 1B (Autumn 2024)", | |
theme=gr.themes.Soft()) | |
interface.launch(inline = False, share = True) |