{ "nbformat": 4, "nbformat_minor": 0, "metadata": { "colab": { "provenance": [] }, "kernelspec": { "name": "python3", "display_name": "Python 3" }, "language_info": { "name": "python" } }, "cells": [ { "cell_type": "markdown", "source": [ "このスクリプトは、Pegasusライブラリを使用してウェブサイトをクロールするためのGradioインターフェースを提供します。\n", "ユーザーは、開始URL、出力ディレクトリ、除外セレクター、含めるドメイン、除外キーワード、出力拡張子、ダストサイズ、\n", "および最大深度を指定できます。このツールは、提供されたパラメータに基づいてウェブサイトをクロールし、\n", "各ページのテキストファイルを生成します。また、すべてのテキストファイルを単一の出力ファイルに結合します。\n", "\n", "## Functions:\n", "- pegasus_web_crawler(input_url, output_dir, exclude_selectors, include_domain, exclude_keywords, output_extension, dust_size, max_depth):\n", "PegasusライブラリとGradioインターフェースを使用してウェブサイトをクロールするメイン関数。\n", "入力パラメータに基づいてクロールを実行し、各ページのテキストファイルを生成し、単一の出力ファイルに結合します。\n", "\n", "## Interface:\n", "- Gradioインターフェースは、ユーザーが必要なパラメータを入力し、クロール結果を表示できるようにします。\n", "- 入力:\n", " - Input URL: クロールを開始するURL\n", " - Output Directory: 出力ファイルを保存するディレクトリ\n", " - Exclude Selectors: 除外するCSSセレクター(カンマ区切り)\n", " - Include Domain: クロール対象に含めるドメイン\n", " - Exclude Keywords: 除外するキーワード(カンマ区切り)\n", " - Output Extension: 出力ファイルの拡張子\n", " - Dust Size: ダストサイズ(クロールの深さを制御)\n", " - Max Depth: 最大クロール深度\n", "- 出力:\n", " - Output Directory: 出力ファイルが保存されたディレクトリ\n", " - Combined Text: すべてのテキストファイルを結合した内容\n", " - Combined Output File: 結合された出力ファイル\n", " - Error Message: エラーが発生した場合のエラーメッセージ\n", " - Pegasus Output: Pegasusライブラリからの出力\n", "\n", "## Examples:\n", "- インターフェースには、ユーザーが試すことができるサンプル入力が含まれています。\n", "\n", "## Note:\n", "- このスクリプトは、PegasusライブラリとGradioインターフェースに依存しています。\n", "- クロールされたウェブサイトのテキストコンテンツは、指定された出力ディレクトリに保存されます。\n" ], "metadata": { "id": "YXCKIOTMnPEj" } }, { "cell_type": "markdown", "source": [ "利用規約はSunwood-ai-labs様の[pegasus](https://github.com/Sunwood-ai-labs/PEGASUS/tree/v0.2.4)に準拠します。\n", "\n" ], "metadata": { "id": "-PDP78o-nrop" } }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "SREcB8tZkcGV" }, "outputs": [], "source": [ "!pip install pegasus-surf==0.2.4\n", "!pip install gradio" ] }, { "cell_type": "code", "source": [ "import gradio as gr\n", "import os\n", "import json\n", "from pegasus import Pegasus\n", "import sys\n", "from io import StringIO\n", "import threading\n", "import time\n", "import re\n", "\n", "def pegasus_web_crawler(input_url, output_dir, exclude_selectors, include_domain, exclude_keywords, output_extension, dust_size, max_depth):\n", " error_message = \"\"\n", " pegasus_output = \"\"\n", "\n", " domain_pattern = r'^(?:https?:\\/\\/)?(?:[^@\\n]+@)?(?:www\\.)?([^:\\/\\n]+)'\n", " # URLからドメインを抽出\n", " matched = re.match(domain_pattern, input_url)\n", " if matched:\n", " domain = matched.group(1)\n", " # ドメインのドットを '_' に置換\n", " replaced_domain = domain.replace('.', '_')\n", " # パスのスラッシュを '_' に置換\n", " replaced_path = re.sub(r'/', '_', input_url[matched.end():])\n", " # 置換後のドメインとパスを結合\n", " replaced_url = \"combined_\" + replaced_domain + replaced_path\n", " else:\n", " replaced_url = \"combined_output\"\n", " combined_output_filename = replaced_url + \".txt\"\n", "\n", " try:\n", " output_subdir = os.path.join(output_dir, include_domain)\n", " os.makedirs(output_subdir, exist_ok=True)\n", " combined_output_path = os.path.join(output_dir, combined_output_filename)\n", " with open(combined_output_path, \"w\") as file:\n", " file.write(\"\")\n", "\n", " # 標準出力をStringIOにリダイレクト\n", " stdout_backup = sys.stdout\n", " sys.stdout = StringIO()\n", "\n", " exclude_selectors = exclude_selectors.split(\",\") if exclude_selectors else []\n", " exclude_keywords = exclude_keywords.split(\",\") if exclude_keywords else []\n", "\n", " pegasus = Pegasus(\n", " output_dir=output_subdir,\n", " exclude_selectors=exclude_selectors,\n", " include_domain=include_domain,\n", " exclude_keywords=exclude_keywords,\n", " output_extension=output_extension,\n", " dust_size=dust_size,\n", " max_depth=max_depth,\n", " )\n", "\n", " def run_pegasus(url):\n", " pegasus.run(url)\n", "\n", " thread = threading.Thread(target=run_pegasus, args=(input_url,))\n", " thread.start()\n", "\n", " while thread.is_alive():\n", " sys.stdout.seek(0)\n", " pegasus_output = sys.stdout.read()\n", " sys.stdout.truncate(0)\n", " sys.stdout.seek(0)\n", " yield output_dir, \"\", combined_output_path, error_message, pegasus_output\n", " time.sleep(1) # 1秒間隔で出力を更新\n", "\n", " thread.join()\n", "\n", " # StringIOから最終的な出力を取得\n", " sys.stdout.seek(0)\n", " pegasus_output = sys.stdout.read()\n", "\n", " # 標準出力を元に戻す\n", " sys.stdout = stdout_backup\n", "\n", " txt_files = [f for f in os.listdir(output_subdir) if f.endswith(output_extension)]\n", " combined_text = \"\"\n", " for f in txt_files:\n", " with open(os.path.join(output_subdir, f), \"r\") as file:\n", " combined_text += file.read()\n", "\n", " with open(combined_output_path, \"w\") as file:\n", " file.write(combined_text)\n", "\n", " yield output_dir, combined_text, combined_output_path, error_message, pegasus_output\n", "\n", " except Exception as e:\n", " error_message = str(e)\n", " yield None, None, None, error_message, pegasus_output\n", "\n", "# Define Gradio interface\n", "interface = gr.Interface(\n", " fn=pegasus_web_crawler,\n", " inputs=[\n", " gr.Textbox(label=\"Input URL\", placeholder=\"https://example.com\"),\n", " gr.Textbox(label=\"Output Directory\", value=\"output_directory\"),\n", " gr.Textbox(label=\"Exclude Selectors (comma-separated)\", value=\"header,footer,nav\"),\n", " gr.Textbox(label=\"Include Domain\", placeholder=\"example.com\"),\n", " gr.Textbox(label=\"Exclude Keywords (comma-separated)\", value=\"login,signup\"),\n", " gr.Textbox(label=\"Output Extension\", value=\".txt\"),\n", " gr.Number(label=\"Dust Size\", value=500),\n", " gr.Number(label=\"Max Depth\", value=1),\n", " ],\n", " outputs=[\n", " gr.FileExplorer(label=\"Output Directory\", root_dir=\"./output_directory\", interactive=True),\n", " gr.Textbox(label=\"Combined Text\", show_copy_button=True),\n", " gr.File(label=\"Combined Output File\"),\n", " gr.Textbox(label=\"Error Message\"),\n", " gr.Textbox(label=\"Pegasus Output\", lines=10)\n", " ],\n", " examples=[\n", " [\"https://docs.dify.ai/features/workflow/node/code\", \"output_directory\", \"header,footer,nav\", \"docs.dify.ai\", \"login\", \".txt\", 500, 1],\n", " [\"https://www.gradio.app/docs\", \"output_directory\", \"header,footer,nav\", \"gradio.app\", \"login\", \".txt\", 500, 1],\n", " ],\n", " cache_examples=False,\n", " title=\"tregu0458/web_crawler_powered_by_pegasus\",\n", " description=\"A web crawler tool based on Pegasus library.\",\n", " article=\"This tool allows you to crawl a website using the Pegasus library. You can specify the starting URL, output directory, exclude selectors, include domain, exclude keywords, output extension, dust size, and maximum depth. The tool will crawl the website based on the provided parameters and generate text files for each page. It also combines all the text files into a single output file.\"\n", ")\n", "\n", "# Launch the interface with queue-based output\n", "interface.queue().launch()" ], "metadata": { "id": "obkt-hyXlu3x" }, "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": [ "## 苦労した点\n", "- Fileのアウトプットをするためには、空のファイルをおいておくとうまくいった。queueだから?\n", "- pegasusのprintをgradio上に表示させたかったが、断念。ロゴだけ表示された\n", "- バージョンによって、pegasusの仕様が異なるので注意" ], "metadata": { "id": "pLIvh2bpoZnA" } }, { "cell_type": "markdown", "source": [], "metadata": { "id": "9ZyT_uVLoYmP" } }, { "cell_type": "code", "source": [ "import os\n", "import json\n", "from pegasus import Pegasus\n", "import sys\n", "from io import StringIO\n", "import threading\n", "import time\n", "import re\n", "\n", "def pegasus_web_crawler(input_url, output_dir, exclude_selectors, include_domain, exclude_keywords, output_extension, dust_size, max_depth):\n", " error_message = \"\"\n", " pegasus_output = \"\"\n", "\n", " domain_pattern = r'^(?:https?:\\/\\/)?(?:[^@\\n]+@)?(?:www\\.)?([^:\\/\\n]+)'\n", " # URLからドメインを抽出\n", " matched = re.match(domain_pattern, input_url)\n", " if matched:\n", " domain = matched.group(1)\n", " # ドメインのドットを '_' に置換\n", " replaced_domain = domain.replace('.', '_')\n", " # パスのスラッシュを '_' に置換\n", " replaced_path = re.sub(r'/', '_', input_url[matched.end():])\n", " # 置換後のドメインとパスを結合\n", " replaced_url = \"combined_\" + replaced_domain + replaced_path\n", " else:\n", " replaced_url = \"combined_output\"\n", " combined_output_filename = replaced_url + \".txt\"\n", "\n", " try:\n", " output_subdir = os.path.join(output_dir, include_domain)\n", " os.makedirs(output_subdir, exist_ok=True)\n", " combined_output_path = os.path.join(output_dir, combined_output_filename)\n", " with open(combined_output_path, \"w\") as file:\n", " file.write(\"\")\n", "\n", "\n", " exclude_selectors = exclude_selectors.split(\",\") if exclude_selectors else []\n", " exclude_keywords = exclude_keywords.split(\",\") if exclude_keywords else []\n", "\n", " pegasus = Pegasus(\n", " output_dir=output_subdir,\n", " exclude_selectors=exclude_selectors,\n", " include_domain=include_domain,\n", " exclude_keywords=exclude_keywords,\n", " output_extension=output_extension,\n", " dust_size=dust_size,\n", " max_depth=max_depth,\n", " )\n", "\n", " pegasus.run(input_url)\n", "\n", "\n", " txt_files = [f for f in os.listdir(output_subdir) if f.endswith(output_extension)]\n", " combined_text = \"\"\n", " for f in txt_files:\n", " with open(os.path.join(output_subdir, f), \"r\") as file:\n", " combined_text += file.read()\n", "\n", " with open(combined_output_path, \"w\") as file:\n", " file.write(combined_text)\n", "\n", " return output_dir, combined_text, combined_output_path, error_message, pegasus_output\n", "\n", " except Exception as e:\n", " error_message = str(e)\n", " return None, None, None, error_message, pegasus_output\n", "\n", "\n", "output_dir, combined_text, combined_output_path, error_message, pegasus_output = pegasus_web_crawler(\"https://docs.dify.ai/features/workflow/node/code\", \"output_directory\", \"header,footer,nav\", \"docs.dify.ai\", \"login\", \".txt\", 500, 1)" ], "metadata": { "id": "ESIQoHYDw0Cx" }, "execution_count": null, "outputs": [] } ] }