AraBERT_claim_retrieval / Preprocessing.py
watheq
Upload the model
4839ed5
import os, sys
sys.path.insert(0, os.getcwd())
import pandas as pd
import global_variables as gb
import urlexpander
import tweepy
import requests, json
import re
import global_utils as utils
class PreProcessor():
def __init__(self) -> None:
self.logger = utils.get_index_preprocessor_logger()
self.api = self.get_api()
def expand_url_using_requests(self, url):
try:
session = requests.Session() # so connections are recycled
resp = session.head(url, allow_redirects=True,timeout=10)
return resp.url
except:
return ""
def expand_url(self, shortened_url):
# shortened_url = shortened_url.replace(' ','')
CLIENT_ERROR = "__CLIENT_ERROR__"
CONNECTIONPOOL_ERROR = "__CONNECTIONPOOL_ERROR__"
expanded_url = ""
try:
expanded_url = urlexpander.expand(shortened_url) # expand url using urlexpander library
if CLIENT_ERROR in expanded_url:
expanded_url = self.expand_url_using_requests(shortened_url) # expand url using custom function. Another way if the first didn't work
if CLIENT_ERROR in expanded_url:
self.logger.warn("Client error while expanding url: ", shortened_url)
expanded_url = ""
if CONNECTIONPOOL_ERROR in expanded_url:
expanded_url = self.expand_url_using_requests(shortened_url) # expand url using custom function. Another way if the first didn't work
if CONNECTIONPOOL_ERROR in expanded_url:
print("CONNECTION POOL error while expanding url: ", shortened_url)
expanded_url = ""
except Exception as e:
self.logger.warn("Cannot expand this url {} for this reason \n {} ".format(shortened_url, e))
expanded_url = ""
return expanded_url
def get_webpage_title(self, expanded_url):
PAGE_404 = "page 404"
PAGE_NOT_FOUND = "Page not found"
title = ""
try:
meta = urlexpander.html_utils.get_webpage_meta(expanded_url)
title = meta["title"]
if title is None:
title = ""
if PAGE_NOT_FOUND.lower() in title.lower() or PAGE_404 in title:
self.logger.warn("Page not found for this url: ", expanded_url)
title = ""
except Exception as e:
self.logger.warn("Cannot find the title for this url {} for this reason \n {} ".format(expanded_url, e))
title = ""
return title
def get_api(self,):
consumer_key = cf.consumer_key
consumer_secret = cf.consumer_secret
access_token = cf.access_token
access_token_secret = cf.access_token_secret
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
# perform authentication and get api object
api = tweepy.API(auth)
return api
def get_username_out_of_handle(self, user_handle):
user_name = ""
try:
# clean the handle
user_handle = user_handle.lower()
user_handle = user_handle.replace('@','')
user_handle = user_handle.replace(' ','')
user = self.api.get_user(user_handle)
user_id = user.id
user_name = user.name
except Exception as e:
self.logger.warn("Cannot get username for this handle {} for this reason \n {} ".format(user_handle, e))
user_name = ""
return user_name
def get_media_guess_from_url(self, image_url):
try:
server_url = "http://localhost:5000/search"
data = {"image_url": image_url}
headers = {'Content-type': 'application/json'}
response = requests.post(server_url, headers=headers, data=json.dumps(data))
json_response = response.json()
best_guess = json_response['best_guess']
if best_guess == "language" or best_guess == "event":
# if the best guess is only one of these words, then no need to add them to the query
best_guess = ""
except Exception as e:
self.logger.warn("cannot get the best guess for this image url: {} for this reason \n {} ".format(image_url, e))
best_guess = ""
return best_guess
def get_media_guess(self, tweet):
media_best_guess= ""
if 'media' in tweet.entities:
for media in tweet['extended_entities']['media']:
try:
media_url = media['media_url'] # for getting the image URL
media_best_guess = media_best_guess + self.get_media_guess_from_url(media_url) + " "
except:
self.logger.exception("Error: Unable to extract best guess for this tweet : {}".format(tweet['id_str']))
for media in tweet['extended_entities']['media']:
try:
media_url = media['expanded_url'] # in case there is a video
media_best_guess = media_best_guess + self.get_media_guess_from_url(media_url) + " "
except:
self.logger.exception("Error: Unable to extract best guess for this tweet: {}".format(tweet['id_str']))
return media_best_guess
def get_media_guess_from_tweet_id(self, tweet_id):
try:
tweet = self.api.get_status(tweet_id, tweet_mode="extended")
except:
self.logger.exception("Error: No data available for specified ID {} ".format(tweet_id))
return ""
with open('tweet.json', 'w', encoding='utf8') as file:
json.dump(tweet._json, file,)
media_best_guess = self.get_media_guess(tweet)
return media_best_guess
def reformat_urls(self, tweet):
''' Separate consecutive URLs with spaces, and add https prior to pic.twitter'''
tweet = tweet.replace("https", " https")
tweet = tweet.replace("http", " http")
tweet = tweet.replace("pic.twitter", " https://pic.twitter")
return tweet
def remove_handle_from_second_part(self, tweet):
try:
if '—' in tweet:
second_part = tweet.split('—')[1]
new_second_part = re.sub(r"@[\w]*", " ", second_part) # remove handles
tweet = tweet.replace(second_part, new_second_part)
except:
print("Cannot remove handle from second part for this tweet: ", tweet)
return tweet
def get_tweet_id(self, tweet_url):
try:
# 1. get everything after 'status/' 2. Remove everything after '?' 3. Get the id before the first '/'
tweet_id = tweet_url.split('status/')[-1].split('?')[0].split('/')[0]
except:
print("Error: cannot get the id out of this url: ", tweet_url)
tweet_id = ""
return tweet_id
def expand_tweet(self, tweet):
tweet_text = tweet['full_text']
# 1. Format URLs in a readable way and remove handle from second part. Username for the handle in the second part is already exist
tweet_text = self.reformat_urls(tweet_text)
# tweet = remove_handle_from_second_part(tweet)
# 2. Replace handles with their names
handle_pattern = re.compile(r"@[\w]*")
iterator = handle_pattern.finditer(tweet_text)
for match in iterator:
user_handle = match.group() # group: Return the string matched by the RE
user_name = self.get_username_out_of_handle(user_handle)
tweet_text = tweet_text.replace(user_handle, user_name)
# 3. replace URLs with their titles
for item in tweet["entities"]['urls']:
try:
url = item['url']
expanded_url = item['expanded_url']
webpage_title = self.get_webpage_title(expanded_url)
webpage_title = re.sub(r'\W*$', '', webpage_title) # remove punctuation from the tail
# expanded_url = self.expand_url(url)
if webpage_title in tweet_text: # to avoid repetition
webpage_title = ""
tweet_text = tweet_text.replace(url, webpage_title)
except:
self.logger.exception("Error: Unable to extract best guess for this tweet : {} ".format(tweet['id_str']))
if 'media' not in tweet['entities']: # No images or videos,
return tweet_text
# 4. replace images/videos with their titles
for media in tweet['entities']['media']:
try:
url = media["url"]
expanded_url = media['expanded_url'] # for getting the image/video URL
media_best_guess = " , " + self.get_media_guess_from_url(expanded_url)
tweet_text = tweet_text.replace(url, media_best_guess)
except:
self.logger.exception("Error: Unable to extract best guess for this tweet : {} ".format(tweet['id_str']))
# tweet[gb.EXPANDED_TEXT] = tweet_text
return tweet_text