|
import os
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, List
|
|
import pandas as pd
|
|
import tweepy
|
|
import praw
|
|
import googleapiclient.discovery
|
|
import pytumblr
|
|
from gnews import GNews
|
|
import requests
|
|
from bs4 import BeautifulSoup
|
|
import time
|
|
import math
|
|
|
|
|
|
class DataFetch:
|
|
def __init__(self):
|
|
|
|
self.end_date = datetime.now()
|
|
self.start_date = self.end_date - timedelta(days=1)
|
|
|
|
|
|
self.tumblr_client = pytumblr.TumblrRestClient(
|
|
os.getenv("TUMBLR_CONSUMER_KEY"),
|
|
os.getenv("TUMBLR_CONSUMER_SECRET"),
|
|
os.getenv("TUMBLR_OAUTH_TOKEN"),
|
|
os.getenv("TUMBLR_OAUTH_SECRET")
|
|
)
|
|
|
|
twitter_auth = tweepy.OAuthHandler(os.getenv("TWITTER_API_KEY"), os.getenv("TWITTER_API_SECRET"))
|
|
twitter_auth.set_access_token(os.getenv("TWITTER_ACCESS_TOKEN"), os.getenv("TWITTER_ACCESS_TOKEN_SECRET"))
|
|
self.twitter_api = tweepy.API(twitter_auth)
|
|
|
|
self.reddit = praw.Reddit(
|
|
client_id=os.getenv("REDDIT_CLIENT_ID"),
|
|
client_secret=os.getenv("REDDIT_CLIENT_SECRET"),
|
|
user_agent="Sentiment Analysis Bot 1.0"
|
|
)
|
|
|
|
self.youtube = googleapiclient.discovery.build("youtube", "v3", developerKey=os.getenv("YOUTUBE_API_KEY"))
|
|
|
|
def load_company_list(self, file_path: str) -> List[str]:
|
|
self.company_list = pd.read_csv(file_path)['company_ticker'].tolist()
|
|
|
|
def collect_data(self) -> List[Dict]:
|
|
all_data = []
|
|
|
|
for company in self.company_list:
|
|
print(f"{company}:")
|
|
all_data.extend(self._collect_social_media_data(company))
|
|
all_data.extend(self._collect_news_data(company))
|
|
|
|
return all_data
|
|
|
|
def _collect_social_media_data(self, query: str) -> List[Dict]:
|
|
social_data = []
|
|
|
|
print("Collecting Reddit Data")
|
|
social_data.extend(self.collect_reddit_data(query))
|
|
|
|
print("Collecting YouTube Data")
|
|
social_data.extend(self.collect_youtube_data(query))
|
|
|
|
print("Collecting Tumblr Data")
|
|
social_data.extend(self.collect_tumblr_data(query))
|
|
|
|
return social_data
|
|
|
|
def _collect_news_data(self, query: str) -> List[Dict]:
|
|
news_data = []
|
|
|
|
print("Collecting Google News Data")
|
|
news_data.extend(self.collect_google_news(query))
|
|
|
|
print("Collecting Financial Times Data")
|
|
news_data.extend(self.collect_financial_times(query))
|
|
|
|
print("Collecting Bloomberg Data")
|
|
news_data.extend(self.collect_bloomberg(query))
|
|
|
|
print("Collecting Reuters Data")
|
|
news_data.extend(self.collect_reuters(query))
|
|
|
|
print("Collecting WSJ Data")
|
|
|
|
|
|
print("Collecting Serper Data - StockNews, Yahoo Finance, Insider Monkey, Investor's Business Daily, etc.")
|
|
news_data.extend(self.search_news(query))
|
|
|
|
return news_data
|
|
|
|
def collect_tumblr_data(self, query: str) -> List[Dict]:
|
|
posts = self.tumblr_client.tagged(query)
|
|
return [{"platform": "Tumblr", "company": query, "page_content": {
|
|
"title": post["blog"]["title"], "content": post["blog"]["description"]}} for post in posts]
|
|
|
|
def collect_twitter_data(self, query: str) -> List[Dict]:
|
|
tweets = []
|
|
for tweet in tweepy.Cursor(self.twitter_api.search_tweets, q=query, lang="en",
|
|
since=self.start_date, until=self.end_date).items(100):
|
|
tweets.append(tweet._json)
|
|
return [{"platform": "Twitter", "company": query, "page_content": tweet} for tweet in tweets]
|
|
|
|
def collect_reddit_data(self, query: str) -> List[Dict]:
|
|
posts = []
|
|
subreddit = self.reddit.subreddit("all")
|
|
for post in subreddit.search(query, sort="new", time_filter="day"):
|
|
post_date = datetime.fromtimestamp(post.created_utc)
|
|
if self.start_date <= post_date <= self.end_date:
|
|
posts.append({"platform": "Reddit", "company": query, "page_content": {
|
|
"title": post.title, "content": post.selftext}})
|
|
return posts
|
|
|
|
def collect_youtube_data(self, query: str) -> List[Dict]:
|
|
request = self.youtube.search().list(
|
|
q=query, type="video", part="id,snippet", maxResults=50,
|
|
publishedAfter=self.start_date.isoformat() + "Z", publishedBefore=self.end_date.isoformat() + "Z"
|
|
)
|
|
response = request.execute()
|
|
return [{"platform": "YouTube", "company": query, "page_content": {
|
|
"title": item["snippet"]["title"], "content": item["snippet"]["description"]}} for item in response['items']]
|
|
|
|
def collect_google_news(self, query: str) -> List[Dict]:
|
|
google_news = GNews(language='en', country='US', start_date=self.start_date, end_date=self.end_date)
|
|
articles = google_news.get_news(query)
|
|
return [{"platform": "Google News", "company": query, "page_content": {
|
|
"title": article["title"], "content": article["description"]}} for article in articles]
|
|
|
|
def collect_financial_times(self, query: str) -> List[Dict]:
|
|
url = f"https://www.ft.com/search?q={query}&dateTo={self.end_date.strftime('%Y-%m-%d')}&dateFrom={self.start_date.strftime('%Y-%m-%d')}"
|
|
response = requests.get(url)
|
|
soup = BeautifulSoup(response.content, 'html.parser')
|
|
articles = soup.find_all('div', class_='o-teaser__content')
|
|
return [{"platform": "Financial Times", "company": query, "page_content": {
|
|
"title": a.find('div', class_='o-teaser__heading').text.strip(),
|
|
"content": a.find('p', class_='o-teaser__standfirst').text.strip() if a.find('p', class_='o-teaser__standfirst') else ''
|
|
}} for a in articles]
|
|
|
|
def collect_bloomberg(self, query: str) -> List[Dict]:
|
|
url = f"https://www.bloomberg.com/search?query={query}"
|
|
response = requests.get(url)
|
|
soup = BeautifulSoup(response.content, 'html.parser')
|
|
articles = soup.find_all('div', class_='storyItem__aaf871c1')
|
|
return [{"platform": "Bloomberg", "company": query, "page_content": {
|
|
"title": a.find('a', class_='headline__3a97424d').text.strip(),
|
|
"content": a.find('p', class_='summary__483358e1').text.strip() if a.find('p', class_='summary__483358e1') else ''
|
|
}} for a in articles]
|
|
|
|
def collect_reuters(self, query: str) -> List[Dict]:
|
|
articles = []
|
|
base_url = "https://www.reuters.com/site-search/"
|
|
page = 1
|
|
while True:
|
|
url = f"{base_url}?blob={query}&page={page}"
|
|
response = requests.get(url)
|
|
soup = BeautifulSoup(response.content, 'html.parser')
|
|
results = soup.find_all('li', class_='search-result__item')
|
|
if not results:
|
|
break
|
|
for result in results:
|
|
date_elem = result.find('time', class_='search-result__timestamp')
|
|
if date_elem:
|
|
date = datetime.strptime(date_elem['datetime'], "%Y-%m-%dT%H:%M:%SZ")
|
|
if self.start_date <= date <= self.end_date:
|
|
articles.append({"platform": "Reuters", "company": query, "page_content": {
|
|
"title": result.find('h3', class_='search-result__headline').text.strip(),
|
|
"content": result.find('p', class_='search-result__excerpt').text.strip()
|
|
}})
|
|
elif date < self.start_date:
|
|
return articles
|
|
page += 1
|
|
time.sleep(1)
|
|
return articles
|
|
|
|
def collect_wsj(self, query: str) -> List[Dict]:
|
|
articles = []
|
|
base_url = "https://www.wsj.com/search"
|
|
page = 1
|
|
while True:
|
|
params = {
|
|
'query': query, 'isToggleOn': 'true', 'operator': 'AND', 'sort': 'date-desc',
|
|
'duration': 'custom', 'startDate': self.start_date.strftime('%Y/%m/%d'),
|
|
'endDate': self.end_date.strftime('%Y/%m/%d'), 'page': page
|
|
}
|
|
response = requests.get(base_url, params=params)
|
|
soup = BeautifulSoup(response.content, 'html.parser')
|
|
results = soup.find_all('article', class_='WSJTheme--story--XB4V2mLz')
|
|
if not results:
|
|
break
|
|
for result in results:
|
|
date_elem = result.find('p', class_='WSJTheme--timestamp--22sfkNDv')
|
|
if date_elem:
|
|
date = datetime.strptime(date_elem.text.strip(), "%B %d, %Y")
|
|
if self.start_date <= date <= self.end_date:
|
|
articles.append({"platform": "Wall Street Journal", "company": query, "page_content": {
|
|
"title": result.find('h3', class_='WSJTheme--headline--unZqjb45').text.strip(),
|
|
"content": result.find('p', class_='WSJTheme--summary--lmOXEsbN').text.strip()
|
|
}})
|
|
elif date < self.start_date:
|
|
return articles
|
|
page += 1
|
|
time.sleep(1)
|
|
return articles
|
|
|
|
def search_news(self, query: str,cnt=300) -> List[Dict]:
|
|
articles = []
|
|
num_results = cnt
|
|
|
|
headers = {
|
|
"X-API-KEY": os.getenv("SERP_API_KEY"),
|
|
"Content-Type": "application/json"
|
|
}
|
|
payload = {"q": f"{query} company news",
|
|
"num": num_results,
|
|
"dateRestrict": 14
|
|
}
|
|
response = requests.post(
|
|
"https://google.serper.dev/news",
|
|
headers=headers,
|
|
json=payload
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
results = response.json().get("news", [])
|
|
for result in results:
|
|
articles.append({"platform": result["source"], "company": query, "page_content": {
|
|
"title": result["title"],
|
|
"content": result["snippet"]
|
|
}})
|
|
return articles
|
|
|
|
|
|
if __name__ == "__main__":
|
|
analyzer = DataFetch("company_list.csv")
|
|
data = analyzer.collect_data()
|
|
|
|
|