import json import linecache import logging import os import re import traceback import tracemalloc import nltk import openai # from tenacity import (retry,stop_after_attempt,stop_after_delay, wait_random_exponential) from tenacity import * # from agents.utils import read_file logger = logging.getLogger("agent_logger") openai.api_key = os.getenv('gpt_api_key') # paid and ad free google_key = os.getenv("google_search_api_key") # cx: The identifier of the Programmable Search Engine. google_cx = os.getenv("google_cx_api_key") GOOGLE = "google" USER = "user" ASSISTANT = "assistant" MODEL = "gpt-3.5-turbo" sites = {} # initialize dictionary or sites used new_sites = {} # initialize dictionary or sites used try: with open("sites", "r") as f: sites = json.loads(f.read()) except: print("Failed to read sites.") # for experimenting with Vicuna def display_top(snapshot, key_type="lineno", limit=10): snapshot = snapshot.filter_traces( ( tracemalloc.Filter(False, ""), tracemalloc.Filter(False, ""), ) ) top_stats = snapshot.statistics(key_type) logger.info("Top %s lines" % limit) for index, stat in enumerate(top_stats[:limit], 1): frame = stat.traceback[0] logger.info( "#%s: %s:%s: %.1f KiB" % (index, frame.filename, frame.lineno, stat.size / 1024) ) line = linecache.getline(frame.filename, frame.lineno).strip() if line: logger.info(" %s" % line) other = top_stats[limit:] if other: size = sum(stat.size for stat in other) logger.info("%s other: %.1f KiB" % (len(other), size / 1024)) total = sum(stat.size for stat in top_stats) logger.info("Total allocated size: %.1f KiB" % (total / 1024)) class turn: def __init__(self, role="assistant", message="", tldr="", source="", keywords=[]): self.role = role self.message = message self.tldr = tldr self.source = source self.keywords = keywords def __str__(self): s = "" if self.role is not None and len(self.role) > 0: s = s + "r: " + self.role if self.message is not None and len(self.message) > 0: s = s + " m: " + self.message if self.source is not None and len(self.source) > 0: s = s + " s: " + self.source if self.tldr is not None and len(self.tldr) > 0: s = s + "tldr: " + self.tldr return s def is_google_turn(self): return self.source is not None and self.source == GOOGLE def is_user_turn(self): return self.source is not None and self.source == USER def is_assistant_turn(self): return self.source is not None and self.source == ASSISTANT # @retry(wait=wait_random_exponential(min=1, max=2), stop=(stop_after_delay(15) | stop_after_attempt(2))) def chatCompletion_with_backoff(**kwargs): return openai.ChatCompletion.create(**kwargs) def ask_gpt(model, gpt_message, max_tokens, temp, top_p): completion = None try: completion = openai.chat.completions.create( model=model, messages=gpt_message, max_tokens=max_tokens, temperature=temp, top_p=top_p, ) except: traceback.print_exc() if completion is not None: response = completion.choices[0].message.content.lstrip(" ,:.") logger.info(response) return response else: logger.info("no response") return None def ask_gpt_with_retries(model, gpt_message, tokens, temp, timeout, tries): retryer = Retrying(stop=(stop_after_delay(timeout) | stop_after_attempt(1))) r = retryer( ask_gpt, model=model, gpt_message=gpt_message, max_tokens=tokens, temp=temp, top_p=1, ) return r INFORMATION_QUERY = "information query" INTENTS = [] def find_intent(response): global INTENTS, INFORMATION_QUERY for intent in INTENTS: if intent in response.lower(): return intent return INFORMATION_QUERY def find_query(response): search_query_phrase = response phrase_index = response.lower().find("phrase:") quoted_strings = [] if phrase_index < 0: phrase_index = 0 else: phrase_index += len("phrase:") quoted_strings = re.findall(r'"([^"]*)"', search_query_phrase[phrase_index:]) if len(quoted_strings) == 0: quoted_strings = re.findall(r"'([^']*)'", search_query_phrase[phrase_index:]) if len(quoted_strings) > 0: # logger.info(quoted_strings) phrase = quoted_strings[0] return phrase, response[response.find(phrase) + len(phrase) + 1:] else: logger.info("no quoted text, returning original query string", response) # logger.info(response) return "", response def find_keywords(response, query_phrase, orig_phrase): # keywords includes those suggested by gpt and any remaining words from query phrase len > 4 keywords = [] quoted_strings = re.findall(r'"([^"]*)"', query_phrase) quoted_strings2 = re.findall(r'"([^"]*)"', orig_phrase) remainder = query_phrase k_index = response.lower().find("keyword") if k_index > 0: keyword_string = response[k_index + len("keyword"):] nm_index = keyword_string.find("Named-Entities:") if nm_index > 0: keyword_string = keyword_string[:nm_index].rstrip() # logger.info(keyword_string) c_index = keyword_string.find(":") keyword_string = keyword_string[c_index + 1:] candidates = keyword_string.split(",") for keyword in candidates: keyword = keyword.strip(":,.\t\n").lstrip(" ") if len(keyword) > 3 or keyword[0:1].isupper(): keywords.append(keyword) return keywords return "" # don't know why this compilation error doesn't throw errors at runtime, but it doesn't, on the other hand trying to # fix this creates an infinite import loop, so don't touch this. def split_interaction(interaction): qs = interaction.find(prefix) rs = interaction.find(suffix) if qs >= 0 and rs >= 0: query = interaction[len(prefix): rs].lstrip() response = interaction[rs + len(suffix):].lstrip() return query, response else: logger.info("can't parse", interaction) return "", "" def findnth(haystack, needle, n): parts = haystack.split(needle, n + 1) if len(parts) <= n + 1: return -1 return len(haystack) - len(parts[-1]) - len(needle) def extract_site(url): site = "" base = findnth(url, "/", 2) if base > 2: site = url[:base].split(".") if len(site) > 1: site = site[-2] site = site.replace("https://", "") site = site.replace("http://", "") return site def extract_domain(url): site = "" base = findnth(url, "/", 2) if base > 2: domain = url[:base].split(".") if len(domain) > 1: domain = domain[-2] + "." + domain[-1] domain = domain.replace("https://", "") domain = domain.replace("http://", "") return domain def part_of_keyword(word, keywords): for keyword in keywords: if word in keyword: return True return False keyword_prompt = 'Perform two tasks on the following text. First, rewrite the as an effective google search phrase. Second, analyze text and list keywords and named-entities found. Return the result as: Phrase: ""\nKeywords: \nNamed-Entities: ' def get_search_phrase_and_keywords(query_string, chat_history): gpt_message = [ {"role": "user", "content": keyword_prompt}, {"role": "user", "content": "Text\n" + query_string}, {"role": "assistant", "content": "Phrase:"}, ] response_text = "" completion = None # for role in gpt_message: # logger.info(role) # logger.info() response_text = ask_gpt_with_retries( "gpt-3.5-turbo", gpt_message, tokens=150, temp=0.3, timeout=6, tries=2 ) logger.info(response_text) # useful function to make search query more optimal, for future explainability studies # consider returning query phrase and keywords to user query_phrase, remainder = find_query(response_text) logger.info("PHRASE:" + query_phrase) # logger.info(remainder) keywords = find_keywords(remainder, query_phrase, query_string) logger.info("KEYWORDS:" + ''.join(keywords)) return query_phrase, keywords def reform(elements): # reformulates text extracted from a webpage by unstructured.partition_html into larger keyword-rankable chunks texts = ( [] ) # a list of text_strings, each of at most *max* chars, separated on '\n' when splitting an element is needed paragraphs = [] total_elem_len = 0 for element in elements: text = str(element) total_elem_len += len(text) if len(text) < 4: continue elif len(text) < 500: texts.append(text) else: subtexts = text.split("\n") for subtext in subtexts: if len(subtext) < 500: texts.append(subtext) else: texts.extend(nltk.sent_tokenize(subtext)) # now reassemble shorter texts into chunks paragraph = "" total_pp_len = 0 for text in texts: if len(text) + len(paragraph) < 500: paragraph += " " + text else: if len(paragraph) > 0: # start a new paragraph paragraphs.append(paragraph) paragraph = "" paragraph += text if len(paragraph) > 0: paragraphs.append(paragraph + ".\n") # logger.info(f'\n***** reform elements in {len(elements)}, paragraphs out {len(paragraphs)}') total_pp_len = 0 for paragraph in paragraphs: total_pp_len += len(paragraph) if total_pp_len > 1.2 * total_elem_len: logger.info( f"******** reform out > reform in. out: {total_pp_len}, in: {total_elem_len}" ) return paragraphs def get_actions(text): # look for actions in response action_indecies = re.finditer("Action:", text) # Action: [search, ask} (query) actions = [] editted_response = text for action_index in action_indecies: action = text[action_index.span()[1]:] agent = None query = None query_start = action.find("(") if query_start > 0: agent = action[:query_start].strip() query_end = action[query_start + 1:].find(")") if query_end > 0: query = action[query_start + 1: query_start + 1 + query_end] action = text[ action_index.start(): action_index.span()[1] + action_index.start() + query_start + query_end + 2 ] if agent is None or query is None: logger.info( "can't parse action, skipping", text[action_index.start(): action_index.start() + 48], ) continue actions.append([agent, query, action]) editted_response = editted_response.replace(action, "") return actions if __name__ == "__main__": get_search_phrase_and_keywords( "Would I like the video game Forspoken, given that I like Final Fantasy VII?", [], ) # logger.info(query_vicuna("what is 5 * 3?"))