import copy import json import logging import os import time import traceback import urllib.parse as en import warnings from concurrent.futures import ThreadPoolExecutor from itertools import zip_longest import requests from zenrows import ZenRowsClient from llmsearch import utilityV2 as ut logger = logging.getLogger("agent_logger") def search(msg, query_phrase): try: # this call extracts keywords from the statement and rewrites it into a better search phrase with gpt3.5 # query_phrase, keywords = ut.get_search_phrase_and_keywords(msg, []) google_text = "" try: print(f"asking google {msg}; rephrased: {query_phrase}") google_text = search_google(msg, query_phrase) except: traceback.print_exc() print("\n\nFinal response: ") for item in google_text: print( f"\n##############################################################################################\nSource: {item['source']}" ) print(f"{item['text']}") print(f"URL: {item['url']}") return google_text except KeyboardInterrupt: traceback.print_exc() raise KeyboardInterrupt except: traceback.print_exc() return "" # Define a function to make a single URL request and process the response def process_url(url): processed_page = {} start_time = time.time() try: with warnings.catch_warnings(): warnings.simplefilter("ignore") try: client = ZenRowsClient(os.getenv('zenrows_api_key')) response = client.get(url) print(f'got response, status: {response.status_code}') result = response.text if len(result) > 0: if "an error has occurred" not in result.lower() and "permission to view this page" not in result.lower() and "403 ERROR" not in result.lower() and "have been blocked" not in result.lower() and "too many requests" not in result.lower(): processed_page = { "source": ut.extract_domain(url), "url": url, "text": result, } print(f"Processed {url}: {len(result)} {int((time.time() - start_time) * 1000)} ms") return processed_page except Exception: traceback.print_exc() return processed_page except Exception: traceback.print_exc() return processed_page def process_urls(urls): print(f"entering process urls: {len(urls)} found. {urls}") start_time = time.time() results = [] try: with ThreadPoolExecutor(max_workers=len(urls)) as pool: for result in pool.map(process_url, urls): results.append(result) except: traceback.print_exc() print( f"\n*****processed all urls {len(results)} {int(time.time() - start_time)} secs" ) return results def extract_subtext(text): return ut.reform(text) def request_google(query_phrase): print(f"***** search {query_phrase}") sort = "&sort=date-sdate:d:w" if "today" in query_phrase or "latest" in query_phrase: sort = "&sort=date-sdate:d:s" print(f"search for: {query_phrase}") google_query = en.quote(query_phrase) response = [] try: start_wall_time = time.time() url = ( "https://www.googleapis.com/customsearch/v1?key=" + ut.google_key + "&cx=" + ut.google_cx + "&num=4" + sort + "&q=" + google_query ) response = requests.get(url) response_json = json.loads(response.text) print(f"***** google search {int((time.time() - start_wall_time) * 10) / 10} sec") except: traceback.print_exc() return [] # see if we got anything useful from Google if "items" not in response_json.keys(): print("no return from google ...", response, response_json.keys()) return [] urls = [] for i in range(len(response_json["items"])): url = response_json["items"][i]["link"].lstrip().rstrip() site = ut.extract_site(url) if site not in ut.sites or ut.sites[site] == 1: # don't use these sources (reddit because it blocks bots) if "reddit" not in url and "youtube" not in url and "facebook" not in url: urls.append(url) return urls def search_google(original_query, query_phrase): full_text = "" try: # query google for recent info orig_phrase_urls = [] if len(original_query) > 0: orig_phrase_urls = request_google(original_query[: min(len(original_query), 128)]) gpt_phrase_urls = [] if len(query_phrase) > 0: gpt_phrase_urls = request_google(query_phrase) if len(orig_phrase_urls) == 0 and len(gpt_phrase_urls) == 0: return "", [], 0, [""], 0, [""] for url in orig_phrase_urls: if url in gpt_phrase_urls: gpt_phrase_urls.remove(url) # interleave both lists now that duplicates are removed urls = [ val for tup in zip_longest(orig_phrase_urls, gpt_phrase_urls) for val in tup if val is not None ] all_urls = copy.deepcopy(urls) start_wall_time = time.time() full_text = process_urls(all_urls) print(f"***** urls_processed {int((time.time() - start_wall_time) * 10) / 10} sec") except: traceback.print_exc() return full_text