Spaces:
Runtime error
Runtime error
import requests | |
import streamlit as st | |
import wikipedia | |
from wikipedia import WikipediaPage | |
import pandas as pd | |
import spacy | |
import unicodedata | |
from nltk.corpus import stopwords | |
import numpy as np | |
import nltk | |
from newspaper import Article | |
nltk.download('stopwords') | |
from string import punctuation | |
import json | |
import time | |
from datetime import datetime, timedelta | |
import urllib | |
from io import BytesIO | |
from PIL import Image, UnidentifiedImageError | |
from SPARQLWrapper import SPARQLWrapper, JSON, N3 | |
from fuzzywuzzy import process, fuzz | |
from st_aggrid import GridOptionsBuilder, AgGrid, GridUpdateMode, DataReturnMode | |
from transformers import pipeline | |
import en_core_web_lg | |
sparql = SPARQLWrapper('https://dbpedia.org/sparql') | |
class ExtractArticleEntities: | |
""" Extract article entities from a document using natural language processing (NLP) and fuzzy matching. | |
Parameters | |
- text: a string or the text of a news article to be parsed | |
Usage: | |
import ExtractArticleEntities | |
instantiate with text parameter ie. entities = ExtractArticleEntities(text) | |
retrieve Who, What, When, Where entities with entities.www_json | |
Non-organised entities with entiities.json | |
""" | |
def __init__(self, text): | |
self.text = text # preprocess text at initialisation | |
self.text = self.preprocessing(self.text) | |
print(self.text) | |
print('_____text_____') | |
self.json = {} | |
# Create empty dataframe to hold entity data for ease of processing | |
self.entity_df = pd.DataFrame(columns=["entity", "description"]) | |
# Load the spacy model | |
self.nlp = en_core_web_lg.load() | |
# self.nlp = pipeline(model="spacy/en_core_web_lg") | |
# Parse the text | |
self.entity_df = self.get_who_what_where_when() | |
# Disambiguate entities | |
self.entity_df = self.fuzzy_disambiguation() | |
self.get_related_entity() | |
self.get_popularity() | |
# Create JSON representation of entities | |
self.entity_df = self.entity_df.drop_duplicates(subset=["description"]) | |
self.entity_df = self.entity_df.reset_index(drop=True) | |
# ungrouped entity returned as json | |
self.json = self.entity_json() | |
# return json with entities grouped into who, what, where, when keys | |
self.www_json = self.get_wwww_json() | |
# def get_related_entity(self): | |
# entities = self.entity_df.description | |
# labels = self.entity_df.entity | |
# related_entity = [] | |
# for entity, label in zip(entities, labels): | |
# if label in ('PERSON', 'ORG','GPE','NORP','LOC'): | |
# related_entity.append(wikipedia.search(entity, 3)) | |
# else: | |
# related_entity.append([None]) | |
# self.entity_df['Wikipedia Entity'] = related_entity | |
def get_popularity(self): | |
# names = self.entity_df.description | |
# related_names = self.entity_df['Matched Entity'] | |
# for name, related_name in zip(names, related_names): | |
# if related_name: | |
# related_name.append(name) | |
# pytrends.build_payload(related_name, timeframe='now 4-d') | |
# st.dataframe(pytrends.interest_over_time()) | |
# time.sleep(2) | |
master_df = pd.DataFrame() | |
view_list = [] | |
for entity in self.entity_df['Matched Entity']: | |
if entity: | |
entity_to_look = entity[0] | |
# print(entity_to_look, '_______') | |
entity_to_look = entity_to_look.replace(' ','_') | |
print(entity_to_look, '_______') | |
headers = { | |
'accept': 'application/json', | |
'User-Agent': 'Foo bar' | |
} | |
now = datetime.now() | |
now_dt = now.strftime(r'%Y%m%d') | |
week_back = now - timedelta(days=7) | |
week_back_dt = week_back.strftime(r'%Y%m%d') | |
resp = requests.get(f'https://wikimedia.org/api/rest_v1/metrics/pageviews/per-article/en.wikipedia.org/all-access/all-agents/{entity_to_look}/daily/{week_back_dt}/{now_dt}', headers=headers) | |
data = resp.json() | |
# print(data) | |
df = pd.json_normalize(data['items']) | |
view_count = sum(df['views']) | |
else: | |
view_count = 0 | |
view_list.append(view_count) | |
self.entity_df['Views'] = view_list | |
for entity in ('PERSON','ORG','GPE','NORP','LOC'): | |
related_entity_view_list = [] | |
grouped_df = self.entity_df[self.entity_df['entity'] == entity] | |
grouped_df['Matched count'] = grouped_df['fuzzy_match'].apply(len) | |
grouped_df['Wiki count'] = grouped_df['Matched Entity'].apply(len) | |
grouped_df = grouped_df.sort_values(by=['Views', 'Matched count', 'Wiki count'], ascending=False).reset_index(drop=True) | |
if not grouped_df.empty: | |
# st.dataframe(grouped_df) | |
master_df = pd.concat([master_df, grouped_df]) | |
self.sorted_entity_df = master_df | |
if 'Views' in self.sorted_entity_df: | |
self.sorted_entity_df = self.sorted_entity_df.sort_values(by=['Views'], ascending=False).reset_index(drop=True) | |
# st.dataframe(self.sorted_entity_df) | |
# names = grouped_df['description'][:5].values | |
# print(names, type(names)) | |
# if names.any(): | |
# # pytrends.build_payload(names, timeframe='now 1-m') | |
# st.dataframe(pytrends.get_historical_interest(names, | |
# year_start=2022, month_start=10, day_start=1, | |
# hour_start=0, | |
# year_end=2022, month_end=10, day_end=21, | |
# hour_end=0, cat=0, geo='', gprop='', sleep=0)) | |
# st.dataframe() | |
# time.sleep(2) | |
# st.dataframe(grouped_df) | |
def get_related_entity(self): | |
names = self.entity_df.description | |
entities = self.entity_df.entity | |
self.related_entity = [] | |
match_scores = [] | |
for i, (name, entity) in enumerate(zip(names, entities)): | |
if entity in ('PERSON','ORG','GPE','NORP','LOC'): | |
related_names = wikipedia.search(name, 10) | |
# Implementing logic for getting related names instead of original text | |
if related_names: | |
names.iloc[i] = related_names[0] | |
self.related_entity.append(related_names) | |
matches = process.extract(name, related_names) | |
match_scores.append([match[0] for match in matches if match[1]>= 90 ]) | |
else: | |
self.related_entity.append([None]) | |
match_scores.append([]) | |
# Remove nulls | |
self.entity_df['Wikipedia Entity'] = self.related_entity | |
self.entity_df['Matched Entity'] = match_scores | |
def fuzzy_disambiguation(self): | |
# Load the entity data | |
self.entity_df['fuzzy_match'] = '' | |
# Load the entity data | |
person_choices = self.entity_df.loc[self.entity_df['entity'] == 'PERSON'] | |
org_choices = self.entity_df.loc[self.entity_df['entity'] == 'ORG'] | |
where_choices = self.entity_df.loc[self.entity_df['entity'] == 'GPE'] | |
norp_choices = self.entity_df.loc[self.entity_df['entity'] == 'NORP'] | |
loc_choices = self.entity_df.loc[self.entity_df['entity'] == 'LOC'] | |
date_choices = self.entity_df.loc[self.entity_df['entity'] == 'DATE'] | |
def fuzzy_match(row, choices): | |
'''This function disambiguates entities by looking for maximum three matches with a score of 80 or more | |
for each of the entity types. If there is no match, then the function returns None. ''' | |
match = process.extract(row["description"], choices["description"], limit=3) | |
match = [m[0] for m in match if m[1] > 80 and m[1] != 100] | |
if len(match) == 0: | |
match = [] | |
if match: | |
self.fuzzy_match_dict[row["description"]] = match | |
return match | |
# Apply the fuzzy matching function to the entity dataframe | |
self.fuzzy_match_dict = {} | |
for i, row in self.entity_df.iterrows(): | |
if row['entity'] == 'PERSON': | |
self.entity_df.at[i, 'fuzzy_match'] = fuzzy_match(row, person_choices) | |
elif row['entity'] == 'ORG': | |
self.entity_df.at[i, 'fuzzy_match'] = fuzzy_match(row, org_choices) | |
elif row['entity'] == 'GPE': | |
self.entity_df.at[i, 'fuzzy_match'] = fuzzy_match(row, where_choices) | |
elif row['entity'] == 'NORP': | |
self.entity_df.at[i, 'fuzzy_match'] = fuzzy_match(row, norp_choices) | |
elif row['entity'] == 'LOC': | |
self.entity_df.at[i, 'fuzzy_match'] = fuzzy_match(row, loc_choices) | |
elif row['entity'] == 'DATE': | |
self.entity_df.at[i, 'fuzzy_match'] = fuzzy_match(row, date_choices) | |
return self.entity_df | |
def preprocessing(self, text): | |
"""This function takes a text string and strips out all punctuation. It then normalizes the string to a | |
normalized form (using the "NFKD" normalization algorithm). Finally, it strips any special characters and | |
converts them to their unicode equivalents. """ | |
# remove punctuation | |
text = text.translate(str.maketrans("", "", punctuation)) | |
# normalize the text | |
stop_words = stopwords.words('english') | |
# Removing Stop words can cause losing context, instead stopwords can be utilized for knowledge | |
filtered_words = [word for word in self.text.split()] #if word not in stop_words] | |
# This is very hacky. Need a better way of handling bad encoding | |
pre_text = " ".join(filtered_words) | |
pre_text = pre_text = pre_text.replace(' ', ' ') | |
pre_text = pre_text.replace('’', "'") | |
pre_text = pre_text.replace('“', '"') | |
pre_text = pre_text.replace('â€', '"') | |
pre_text = pre_text.replace('‘', "'") | |
pre_text = pre_text.replace('…', '...') | |
pre_text = pre_text.replace('–', '-') | |
pre_text = pre_text.replace("\x9d", '-') | |
# normalize the text | |
pre_text = unicodedata.normalize("NFKD", pre_text) | |
# strip punctuation again as some remains in first pass | |
pre_text = pre_text.translate(str.maketrans("", "", punctuation)) | |
return pre_text | |
def fuzzy_remove_duplicate_ent(self, deduped_ents, threshold=85, limit=1): | |
search_space = list(deduped_ents) | |
for ent in deduped_ents: | |
duplicates_found = process.extract(ent, search_space.remove(ent), limit =1) # process.extract return the ent match amongst search_space with it's score | |
duplicates_found = [entity[0] for entity in duplicates_found if entity[1]> threshold] | |
if (len(duplicates_found) >0 ): | |
deduped_ents =[entity for entity in deduped_ents if entity not in duplicates_found] | |
return deduped_ents | |
def get_who_what_where_when(self): | |
"""Get entity information in a document. | |
This function will return a DataFrame with the following columns: | |
- entity: the entity being queried | |
- description: a brief description of the entity | |
Usage: | |
get_who_what_where_when(text) | |
Example: | |
> get_who_what_where_when('This is a test') | |
PERSON | |
ORG | |
GPE | |
LOC | |
PRODUCT | |
EVENT | |
LAW | |
LANGUAGE | |
NORP | |
DATE | |
GPE | |
TIME""" | |
# list to hold entity data | |
article_entity_list = [] | |
# tokenize the text | |
doc = self.nlp(self.text) | |
# iterate over the entities in the document but only keep those which are meaningful | |
desired_entities = ['PERSON', 'ORG', 'GPE', 'LOC', 'PRODUCT', 'EVENT', 'LAW', 'LANGUAGE', 'NORP', 'DATE', 'GPE', | |
'TIME'] | |
self.label_dict = {} | |
# stop_words = stopwords.words('english') | |
for ent in doc.ents: | |
self.label_dict[ent] = ent.label_ | |
if ent.label_ in desired_entities: | |
# add the entity to the list | |
entity_dict = {ent.label_: ent.text} | |
article_entity_list.append(entity_dict) | |
# dedupe the entities but only on exact match of values as occasional it will assign an ORG entity to PER | |
deduplicated_entities = {frozenset(item.values()): | |
item for item in article_entity_list}.values() | |
#to remove duplicate names | |
deduplicated_entities = self.fuzzy_remove_duplicate_ent(deduplicated_entities, threshold = 85, limit = 1) | |
# create a dataframe from the entities | |
for record in deduplicated_entities: | |
record_df = pd.DataFrame(record.items(), columns=["entity", "description"]) | |
self.entity_df = pd.concat([self.entity_df, record_df], ignore_index=True) | |
return self.entity_df | |
def entity_json(self): | |
"""Returns a JSON representation of an entity defined by the `entity_df` dataframe. The `entity_json` function | |
will return a JSON object with the following fields: | |
- entity: The type of the entity in the text | |
- description: The name of the entity as described in the input text | |
- fuzzy_match: A list of fuzzy matches for the entity. This is useful for disambiguating entities that are similar | |
""" | |
self.json = json.loads(self.entity_df.to_json(orient='records')) | |
# self.json = json.dumps(self.json, indent=2) | |
return self.json | |
def get_wwww_json(self): | |
"""This function returns a JSON representation of the `get_who_what_where_when` function. The `get_www_json` | |
function will return a JSON object with the following fields: | |
- entity: The type of the entity in the text | |
- description: The name of the entity as described in the input text | |
- fuzzy_match: A list of fuzzy matches for the entity. This is useful for disambiguating entities that are similar | |
""" | |
# create a json object from the entity dataframe | |
who_dict = {"who": [ent for ent in self.entity_json() if ent['entity'] in ['ORG', 'PERSON']]} | |
where_dict = {"where": [ent for ent in self.entity_json() if ent['entity'] in ['GPE', 'LOC']]} | |
when_dict = {"when": [ent for ent in self.entity_json() if ent['entity'] in ['DATE', 'TIME']]} | |
what_dict = { | |
"what": [ent for ent in self.entity_json() if ent['entity'] in ['PRODUCT', 'EVENT', 'LAW', 'LANGUAGE', | |
'NORP']]} | |
article_wwww = [who_dict, where_dict, when_dict, what_dict] | |
self.wwww_json = json.dumps(article_wwww,indent=2) | |
return self.wwww_json | |
news_article = st.text_input('Paste an Article here to be parsed') | |
if 'parsed' not in st.session_state: | |
st.session_state['parsed'] = None | |
st.session_state['article'] = None | |
if news_article: | |
st.write('Your news article is') | |
st.write(news_article) | |
if st.button('Get details'): | |
parsed = ExtractArticleEntities(news_article) | |
if parsed: | |
st.session_state['article'] = parsed.sorted_entity_df | |
st.session_state['parsed'] = True | |
st.session_state['json'] = parsed.www_json | |
# if not st.session_state['article'].empty: | |
def preprocessing(text): | |
"""This function takes a text string and strips out all punctuation. It then normalizes the string to a | |
normalized form (using the "NFKD" normalization algorithm). Finally, it strips any special characters and | |
converts them to their unicode equivalents. """ | |
# remove punctuation | |
if text: | |
text = text.translate(str.maketrans("", "", punctuation)) | |
# normalize the text | |
stop_words = stopwords.words('english') | |
# Removing Stop words can cause losing context, instead stopwords can be utilized for knowledge | |
filtered_words = [word for word in text.split()] #if word not in stop_words] | |
# This is very hacky. Need a better way of handling bad encoding | |
pre_text = " ".join(filtered_words) | |
pre_text = pre_text = pre_text.replace(' ', ' ') | |
pre_text = pre_text.replace('’', "'") | |
pre_text = pre_text.replace('“', '"') | |
pre_text = pre_text.replace('â€', '"') | |
pre_text = pre_text.replace('‘', "'") | |
pre_text = pre_text.replace('…', '...') | |
pre_text = pre_text.replace('–', '-') | |
pre_text = pre_text.replace("\x9d", '-') | |
# normalize the text | |
pre_text = unicodedata.normalize("NFKD", pre_text) | |
# strip punctuation again as some remains in first pass | |
pre_text = pre_text.translate(str.maketrans("", "", punctuation)) | |
else: | |
pre_text = None | |
return pre_text | |
def filter_wiki_df(df): | |
key_list = df.keys()[:2] | |
# df.to_csv('test.csv') | |
df = df[key_list] | |
# if len(df.keys()) == 2: | |
df['Match Check'] = np.where(df[df.keys()[0]] != df[df.keys()[1]], True, False) | |
df = df[df['Match Check']!= False] | |
df = df[key_list] | |
df = df.dropna(how='any').reset_index(drop=True) | |
# filtered_term = [] | |
# for terms in df[df.keys()[0]]: | |
# if isinstance(terms, str): | |
# filtered_term.append(preprocessing(terms)) | |
# else: | |
# filtered_term.append(None) | |
# df[df.keys()[0]] = filtered_term | |
df.rename(columns = {key_list[0]: 'Attribute', key_list[1]: 'Value'}, inplace = True) | |
return df | |
def get_entity_from_selectbox(related_entity): | |
entity = st.selectbox('Please select the term:', related_entity, key='foo') | |
if entity: | |
summary_entity = wikipedia.summary(entity, 3) | |
return summary_entity | |
if st.session_state['parsed']: | |
df = st.session_state['article'] | |
# left, right = st.columns(2) | |
# with left: | |
df_to_st = pd.DataFrame() | |
df_to_st['Name'] = df['description'] | |
df_to_st['Is a type of'] = df['entity'] | |
df_to_st['Related to'] = df['Matched Entity'] | |
df_to_st['Is a type of'] = df_to_st['Is a type of'].replace({'PERSON':'Person', | |
'ORG':'Organization', | |
'GPE':'Political Location', | |
'NORP':'Political or Religious Groups', | |
'LOC':'Non Political Location'}) | |
gb = GridOptionsBuilder.from_dataframe(df_to_st) | |
gb.configure_pagination(paginationAutoPageSize=True) #Add pagination | |
gb.configure_side_bar() #Add a sidebar | |
gb.configure_selection('multiple', use_checkbox=True, groupSelectsChildren="Group checkbox select children") #Enable multi-row selection | |
gridOptions = gb.build() | |
# st.dataframe(df_to_st) | |
grid_response = AgGrid( | |
df_to_st, | |
gridOptions=gridOptions, | |
data_return_mode='AS_INPUT', | |
update_mode='MODEL_CHANGED', | |
fit_columns_on_grid_load=False, | |
enable_enterprise_modules=True, | |
height=350, | |
width='100%', | |
reload_data=True | |
) | |
data = grid_response['data'] | |
selected = grid_response['selected_rows'] | |
selected_df = pd.DataFrame(selected) | |
if not selected_df.empty: | |
selected_entity = selected_df[['Name', 'Is a type of', 'Related to']] | |
st.dataframe(selected_entity) | |
# with right: | |
# st.json(st.session_state['json']) | |
entities_list = df['description'] | |
# selected_entity = st.selectbox('Which entity you want to choose?', | |
# entities_list) | |
if not selected_df.empty and selected_entity['Name'].any(): | |
# lookup_url = rf'https://lookup.dbpedia.org/api/search?query={selected_entity}' | |
# r = requests.get(lookup_url) | |
selected_row = df.loc[df['description'] == selected_entity['Name'][0]] | |
entity_value = selected_row.values | |
# st.write('Entity is a ', entity_value[0][0]) | |
label, name, fuzzy, related, related_match,_,_,_ = entity_value[0] | |
not_matched = [word for word in related if word not in related_match] | |
fuzzy = fuzzy[0] if len(fuzzy) > 0 else '' | |
related = related[0] if len(related) > 0 else '' | |
not_matched = not_matched[0] if len(not_matched) > 0 else related | |
related_entity_list = [name, fuzzy, not_matched] | |
related_entity = entity_value[0][1:] | |
google_query_term = ' '.join(related_entity_list) | |
# search() | |
try: | |
urls = [i for i in search(google_query_term ,stop = 10,pause = 2.0, tld='com', lang='en', tbs='0', user_agent = get_random_user_agent())] | |
except: | |
urls = [] | |
# urls = search(google_query_term+' news latest', num_results=10) | |
st.session_state['wiki_summary'] = False | |
all_related_entity = [] | |
for el in related_entity[:-2]: | |
if isinstance(el, str): | |
all_related_entity.append(el) | |
elif isinstance(el, int): | |
all_related_entity.append(str(el)) | |
else: | |
all_related_entity.extend(el) | |
# [ if type(el) == 'int' all_related_entity.extend(el) else all_related_entity.extend([el])for el in related_entity] | |
for entity in all_related_entity: | |
# try: | |
if True: | |
if entity: | |
entity = entity.replace(' ', '_') | |
query = f''' | |
SELECT ?name ?comment ?image | |
WHERE {{ dbr:{entity} rdfs:label ?name. | |
dbr:{entity} rdfs:comment ?comment. | |
dbr:{entity} dbo:thumbnail ?image. | |
FILTER (lang(?name) = 'en') | |
FILTER (lang(?comment) = 'en') | |
}}''' | |
sparql.setQuery(query) | |
sparql.setReturnFormat(JSON) | |
qres = sparql.query().convert() | |
if qres['results']['bindings']: | |
result = qres['results']['bindings'][0] | |
name, comment, image_url = result['name']['value'], result['comment']['value'], result['image']['value'] | |
# urllib.request.urlretrieve(image_url, "img.jpg") | |
# img = Image.open("/Users/anujkarn/NER/img.jpg") | |
wiki_url = f'https://en.wikipedia.org/wiki/{entity}' | |
st.write(name) | |
# st.image(img) | |
st.write(image_url) | |
# try: | |
response = requests.get(image_url) | |
try: | |
related_image = Image.open(BytesIO(response.content)) | |
st.image(related_image) | |
except UnidentifiedImageError: | |
st.write('Not able to get image') | |
pass | |
# except error as e: | |
# st.write(f'Image not parsed because of : {e}') | |
summary_entity = comment | |
wiki_knowledge_df = pd.read_html(wiki_url)[0] | |
wiki_knowledge_df = filter_wiki_df(wiki_knowledge_df) | |
st.write('Showing desciption for entity:', name) | |
st.dataframe(wiki_knowledge_df) | |
# if st.button('Want something else?'): | |
# summary_entity = get_entity_from_selectbox(all_related_entity) | |
break | |
# summary_entity = wikipedia.summary(entity, 3) | |
else: | |
summary_entity = None | |
if not summary_entity: | |
try: | |
summary_entity = get_entity_from_selectbox(all_related_entity) | |
# page = WikipediaPage(entity) | |
except wikipedia.exceptions.DisambiguationError: | |
st.write('Disambiguation is there for term') | |
if selected_entity['Name'].any(): | |
st.write(f'Summary for {selected_entity["Name"][0]}') | |
st.write(summary_entity) | |