import gradio as gr import os import json from pegasus import Pegasus import sys from io import StringIO import threading import time import re def pegasus_web_crawler(input_url, output_dir, exclude_selectors, include_domain, exclude_keywords, output_extension, dust_size, max_depth): error_message = "" pegasus_output = "" domain_pattern = r'^(?:https?:\/\/)?(?:[^@\n]+@)?(?:www\.)?([^:\/\n]+)' # URLからドメインを抽出 matched = re.match(domain_pattern, input_url) if matched: domain = matched.group(1) # ドメインのドットを '_' に置換 replaced_domain = domain.replace('.', '_') # パスのスラッシュを '_' に置換 replaced_path = re.sub(r'/', '_', input_url[matched.end():]) # 置換後のドメインとパスを結合 replaced_url = "combined_" + replaced_domain + replaced_path else: replaced_url = "combined_output" combined_output_filename = replaced_url + ".txt" try: output_subdir = os.path.join(output_dir, include_domain) os.makedirs(output_subdir, exist_ok=True) combined_output_path = os.path.join(output_dir, combined_output_filename) with open(combined_output_path, "w") as file: file.write("") # 標準出力をStringIOにリダイレクト stdout_backup = sys.stdout sys.stdout = StringIO() exclude_selectors = exclude_selectors.split(",") if exclude_selectors else [] exclude_keywords = exclude_keywords.split(",") if exclude_keywords else [] pegasus = Pegasus( output_dir=output_subdir, exclude_selectors=exclude_selectors, include_domain=include_domain, exclude_keywords=exclude_keywords, output_extension=output_extension, dust_size=dust_size, max_depth=max_depth, ) def run_pegasus(url): pegasus.run(url) thread = threading.Thread(target=run_pegasus, args=(input_url,)) thread.start() while thread.is_alive(): sys.stdout.seek(0) pegasus_output = sys.stdout.read() sys.stdout.truncate(0) sys.stdout.seek(0) yield output_dir, "", combined_output_path, error_message, pegasus_output time.sleep(1) # 1秒間隔で出力を更新 thread.join() # StringIOから最終的な出力を取得 sys.stdout.seek(0) pegasus_output = sys.stdout.read() # 標準出力を元に戻す sys.stdout = stdout_backup txt_files = [f for f in os.listdir(output_subdir) if f.endswith(output_extension)] combined_text = "" for f in txt_files: with open(os.path.join(output_subdir, f), "r") as file: combined_text += file.read() with open(combined_output_path, "w") as file: file.write(combined_text) yield output_dir, combined_text, combined_output_path, error_message, pegasus_output except Exception as e: error_message = str(e) yield None, None, None, error_message, pegasus_output # Define Gradio interface interface = gr.Interface( fn=pegasus_web_crawler, inputs=[ gr.Textbox(label="Input URL", placeholder="https://example.com"), gr.Textbox(label="Output Directory", value="output_directory"), gr.Textbox(label="Exclude Selectors (comma-separated)", value="header,footer,nav"), gr.Textbox(label="Include Domain", placeholder="example.com"), gr.Textbox(label="Exclude Keywords (comma-separated)", value="login,signup"), gr.Textbox(label="Output Extension", value=".txt"), gr.Number(label="Dust Size", value=500), gr.Number(label="Max Depth", value=1), ], outputs=[ gr.FileExplorer(label="Output Directory", root_dir="./output_directory", interactive=True), gr.Textbox(label="Combined Text", show_copy_button=True), gr.File(label="Combined Output File"), gr.Textbox(label="Error Message"), gr.Textbox(label="Pegasus Output", lines=10) ], examples=[ ["https://docs.dify.ai/features/workflow/node/code", "output_directory", "header,footer,nav", "docs.dify.ai", "login", ".txt", 500, 1], ["https://www.gradio.app/docs", "output_directory", "header,footer,nav", "gradio.app", "login", ".txt", 500, 1], ], cache_examples=False, title="Pegasus Web Crawler", description="A web crawler tool based on Pegasus library.", article="This tool allows you to crawl a website using the Pegasus library. You can specify the starting URL, output directory, exclude selectors, include domain, exclude keywords, output extension, dust size, and maximum depth. The tool will crawl the website based on the provided parameters and generate text files for each page. It also combines all the text files into a single output file." ) # Launch the interface with queue-based output interface.queue().launch()