|
import os |
|
import feedparser |
|
import urllib.parse |
|
import newspaper |
|
import functools |
|
from transformers import pipeline, BartForConditionalGeneration, BartTokenizer |
|
from sentence_transformers import SentenceTransformer, util |
|
from datetime import datetime |
|
from concurrent.futures import ThreadPoolExecutor |
|
import pandas as pd |
|
import time |
|
import sys |
|
import gradio as gr |
|
from flask import Flask, send_file |
|
|
|
|
|
sentiment_analysis = pipeline("sentiment-analysis", model="ProsusAI/finbert") |
|
|
|
|
|
sentence_model = SentenceTransformer("all-MiniLM-L6-v2") |
|
|
|
|
|
bart_model_name = "facebook/bart-large-cnn" |
|
bart_model = BartForConditionalGeneration.from_pretrained(bart_model_name) |
|
bart_tokenizer = BartTokenizer.from_pretrained(bart_model_name) |
|
|
|
|
|
article_cache = {} |
|
|
|
def fetch_article(url): |
|
"""Fetch article text from URL.""" |
|
if url not in article_cache: |
|
article = newspaper.Article(url) |
|
article.download() |
|
article.parse() |
|
article_cache[url] = article.text |
|
return article_cache[url] |
|
|
|
def fetch_and_analyze_news_entry(entry, company_name, company_ticker, location): |
|
"""Fetch and analyze sentiment for a single news entry.""" |
|
title = entry.title |
|
url = entry.link |
|
domain = urllib.parse.urlparse(url).netloc |
|
publishing_date = entry.published_parsed |
|
|
|
|
|
try: |
|
label, score = analyze_sentiment(title) |
|
sentiment_label = "Positive" if label == "positive" else "Negative" if label == "negative" else "Neutral" |
|
except Exception as e: |
|
print(f"Error analyzing sentiment for title: {title}. Error: {e}") |
|
sentiment_label = "Unknown" |
|
|
|
try: |
|
|
|
article_text = fetch_article(url) |
|
except Exception as e: |
|
print(f"Error fetching article at URL: {url}. Skipping article.") |
|
return { |
|
"title": title, |
|
"url": url, |
|
"domain": domain, |
|
"location": location, |
|
"publishing_date": datetime.fromtimestamp(time.mktime(publishing_date)).strftime("%Y-%m-%d %H:%M:%S"), |
|
"sentiment": sentiment_label, |
|
"detailed_summary": "Paywall Detected", |
|
"similarity_score": calculate_similarity(company_name, company_ticker, title) |
|
} |
|
|
|
|
|
detailed_summary = news_detailed(article_text) |
|
|
|
|
|
similarity_score = calculate_similarity(company_name, company_ticker, title) |
|
|
|
return { |
|
"title": title, |
|
"url": url, |
|
"domain": domain, |
|
"location": location, |
|
"publishing_date": datetime.fromtimestamp(time.mktime(publishing_date)).strftime("%Y-%m-%d %H:%M:%S"), |
|
"sentiment": sentiment_label, |
|
"detailed_summary": detailed_summary, |
|
"similarity_score": similarity_score |
|
} |
|
|
|
def fetch_and_analyze_news(company_name, company_ticker, event_name, start_date=None, end_date=None, location=None, num_news=5, include_domains=None, exclude_domains=None): |
|
"""Fetch and analyze news entries.""" |
|
|
|
query_name = f"{company_name} {event_name} {location}" |
|
|
|
|
|
if start_date and end_date: |
|
query_name += f" after:{start_date} before:{end_date}" |
|
|
|
|
|
if include_domains: |
|
include_domains_query = " OR ".join(f"site:{domain.strip()}" for domain in include_domains) |
|
query_name += f" {include_domains_query}" |
|
|
|
if exclude_domains: |
|
exclude_domains_query = " ".join(f"-site:{domain.strip()}" for domain in exclude_domains) |
|
query_name += f" {exclude_domains_query}" |
|
|
|
encoded_query_name = urllib.parse.quote(query_name) |
|
rss_url_name = f"https://news.google.com/rss/search?q={encoded_query_name}" |
|
|
|
|
|
feed_name = feedparser.parse(rss_url_name) |
|
news_entries_name = feed_name.entries[:num_news] |
|
|
|
analyzed_news_name = [] |
|
|
|
|
|
with ThreadPoolExecutor() as executor: |
|
analyze_news_entry_func = functools.partial(fetch_and_analyze_news_entry, company_name=company_name, company_ticker=company_ticker, location=location) |
|
analyzed_news_name = list(executor.map(analyze_news_entry_func, news_entries_name)) |
|
|
|
return analyzed_news_name |
|
|
|
def news_detailed(article_text, max_length=250): |
|
"""Generate detailed news summary using BART model.""" |
|
inputs = bart_tokenizer([article_text], max_length=max_length, truncation=True, return_tensors="pt") |
|
summary_ids = bart_model.generate(inputs["input_ids"], num_beams=4, max_length=max_length, length_penalty=2.0, early_stopping=True) |
|
detailed_summary = bart_tokenizer.decode(summary_ids[0], skip_special_tokens=True) |
|
return detailed_summary |
|
|
|
def calculate_similarity(company_name, company_ticker, title, threshold=0.4): |
|
"""Calculate sentence similarity.""" |
|
company_name_prefix = f"News Regarding {company_name}" |
|
|
|
embeddings_company_name = sentence_model.encode([company_name_prefix], convert_to_tensor=True) |
|
embeddings_title = sentence_model.encode([title], convert_to_tensor=True) |
|
|
|
similarity_score_company_name = util.pytorch_cos_sim(embeddings_company_name, embeddings_title).item() |
|
|
|
weighted_similarity_score = similarity_score_company_name |
|
|
|
return weighted_similarity_score |
|
|
|
def analyze_sentiment(title): |
|
print("Analyzing sentiment...") |
|
|
|
result = sentiment_analysis(title) |
|
|
|
labels = result[0]['label'] |
|
scores = result[0]['score'] |
|
print("Sentiment analyzed successfully.") |
|
return labels, scores |
|
|
|
def find_similar_news_titles(above_threshold_news, similarity_threshold=0.9): |
|
similar_news = [] |
|
titles = [news['title'] for news in above_threshold_news] |
|
embeddings = sentence_model.encode(titles, convert_to_tensor=True) |
|
cosine_scores = util.pytorch_cos_sim(embeddings, embeddings) |
|
|
|
for i in range(len(titles)): |
|
for j in range(i + 1, len(titles)): |
|
if cosine_scores[i][j] >= similarity_threshold: |
|
similar_news.append({ |
|
"title_1": titles[i], |
|
"title_2": titles[j], |
|
"similarity_score": cosine_scores[i][j].item() |
|
}) |
|
return similar_news |
|
|
|
def fetch_news(company_name, company_ticker, event_name, start_date, end_date, location, num_news, include_domains, exclude_domains): |
|
analyzed_news_name = fetch_and_analyze_news(company_name, company_ticker, event_name, start_date, end_date, location, num_news, include_domains, exclude_domains) |
|
|
|
above_threshold_news = [news for news in analyzed_news_name if news is not None and news['similarity_score'] >= 0.3] |
|
below_threshold_news = [news for news in analyzed_news_name if news is not None and news['similarity_score'] < 0.3] |
|
|
|
similar_news = find_similar_news_titles(above_threshold_news, similarity_threshold=0.9) |
|
|
|
above_threshold_df = pd.DataFrame(above_threshold_news) |
|
below_threshold_df = pd.DataFrame(below_threshold_news) |
|
similar_news_df = pd.DataFrame(similar_news) |
|
|
|
file_name = f"{company_name}_News_Data.xlsx" |
|
file_path = os.path.join("/tmp", file_name) |
|
|
|
with pd.ExcelWriter(file_path) as writer: |
|
above_threshold_df.to_excel(writer, sheet_name='Above_Threshold', index=False) |
|
below_threshold_df.to_excel(writer, sheet_name='Below_Threshold', index=False) |
|
similar_news_df.to_excel(writer, sheet_name='Similar_News', index=False) |
|
|
|
|
|
return file_path |
|
|
|
|
|
inputs = [ |
|
gr.Textbox(label="Company Name"), |
|
gr.Textbox(label="Company Ticker"), |
|
gr.Textbox(label="Event Name"), |
|
gr.Textbox(label="Start Date (optional)"), |
|
gr.Textbox(label="End Date (optional)"), |
|
gr.Textbox(label="Location (optional)"), |
|
gr.Number(label="Number of News to Fetch"), |
|
gr.Textbox(label="Include Domains (comma-separated)", placeholder="e.g., example.com,example.org"), |
|
gr.Textbox(label="Exclude Domains (comma-separated)", placeholder="e.g., example.net,example.info") |
|
] |
|
|
|
outputs = gr.Markdown(label="Output Message") |
|
|
|
interface = gr.Interface( |
|
fn=fetch_news, |
|
inputs=inputs, |
|
outputs=outputs, |
|
title="Company News Analyzer", |
|
description="Fetch and analyze news articles based on company and event details." |
|
) |
|
|
|
|
|
app = Flask(__name__) |
|
|
|
|
|
@app.route('/download/<file_name>') |
|
def download(file_name): |
|
file_path = os.path.join("/tmp", file_name) |
|
return send_file(file_path, as_attachment=True) |
|
|
|
|
|
@app.route('/') |
|
def serve_interface(): |
|
return interface.launch() |
|
|
|
if __name__ == "__main__": |
|
app.run(debug=True) |
|
|