Spaces:
Building
Building
import gradio as gr | |
import requests | |
import json | |
import os | |
from datetime import datetime, timedelta | |
from bs4 import BeautifulSoup # μΉ νμ΄μ§μμ ν μ€νΈλ₯Ό μΆμΆνκΈ° μν΄ μ¬μ© | |
from huggingface_hub import InferenceClient # LLM μ¬μ©μ μν΄ νμ | |
# νμν ν¨ν€μ§ μ€μΉ (νμν κ²½μ° μ£Όμμ μ κ±°νκ³ μ€ν) | |
# !pip install bs4 huggingface_hub | |
# νκ²½ λ³μμμ API ν€ κ°μ Έμ€κΈ° (API ν€λ μμ νκ² κ΄λ¦¬λμ΄μΌ ν©λλ€) | |
API_KEY = os.getenv("SERPHOUSE_API_KEY") # λ³ΈμΈμ SerpHouse API ν€λ₯Ό νκ²½ λ³μλ‘ μ€μ νμΈμ. | |
HF_TOKEN = os.getenv("HF_TOKEN") # Hugging Face API ν ν°μ νκ²½ λ³μλ‘ μ€μ νμΈμ. | |
MAJOR_COUNTRIES = [ | |
"United States", "United Kingdom", "Canada", "Australia", "Germany", | |
"France", "Japan", "South Korea", "China", "India", | |
"Brazil", "Mexico", "Russia", "Italy", "Spain", | |
"Netherlands", "Sweden", "Switzerland", "Norway", "Denmark", | |
"Finland", "Belgium", "Austria", "New Zealand", "Ireland", | |
"Singapore", "Hong Kong", "Israel", "United Arab Emirates", "Saudi Arabia", | |
"South Africa", "Turkey", "Egypt", "Poland", "Czech Republic", | |
"Hungary", "Greece", "Portugal", "Argentina", "Chile", | |
"Colombia", "Peru", "Venezuela", "Thailand", "Malaysia", | |
"Indonesia", "Philippines", "Vietnam", "Pakistan", "Bangladesh" | |
] | |
def search_serphouse(query, country, page=1, num_result=100): | |
url = "https://api.serphouse.com/serp/live" | |
now = datetime.utcnow() | |
yesterday = now - timedelta(days=1) | |
date_range = f"{yesterday.strftime('%Y-%m-%d')},{now.strftime('%Y-%m-%d')}" | |
payload = { | |
"data": { | |
"q": query, | |
"domain": "google.com", | |
"loc": country, | |
"lang": "en", | |
"device": "desktop", | |
"serp_type": "news", | |
"page": str(page), | |
"verbatim": "1", | |
"num": str(num_result), | |
"date_range": date_range | |
} | |
} | |
headers = { | |
"accept": "application/json", | |
"content-type": "application/json", | |
"authorization": f"Bearer {API_KEY}" | |
} | |
try: | |
response = requests.post(url, json=payload, headers=headers) | |
response.raise_for_status() | |
return response.json() | |
except requests.RequestException as e: | |
error_msg = f"Error: {str(e)}" | |
if response.text: | |
error_msg += f"\nResponse content: {response.text}" | |
return {"error": error_msg} | |
def format_results_from_raw(results): | |
try: | |
if isinstance(results, dict) and "error" in results: | |
return "Error: " + results["error"], [] | |
if not isinstance(results, dict): | |
raise ValueError("κ²°κ³Όκ° μ¬μ νμμ΄ μλλλ€.") | |
# 'results' ν€ λ΄λΆμ ꡬ쑰 νμΈ (μ€μ²©λ 'results' μ²λ¦¬) | |
if 'results' in results: | |
results_content = results['results'] | |
if 'results' in results_content: | |
results_content = results_content['results'] | |
# 'news' ν€ νμΈ | |
if 'news' in results_content: | |
news_results = results_content['news'] | |
else: | |
news_results = [] | |
else: | |
news_results = [] | |
else: | |
news_results = [] | |
if not news_results: | |
return "κ²μ κ²°κ³Όκ° μμ΅λλ€.", [] | |
articles = [] | |
for idx, result in enumerate(news_results, 1): | |
title = result.get("title", "μ λͺ© μμ") | |
link = result.get("url", result.get("link", "#")) | |
snippet = result.get("snippet", "λ΄μ© μμ") | |
channel = result.get("channel", result.get("source", "μ μ μμ")) | |
time = result.get("time", result.get("date", "μ μ μλ μκ°")) | |
image_url = result.get("img", result.get("thumbnail", "")) | |
articles.append({ | |
"index": idx, | |
"title": title, | |
"link": link, | |
"snippet": snippet, | |
"channel": channel, | |
"time": time, | |
"image_url": image_url | |
}) | |
return "", articles | |
except Exception as e: | |
error_message = f"κ²°κ³Ό μ²λ¦¬ μ€ μ€λ₯ λ°μ: {str(e)}" | |
return "Error: " + error_message, [] | |
def serphouse_search(query, country): | |
# νμ΄μ§μ κ²°κ³Ό μμ κΈ°λ³Έκ°μ μ€μ ν©λλ€. | |
page = 1 | |
num_result = 100 | |
results = search_serphouse(query, country, page, num_result) | |
error_message, articles = format_results_from_raw(results) | |
return error_message, articles | |
# LLM μ€μ | |
hf_client = InferenceClient("CohereForAI/c4ai-command-r-plus-08-2024", token=HF_TOKEN) | |
def summarize_article(url): | |
try: | |
# μΉ νμ΄μ§μμ ν μ€νΈ μΆμΆ | |
response = requests.get(url) | |
response.raise_for_status() | |
soup = BeautifulSoup(response.text, 'html.parser') | |
# λͺ¨λ ν μ€νΈλ₯Ό μΆμΆ (κ°λ¨ν μμ) | |
text = ' '.join([p.get_text() for p in soup.find_all('p')]) | |
if not text.strip(): | |
return "κΈ°μ¬ λ΄μ©μ κ°μ Έμ¬ μ μμ΅λλ€." | |
# μμ½ μμ± | |
prompt = f"λ€μ μμ΄ κΈ°μ¬λ₯Ό νκ΅μ΄λ‘ 3λ¬Έμ₯μΌλ‘ μμ½νμΈμ:\n{text}" | |
summary = hf_client.text_generation(prompt, max_new_tokens=500) | |
return summary | |
except Exception as e: | |
return f"μμ½ μ€ μ€λ₯ λ°μ: {str(e)}" | |
css = """ | |
footer { | |
visibility: hidden; | |
} | |
""" | |
# Gradio μΈν°νμ΄μ€ κ΅¬μ± | |
with gr.Blocks(css=css, title="NewsAI μλΉμ€") as iface: | |
gr.Markdown("κ²μμ΄λ₯Ό μ λ ₯νκ³ μνλ κ΅κ°λ₯Ό μ ννλ©΄, κ²μμ΄μ μΌμΉνλ 24μκ° μ΄λ΄ λ΄μ€λ₯Ό μ΅λ 100κ° μΆλ ₯ν©λλ€.") | |
with gr.Column(): | |
with gr.Row(): | |
query = gr.Textbox(label="κ²μμ΄") | |
country = gr.Dropdown(MAJOR_COUNTRIES, label="κ΅κ°", value="South Korea") | |
search_button = gr.Button("κ²μ") | |
output_table = gr.HTML() | |
summary_output = gr.Markdown(visible=False) | |
def search_and_display(query, country): | |
error_message, articles = serphouse_search(query, country) | |
if error_message: | |
return f"<p>{error_message}</p>", gr.update(visible=False) | |
else: | |
# κΈ°μ¬ λͺ©λ‘μ HTML ν μ΄λΈλ‘ μμ± | |
table_html = "<table border='1' style='width:100%; text-align:left;'><tr><th>λ²νΈ</th><th>μ λͺ©</th><th>μΆμ²</th><th>μκ°</th><th>λΆμ</th></tr>" | |
for article in articles: | |
# κ° κΈ°μ¬μ λν΄ λ²νΌμ ν΄λΉνλ JavaScript μ½λλ₯Ό μ½μ | |
analyze_button = f"""<button onclick="analyzeArticle('{article['link']}')">λΆμ</button>""" | |
row = f""" | |
<tr> | |
<td>{article['index']}</td> | |
<td><a href="{article['link']}" target="_blank">{article['title']}</a></td> | |
<td>{article['channel']}</td> | |
<td>{article['time']}</td> | |
<td>{analyze_button}</td> | |
</tr> | |
""" | |
table_html += row | |
table_html += "</table>" | |
# JavaScript ν¨μ μ μ | |
js_code = """ | |
<script> | |
function analyzeArticle(url) { | |
// Gradioμ handle_functionμ μ¬μ©νμ¬ Python ν¨μ νΈμΆ | |
gradioApp().querySelector('#article_url_input textarea').value = url; | |
gradioApp().querySelector('#analyze_button').click(); | |
} | |
</script> | |
""" | |
full_html = table_html + js_code | |
return full_html, gr.update(visible=True, value="") # summary_output μ΄κΈ°ν | |
def analyze_article(url): | |
summary = summarize_article(url) | |
return summary | |
article_url_input = gr.Textbox(visible=False, elem_id="article_url_input") | |
analyze_button = gr.Button("λΆμ", visible=False, elem_id="analyze_button") | |
search_button.click( | |
search_and_display, | |
inputs=[query, country], | |
outputs=[output_table, summary_output] | |
) | |
analyze_button.click( | |
analyze_article, | |
inputs=[article_url_input], | |
outputs=[summary_output] | |
) | |
iface.launch(auth=("gini", "pick")) | |