article_writer / google_search.py
minko186's picture
remove content_string (not used) + clean unicode non-printable chars + add pymupdf reading for pdf urls
a62cc34
raw
history blame
6.16 kB
import os
import time
from googleapiclient.discovery import build
import asyncio
import httpx
from bs4 import BeautifulSoup
from dotenv import load_dotenv
import html2text
import requests
import unicodedata
import fitz
load_dotenv()
API_KEY = os.environ.get("GOOGLE_SEARCH_API_KEY")
CSE_KEY = os.environ.get("GOOGLE_SEARCH_CSE_ID")
# Number of pages to scrape
NUM_PAGES = 10
# load html2text and set up configs
h2t = html2text.HTML2Text()
h2t.bodywidth = 0 # No wrapping
h2t.ignore_links = True # Ignore hyperlinks
h2t.ignore_images = True # Ignore images
h2t.ignore_emphasis = True # Ignore emphasis
h2t.ignore_tables = False # Include tables
h2t.skip_internal_links = True # Skip internal links
h2t.skip_external_links = True # Skip external links
h2t.single_line_break = True # Use single line breaks
h2t.protect_links = True # Protect links from being split
h2t.default_image_alt = "[image]" # Default alt text for images
def clean_html(text):
text = h2t.handle(text)
text = unicodedata.normalize("NFKD", text).encode("ASCII", "ignore").decode("ASCII") # Remove non-ASCII characters
return text
def build_results_beautifulsoup(url_list):
print("Starting to scrape URLs...")
start_time = time.perf_counter()
# scrape URLs in list
soups = asyncio.run(parallel_scrap(url_list))
scraping_time = time.perf_counter() - start_time
print(f"Scraping processing time: {scraping_time:.2f} seconds")
result_content = {}
count = 0
print("Starting to process each URL...")
for url, soup in zip(url_list, soups):
if count >= NUM_PAGES:
print(f"Reached the limit of {NUM_PAGES} pages. Stopping processing.")
break
if soup:
print(f"Processing URL: {url}")
text = clean_html(soup.text)
if len(text) > 500:
print(f"Adding content from URL: {url}, content length: {len(text)}")
result_content[url] = text
count += 1
else:
print(f"Skipped URL: {url}, content too short (length: {len(text)})")
else:
print(f"Skipped URL: {url}, no soup content available.")
print("Finished processing URLs.")
return result_content
def build_results_extractor(url_list):
try:
endpoint = "https://extractorapi.com/api/v1/extractor"
result_content = {}
count = 0
for url in url_list:
if count >= NUM_PAGES:
break
params = {"apikey": os.environ.get("EXTRACTOR_API_KEY"), "url": url}
r = requests.get(endpoint, params=params)
if r.status_code == 200:
text = r.json()["text"]
if len(text) > 500:
result_content[url] = text
count += 1
if r.status_code == 403:
raise Exception(f"Error with API; using default implementaion instead")
return result_content
except Exception as e:
print(e)
return build_results_beautifulsoup(url_list)
months = {
"January": "01",
"February": "02",
"March": "03",
"April": "04",
"May": "05",
"June": "06",
"July": "07",
"August": "08",
"September": "09",
"October": "10",
"November": "11",
"December": "12",
}
domain_list = ["com", "org", "net", "int", "edu", "gov", "mil"]
def build_date(year=2024, month="March", day=1):
return f"{year}{months[month]}{day}"
async def get_url_data(url, client):
try:
r = await client.get(url)
if r.status_code == 200:
content_type = r.headers.get("Content-Type", "").lower()
# detect if pdf
if "application/pdf" in content_type or url.lower().endswith(".pdf"):
pdf_content = await extract_pdf_text(r.content)
return BeautifulSoup(pdf_content, "html.parser")
else:
return BeautifulSoup(r.content, "html.parser")
except Exception:
return None
async def extract_pdf_text(content):
try:
with fitz.open(stream=content, filetype="pdf") as doc:
text = ""
for page in doc:
text += page.get_text()
return f"<div>{text}</div>" # Wrap in a div to make it valid HTML
except Exception as e:
print(f"Error extracting PDF text: {str(e)}")
return "<div>Error extracting PDF text</div>"
async def parallel_scrap(urls):
async with httpx.AsyncClient(timeout=30) as client:
tasks = []
for url in urls:
tasks.append(get_url_data(url=url, client=client))
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
def scrap(urls):
client = httpx.Client()
soups = []
for url in urls:
soups.append(get_url_data(url=url, client=client))
return soups
def google_search_urls(
text,
sorted_date,
domains_to_include,
api_key,
cse_id,
**kwargs,
):
service = build("customsearch", "v1", developerKey=api_key)
results = service.cse().list(q=text, cx=cse_id, sort=sorted_date, **kwargs).execute()
url_list = []
if "items" in results and len(results["items"]) > 0:
for count, link in enumerate(results["items"]):
# skip user selected domains
if (domains_to_include is None) or not any(
("." + domain) in link["link"] for domain in domains_to_include
):
continue
url = link["link"]
if url not in url_list:
url_list.append(url)
return url_list
def google_search(
topic,
sorted_date,
domains_to_include,
):
api_key = os.environ.get("GOOGLE_SEARCH_API_KEY")
cse_id = os.environ.get("GOOGLE_SEARCH_CSE_ID")
start_time = time.perf_counter()
url_list = google_search_urls(
topic,
sorted_date,
domains_to_include,
api_key,
cse_id,
)
print("Google Search processing time: ", time.perf_counter() - start_time)
result_content = build_results_beautifulsoup(url_list)
return result_content