import gradio as gr import requests import json import os from datetime import datetime, timedelta from huggingface_hub import InferenceClient from bs4 import BeautifulSoup import concurrent.futures import time import re MAX_COUNTRY_RESULTS = 100 # 국가별 최대 결과 수 MAX_GLOBAL_RESULTS = 1000 # 전세계 최대 결과 수 def create_article_components(max_results): article_components = [] for i in range(max_results): with gr.Group(visible=False) as article_group: title = gr.Markdown() image = gr.Image(width=200, height=150) snippet = gr.Markdown() info = gr.Markdown() article_components.append({ 'group': article_group, 'title': title, 'image': image, 'snippet': snippet, 'info': info, 'index': i, }) return article_components API_KEY = os.getenv("SERPHOUSE_API_KEY") hf_client = InferenceClient("CohereForAI/c4ai-command-r-plus-08-2024", token=os.getenv("HF_TOKEN")) # 국가별 언어 코드 매핑 COUNTRY_LANGUAGES = { "United States": "en", "United Kingdom": "en", "Taiwan": "zh-TW", # 대만어(번체 중국어) "Canada": "en", "Australia": "en", "Germany": "de", "France": "fr", "Japan": "ja", # "South Korea": "ko", "China": "zh", "India": "hi", "Brazil": "pt", "Mexico": "es", "Russia": "ru", "Italy": "it", "Spain": "es", "Netherlands": "nl", "Singapore": "en", "Hong Kong": "zh-HK", "Indonesia": "id", "Malaysia": "ms", "Philippines": "tl", "Thailand": "th", "Vietnam": "vi", "Belgium": "nl", "Denmark": "da", "Finland": "fi", "Ireland": "en", "Norway": "no", "Poland": "pl", "Sweden": "sv", "Switzerland": "de", "Austria": "de", "Czech Republic": "cs", "Greece": "el", "Hungary": "hu", "Portugal": "pt", "Romania": "ro", "Turkey": "tr", "Israel": "he", "Saudi Arabia": "ar", "United Arab Emirates": "ar", "South Africa": "en", "Argentina": "es", "Chile": "es", "Colombia": "es", "Peru": "es", "Venezuela": "es", "New Zealand": "en", "Bangladesh": "bn", "Pakistan": "ur", "Egypt": "ar", "Morocco": "ar", "Nigeria": "en", "Kenya": "sw", "Ukraine": "uk", "Croatia": "hr", "Slovakia": "sk", "Bulgaria": "bg", "Serbia": "sr", "Estonia": "et", "Latvia": "lv", "Lithuania": "lt", "Slovenia": "sl", "Luxembourg": "fr", "Malta": "mt", "Cyprus": "el", "Iceland": "is" } COUNTRY_LOCATIONS = { "United States": "United States", "United Kingdom": "United Kingdom", "Taiwan": "Taiwan", # 국가명 사용 "Canada": "Canada", "Australia": "Australia", "Germany": "Germany", "France": "France", "Japan": "Japan", # "South Korea": "South Korea", "China": "China", "India": "India", "Brazil": "Brazil", "Mexico": "Mexico", "Russia": "Russia", "Italy": "Italy", "Spain": "Spain", "Netherlands": "Netherlands", "Singapore": "Singapore", "Hong Kong": "Hong Kong", "Indonesia": "Indonesia", "Malaysia": "Malaysia", "Philippines": "Philippines", "Thailand": "Thailand", "Vietnam": "Vietnam", "Belgium": "Belgium", "Denmark": "Denmark", "Finland": "Finland", "Ireland": "Ireland", "Norway": "Norway", "Poland": "Poland", "Sweden": "Sweden", "Switzerland": "Switzerland", "Austria": "Austria", "Czech Republic": "Czech Republic", "Greece": "Greece", "Hungary": "Hungary", "Portugal": "Portugal", "Romania": "Romania", "Turkey": "Turkey", "Israel": "Israel", "Saudi Arabia": "Saudi Arabia", "United Arab Emirates": "United Arab Emirates", "South Africa": "South Africa", "Argentina": "Argentina", "Chile": "Chile", "Colombia": "Colombia", "Peru": "Peru", "Venezuela": "Venezuela", "New Zealand": "New Zealand", "Bangladesh": "Bangladesh", "Pakistan": "Pakistan", "Egypt": "Egypt", "Morocco": "Morocco", "Nigeria": "Nigeria", "Kenya": "Kenya", "Ukraine": "Ukraine", "Croatia": "Croatia", "Slovakia": "Slovakia", "Bulgaria": "Bulgaria", "Serbia": "Serbia", "Estonia": "Estonia", "Latvia": "Latvia", "Lithuania": "Lithuania", "Slovenia": "Slovenia", "Luxembourg": "Luxembourg", "Malta": "Malta", "Cyprus": "Cyprus", "Iceland": "Iceland" } MAJOR_COUNTRIES = list(COUNTRY_LOCATIONS.keys()) def translate_query(query, country): try: # 영어 입력 확인 if is_english(query): print(f"영어 검색어 감지 - 원본 사용: {query}") return query # 선택된 국가가 번역 지원 국가인 경우 if country in COUNTRY_LANGUAGES: # South Korea 선택시 한글 입력은 그대로 사용 if country == "South Korea": print(f"한국 선택 - 원본 사용: {query}") return query target_lang = COUNTRY_LANGUAGES[country] print(f"번역 시도: {query} -> {country}({target_lang})") url = f"https://translate.googleapis.com/translate_a/single" params = { "client": "gtx", "sl": "auto", "tl": target_lang, "dt": "t", "q": query } response = requests.get(url, params=params) translated_text = response.json()[0][0][0] print(f"번역 완료: {query} -> {translated_text} ({country})") return translated_text return query except Exception as e: print(f"번역 오류: {str(e)}") return query def translate_to_korean(text): try: url = "https://translate.googleapis.com/translate_a/single" params = { "client": "gtx", "sl": "auto", "tl": "ko", "dt": "t", "q": text } response = requests.get(url, params=params) translated_text = response.json()[0][0][0] return translated_text except Exception as e: print(f"한글 번역 오류: {str(e)}") return text def is_english(text): return all(ord(char) < 128 for char in text.replace(' ', '').replace('-', '').replace('_', '')) def is_korean(text): return any('\uAC00' <= char <= '\uD7A3' for char in text) def search_serphouse(query, country, page=1, num_result=10): url = "https://api.serphouse.com/serp/live" now = datetime.utcnow() yesterday = now - timedelta(days=1) date_range = f"{yesterday.strftime('%Y-%m-%d')},{now.strftime('%Y-%m-%d')}" translated_query = translate_query(query, country) print(f"Original query: {query}") print(f"Translated query: {translated_query}") payload = { "data": { "q": translated_query, "domain": "google.com", "loc": COUNTRY_LOCATIONS.get(country, "United States"), "lang": COUNTRY_LANGUAGES.get(country, "en"), "device": "desktop", "serp_type": "news", "page": "1", "num": "10", "date_range": date_range, "sort_by": "date" } } headers = { "accept": "application/json", "content-type": "application/json", "authorization": f"Bearer {API_KEY}" } try: response = requests.post(url, json=payload, headers=headers) print("Request payload:", json.dumps(payload, indent=2, ensure_ascii=False)) print("Response status:", response.status_code) response.raise_for_status() return {"results": response.json(), "translated_query": translated_query} except requests.RequestException as e: return {"error": f"Error: {str(e)}", "translated_query": query} def format_results_from_raw(response_data): if "error" in response_data: return "Error: " + response_data["error"], [] try: results = response_data["results"] translated_query = response_data["translated_query"] news_results = results.get('results', {}).get('results', {}).get('news', []) if not news_results: return "검색 결과가 없습니다.", [] articles = [] for idx, result in enumerate(news_results, 1): articles.append({ "index": idx, "title": result.get("title", "제목 없음"), "link": result.get("url", result.get("link", "#")), "snippet": result.get("snippet", "내용 없음"), "channel": result.get("channel", result.get("source", "알 수 없음")), "time": result.get("time", result.get("date", "알 수 없는 시간")), "image_url": result.get("img", result.get("thumbnail", "")), "translated_query": translated_query }) return "", articles except Exception as e: return f"결과 처리 중 오류 발생: {str(e)}", [] def serphouse_search(query, country): response_data = search_serphouse(query, country) return format_results_from_raw(response_data) # Hacker News API 관련 함수들 먼저 추가 def get_hn_item(item_id): """개별 아이템 정보 가져오기""" try: response = requests.get(f"https://hacker-news.firebaseio.com/v0/item/{item_id}.json") return response.json() except: return None def get_recent_stories(): """최신 스토리 가져오기""" try: response = requests.get("https://hacker-news.firebaseio.com/v0/newstories.json") story_ids = response.json() recent_stories = [] current_time = datetime.now().timestamp() day_ago = current_time - (24 * 60 * 60) for story_id in story_ids: story = get_hn_item(story_id) if story and 'time' in story and story['time'] > day_ago: recent_stories.append(story) if len(recent_stories) >= 100: break return recent_stories except Exception as e: print(f"Error fetching HN stories: {str(e)}") return [] def format_hn_time(timestamp): """Unix timestamp를 읽기 쉬운 형식으로 변환""" try: dt = datetime.fromtimestamp(timestamp) return dt.strftime("%Y-%m-%d %H:%M:%S") except: return "Unknown time" def clean_text(text): """HTML 태그 제거 및 텍스트 정리""" text = re.sub(r'\s+', ' ', text) text = re.sub(r'<[^>]+>', '', text) return text.strip() def get_article_content(url): """URL에서 기사 내용 스크래핑""" if not url or 'github.com' in url or 'twitter.com' in url: return None try: headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'} response = requests.get(url, headers=headers, timeout=10) soup = BeautifulSoup(response.text, 'html.parser') # 불필요한 요소 제거 for tag in soup(['script', 'style', 'nav', 'footer', 'header']): tag.decompose() paragraphs = soup.find_all('p') text = ' '.join(p.get_text() for p in paragraphs) text = clean_text(text) return text[:4000] # 텍스트 길이 제한 except Exception as e: print(f"Scraping error for {url}: {str(e)}") return None def generate_summary(text): """CohereForAI 모델을 사용한 요약 생성""" if not text: return None prompt = """반드시 한글(한국어)로 작성하라. Please analyze and summarize the following text in 2-3 sentences. Focus on the main points and key information: Text: {text} Summary:""" try: response = hf_client.text_generation( prompt.format(text=text), max_new_tokens=500, temperature=0.5, repetition_penalty=1.2 ) return response except Exception as e: print(f"Summary generation error: {str(e)}") return None def process_hn_story(story, progress=None): """개별 스토리 처리 및 요약""" try: url = story.get('url') if not url: return story, None content = get_article_content(url) if not content: return story, None summary_en = generate_summary(content) if not summary_en: return story, None summary_ko = translate_to_korean(summary_en) return story, summary_ko except Exception as e: print(f"Story processing error: {str(e)}") return story, None def refresh_hn_stories(): """Hacker News 스토리 새로고침 (실시간 출력 버전)""" status_msg = "Hacker News 포스트를 가져오는 중..." outputs = [gr.update(value=status_msg, visible=True)] # 컴포넌트 초기화 for comp in hn_article_components: outputs.extend([ gr.update(visible=False), gr.update(), gr.update() ]) yield outputs # 최신 스토리 가져오기 stories = get_recent_stories() processed_count = 0 # 실시간 처리 및 출력을 위한 리스트 processed_stories = [] with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: future_to_story = {executor.submit(process_hn_story, story): story for story in stories[:100]} for future in concurrent.futures.as_completed(future_to_story): story, summary = future.result() processed_count += 1 if summary: # 새로운 결과를 리스트 맨 앞에 추가 processed_stories.insert(0, (story, summary)) # 현재까지의 결과 출력 outputs = [gr.update(value=f"처리 중... ({processed_count}/{len(stories)})", visible=True)] # 모든 컴포넌트 업데이트 for idx, comp in enumerate(hn_article_components): if idx < len(processed_stories): current_story, current_summary = processed_stories[idx] outputs.extend([ gr.update(visible=True), gr.update(value=f"### [{current_story.get('title', 'Untitled')}]({current_story.get('url', '#')})"), gr.update(value=f""" **작성자:** {current_story.get('by', 'unknown')} | **시간:** {format_hn_time(current_story.get('time', 0))} | **점수:** {current_story.get('score', 0)} | **댓글:** {len(current_story.get('kids', []))}개\n **AI 요약:** {current_summary} """) ]) else: outputs.extend([ gr.update(visible=False), gr.update(), gr.update() ]) yield outputs # 최종 상태 업데이트 final_outputs = [gr.update(value=f"총 {len(processed_stories)}개의 포스트가 처리되었습니다.", visible=True)] for idx, comp in enumerate(hn_article_components): if idx < len(processed_stories): story, summary = processed_stories[idx] final_outputs.extend([ gr.update(visible=True), gr.update(value=f"### [{story.get('title', 'Untitled')}]({story.get('url', '#')})"), gr.update(value=f""" **작성자:** {story.get('by', 'unknown')} | **시간:** {format_hn_time(story.get('time', 0))} | **점수:** {story.get('score', 0)} | **댓글:** {len(story.get('kids', []))}개\n **AI 요약:** {summary} """) ]) else: final_outputs.extend([ gr.update(visible=False), gr.update(), gr.update() ]) yield final_outputs css = """ footer {visibility: hidden;} #status_area { background: rgba(255, 255, 255, 0.9); /* 약간 투명한 흰색 배경 */ padding: 15px; border-bottom: 1px solid #ddd; margin-bottom: 20px; box-shadow: 0 2px 5px rgba(0,0,0,0.1); /* 부드러운 그림자 효과 */ } #results_area { padding: 10px; margin-top: 10px; } /* 탭 스타일 개선 */ .tabs { border-bottom: 2px solid #ddd !important; margin-bottom: 20px !important; } .tab-nav { border-bottom: none !important; margin-bottom: 0 !important; } .tab-nav button { font-weight: bold !important; padding: 10px 20px !important; } .tab-nav button.selected { border-bottom: 2px solid #1f77b4 !important; /* 선택된 탭 강조 */ color: #1f77b4 !important; } /* 검색 상태 메시지 스타일 */ #status_area .markdown-text { font-size: 1.1em; color: #2c3e50; padding: 10px 0; } /* 검색 결과 컨테이너 스타일 */ .group { border: 1px solid #eee; padding: 15px; margin-bottom: 15px; border-radius: 5px; background: white; } /* 검색 버튼 스타일 */ .primary-btn { background: #1f77b4 !important; border: none !important; } /* 검색어 입력창 스타일 */ .textbox { border: 1px solid #ddd !important; border-radius: 4px !important; } """ with gr.Blocks(theme="Nymbo/Nymbo_Theme", css=css, title="NewsAI 서비스") as iface: with gr.Tabs(): # 국가별 탭 with gr.Tab("국가별"): gr.Markdown("검색어를 입력하고 원하는 국가(한국 제외)를를 선택하면, 검색어와 일치하는 24시간 이내 뉴스를 최대 100개 출력합니다.") gr.Markdown("국가 선택후 검색어에 '한글'을 입력하면 현지 언어로 번역되어 검색합니다. 예: 'Taiwan' 국가 선택후 '삼성' 입력시 '三星'으로 자동 검색") with gr.Column(): with gr.Row(): query = gr.Textbox(label="검색어") country = gr.Dropdown(MAJOR_COUNTRIES, label="국가", value="United States") status_message = gr.Markdown("", visible=True) translated_query_display = gr.Markdown(visible=False) search_button = gr.Button("검색", variant="primary") progress = gr.Progress() articles_state = gr.State([]) article_components = [] for i in range(100): with gr.Group(visible=False) as article_group: title = gr.Markdown() image = gr.Image(width=200, height=150) snippet = gr.Markdown() info = gr.Markdown() article_components.append({ 'group': article_group, 'title': title, 'image': image, 'snippet': snippet, 'info': info, 'index': i, }) # 전세계 탭 with gr.Tab("전세계"): gr.Markdown("검색어를 입력하면 67개국(한국 제외) 전체에 대해 국가별로 구분하여 24시간 이내 뉴스가 최대 1000개 순차 출력됩니다.") gr.Markdown("국가 선택후 검색어에 '한글'을 입력하면 현지 언어로 번역되어 검색합니다. 예: 'Taiwan' 국가 선택후 '삼성' 입력시 '三星'으로 자동 검색") with gr.Column(): with gr.Column(elem_id="status_area"): with gr.Row(): query_global = gr.Textbox(label="검색어") search_button_global = gr.Button("전세계 검색", variant="primary") status_message_global = gr.Markdown("") translated_query_display_global = gr.Markdown("") with gr.Column(elem_id="results_area"): articles_state_global = gr.State([]) global_article_components = [] for i in range(1000): with gr.Group(visible=False) as article_group: title = gr.Markdown() image = gr.Image(width=200, height=150) snippet = gr.Markdown() info = gr.Markdown() global_article_components.append({ 'group': article_group, 'title': title, 'image': image, 'snippet': snippet, 'info': info, 'index': i, }) # AI 리포터 탭 with gr.Tab("AI 리포터"): gr.Markdown("지난 24시간 동안의 Hacker News 포스트를 AI가 요약하여 보여줍니다.") with gr.Column(): refresh_button = gr.Button("새로고침", variant="primary") status_message_hn = gr.Markdown("") with gr.Column(elem_id="hn_results_area"): hn_articles_state = gr.State([]) hn_article_components = [] for i in range(100): # 상위 20개 포스트만 처리 with gr.Group(visible=False) as article_group: title = gr.Markdown() info = gr.Markdown() hn_article_components.append({ 'group': article_group, 'title': title, 'info': info, 'index': i, }) # 기존 함수들 def search_and_display(query, country, articles_state, progress=gr.Progress()): status_msg = "검색을 진행중입니다. 잠시만 기다리세요..." progress(0, desc="검색어 번역 중...") translated_query = translate_query(query, country) translated_display = f"**원본 검색어:** {query}\n**번역된 검색어:** {translated_query}" if translated_query != query else f"**검색어:** {query}" progress(0.2, desc="검색 시작...") error_message, articles = serphouse_search(query, country) progress(0.5, desc="결과 처리 중...") outputs = [] outputs.append(gr.update(value=status_msg, visible=True)) outputs.append(gr.update(value=translated_display, visible=True)) if error_message: outputs.append(gr.update(value=error_message, visible=True)) for comp in article_components: outputs.extend([ gr.update(visible=False), gr.update(), gr.update(), gr.update(), gr.update() ]) articles_state = [] else: outputs.append(gr.update(value="", visible=False)) total_articles = len(articles) for idx, comp in enumerate(article_components): progress((idx + 1) / total_articles, desc=f"결과 표시 중... {idx + 1}/{total_articles}") if idx < len(articles): article = articles[idx] image_url = article['image_url'] image_update = gr.update(value=image_url, visible=True) if image_url and not image_url.startswith('data:image') else gr.update(value=None, visible=False) korean_summary = translate_to_korean(article['snippet']) outputs.extend([ gr.update(visible=True), gr.update(value=f"### [{article['title']}]({article['link']})"), image_update, gr.update(value=f"**요약:** {article['snippet']}\n\n**한글 요약:** {korean_summary}"), gr.update(value=f"**출처:** {article['channel']} | **시간:** {article['time']}") ]) else: outputs.extend([ gr.update(visible=False), gr.update(), gr.update(), gr.update(), gr.update() ]) articles_state = articles progress(1.0, desc="완료!") outputs.append(articles_state) outputs[0] = gr.update(value="", visible=False) return outputs def search_global(query, articles_state_global): status_msg = "전세계 검색을 시작합니다..." all_results = [] outputs = [ gr.update(value=status_msg, visible=True), gr.update(value=f"**검색어:** {query}", visible=True), ] for _ in global_article_components: outputs.extend([ gr.update(visible=False), gr.update(), gr.update(), gr.update(), gr.update() ]) outputs.append([]) yield outputs total_countries = len(COUNTRY_LOCATIONS) for idx, (country, location) in enumerate(COUNTRY_LOCATIONS.items(), 1): try: status_msg = f"{country} 검색 중... ({idx}/{total_countries} 국가)" outputs[0] = gr.update(value=status_msg, visible=True) yield outputs error_message, articles = serphouse_search(query, country) if not error_message and articles: for article in articles: article['source_country'] = country all_results.extend(articles) sorted_results = sorted(all_results, key=lambda x: x.get('time', ''), reverse=True) seen_urls = set() unique_results = [] for article in sorted_results: url = article.get('link', '') if url not in seen_urls: seen_urls.add(url) unique_results.append(article) unique_results = unique_results[:1000] outputs = [ gr.update(value=f"{idx}/{total_countries} 국가 검색 완료\n현재까지 발견된 뉴스: {len(unique_results)}건", visible=True), gr.update(value=f"**검색어:** {query}", visible=True), ] for idx, comp in enumerate(global_article_components): if idx < len(unique_results): article = unique_results[idx] image_url = article.get('image_url', '') image_update = gr.update(value=image_url, visible=True) if image_url and not image_url.startswith('data:image') else gr.update(value=None, visible=False) korean_summary = translate_to_korean(article['snippet']) outputs.extend([ gr.update(visible=True), gr.update(value=f"### [{article['title']}]({article['link']})"), image_update, gr.update(value=f"**요약:** {article['snippet']}\n\n**한글 요약:** {korean_summary}"), gr.update(value=f"**출처:** {article['channel']} | **국가:** {article['source_country']} | **시간:** {article['time']}") ]) else: outputs.extend([ gr.update(visible=False), gr.update(), gr.update(), gr.update(), gr.update() ]) outputs.append(unique_results) yield outputs except Exception as e: print(f"Error searching {country}: {str(e)}") continue final_status = f"검색 완료! 총 {len(unique_results)}개의 뉴스가 발견되었습니다." outputs[0] = gr.update(value=final_status, visible=True) yield outputs # 국가별 탭 이벤트 연결 search_outputs = [ status_message, translated_query_display, gr.Markdown(visible=False) ] for comp in article_components: search_outputs.extend([ comp['group'], comp['title'], comp['image'], comp['snippet'], comp['info'] ]) search_outputs.append(articles_state) search_button.click( search_and_display, inputs=[query, country, articles_state], outputs=search_outputs, show_progress=True ) # 전세계 탭 이벤트 연결 global_search_outputs = [ status_message_global, translated_query_display_global, ] for comp in global_article_components: global_search_outputs.extend([ comp['group'], comp['title'], comp['image'], comp['snippet'], comp['info'] ]) global_search_outputs.append(articles_state_global) search_button_global.click( search_global, inputs=[query_global, articles_state_global], outputs=global_search_outputs ) # AI 리포터 탭 이벤트 연결 hn_outputs = [status_message_hn] for comp in hn_article_components: hn_outputs.extend([ comp['group'], comp['title'], comp['info'] ]) refresh_button.click( refresh_hn_stories, outputs=hn_outputs ) iface.launch( server_name="0.0.0.0", server_port=7860, share=False, # 외부 공유 비활성화 auth=("it1","chosun1"), ssl_verify=False, # SSL 검증 비활성화 (필요한 경우) show_error=True # 오류 메시지 표시 )