Spaces:
Sleeping
Sleeping
import os | |
import pdf2image | |
import pytesseract | |
import streamlit as st | |
from langchain_community.vectorstores import FAISS | |
from langchain_core.messages import AIMessage, HumanMessage | |
from langchain_core.output_parsers import StrOutputParser | |
from langchain_core.prompts import ChatPromptTemplate | |
from langchain_openai.chat_models.azure import ChatOpenAI | |
from langchain_openai.embeddings.azure import OpenAIEmbeddings | |
from langchain_text_splitters import RecursiveCharacterTextSplitter | |
from s3bucket import upload_to_s3 | |
vector_database_name = "Adina_Vector_Database" | |
temp_pdf_folder = "temp-pdf-files" | |
RETRIEVER = None | |
def delete_temp_files(): | |
for item in os.listdir(temp_pdf_folder): | |
file_path = os.path.join(temp_pdf_folder, item) | |
os.remove(file_path) | |
def extract_text(file): | |
if file.type == "application/pdf": | |
images = pdf2image.convert_from_bytes(file.getvalue()) | |
text = "" | |
for img in images: | |
text += pytesseract.image_to_string(img) | |
else: | |
st.error("Invalid file type. Please upload pdf file.") | |
return None | |
return text | |
def load_and_split(file): | |
if not os.path.exists(temp_pdf_folder): | |
os.makedirs(temp_pdf_folder) | |
local_filepath = os.path.join(temp_pdf_folder, file.name) | |
with open(local_filepath, "wb") as f: | |
f.write(file.getvalue()) | |
upload_to_s3(file_path=local_filepath, file_name=file.name) | |
text = extract_text(file) | |
if text: | |
text_splitter = RecursiveCharacterTextSplitter( | |
chunk_size=1000, chunk_overlap=200 | |
) | |
texts = text_splitter.split_text(text) | |
docs = text_splitter.create_documents( | |
texts=texts, metadatas=[{"file_name": file.name}] * len(texts) | |
) | |
delete_temp_files() | |
return docs | |
def initialize_vector_db(): | |
vector_database = FAISS.from_texts( | |
["Adina Cosmetic Ingredients"], OpenAIEmbeddings() | |
) | |
vector_database.save_local(f"{vector_database_name}") | |
return vector_database | |
def load_vector_db(): | |
if os.path.exists(f"{vector_database_name}"): | |
return FAISS.load_local( | |
f"{vector_database_name}", | |
OpenAIEmbeddings(), | |
allow_dangerous_deserialization=True, | |
) | |
return initialize_vector_db() | |
def append_to_vector_db(docs: list = []): | |
global RETRIEVER | |
existing_vector_db = load_vector_db() | |
new_vector_db = FAISS.from_documents(docs, OpenAIEmbeddings()) | |
existing_vector_db.merge_from(new_vector_db) | |
existing_vector_db.save_local(f"{vector_database_name}") | |
RETRIEVER = existing_vector_db.as_retriever() | |
def create_embeddings(files: list = []): | |
for file in files: | |
docs = load_and_split(file) | |
append_to_vector_db(docs=docs) | |
st.session_state.last_uploaded_files.append(file.name) | |
print(file.name, "processed successfully.") | |
def get_response(user_query, chat_history): | |
docs = RETRIEVER.invoke(user_query) | |
template = """ | |
Your name is ADINA, who provides helpful information about Adina Consmetic Ingredients. | |
<rules> | |
- Answer the question based on the retrieved information only. | |
- If the question can not be answered, simply say you can not annswer it. | |
- Avoid mentioning that you are answering based on retreived information. | |
</rules> | |
Execute the below mandatory considerations when responding to the inquiries: | |
--- Tone - Respectful, Patient, and Encouraging: | |
Maintain a tone that is not only polite but also encouraging. Positive language can help build confidence, especially when they are trying to learn something new. | |
Be mindful of cultural references or idioms that may not be universally understood or may date back to a different era, ensuring relatability. | |
--- Clarity - Simple, Direct, and Unambiguous: | |
Avoid abbreviations, slang, or colloquialisms that might be confusing. Stick to standard language. | |
Use bullet points or numbered lists to break down instructions or information, which can aid in comprehension. | |
--- Structure - Organized, Consistent, and Considerate: | |
Include relevant examples or analogies that relate to experiences common in their lifetime, which can aid in understanding complex topics. | |
--- Empathy and Understanding - Compassionate and Responsive: | |
Recognize and validate their feelings or concerns. Phrases like, “It’s completely normal to find this challenging,” can be comforting. | |
Be aware of the potential need for more frequent repetition or rephrasing of information for clarity. | |
Answer the following questions considering the history of the conversation and retrieved information. | |
Chat history: {chat_history} | |
retrieved information: {retrieved_info} | |
User question: {user_question} | |
""" | |
prompt = ChatPromptTemplate.from_template(template) | |
llm = ChatOpenAI(model="gpt-3.5-turbo-0125", streaming=True) | |
chain = prompt | llm | StrOutputParser() | |
return chain.stream( | |
{ | |
"chat_history": chat_history, | |
"retrieved_info": docs, | |
"user_question": user_query, | |
} | |
) | |
def main(): | |
st.set_page_config(page_title="Adina Cosmetic Ingredients", page_icon="") | |
st.title("Adina Cosmetic Ingredients") | |
if "last_uploaded_files" not in st.session_state: | |
st.session_state.last_uploaded_files = [] | |
if "chat_history" not in st.session_state: | |
st.session_state.chat_history = [ | |
AIMessage(content="Hello, I am Adina. How can I help you?"), | |
] | |
for message in st.session_state.chat_history: | |
if isinstance(message, AIMessage): | |
with st.chat_message("AI"): | |
st.write(message.content) | |
elif isinstance(message, HumanMessage): | |
with st.chat_message("Human"): | |
st.write(message.content) | |
user_query = st.chat_input("Type your message here...") | |
if user_query is not None and user_query != "": | |
st.session_state.chat_history.append(HumanMessage(content=user_query)) | |
with st.chat_message("Human"): | |
st.markdown(user_query) | |
with st.chat_message("AI"): | |
response = st.write_stream( | |
get_response( | |
user_query=user_query, chat_history=st.session_state.chat_history | |
) | |
) | |
st.session_state.chat_history.append(AIMessage(content=response)) | |
uploaded_files = st.sidebar.file_uploader( | |
label="Upload files", type="pdf", accept_multiple_files=True | |
) | |
to_be_vectorised_files = [ | |
item | |
for item in uploaded_files | |
if item.name not in st.session_state.last_uploaded_files | |
] | |
if to_be_vectorised_files: | |
create_embeddings(to_be_vectorised_files) | |
if __name__ == "__main__": | |
RETRIEVER = load_vector_db().as_retriever() | |
main() | |