tregu0458's picture
Update app.py
cdd5157 verified
import gradio as gr
import os
import json
from pegasus import Pegasus
import sys
from io import StringIO
import threading
import time
import re
def pegasus_web_crawler(input_url, output_dir, exclude_selectors, include_domain, exclude_keywords, output_extension, dust_size, max_depth):
error_message = ""
pegasus_output = ""
domain_pattern = r'^(?:https?:\/\/)?(?:[^@\n]+@)?(?:www\.)?([^:\/\n]+)'
# URLからドメインを抽出
matched = re.match(domain_pattern, input_url)
if matched:
domain = matched.group(1)
# ドメインのドットを '_' に置換
replaced_domain = domain.replace('.', '_')
# パスのスラッシュを '_' に置換
replaced_path = re.sub(r'/', '_', input_url[matched.end():])
# 置換後のドメインとパスを結合
replaced_url = "combined_" + replaced_domain + replaced_path
else:
replaced_url = "combined_output"
combined_output_filename = replaced_url + ".txt"
try:
output_subdir = os.path.join(output_dir, include_domain)
os.makedirs(output_subdir, exist_ok=True)
combined_output_path = os.path.join(output_dir, combined_output_filename)
with open(combined_output_path, "w") as file:
file.write("")
# 標準出力をStringIOにリダイレクト
stdout_backup = sys.stdout
sys.stdout = StringIO()
exclude_selectors = exclude_selectors.split(",") if exclude_selectors else []
exclude_keywords = exclude_keywords.split(",") if exclude_keywords else []
pegasus = Pegasus(
output_dir=output_subdir,
exclude_selectors=exclude_selectors,
include_domain=include_domain,
exclude_keywords=exclude_keywords,
output_extension=output_extension,
dust_size=dust_size,
max_depth=max_depth,
)
def run_pegasus(url):
pegasus.run(url)
thread = threading.Thread(target=run_pegasus, args=(input_url,))
thread.start()
while thread.is_alive():
sys.stdout.seek(0)
pegasus_output = sys.stdout.read()
sys.stdout.truncate(0)
sys.stdout.seek(0)
yield output_dir, "", combined_output_path, error_message, pegasus_output
time.sleep(1) # 1秒間隔で出力を更新
thread.join()
# StringIOから最終的な出力を取得
sys.stdout.seek(0)
pegasus_output = sys.stdout.read()
# 標準出力を元に戻す
sys.stdout = stdout_backup
txt_files = [f for f in os.listdir(output_subdir) if f.endswith(output_extension)]
combined_text = ""
for f in txt_files:
with open(os.path.join(output_subdir, f), "r") as file:
combined_text += file.read()
with open(combined_output_path, "w") as file:
file.write(combined_text)
yield output_dir, combined_text, combined_output_path, error_message, pegasus_output
except Exception as e:
error_message = str(e)
yield None, None, None, error_message, pegasus_output
# Define Gradio interface
interface = gr.Interface(
fn=pegasus_web_crawler,
inputs=[
gr.Textbox(label="Input URL", placeholder="https://example.com"),
gr.Textbox(label="Output Directory", value="output_directory"),
gr.Textbox(label="Exclude Selectors (comma-separated)", value="header,footer,nav"),
gr.Textbox(label="Include Domain", placeholder="example.com"),
gr.Textbox(label="Exclude Keywords (comma-separated)", value="login,signup"),
gr.Textbox(label="Output Extension", value=".txt"),
gr.Number(label="Dust Size", value=500),
gr.Number(label="Max Depth", value=1),
],
outputs=[
gr.FileExplorer(label="Output Directory", root_dir="./output_directory", interactive=True),
gr.Textbox(label="Combined Text", show_copy_button=True),
gr.File(label="Combined Output File"),
gr.Textbox(label="Error Message"),
gr.Textbox(label="Pegasus Output", lines=10)
],
examples=[
["https://docs.dify.ai/features/workflow/node/code", "output_directory", "header,footer,nav", "docs.dify.ai", "login", ".txt", 500, 1],
["https://www.gradio.app/docs", "output_directory", "header,footer,nav", "gradio.app", "login", ".txt", 500, 1],
],
cache_examples=False,
title="Pegasus Web Crawler",
description="A web crawler tool based on Pegasus library.",
article="This tool allows you to crawl a website using the Pegasus library. You can specify the starting URL, output directory, exclude selectors, include domain, exclude keywords, output extension, dust size, and maximum depth. The tool will crawl the website based on the provided parameters and generate text files for each page. It also combines all the text files into a single output file."
)
# Launch the interface with queue-based output
interface.queue().launch()