|
import os, sys |
|
sys.path.insert(0, os.getcwd()) |
|
import pandas as pd |
|
import global_variables as gb |
|
import urlexpander |
|
import tweepy |
|
import requests, json |
|
import re |
|
import global_utils as utils |
|
|
|
|
|
class PreProcessor(): |
|
|
|
|
|
|
|
def __init__(self) -> None: |
|
self.logger = utils.get_index_preprocessor_logger() |
|
self.api = self.get_api() |
|
|
|
|
|
|
|
def expand_url_using_requests(self, url): |
|
try: |
|
session = requests.Session() |
|
resp = session.head(url, allow_redirects=True,timeout=10) |
|
return resp.url |
|
except: |
|
return "" |
|
|
|
|
|
def expand_url(self, shortened_url): |
|
|
|
CLIENT_ERROR = "__CLIENT_ERROR__" |
|
CONNECTIONPOOL_ERROR = "__CONNECTIONPOOL_ERROR__" |
|
expanded_url = "" |
|
try: |
|
expanded_url = urlexpander.expand(shortened_url) |
|
|
|
if CLIENT_ERROR in expanded_url: |
|
expanded_url = self.expand_url_using_requests(shortened_url) |
|
if CLIENT_ERROR in expanded_url: |
|
self.logger.warn("Client error while expanding url: ", shortened_url) |
|
expanded_url = "" |
|
|
|
if CONNECTIONPOOL_ERROR in expanded_url: |
|
expanded_url = self.expand_url_using_requests(shortened_url) |
|
if CONNECTIONPOOL_ERROR in expanded_url: |
|
print("CONNECTION POOL error while expanding url: ", shortened_url) |
|
expanded_url = "" |
|
|
|
except Exception as e: |
|
self.logger.warn("Cannot expand this url {} for this reason \n {} ".format(shortened_url, e)) |
|
expanded_url = "" |
|
return expanded_url |
|
|
|
|
|
def get_webpage_title(self, expanded_url): |
|
|
|
PAGE_404 = "page 404" |
|
PAGE_NOT_FOUND = "Page not found" |
|
title = "" |
|
try: |
|
meta = urlexpander.html_utils.get_webpage_meta(expanded_url) |
|
title = meta["title"] |
|
|
|
if title is None: |
|
title = "" |
|
|
|
if PAGE_NOT_FOUND.lower() in title.lower() or PAGE_404 in title: |
|
self.logger.warn("Page not found for this url: ", expanded_url) |
|
title = "" |
|
except Exception as e: |
|
self.logger.warn("Cannot find the title for this url {} for this reason \n {} ".format(expanded_url, e)) |
|
title = "" |
|
|
|
return title |
|
|
|
|
|
def get_api(self,): |
|
consumer_key = cf.consumer_key |
|
consumer_secret = cf.consumer_secret |
|
access_token = cf.access_token |
|
access_token_secret = cf.access_token_secret |
|
|
|
auth = tweepy.OAuthHandler(consumer_key, consumer_secret) |
|
auth.set_access_token(access_token, access_token_secret) |
|
|
|
|
|
api = tweepy.API(auth) |
|
return api |
|
|
|
def get_username_out_of_handle(self, user_handle): |
|
user_name = "" |
|
try: |
|
|
|
user_handle = user_handle.lower() |
|
user_handle = user_handle.replace('@','') |
|
user_handle = user_handle.replace(' ','') |
|
user = self.api.get_user(user_handle) |
|
user_id = user.id |
|
user_name = user.name |
|
|
|
except Exception as e: |
|
self.logger.warn("Cannot get username for this handle {} for this reason \n {} ".format(user_handle, e)) |
|
user_name = "" |
|
|
|
return user_name |
|
|
|
|
|
|
|
def get_media_guess_from_url(self, image_url): |
|
|
|
try: |
|
server_url = "http://localhost:5000/search" |
|
data = {"image_url": image_url} |
|
headers = {'Content-type': 'application/json'} |
|
response = requests.post(server_url, headers=headers, data=json.dumps(data)) |
|
json_response = response.json() |
|
best_guess = json_response['best_guess'] |
|
if best_guess == "language" or best_guess == "event": |
|
|
|
best_guess = "" |
|
except Exception as e: |
|
self.logger.warn("cannot get the best guess for this image url: {} for this reason \n {} ".format(image_url, e)) |
|
best_guess = "" |
|
|
|
return best_guess |
|
|
|
|
|
|
|
def get_media_guess(self, tweet): |
|
|
|
media_best_guess= "" |
|
if 'media' in tweet.entities: |
|
for media in tweet['extended_entities']['media']: |
|
try: |
|
media_url = media['media_url'] |
|
media_best_guess = media_best_guess + self.get_media_guess_from_url(media_url) + " " |
|
except: |
|
self.logger.exception("Error: Unable to extract best guess for this tweet : {}".format(tweet['id_str'])) |
|
|
|
for media in tweet['extended_entities']['media']: |
|
try: |
|
media_url = media['expanded_url'] |
|
media_best_guess = media_best_guess + self.get_media_guess_from_url(media_url) + " " |
|
except: |
|
self.logger.exception("Error: Unable to extract best guess for this tweet: {}".format(tweet['id_str'])) |
|
|
|
return media_best_guess |
|
|
|
|
|
|
|
def get_media_guess_from_tweet_id(self, tweet_id): |
|
|
|
try: |
|
tweet = self.api.get_status(tweet_id, tweet_mode="extended") |
|
except: |
|
self.logger.exception("Error: No data available for specified ID {} ".format(tweet_id)) |
|
return "" |
|
|
|
with open('tweet.json', 'w', encoding='utf8') as file: |
|
json.dump(tweet._json, file,) |
|
media_best_guess = self.get_media_guess(tweet) |
|
return media_best_guess |
|
|
|
|
|
|
|
def reformat_urls(self, tweet): |
|
''' Separate consecutive URLs with spaces, and add https prior to pic.twitter''' |
|
tweet = tweet.replace("https", " https") |
|
tweet = tweet.replace("http", " http") |
|
tweet = tweet.replace("pic.twitter", " https://pic.twitter") |
|
return tweet |
|
|
|
|
|
def remove_handle_from_second_part(self, tweet): |
|
try: |
|
if '—' in tweet: |
|
second_part = tweet.split('—')[1] |
|
new_second_part = re.sub(r"@[\w]*", " ", second_part) |
|
tweet = tweet.replace(second_part, new_second_part) |
|
except: |
|
print("Cannot remove handle from second part for this tweet: ", tweet) |
|
return tweet |
|
|
|
|
|
def get_tweet_id(self, tweet_url): |
|
try: |
|
|
|
tweet_id = tweet_url.split('status/')[-1].split('?')[0].split('/')[0] |
|
except: |
|
print("Error: cannot get the id out of this url: ", tweet_url) |
|
tweet_id = "" |
|
return tweet_id |
|
|
|
|
|
def expand_tweet(self, tweet): |
|
|
|
tweet_text = tweet['full_text'] |
|
|
|
tweet_text = self.reformat_urls(tweet_text) |
|
|
|
|
|
|
|
handle_pattern = re.compile(r"@[\w]*") |
|
iterator = handle_pattern.finditer(tweet_text) |
|
for match in iterator: |
|
user_handle = match.group() |
|
user_name = self.get_username_out_of_handle(user_handle) |
|
tweet_text = tweet_text.replace(user_handle, user_name) |
|
|
|
|
|
for item in tweet["entities"]['urls']: |
|
try: |
|
url = item['url'] |
|
expanded_url = item['expanded_url'] |
|
webpage_title = self.get_webpage_title(expanded_url) |
|
webpage_title = re.sub(r'\W*$', '', webpage_title) |
|
|
|
if webpage_title in tweet_text: |
|
webpage_title = "" |
|
tweet_text = tweet_text.replace(url, webpage_title) |
|
|
|
except: |
|
self.logger.exception("Error: Unable to extract best guess for this tweet : {} ".format(tweet['id_str'])) |
|
|
|
if 'media' not in tweet['entities']: |
|
return tweet_text |
|
|
|
|
|
for media in tweet['entities']['media']: |
|
try: |
|
url = media["url"] |
|
expanded_url = media['expanded_url'] |
|
media_best_guess = " , " + self.get_media_guess_from_url(expanded_url) |
|
tweet_text = tweet_text.replace(url, media_best_guess) |
|
except: |
|
self.logger.exception("Error: Unable to extract best guess for this tweet : {} ".format(tweet['id_str'])) |
|
|
|
|
|
return tweet_text |
|
|
|
|