Spaces:
Sleeping
Sleeping
import concurrent.futures | |
import copy | |
import json | |
import logging | |
import time | |
import traceback | |
import urllib.parse as en | |
import warnings | |
from itertools import zip_longest | |
import requests | |
import selenium.common.exceptions | |
from selenium import webdriver | |
from selenium.webdriver.chrome.options import Options | |
from unstructured.partition.html import partition_html | |
from llmsearch import site_stats | |
# this import style works in pycharm | |
from llmsearch import utilityV2 as ut | |
from urllib.request import urlopen | |
# this import style works on sever + vs code | |
# import utils | |
# from llmsearch import google_search_concurrent as gs | |
# from llmsearch import meta as mt | |
# from llmsearch import utilityV2 as ut | |
logger = logging.getLogger("agent_logger") | |
# todo drop blocked pages > see og llmsearch code | |
# todo use the chatcondesemode query instead of the new gpt query | |
def search(msg, query_phrase): | |
try: | |
# this call extracts keywords from the statement and rewrites it into a better search phrase with gpt3.5 | |
# query_phrase, keywords = ut.get_search_phrase_and_keywords(msg, []) | |
google_text = "" | |
try: | |
logger.info(f"asking google {msg}; rephrased: {query_phrase}") | |
google_text, urls_all, index, urls_used, tried_index, urls_tried = search_google(msg, query_phrase) | |
except: | |
traceback.print_exc() | |
logger.info("\n\nFinal response: ") | |
for item in google_text: | |
logger.info( | |
f"\n##############################################################################################\nSource: {item['source']}" | |
) | |
logger.info(f"{item['text']}") | |
logger.info(f"URL: {item['url']}") | |
return google_text | |
except KeyboardInterrupt: | |
traceback.print_exc() | |
raise KeyboardInterrupt | |
except: | |
traceback.print_exc() | |
return "" | |
# Define a function to make a single URL request and process the response | |
def process_url(query_phrase, url, timeout): | |
start_time = time.time() | |
site = ut.extract_site(url) | |
result = "" | |
try: | |
with warnings.catch_warnings(): | |
warnings.simplefilter("ignore") | |
options = Options() | |
options.page_load_strategy = "eager" | |
options.add_argument("--headless") | |
result = "" | |
with webdriver.Chrome(options=options) as dr: | |
logger.info(f"*****setting page load timeout {timeout}") | |
dr.set_page_load_timeout(timeout) | |
try: | |
dr.get(url) | |
response = dr.page_source | |
result = response_text_extract(url=url, response=response) | |
except selenium.common.exceptions.TimeoutException: | |
return "", url | |
except Exception: | |
traceback.print_exc() | |
logger.info(f"{site} err") | |
pass | |
logger.info(f"Processed {site}: {len(response)} / {len(result)} {int((time.time() - start_time) * 1000)} ms") | |
return result, url | |
def process_urls(query_phrase, urls): | |
# Create a ThreadPoolExecutor with 5 worker threads | |
response = [] | |
logger.info("entering process urls") | |
full_text = "" | |
used_index = 0 | |
urls_used = ["" for i in range(30)] | |
tried_index = 0 | |
urls_tried = ["" for i in range(30)] | |
start_time = time.time() | |
in_process = [] | |
processed = [] | |
google_futures = [] | |
with (concurrent.futures.ThreadPoolExecutor(max_workers=11) as executor): | |
# initialize scan of google urls | |
while True: | |
try: | |
while (len(urls) > 0 | |
# no sense starting if not much time left | |
and (len(full_text) < 4800 and len(in_process) < 10 and time.time() - start_time < 8) | |
): | |
recommendation = site_stats.get_next(urls, sample_unknown=True) | |
# set timeout so we don't wait for a slow site forever | |
timeout = 12 - int(time.time() - start_time) | |
url = recommendation[1] | |
future = executor.submit(process_url, query_phrase, url, timeout) | |
google_futures.append(future) | |
in_process.append(future) | |
urls_tried[tried_index] = url | |
tried_index += 1 | |
urls.remove(url) | |
logger.info(f"queued {ut.extract_site(url)}, {timeout}") | |
# Process the responses as they arrive | |
for future in in_process: | |
if future.done(): | |
result, url = future.result() | |
processed.append(future) | |
in_process.remove(future) | |
if len(result) > 0: | |
urls_used[used_index] = url | |
used_index += 1 | |
logger.info( | |
f"adding {len(result)} chars from {ut.extract_site(url)} to {len(response)} prior responses" | |
) | |
if "an error has occurred" not in result.lower() and "permission to view this page" not in result.lower() and "403 ERROR" not in result.lower() and "have been blocked" not in result.lower() and "too many requests" not in result.lower(): | |
response.append( | |
{ | |
"source": ut.extract_domain(url), | |
"url": url, | |
"text": result, | |
} | |
) | |
if (len(urls) == 0 and len(in_process) == 0) or (time.time() - start_time > 28): | |
executor.shutdown(wait=False) | |
logger.info( | |
f"n****** exiting process urls early {len(response)} {int(time.time() - start_time)} secs\n" | |
) | |
return response, used_index, urls_used, tried_index, urls_tried | |
time.sleep(0.5) | |
except: | |
traceback.print_exc() | |
executor.shutdown(wait=False) | |
logger.info( | |
f"\n*****processed all urls {len(response)} {int(time.time() - start_time)} secs" | |
) | |
return response, index, urls_used, tried_index, urls_tried | |
def extract_subtext(text): | |
return ut.reform(text) | |
def request_google(query_phrase): | |
logger.info(f"***** search {query_phrase}") | |
sort = "&sort=date-sdate:d:w" | |
if "today" in query_phrase or "latest" in query_phrase: | |
sort = "&sort=date-sdate:d:s" | |
# logger.info(f"search for: {query_phrase}") | |
google_query = en.quote(query_phrase) | |
response = [] | |
try: | |
start_wall_time = time.time() | |
url = ( | |
"https://www.googleapis.com/customsearch/v1?key=" | |
+ ut.google_key | |
+ "&cx=" | |
+ ut.google_cx | |
+ "&num=4" | |
+ sort | |
+ "&q=" | |
+ google_query | |
) | |
response = requests.get(url) | |
response_json = json.loads(response.text) | |
logger.info(f"***** google search {int((time.time() - start_wall_time) * 10) / 10} sec") | |
except: | |
traceback.print_exc() | |
return [] | |
# see if we got anything useful from google | |
if "items" not in response_json.keys(): | |
logger.info("no return from google ...", response, response_json.keys()) | |
return [] | |
urls = [] | |
for i in range(len(response_json["items"])): | |
url = response_json["items"][i]["link"].lstrip().rstrip() | |
site = ut.extract_site(url) | |
if site not in ut.sites or ut.sites[site] == 1: | |
# don't use these sources (reddit because it blocks bots) | |
if "reddit" not in url and "youtube" not in url and "facebook" not in url: | |
urls.append(url) | |
return urls | |
def response_text_extract(url, response): | |
extract_text = "" | |
if url.endswith("pdf"): | |
pass | |
else: | |
if response is not None: | |
elements = partition_html(text=response) | |
str_elements = [] | |
logger.info('\n***** elements') | |
for e in elements: | |
stre = str(e).replace(" ", " ") | |
str_elements.append(stre) | |
extract_text = ''.join(extract_subtext(str_elements)) | |
logger.info( | |
f"***** unstructured found {len(elements)} elements, {sum([len(str(e)) for e in elements])} raw chars, {len(extract_text)} extract" | |
) | |
if len(extract_text.strip()) < 8: | |
return "" | |
else: | |
return extract_text | |
def extract_items_from_numbered_list(text): | |
items = "" | |
elements = text.split("\n") | |
for candidate in elements: | |
candidate = candidate.lstrip(". \t") | |
if len(candidate) > 4 and candidate[0].isdigit(): | |
candidate = candidate[1:].lstrip(". ") | |
if ( | |
len(candidate) > 4 and candidate[0].isdigit() | |
): # strip second digit if more than 10 items | |
candidate = candidate[1:].lstrip(". ") | |
logger.info("E {}".format(candidate)) | |
items += candidate + " " | |
return items | |
def search_google(original_query, query_phrase): | |
all_urls = [] | |
urls_used = [] | |
urls_tried = [] | |
index = 0 | |
tried_index = 0 | |
full_text = "" | |
try: # query google for recent info | |
extract_query = "" | |
orig_phrase_urls = [] | |
if len(original_query) > 0: | |
orig_phrase_urls = request_google(original_query[: min(len(original_query), 128)]) | |
extract_query = original_query[: min(len(original_query), 128)] | |
gpt_phrase_urls = [] | |
if len(query_phrase) > 0: | |
gpt_phrase_urls = request_google(query_phrase) | |
extract_query = ( | |
query_phrase # prefer more succinct query phrase if available | |
) | |
if len(orig_phrase_urls) == 0 and len(gpt_phrase_urls) == 0: | |
return "", [], 0, [""], 0, [""] | |
for url in orig_phrase_urls: | |
if url in gpt_phrase_urls: | |
gpt_phrase_urls.remove(url) | |
# interleave both lists now that duplicates are removed | |
urls = [ | |
val | |
for tup in zip_longest(orig_phrase_urls, gpt_phrase_urls) | |
for val in tup | |
if val is not None | |
] | |
all_urls = copy.deepcopy(urls) | |
# initialize scan of google urls | |
start_wall_time = time.time() | |
full_text, index, urls_used, tried_index, urls_tried = process_urls(extract_query, all_urls) | |
logger.info(f"***** urls_processed {int((time.time() - start_wall_time) * 10) / 10} sec") | |
logger.info("return from url processsing") | |
except: | |
traceback.print_exc() | |
return full_text, all_urls, index, urls_used, tried_index, urls_tried | |