File size: 5,042 Bytes
3c80b17
 
 
 
 
 
 
 
a896054
3c80b17
2f93530
3c80b17
 
 
a896054
8e1d1f4
3f02500
1bed9fc
3f02500
a896054
 
 
1bed9fc
a896054
cdd5157
a896054
 
 
 
3c80b17
8e1d1f4
 
a896054
3c80b17
 
 
 
 
 
 
 
 
 
 
8e1d1f4
3c80b17
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8e1d1f4
3c80b17
 
8e1d1f4
3c80b17
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a603be1
3c80b17
 
a603be1
 
3c80b17
 
 
 
 
 
 
 
 
 
 
ad5fc7b
a896054
 
ad5fc7b
4b7c1e7
3c80b17
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import gradio as gr
import os
import json
from pegasus import Pegasus
import sys
from io import StringIO
import threading
import time
import re

def pegasus_web_crawler(input_url, output_dir, exclude_selectors, include_domain, exclude_keywords, output_extension, dust_size, max_depth):
    error_message = ""
    pegasus_output = ""

    domain_pattern = r'^(?:https?:\/\/)?(?:[^@\n]+@)?(?:www\.)?([^:\/\n]+)'
    # URLからドメインを抽出
    matched = re.match(domain_pattern, input_url)
    if matched:
        domain = matched.group(1)
        # ドメインのドットを '_' に置換
        replaced_domain = domain.replace('.', '_')
        # パスのスラッシュを '_' に置換
        replaced_path = re.sub(r'/', '_', input_url[matched.end():])
        # 置換後のドメインとパスを結合
        replaced_url = "combined_" + replaced_domain + replaced_path
    else:
        replaced_url = "combined_output"
    combined_output_filename = replaced_url + ".txt"

    try:
        output_subdir = os.path.join(output_dir, include_domain)
        os.makedirs(output_subdir, exist_ok=True)
        combined_output_path = os.path.join(output_dir, combined_output_filename)
        with open(combined_output_path, "w") as file:
            file.write("")

        # 標準出力をStringIOにリダイレクト
        stdout_backup = sys.stdout
        sys.stdout = StringIO()

        exclude_selectors = exclude_selectors.split(",") if exclude_selectors else []
        exclude_keywords = exclude_keywords.split(",") if exclude_keywords else []

        pegasus = Pegasus(
            output_dir=output_subdir,
            exclude_selectors=exclude_selectors,
            include_domain=include_domain,
            exclude_keywords=exclude_keywords,
            output_extension=output_extension,
            dust_size=dust_size,
            max_depth=max_depth,
        )

        def run_pegasus(url):
            pegasus.run(url)

        thread = threading.Thread(target=run_pegasus, args=(input_url,))
        thread.start()

        while thread.is_alive():
            sys.stdout.seek(0)
            pegasus_output = sys.stdout.read()
            sys.stdout.truncate(0)
            sys.stdout.seek(0)
            yield output_dir, "", combined_output_path, error_message, pegasus_output
            time.sleep(1)  # 1秒間隔で出力を更新

        thread.join()

        # StringIOから最終的な出力を取得
        sys.stdout.seek(0)
        pegasus_output = sys.stdout.read()

        # 標準出力を元に戻す
        sys.stdout = stdout_backup

        txt_files = [f for f in os.listdir(output_subdir) if f.endswith(output_extension)]
        combined_text = ""
        for f in txt_files:
            with open(os.path.join(output_subdir, f), "r") as file:
                combined_text += file.read()

        with open(combined_output_path, "w") as file:
            file.write(combined_text)

        yield output_dir, combined_text, combined_output_path, error_message, pegasus_output

    except Exception as e:
        error_message = str(e)
        yield None, None, None, error_message, pegasus_output

# Define Gradio interface
interface = gr.Interface(
    fn=pegasus_web_crawler,
    inputs=[
        gr.Textbox(label="Input URL", placeholder="https://example.com"),
        gr.Textbox(label="Output Directory", value="output_directory"),
        gr.Textbox(label="Exclude Selectors (comma-separated)", value="header,footer,nav"),
        gr.Textbox(label="Include Domain", placeholder="example.com"),
        gr.Textbox(label="Exclude Keywords (comma-separated)", value="login,signup"),
        gr.Textbox(label="Output Extension", value=".txt"),
        gr.Number(label="Dust Size", value=500),
        gr.Number(label="Max Depth", value=1),
    ],
    outputs=[
        gr.FileExplorer(label="Output Directory", root_dir="./output_directory", interactive=True),
        gr.Textbox(label="Combined Text", show_copy_button=True),
        gr.File(label="Combined Output File"),
        gr.Textbox(label="Error Message"),
        gr.Textbox(label="Pegasus Output", lines=10)
    ],
    examples=[
        ["https://docs.dify.ai/features/workflow/node/code", "output_directory", "header,footer,nav", "docs.dify.ai", "login", ".txt", 500, 1],
        ["https://www.gradio.app/docs", "output_directory", "header,footer,nav", "gradio.app", "login", ".txt", 500, 1],
    ],
    cache_examples=False,
    title="Pegasus Web Crawler",
    description="A web crawler tool based on Pegasus library.",
    article="This tool allows you to crawl a website using the Pegasus library. You can specify the starting URL, output directory, exclude selectors, include domain, exclude keywords, output extension, dust size, and maximum depth. The tool will crawl the website based on the provided parameters and generate text files for each page. It also combines all the text files into a single output file."
)

# Launch the interface with queue-based output
interface.queue().launch()