Spaces:
Runtime error
Runtime error
| import requests | |
| import streamlit as st | |
| import wikipedia | |
| from wikipedia import WikipediaPage | |
| import pandas as pd | |
| import spacy | |
| import unicodedata | |
| from nltk.corpus import stopwords | |
| import numpy as np | |
| import nltk | |
| from newspaper import Article | |
| nltk.download('stopwords') | |
| from string import punctuation | |
| import json | |
| import time | |
| from datetime import datetime, timedelta | |
| import urllib | |
| from io import BytesIO | |
| from PIL import Image, UnidentifiedImageError | |
| from SPARQLWrapper import SPARQLWrapper, JSON, N3 | |
| from fuzzywuzzy import process, fuzz | |
| from st_aggrid import GridOptionsBuilder, AgGrid, GridUpdateMode, DataReturnMode | |
| from transformers import pipeline | |
| import en_core_web_lg | |
| sparql = SPARQLWrapper('https://dbpedia.org/sparql') | |
| class ExtractArticleEntities: | |
| """ Extract article entities from a document using natural language processing (NLP) and fuzzy matching. | |
| Parameters | |
| - text: a string or the text of a news article to be parsed | |
| Usage: | |
| import ExtractArticleEntities | |
| instantiate with text parameter ie. entities = ExtractArticleEntities(text) | |
| retrieve Who, What, When, Where entities with entities.www_json | |
| Non-organised entities with entiities.json | |
| """ | |
| def __init__(self, text): | |
| self.text = text # preprocess text at initialisation | |
| self.text = self.preprocessing(self.text) | |
| print(self.text) | |
| print('_____text_____') | |
| self.json = {} | |
| # Create empty dataframe to hold entity data for ease of processing | |
| self.entity_df = pd.DataFrame(columns=["entity", "description"]) | |
| # Load the spacy model | |
| self.nlp = en_core_web_lg.load() | |
| # self.nlp = pipeline(model="spacy/en_core_web_lg") | |
| # Parse the text | |
| self.entity_df = self.get_who_what_where_when() | |
| # Disambiguate entities | |
| self.entity_df = self.fuzzy_disambiguation() | |
| self.get_related_entity() | |
| self.get_popularity() | |
| # Create JSON representation of entities | |
| self.entity_df = self.entity_df.drop_duplicates(subset=["description"]) | |
| self.entity_df = self.entity_df.reset_index(drop=True) | |
| # ungrouped entity returned as json | |
| self.json = self.entity_json() | |
| # return json with entities grouped into who, what, where, when keys | |
| self.www_json = self.get_wwww_json() | |
| # def get_related_entity(self): | |
| # entities = self.entity_df.description | |
| # labels = self.entity_df.entity | |
| # related_entity = [] | |
| # for entity, label in zip(entities, labels): | |
| # if label in ('PERSON', 'ORG','GPE','NORP','LOC'): | |
| # related_entity.append(wikipedia.search(entity, 3)) | |
| # else: | |
| # related_entity.append([None]) | |
| # self.entity_df['Wikipedia Entity'] = related_entity | |
| def get_popularity(self): | |
| # names = self.entity_df.description | |
| # related_names = self.entity_df['Matched Entity'] | |
| # for name, related_name in zip(names, related_names): | |
| # if related_name: | |
| # related_name.append(name) | |
| # pytrends.build_payload(related_name, timeframe='now 4-d') | |
| # st.dataframe(pytrends.interest_over_time()) | |
| # time.sleep(2) | |
| master_df = pd.DataFrame() | |
| view_list = [] | |
| for entity in self.entity_df['Matched Entity']: | |
| if entity: | |
| entity_to_look = entity[0] | |
| # print(entity_to_look, '_______') | |
| entity_to_look = entity_to_look.replace(' ','_') | |
| print(entity_to_look, '_______') | |
| headers = { | |
| 'accept': 'application/json', | |
| 'User-Agent': 'Foo bar' | |
| } | |
| now = datetime.now() | |
| now_dt = now.strftime(r'%Y%m%d') | |
| week_back = now - timedelta(days=7) | |
| week_back_dt = week_back.strftime(r'%Y%m%d') | |
| resp = requests.get(f'https://wikimedia.org/api/rest_v1/metrics/pageviews/per-article/en.wikipedia.org/all-access/all-agents/{entity_to_look}/daily/{week_back_dt}/{now_dt}', headers=headers) | |
| data = resp.json() | |
| # print(data) | |
| df = pd.json_normalize(data['items']) | |
| view_count = sum(df['views']) | |
| else: | |
| view_count = 0 | |
| view_list.append(view_count) | |
| self.entity_df['Views'] = view_list | |
| for entity in ('PERSON','ORG','GPE','NORP','LOC'): | |
| related_entity_view_list = [] | |
| grouped_df = self.entity_df[self.entity_df['entity'] == entity] | |
| grouped_df['Matched count'] = grouped_df['fuzzy_match'].apply(len) | |
| grouped_df['Wiki count'] = grouped_df['Matched Entity'].apply(len) | |
| grouped_df = grouped_df.sort_values(by=['Views', 'Matched count', 'Wiki count'], ascending=False).reset_index(drop=True) | |
| if not grouped_df.empty: | |
| # st.dataframe(grouped_df) | |
| master_df = pd.concat([master_df, grouped_df]) | |
| self.sorted_entity_df = master_df | |
| if 'Views' in self.sorted_entity_df: | |
| self.sorted_entity_df = self.sorted_entity_df.sort_values(by=['Views'], ascending=False).reset_index(drop=True) | |
| # st.dataframe(self.sorted_entity_df) | |
| # names = grouped_df['description'][:5].values | |
| # print(names, type(names)) | |
| # if names.any(): | |
| # # pytrends.build_payload(names, timeframe='now 1-m') | |
| # st.dataframe(pytrends.get_historical_interest(names, | |
| # year_start=2022, month_start=10, day_start=1, | |
| # hour_start=0, | |
| # year_end=2022, month_end=10, day_end=21, | |
| # hour_end=0, cat=0, geo='', gprop='', sleep=0)) | |
| # st.dataframe() | |
| # time.sleep(2) | |
| # st.dataframe(grouped_df) | |
| def get_related_entity(self): | |
| names = self.entity_df.description | |
| entities = self.entity_df.entity | |
| self.related_entity = [] | |
| match_scores = [] | |
| for name, entity in zip(names, entities): | |
| if entity in ('PERSON','ORG','GPE','NORP','LOC'): | |
| related_names = wikipedia.search(name, 10) | |
| self.related_entity.append(related_names) | |
| matches = process.extract(name, related_names) | |
| match_scores.append([match[0] for match in matches if match[1]>= 90 ]) | |
| else: | |
| self.related_entity.append([None]) | |
| match_scores.append([]) | |
| # Remove nulls | |
| self.entity_df['Wikipedia Entity'] = self.related_entity | |
| self.entity_df['Matched Entity'] = match_scores | |
| def fuzzy_disambiguation(self): | |
| # Load the entity data | |
| self.entity_df['fuzzy_match'] = '' | |
| # Load the entity data | |
| person_choices = self.entity_df.loc[self.entity_df['entity'] == 'PERSON'] | |
| org_choices = self.entity_df.loc[self.entity_df['entity'] == 'ORG'] | |
| where_choices = self.entity_df.loc[self.entity_df['entity'] == 'GPE'] | |
| norp_choices = self.entity_df.loc[self.entity_df['entity'] == 'NORP'] | |
| loc_choices = self.entity_df.loc[self.entity_df['entity'] == 'LOC'] | |
| date_choices = self.entity_df.loc[self.entity_df['entity'] == 'DATE'] | |
| def fuzzy_match(row, choices): | |
| '''This function disambiguates entities by looking for maximum three matches with a score of 80 or more | |
| for each of the entity types. If there is no match, then the function returns None. ''' | |
| match = process.extract(row["description"], choices["description"], limit=3) | |
| match = [m[0] for m in match if m[1] > 80 and m[1] != 100] | |
| if len(match) == 0: | |
| match = [] | |
| if match: | |
| self.fuzzy_match_dict[row["description"]] = match | |
| return match | |
| # Apply the fuzzy matching function to the entity dataframe | |
| self.fuzzy_match_dict = {} | |
| for i, row in self.entity_df.iterrows(): | |
| if row['entity'] == 'PERSON': | |
| self.entity_df.at[i, 'fuzzy_match'] = fuzzy_match(row, person_choices) | |
| elif row['entity'] == 'ORG': | |
| self.entity_df.at[i, 'fuzzy_match'] = fuzzy_match(row, org_choices) | |
| elif row['entity'] == 'GPE': | |
| self.entity_df.at[i, 'fuzzy_match'] = fuzzy_match(row, where_choices) | |
| elif row['entity'] == 'NORP': | |
| self.entity_df.at[i, 'fuzzy_match'] = fuzzy_match(row, norp_choices) | |
| elif row['entity'] == 'LOC': | |
| self.entity_df.at[i, 'fuzzy_match'] = fuzzy_match(row, loc_choices) | |
| elif row['entity'] == 'DATE': | |
| self.entity_df.at[i, 'fuzzy_match'] = fuzzy_match(row, date_choices) | |
| return self.entity_df | |
| def preprocessing(self, text): | |
| """This function takes a text string and strips out all punctuation. It then normalizes the string to a | |
| normalized form (using the "NFKD" normalization algorithm). Finally, it strips any special characters and | |
| converts them to their unicode equivalents. """ | |
| # remove punctuation | |
| text = text.translate(str.maketrans("", "", punctuation)) | |
| # normalize the text | |
| stop_words = stopwords.words('english') | |
| # Removing Stop words can cause losing context, instead stopwords can be utilized for knowledge | |
| filtered_words = [word for word in self.text.split()] #if word not in stop_words] | |
| # This is very hacky. Need a better way of handling bad encoding | |
| pre_text = " ".join(filtered_words) | |
| pre_text = pre_text = pre_text.replace(' ', ' ') | |
| pre_text = pre_text.replace('’', "'") | |
| pre_text = pre_text.replace('“', '"') | |
| pre_text = pre_text.replace('â€', '"') | |
| pre_text = pre_text.replace('‘', "'") | |
| pre_text = pre_text.replace('…', '...') | |
| pre_text = pre_text.replace('–', '-') | |
| pre_text = pre_text.replace("\x9d", '-') | |
| # normalize the text | |
| pre_text = unicodedata.normalize("NFKD", pre_text) | |
| # strip punctuation again as some remains in first pass | |
| pre_text = pre_text.translate(str.maketrans("", "", punctuation)) | |
| return pre_text | |
| def get_who_what_where_when(self): | |
| """Get entity information in a document. | |
| This function will return a DataFrame with the following columns: | |
| - entity: the entity being queried | |
| - description: a brief description of the entity | |
| Usage: | |
| get_who_what_where_when(text) | |
| Example: | |
| > get_who_what_where_when('This is a test') | |
| PERSON | |
| ORG | |
| GPE | |
| LOC | |
| PRODUCT | |
| EVENT | |
| LAW | |
| LANGUAGE | |
| NORP | |
| DATE | |
| GPE | |
| TIME""" | |
| # list to hold entity data | |
| article_entity_list = [] | |
| # tokenize the text | |
| doc = self.nlp(self.text) | |
| # iterate over the entities in the document but only keep those which are meaningful | |
| desired_entities = ['PERSON', 'ORG', 'GPE', 'LOC', 'PRODUCT', 'EVENT', 'LAW', 'LANGUAGE', 'NORP', 'DATE', 'GPE', | |
| 'TIME'] | |
| self.label_dict = {} | |
| # stop_words = stopwords.words('english') | |
| for ent in doc.ents: | |
| self.label_dict[ent] = ent.label_ | |
| if ent.label_ in desired_entities: | |
| # add the entity to the list | |
| entity_dict = {ent.label_: ent.text} | |
| article_entity_list.append(entity_dict) | |
| # dedupe the entities but only on exact match of values as occasional it will assign an ORG entity to PER | |
| deduplicated_entities = {frozenset(item.values()): | |
| item for item in article_entity_list}.values() | |
| # create a dataframe from the entities | |
| for record in deduplicated_entities: | |
| record_df = pd.DataFrame(record.items(), columns=["entity", "description"]) | |
| self.entity_df = pd.concat([self.entity_df, record_df], ignore_index=True) | |
| print(self.entity_df) | |
| print('______________________') | |
| return self.entity_df | |
| def entity_json(self): | |
| """Returns a JSON representation of an entity defined by the `entity_df` dataframe. The `entity_json` function | |
| will return a JSON object with the following fields: | |
| - entity: The type of the entity in the text | |
| - description: The name of the entity as described in the input text | |
| - fuzzy_match: A list of fuzzy matches for the entity. This is useful for disambiguating entities that are similar | |
| """ | |
| self.json = json.loads(self.entity_df.to_json(orient='records')) | |
| # self.json = json.dumps(self.json, indent=2) | |
| return self.json | |
| def get_wwww_json(self): | |
| """This function returns a JSON representation of the `get_who_what_where_when` function. The `get_www_json` | |
| function will return a JSON object with the following fields: | |
| - entity: The type of the entity in the text | |
| - description: The name of the entity as described in the input text | |
| - fuzzy_match: A list of fuzzy matches for the entity. This is useful for disambiguating entities that are similar | |
| """ | |
| # create a json object from the entity dataframe | |
| who_dict = {"who": [ent for ent in self.entity_json() if ent['entity'] in ['ORG', 'PERSON']]} | |
| where_dict = {"where": [ent for ent in self.entity_json() if ent['entity'] in ['GPE', 'LOC']]} | |
| when_dict = {"when": [ent for ent in self.entity_json() if ent['entity'] in ['DATE', 'TIME']]} | |
| what_dict = { | |
| "what": [ent for ent in self.entity_json() if ent['entity'] in ['PRODUCT', 'EVENT', 'LAW', 'LANGUAGE', | |
| 'NORP']]} | |
| article_wwww = [who_dict, where_dict, when_dict, what_dict] | |
| self.wwww_json = json.dumps(article_wwww,indent=2) | |
| return self.wwww_json | |
| news_article = st.text_input('Paste an Article here to be parsed') | |
| if 'parsed' not in st.session_state: | |
| st.session_state['parsed'] = None | |
| st.session_state['article'] = None | |
| if news_article: | |
| st.write('Your news article is') | |
| st.write(news_article) | |
| if st.button('Get details'): | |
| parsed = ExtractArticleEntities(news_article) | |
| if parsed: | |
| st.session_state['article'] = parsed.sorted_entity_df | |
| st.session_state['parsed'] = True | |
| st.session_state['json'] = parsed.www_json | |
| # if not st.session_state['article'].empty: | |
| def preprocessing(text): | |
| """This function takes a text string and strips out all punctuation. It then normalizes the string to a | |
| normalized form (using the "NFKD" normalization algorithm). Finally, it strips any special characters and | |
| converts them to their unicode equivalents. """ | |
| # remove punctuation | |
| if text: | |
| text = text.translate(str.maketrans("", "", punctuation)) | |
| # normalize the text | |
| stop_words = stopwords.words('english') | |
| # Removing Stop words can cause losing context, instead stopwords can be utilized for knowledge | |
| filtered_words = [word for word in text.split()] #if word not in stop_words] | |
| # This is very hacky. Need a better way of handling bad encoding | |
| pre_text = " ".join(filtered_words) | |
| pre_text = pre_text = pre_text.replace(' ', ' ') | |
| pre_text = pre_text.replace('’', "'") | |
| pre_text = pre_text.replace('“', '"') | |
| pre_text = pre_text.replace('â€', '"') | |
| pre_text = pre_text.replace('‘', "'") | |
| pre_text = pre_text.replace('…', '...') | |
| pre_text = pre_text.replace('–', '-') | |
| pre_text = pre_text.replace("\x9d", '-') | |
| # normalize the text | |
| pre_text = unicodedata.normalize("NFKD", pre_text) | |
| # strip punctuation again as some remains in first pass | |
| pre_text = pre_text.translate(str.maketrans("", "", punctuation)) | |
| else: | |
| pre_text = None | |
| return pre_text | |
| def filter_wiki_df(df): | |
| key_list = df.keys()[:2] | |
| # df.to_csv('test.csv') | |
| df = df[key_list] | |
| # if len(df.keys()) == 2: | |
| df['Match Check'] = np.where(df[df.keys()[0]] != df[df.keys()[1]], True, False) | |
| df = df[df['Match Check']!= False] | |
| df = df[key_list] | |
| df = df.dropna(how='any').reset_index(drop=True) | |
| # filtered_term = [] | |
| # for terms in df[df.keys()[0]]: | |
| # if isinstance(terms, str): | |
| # filtered_term.append(preprocessing(terms)) | |
| # else: | |
| # filtered_term.append(None) | |
| # df[df.keys()[0]] = filtered_term | |
| df.rename(columns = {key_list[0]: 'Attribute', key_list[1]: 'Value'}, inplace = True) | |
| return df | |
| def get_entity_from_selectbox(related_entity): | |
| entity = st.selectbox('Please select the term:', related_entity, key='foo') | |
| if entity: | |
| summary_entity = wikipedia.summary(entity, 3) | |
| return summary_entity | |
| if st.session_state['parsed']: | |
| df = st.session_state['article'] | |
| # left, right = st.columns(2) | |
| # with left: | |
| df_to_st = pd.DataFrame() | |
| df_to_st['Name'] = df['description'] | |
| df_to_st['Is a type of'] = df['entity'] | |
| df_to_st['Related to'] = df['Matched Entity'] | |
| df_to_st['Is a type of'] = df_to_st['Is a type of'].replace({'PERSON':'Person', | |
| 'ORG':'Organization', | |
| 'GPE':'Political Location', | |
| 'NORP':'Political or Religious Groups', | |
| 'LOC':'Non Political Location'}) | |
| gb = GridOptionsBuilder.from_dataframe(df_to_st) | |
| gb.configure_pagination(paginationAutoPageSize=True) #Add pagination | |
| gb.configure_side_bar() #Add a sidebar | |
| gb.configure_selection('multiple', use_checkbox=True, groupSelectsChildren="Group checkbox select children") #Enable multi-row selection | |
| gridOptions = gb.build() | |
| st.dataframe(df_to_st) | |
| grid_response = AgGrid( | |
| df_to_st, | |
| gridOptions=gridOptions, | |
| data_return_mode='AS_INPUT', | |
| update_mode='MODEL_CHANGED', | |
| fit_columns_on_grid_load=False, | |
| enable_enterprise_modules=True, | |
| height=350, | |
| width='100%', | |
| reload_data=True | |
| ) | |
| data = grid_response['data'] | |
| selected = grid_response['selected_rows'] | |
| selected_df = pd.DataFrame(selected) | |
| if not selected_df.empty: | |
| selected_entity = selected_df[['Name', 'Is a type of', 'Related to']] | |
| st.dataframe(selected_entity) | |
| # with right: | |
| # st.json(st.session_state['json']) | |
| entities_list = df['description'] | |
| # selected_entity = st.selectbox('Which entity you want to choose?', | |
| # entities_list) | |
| if not selected_df.empty and selected_entity['Name'].any(): | |
| # lookup_url = rf'https://lookup.dbpedia.org/api/search?query={selected_entity}' | |
| # r = requests.get(lookup_url) | |
| selected_row = df.loc[df['description'] == selected_entity['Name'][0]] | |
| entity_value = selected_row.values | |
| # st.write('Entity is a ', entity_value[0][0]) | |
| label, name, fuzzy, related, related_match,_,_,_ = entity_value[0] | |
| not_matched = [word for word in related if word not in related_match] | |
| fuzzy = fuzzy[0] if len(fuzzy) > 0 else '' | |
| related = related[0] if len(related) > 0 else '' | |
| not_matched = not_matched[0] if len(not_matched) > 0 else related | |
| related_entity_list = [name, fuzzy, not_matched] | |
| related_entity = entity_value[0][1:] | |
| google_query_term = ' '.join(related_entity_list) | |
| # search() | |
| try: | |
| urls = [i for i in search(google_query_term ,stop = 10,pause = 2.0, tld='com', lang='en', tbs='0', user_agent = get_random_user_agent())] | |
| except: | |
| urls = [] | |
| # urls = search(google_query_term+' news latest', num_results=10) | |
| st.session_state['wiki_summary'] = False | |
| all_related_entity = [] | |
| print(related_entity, ' _____') | |
| for el in related_entity[:-2]: | |
| if isinstance(el, str): | |
| all_related_entity.append(el) | |
| elif isinstance(el, int): | |
| all_related_entity.append(str(el)) | |
| else: | |
| all_related_entity.extend(el) | |
| # [ if type(el) == 'int' all_related_entity.extend(el) else all_related_entity.extend([el])for el in related_entity] | |
| for entity in all_related_entity: | |
| # print(all_related_entity) | |
| # try: | |
| if True: | |
| if entity: | |
| print(entity) | |
| entity = entity.replace(' ', '_') | |
| query = f''' | |
| SELECT ?name ?comment ?image | |
| WHERE {{ dbr:{entity} rdfs:label ?name. | |
| dbr:{entity} rdfs:comment ?comment. | |
| dbr:{entity} dbo:thumbnail ?image. | |
| FILTER (lang(?name) = 'en') | |
| FILTER (lang(?comment) = 'en') | |
| }}''' | |
| sparql.setQuery(query) | |
| sparql.setReturnFormat(JSON) | |
| qres = sparql.query().convert() | |
| if qres['results']['bindings']: | |
| result = qres['results']['bindings'][0] | |
| name, comment, image_url = result['name']['value'], result['comment']['value'], result['image']['value'] | |
| # urllib.request.urlretrieve(image_url, "img.jpg") | |
| # img = Image.open("/Users/anujkarn/NER/img.jpg") | |
| wiki_url = f'https://en.wikipedia.org/wiki/{entity}' | |
| st.write(name) | |
| # st.image(img) | |
| st.write(image_url) | |
| # try: | |
| response = requests.get(image_url) | |
| # display(Image.open(BytesIO(response.content))) | |
| try: | |
| related_image = Image.open(BytesIO(response.content)) | |
| st.image(related_image) | |
| except UnidentifiedImageError: | |
| st.write('Not able to get image') | |
| pass | |
| # except error as e: | |
| # st.write(f'Image not parsed because of : {e}') | |
| summary_entity = comment | |
| wiki_knowledge_df = pd.read_html(wiki_url)[0] | |
| wiki_knowledge_df = filter_wiki_df(wiki_knowledge_df) | |
| # st.write('Showing desciption for entity:', name) | |
| # if st.button('Want something else?'): | |
| # summary_entity = get_entity_from_selectbox(all_related_entity) | |
| break | |
| # summary_entity = wikipedia.summary(entity, 3) | |
| else: | |
| print(qres) | |
| print(query) | |
| summary_entity = None | |
| if not summary_entity: | |
| try: | |
| summary_entity = get_entity_from_selectbox(all_related_entity) | |
| # page = WikipediaPage(entity) | |
| except wikipedia.exceptions.DisambiguationError: | |
| st.write('Disambiguation is there for term') | |
| if selected_entity['Name'].any(): | |
| st.write(f'Summary for {selected_entity["Name"][0]}') | |
| st.write(summary_entity) | |