Spaces:
Runtime error
Runtime error
import os | |
import pickle | |
import re | |
import time | |
from typing import List, Union | |
from urllib.parse import urlparse, urljoin | |
import faiss | |
import requests | |
from PyPDF2 import PdfReader | |
from bs4 import BeautifulSoup | |
from langchain import OpenAI, LLMChain | |
from langchain.agents import ConversationalAgent | |
from langchain.agents import Tool, AgentExecutor, LLMSingleActionAgent, AgentOutputParser | |
from langchain.prompts import BaseChatPromptTemplate | |
from langchain.chains import ConversationalRetrievalChain | |
from langchain.docstore.document import Document | |
from langchain.embeddings import OpenAIEmbeddings | |
from langchain.memory import ConversationBufferWindowMemory | |
from langchain.schema import AgentAction, AgentFinish, HumanMessage | |
from langchain.text_splitter import CharacterTextSplitter | |
from langchain.vectorstores.faiss import FAISS | |
book_url = 'https://g.co/kgs/2VFC7u' | |
book_file = "Book.pdf" | |
url = 'https://makerlab.illinois.edu/' | |
pickle_file = "open_ai.pkl" | |
index_file = "open_ai.index" | |
gpt_3_5 = OpenAI(model_name='gpt-3.5-turbo',temperature=0) | |
embeddings = OpenAIEmbeddings() | |
chat_history = [] | |
memory = ConversationBufferWindowMemory(memory_key="chat_history") | |
gpt_3_5_index = None | |
class CustomOutputParser(AgentOutputParser): | |
def parse(self, llm_output: str) -> Union[AgentAction, AgentFinish]: | |
# Check if agent replied without using tools | |
if "AI:" in llm_output: | |
return AgentFinish(return_values={"output": llm_output.split("AI:")[-1].strip()}, | |
log=llm_output) | |
# Check if agent should finish | |
if "Final Answer:" in llm_output: | |
return AgentFinish( | |
# Return values is generally always a dictionary with a single `output` key | |
# It is not recommended to try anything else at the moment :) | |
return_values={"output": llm_output.split("Final Answer:")[-1].strip()}, | |
log=llm_output, | |
) | |
# Parse out the action and action input | |
regex = r"Action: (.*?)[\n]*Action Input:[\s]*(.*)" | |
match = re.search(regex, llm_output, re.DOTALL) | |
if not match: | |
raise ValueError(f"Could not parse LLM output: `{llm_output}`") | |
action = match.group(1).strip() | |
action_input = match.group(2) | |
# Return the action and action input | |
return AgentAction(tool=action, tool_input=action_input.strip(" ").strip('"'), log=llm_output) | |
# Set up a prompt template | |
class CustomPromptTemplate(BaseChatPromptTemplate): | |
# The template to use | |
template: str | |
# The list of tools available | |
tools: List[Tool] | |
def format_messages(self, **kwargs) -> str: | |
# Get the intermediate steps (AgentAction, Observation tuples) | |
# Format them in a particular way | |
intermediate_steps = kwargs.pop("intermediate_steps") | |
thoughts = "" | |
for action, observation in intermediate_steps: | |
thoughts += action.log | |
thoughts += f"\nObservation: {observation}\nThought: " | |
# Set the agent_scratchpad variable to that value | |
kwargs["agent_scratchpad"] = thoughts | |
# Create a tools variable from the list of tools provided | |
kwargs["tools"] = "\n".join([f"{tool.name}: {tool.description}" for tool in self.tools]) | |
# Create a list of tool names for the tools provided | |
kwargs["tool_names"] = ", ".join([tool.name for tool in self.tools]) | |
formatted = self.template.format(**kwargs) | |
return [HumanMessage(content=formatted)] | |
def get_search_index(): | |
global gpt_3_5_index | |
if os.path.isfile(pickle_file) and os.path.isfile(index_file) and os.path.getsize(pickle_file) > 0: | |
# Load index from pickle file | |
with open(pickle_file, "rb") as f: | |
search_index = pickle.load(f) | |
else: | |
search_index = create_index() | |
gpt_3_5_index = search_index | |
def create_index(): | |
source_chunks = create_chunk_documents() | |
search_index = search_index_from_docs(source_chunks) | |
faiss.write_index(search_index.index, index_file) | |
# Save index to pickle file | |
with open(pickle_file, "wb") as f: | |
pickle.dump(search_index, f) | |
return search_index | |
def create_chunk_documents(): | |
sources = fetch_data_for_embeddings(url, book_file, book_url) | |
# print("sources" + str(len(sources))) | |
splitter = CharacterTextSplitter(separator=" ", chunk_size=800, chunk_overlap=0) | |
source_chunks = splitter.split_documents(sources) | |
for chunk in source_chunks: | |
print("Size of chunk: " + str(len(chunk.page_content) + len(chunk.metadata))) | |
if chunk.page_content is None or chunk.page_content == '': | |
print("removing chunk: "+ chunk.page_content) | |
source_chunks.remove(chunk) | |
elif len(chunk.page_content) >=1000: | |
print("splitting document") | |
source_chunks.extend(splitter.split_documents([chunk])) | |
# print("Chunks: " + str(len(source_chunks)) + "and type " + str(type(source_chunks))) | |
return source_chunks | |
def fetch_data_for_embeddings(url, book_file, book_url): | |
sources = get_website_data(url) | |
sources.extend(get_document_data(book_file, book_url)) | |
return sources | |
def get_website_data(index_url): | |
# Get all page paths from index | |
paths = get_paths(index_url) | |
# Filter out invalid links and join them with the base URL | |
links = get_links(index_url, paths) | |
return get_content_from_links(links, index_url) | |
def get_content_from_links(links, index_url): | |
content_list = [] | |
for link in set(links): | |
if link.startswith(index_url): | |
page_data = requests.get(link).content | |
soup = BeautifulSoup(page_data, "html.parser") | |
# Get page content | |
content = soup.get_text(separator="\n") | |
# print(link) | |
# Get page metadata | |
metadata = {"source": link} | |
content_list.append(Document(page_content=content, metadata=metadata)) | |
time.sleep(1) | |
# print("content list" + str(len(content_list))) | |
return content_list | |
def get_paths(index_url): | |
index_data = requests.get(index_url).content | |
soup = BeautifulSoup(index_data, "html.parser") | |
paths = set([a.get('href') for a in soup.find_all('a', href=True)]) | |
return paths | |
def get_links(index_url, paths): | |
links = [] | |
for path in paths: | |
url = urljoin(index_url, path) | |
parsed_url = urlparse(url) | |
if parsed_url.scheme in ["http", "https"] and "squarespace" not in parsed_url.netloc: | |
links.append(url) | |
return links | |
def get_document_data(book_file, book_url): | |
document_list = [] | |
with open(book_file, 'rb') as f: | |
pdf_reader = PdfReader(f) | |
for i in range(len(pdf_reader.pages)): | |
page_text = pdf_reader.pages[i].extract_text() | |
metadata = {"source": book_url} | |
document_list.append(Document(page_content=page_text, metadata=metadata)) | |
# print("document list" + str(len(document_list))) | |
return document_list | |
def search_index_from_docs(source_chunks): | |
# Create index from chunk documents | |
# print("Size of chunk" + str(len(source_chunks))) | |
search_index = FAISS.from_texts([doc.page_content for doc in source_chunks], embeddings, metadatas=[doc.metadata for doc in source_chunks]) | |
return search_index | |
def get_qa_chain(gpt_3_5_index): | |
global gpt_3_5 | |
print("index: " + str(gpt_3_5_index)) | |
return ConversationalRetrievalChain.from_llm(gpt_3_5, chain_type="stuff", get_chat_history=get_chat_history, | |
retriever=gpt_3_5_index.as_retriever(), return_source_documents=True, verbose=True) | |
def get_chat_history(inputs) -> str: | |
res = [] | |
for human, ai in inputs: | |
res.append(f"Human:{human}\nAI:{ai}") | |
return "\n".join(res) | |
def generate_answer(question) -> str: | |
global chat_history, gpt_3_5_index | |
gpt_3_5_chain = get_qa_chain(gpt_3_5_index) | |
result = gpt_3_5_chain( | |
{"question": question, "chat_history": chat_history,"vectordbkwargs": {"search_distance": 0.8}}) | |
print("REsult: " + str(result)) | |
chat_history = [(question, result["answer"])] | |
sources = [] | |
for document in result['source_documents']: | |
source = document.metadata['source'] | |
sources.append(source) | |
source = ',\n'.join(set(sources)) | |
return result['answer'] + '\nSOURCES: ' + source | |
def get_agent_chain(prompt, tools): | |
global gpt_3_5 | |
# output_parser = CustomOutputParser() | |
llm_chain = LLMChain(llm=gpt_3_5, prompt=prompt) | |
agent = ConversationalAgent(llm_chain=llm_chain, tools=tools, verbose=True) | |
agent_chain = AgentExecutor.from_agent_and_tools(agent=agent, tools=tools, verbose=True, memory=memory, | |
intermediate_steps=True) | |
return agent_chain | |
def get_prompt_and_tools(): | |
tools = get_tools() | |
prefix = """Have a conversation with a human, answering the following questions as best you can. | |
Always try to use Vectorstore first. | |
Your name is Makerlab Bot because you are a personal assistant of Makerlab. You have access to the following tools:""" | |
suffix = """Begin! If you use any tool, ALWAYS return a "SOURCES" part in your answer" | |
{chat_history} | |
Question: {input} | |
{agent_scratchpad} | |
SOURCES:""" | |
prompt = ConversationalAgent.create_prompt( | |
tools, | |
prefix=prefix, | |
suffix=suffix, | |
input_variables=["input", "chat_history", "agent_scratchpad"] | |
) | |
# print("Template: " + prompt.template) | |
return prompt, tools | |
def get_tools(): | |
tools = [ | |
Tool( | |
name="Vectorstore", | |
func=generate_answer, | |
description="useful for when you need to answer questions about the Makerlab or 3D Printing.", | |
return_direct=True | |
)] | |
return tools | |
def get_custom_agent(prompt, tools): | |
llm_chain = LLMChain(llm=gpt_3_5, prompt=prompt) | |
output_parser = CustomOutputParser() | |
tool_names = [tool.name for tool in tools] | |
agent = LLMSingleActionAgent( | |
llm_chain=llm_chain, | |
output_parser=output_parser, | |
stop=["\nObservation:"], | |
allowed_tools=tool_names | |
) | |
agent_executor = AgentExecutor.from_agent_and_tools(agent=agent, tools=tools, verbose=True, memory=memory, | |
intermediate_steps=True) | |
return agent_executor | |
def get_prompt_and_tools_for_custom_agent(): | |
template = """ | |
Have a conversation with a human, answering the following questions as best you can. | |
Always try to use Vectorstore first. | |
Your name is Makerlab Bot because you are a personal assistant of Makerlab. You have access to the following tools: | |
{tools} | |
To answer for the new input, use the following format: | |
New Input: the input question you must answer | |
Thought: Do I need to use a tool? Yes | |
Action: the action to take, should be one of [{tool_names}] | |
Action Input: the input to the action | |
Observation: the result of the action | |
... (this Thought/Action/Action Input/Observation can repeat N times) | |
Thought: I now know the final answer | |
Final Answer: the final answer to the original input question. SOURCES: the sources referred to find the final answer | |
When you have a response to say to the Human and DO NOT need to use a tool: | |
1. DO NOT return "SOURCES" if you did not use any tool. | |
2. You MUST use this format: | |
``` | |
Thought: Do I need to use a tool? No | |
AI: [your response here] | |
``` | |
Begin! Remember to speak as a personal assistant when giving your final answer. | |
ALWAYS return a "SOURCES" part in your answer, if you used any tool. | |
Previous conversation history: | |
{chat_history} | |
New input: {input} | |
{agent_scratchpad} | |
SOURCES:""" | |
tools = get_tools() | |
prompt = CustomPromptTemplate( | |
template=template, | |
tools=tools, | |
# This omits the `agent_scratchpad`, `tools`, and `tool_names` variables because those are generated dynamically | |
# This includes the `intermediate_steps` variable because that is needed | |
input_variables=["input", "intermediate_steps", "chat_history"] | |
) | |
return prompt, tools |