import copy import json import logging import time import traceback import urllib.parse as en import warnings from itertools import zip_longest import requests import selenium.common.exceptions from selenium import webdriver from selenium.webdriver.chrome.options import Options from unstructured.partition.html import partition_html from llmsearch import site_stats # this import style works in pycharm from llmsearch import utilityV2 as ut # this import style works on sever + vs code # import utils # from llmsearch import google_search_concurrent as gs # from llmsearch import meta as mt # from llmsearch import utilityV2 as ut logger = logging.getLogger("agent_logger") # todo drop blocked pages > see og llmsearch code # todo use the chatcondesemode query instead of the new gpt query def search(msg, query_phrase): try: # this call extracts keywords from the statement and rewrites it into a better search phrase with gpt3.5 # query_phrase, keywords = ut.get_search_phrase_and_keywords(msg, []) google_text = "" try: logger.info(f"asking google {msg}; rephrased: {query_phrase}") google_text, urls_all, urls_used, tried_index, urls_tried = search_google(msg, query_phrase) except: traceback.print_exc() logger.info("\n\nFinal response: ") for item in google_text: logger.info( f"\n##############################################################################################\nSource: {item['source']}" ) logger.info(f"{item['text']}") logger.info(f"URL: {item['url']}") return google_text except KeyboardInterrupt: traceback.print_exc() raise KeyboardInterrupt except: traceback.print_exc() return "" # Define a function to make a single URL request and process the response def process_url(url, timeout): start_time = time.time() site = ut.extract_site(url) result = "" try: with warnings.catch_warnings(): warnings.simplefilter("ignore") options = Options() options.page_load_strategy = "eager" options.add_argument("--headless") result = "" with webdriver.Chrome(options=options) as dr: logger.info(f"*****setting page load timeout {timeout}") dr.set_page_load_timeout(timeout) try: dr.get(url) response = dr.page_source result = response_text_extract(url=url, response=response) except selenium.common.exceptions.TimeoutException: return "", url except Exception: traceback.print_exc() logger.info(f"{site} err") pass logger.info(f"Processed {site}: {len(response)} / {len(result)} {int((time.time() - start_time) * 1000)} ms") return result, url def process_urls(urls): # Create a ThreadPoolExecutor with 5 worker threads response = [] logger.info("entering process urls") full_text = "" used_index = 0 urls_used = ["" for i in range(30)] tried_index = 0 urls_tried = ["" for i in range(30)] start_time = time.time() in_process = [] # processed = [] # google_futures = [] # with (concurrent.futures.ThreadPoolExecutor(max_workers=11) as executor): # initialize scan of google urls try: while (len(urls) > 0 # no sense starting if not much time left and (len(full_text) < 4800 and len(in_process) < 10 and time.time() - start_time < 8) ): recommendation = site_stats.get_next(urls, sample_unknown=True) # set timeout so we don't wait for a slow site forever timeout = 12 - int(time.time() - start_time) url = recommendation[1] # future = executor.submit(process_url, query_phrase, url, timeout) result, url = process_url(url, timeout) # google_futures.append(future) # in_process.append(future) urls_tried[tried_index] = url tried_index += 1 urls.remove(url) logger.info(f"queued {ut.extract_site(url)}, {timeout}") # Process the responses as they arrive # for future in in_process: # if future.done(): # result, url = future.result() # processed.append(future) # in_process.remove(future) if len(result) > 0: urls_used[used_index] = url used_index += 1 logger.info( f"adding {len(result)} chars from {ut.extract_site(url)} to {len(response)} prior responses" ) if "an error has occurred" not in result.lower() and "permission to view this page" not in result.lower() and "403 ERROR" not in result.lower() and "have been blocked" not in result.lower() and "too many requests" not in result.lower(): response.append( { "source": ut.extract_domain(url), "url": url, "text": result, } ) if (len(urls) == 0 and len(in_process) == 0) or (time.time() - start_time > 28): # executor.shutdown(wait=False) logger.info( f"n****** exiting process urls early {len(response)} {int(time.time() - start_time)} secs\n" ) return response, used_index, urls_used, tried_index, urls_tried time.sleep(0.5) except: traceback.print_exc() # executor.shutdown(wait=False) logger.info( f"\n*****processed all urls {len(response)} {int(time.time() - start_time)} secs" ) return response, urls_used, tried_index, urls_tried def extract_subtext(text): return ut.reform(text) def request_google(query_phrase): logger.info(f"***** search {query_phrase}") sort = "&sort=date-sdate:d:w" if "today" in query_phrase or "latest" in query_phrase: sort = "&sort=date-sdate:d:s" # logger.info(f"search for: {query_phrase}") google_query = en.quote(query_phrase) response = [] try: start_wall_time = time.time() url = ( "https://www.googleapis.com/customsearch/v1?key=" + ut.google_key + "&cx=" + ut.google_cx + "&num=4" + sort + "&q=" + google_query ) response = requests.get(url) response_json = json.loads(response.text) logger.info(f"***** google search {int((time.time() - start_wall_time) * 10) / 10} sec") except: traceback.print_exc() return [] # see if we got anything useful from google if "items" not in response_json.keys(): logger.info("no return from google ...", response, response_json.keys()) return [] urls = [] for i in range(len(response_json["items"])): url = response_json["items"][i]["link"].lstrip().rstrip() site = ut.extract_site(url) if site not in ut.sites or ut.sites[site] == 1: # don't use these sources (reddit because it blocks bots) if "reddit" not in url and "youtube" not in url and "facebook" not in url: urls.append(url) return urls def response_text_extract(url, response): extract_text = "" if url.endswith("pdf"): pass else: if response is not None: elements = partition_html(text=response) str_elements = [] logger.info('\n***** elements') for e in elements: stre = str(e).replace(" ", " ") str_elements.append(stre) extract_text = ''.join(extract_subtext(str_elements)) logger.info( f"***** unstructured found {len(elements)} elements, {sum([len(str(e)) for e in elements])} raw chars, {len(extract_text)} extract" ) if len(extract_text.strip()) < 8: return "" else: return extract_text def extract_items_from_numbered_list(text): items = "" elements = text.split("\n") for candidate in elements: candidate = candidate.lstrip(". \t") if len(candidate) > 4 and candidate[0].isdigit(): candidate = candidate[1:].lstrip(". ") if ( len(candidate) > 4 and candidate[0].isdigit() ): # strip second digit if more than 10 items candidate = candidate[1:].lstrip(". ") logger.info("E {}".format(candidate)) items += candidate + " " return items def search_google(original_query, query_phrase): all_urls = [] urls_used = [] urls_tried = [] tried_index = 0 full_text = "" try: # query google for recent info orig_phrase_urls = [] if len(original_query) > 0: orig_phrase_urls = request_google(original_query[: min(len(original_query), 128)]) gpt_phrase_urls = [] if len(query_phrase) > 0: gpt_phrase_urls = request_google(query_phrase) if len(orig_phrase_urls) == 0 and len(gpt_phrase_urls) == 0: return "", [], 0, [""], 0, [""] for url in orig_phrase_urls: if url in gpt_phrase_urls: gpt_phrase_urls.remove(url) # interleave both lists now that duplicates are removed urls = [ val for tup in zip_longest(orig_phrase_urls, gpt_phrase_urls) for val in tup if val is not None ] all_urls = copy.deepcopy(urls) # initialize scan of google urls start_wall_time = time.time() full_text, urls_used, tried_index, urls_tried = process_urls(all_urls) logger.info(f"***** urls_processed {int((time.time() - start_wall_time) * 10) / 10} sec") logger.info("return from url processsing") except: traceback.print_exc() return full_text, all_urls, urls_used, tried_index, urls_tried