import os, sys sys.path.insert(0, os.getcwd()) import pandas as pd import global_variables as gb import urlexpander import tweepy import requests, json import re import global_utils as utils class PreProcessor(): def __init__(self) -> None: self.logger = utils.get_index_preprocessor_logger() self.api = self.get_api() def expand_url_using_requests(self, url): try: session = requests.Session() # so connections are recycled resp = session.head(url, allow_redirects=True,timeout=10) return resp.url except: return "" def expand_url(self, shortened_url): # shortened_url = shortened_url.replace(' ','') CLIENT_ERROR = "__CLIENT_ERROR__" CONNECTIONPOOL_ERROR = "__CONNECTIONPOOL_ERROR__" expanded_url = "" try: expanded_url = urlexpander.expand(shortened_url) # expand url using urlexpander library if CLIENT_ERROR in expanded_url: expanded_url = self.expand_url_using_requests(shortened_url) # expand url using custom function. Another way if the first didn't work if CLIENT_ERROR in expanded_url: self.logger.warn("Client error while expanding url: ", shortened_url) expanded_url = "" if CONNECTIONPOOL_ERROR in expanded_url: expanded_url = self.expand_url_using_requests(shortened_url) # expand url using custom function. Another way if the first didn't work if CONNECTIONPOOL_ERROR in expanded_url: print("CONNECTION POOL error while expanding url: ", shortened_url) expanded_url = "" except Exception as e: self.logger.warn("Cannot expand this url {} for this reason \n {} ".format(shortened_url, e)) expanded_url = "" return expanded_url def get_webpage_title(self, expanded_url): PAGE_404 = "page 404" PAGE_NOT_FOUND = "Page not found" title = "" try: meta = urlexpander.html_utils.get_webpage_meta(expanded_url) title = meta["title"] if title is None: title = "" if PAGE_NOT_FOUND.lower() in title.lower() or PAGE_404 in title: self.logger.warn("Page not found for this url: ", expanded_url) title = "" except Exception as e: self.logger.warn("Cannot find the title for this url {} for this reason \n {} ".format(expanded_url, e)) title = "" return title def get_api(self,): consumer_key = cf.consumer_key consumer_secret = cf.consumer_secret access_token = cf.access_token access_token_secret = cf.access_token_secret auth = tweepy.OAuthHandler(consumer_key, consumer_secret) auth.set_access_token(access_token, access_token_secret) # perform authentication and get api object api = tweepy.API(auth) return api def get_username_out_of_handle(self, user_handle): user_name = "" try: # clean the handle user_handle = user_handle.lower() user_handle = user_handle.replace('@','') user_handle = user_handle.replace(' ','') user = self.api.get_user(user_handle) user_id = user.id user_name = user.name except Exception as e: self.logger.warn("Cannot get username for this handle {} for this reason \n {} ".format(user_handle, e)) user_name = "" return user_name def get_media_guess_from_url(self, image_url): try: server_url = "http://localhost:5000/search" data = {"image_url": image_url} headers = {'Content-type': 'application/json'} response = requests.post(server_url, headers=headers, data=json.dumps(data)) json_response = response.json() best_guess = json_response['best_guess'] if best_guess == "language" or best_guess == "event": # if the best guess is only one of these words, then no need to add them to the query best_guess = "" except Exception as e: self.logger.warn("cannot get the best guess for this image url: {} for this reason \n {} ".format(image_url, e)) best_guess = "" return best_guess def get_media_guess(self, tweet): media_best_guess= "" if 'media' in tweet.entities: for media in tweet['extended_entities']['media']: try: media_url = media['media_url'] # for getting the image URL media_best_guess = media_best_guess + self.get_media_guess_from_url(media_url) + " " except: self.logger.exception("Error: Unable to extract best guess for this tweet : {}".format(tweet['id_str'])) for media in tweet['extended_entities']['media']: try: media_url = media['expanded_url'] # in case there is a video media_best_guess = media_best_guess + self.get_media_guess_from_url(media_url) + " " except: self.logger.exception("Error: Unable to extract best guess for this tweet: {}".format(tweet['id_str'])) return media_best_guess def get_media_guess_from_tweet_id(self, tweet_id): try: tweet = self.api.get_status(tweet_id, tweet_mode="extended") except: self.logger.exception("Error: No data available for specified ID {} ".format(tweet_id)) return "" with open('tweet.json', 'w', encoding='utf8') as file: json.dump(tweet._json, file,) media_best_guess = self.get_media_guess(tweet) return media_best_guess def reformat_urls(self, tweet): ''' Separate consecutive URLs with spaces, and add https prior to pic.twitter''' tweet = tweet.replace("https", " https") tweet = tweet.replace("http", " http") tweet = tweet.replace("pic.twitter", " https://pic.twitter") return tweet def remove_handle_from_second_part(self, tweet): try: if '—' in tweet: second_part = tweet.split('—')[1] new_second_part = re.sub(r"@[\w]*", " ", second_part) # remove handles tweet = tweet.replace(second_part, new_second_part) except: print("Cannot remove handle from second part for this tweet: ", tweet) return tweet def get_tweet_id(self, tweet_url): try: # 1. get everything after 'status/' 2. Remove everything after '?' 3. Get the id before the first '/' tweet_id = tweet_url.split('status/')[-1].split('?')[0].split('/')[0] except: print("Error: cannot get the id out of this url: ", tweet_url) tweet_id = "" return tweet_id def expand_tweet(self, tweet): tweet_text = tweet['full_text'] # 1. Format URLs in a readable way and remove handle from second part. Username for the handle in the second part is already exist tweet_text = self.reformat_urls(tweet_text) # tweet = remove_handle_from_second_part(tweet) # 2. Replace handles with their names handle_pattern = re.compile(r"@[\w]*") iterator = handle_pattern.finditer(tweet_text) for match in iterator: user_handle = match.group() # group: Return the string matched by the RE user_name = self.get_username_out_of_handle(user_handle) tweet_text = tweet_text.replace(user_handle, user_name) # 3. replace URLs with their titles for item in tweet["entities"]['urls']: try: url = item['url'] expanded_url = item['expanded_url'] webpage_title = self.get_webpage_title(expanded_url) webpage_title = re.sub(r'\W*$', '', webpage_title) # remove punctuation from the tail # expanded_url = self.expand_url(url) if webpage_title in tweet_text: # to avoid repetition webpage_title = "" tweet_text = tweet_text.replace(url, webpage_title) except: self.logger.exception("Error: Unable to extract best guess for this tweet : {} ".format(tweet['id_str'])) if 'media' not in tweet['entities']: # No images or videos, return tweet_text # 4. replace images/videos with their titles for media in tweet['entities']['media']: try: url = media["url"] expanded_url = media['expanded_url'] # for getting the image/video URL media_best_guess = " , " + self.get_media_guess_from_url(expanded_url) tweet_text = tweet_text.replace(url, media_best_guess) except: self.logger.exception("Error: Unable to extract best guess for this tweet : {} ".format(tweet['id_str'])) # tweet[gb.EXPANDED_TEXT] = tweet_text return tweet_text