Spaces:
Sleeping
Sleeping
import streamlit as st | |
import logging | |
from streamlit_chat import message | |
from langchain.chains import ConversationalRetrievalChain | |
from langchain.embeddings import HuggingFaceEmbeddings, CacheBackedEmbeddings, HuggingFaceInstructEmbeddings | |
from langchain.llms import LlamaCpp | |
from langchain.vectorstores import FAISS | |
from langchain.memory import ConversationBufferMemory | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain.document_loaders import PyPDFLoader | |
from langchain.storage import LocalFileStore | |
from langchain.llms import HuggingFaceHub | |
from langchain.embeddings import HuggingFaceInstructEmbeddings | |
from datetime import datetime | |
import os | |
import tempfile | |
import requests # Import requests here | |
now = datetime.now() | |
underlying_embeddings = HuggingFaceEmbeddings() | |
def initialize_session_state(): | |
if 'history' not in st.session_state: | |
st.session_state['history'] = [] | |
if 'generated' not in st.session_state: | |
st.session_state['generated'] = ["Hello! Ask me anything about π€"] | |
if 'past' not in st.session_state: | |
st.session_state['past'] = ["Hey! π"] | |
def conversation_chat(query, chain, history): | |
result = chain({"question": query, "chat_history": history}) | |
history.append((query, result["answer"])) | |
return result["answer"] | |
def cache_checker(question, question_cache, chain): | |
# Check if the response is already cached | |
logging.info("I'm here") | |
if question in question_cache: | |
response = question_cache[question] | |
logging.info("Response retrieved from cache.") | |
else: | |
# Perform the Q&A operation | |
response = chain({"question": question}) | |
question_cache[question] = response["answer"] | |
logging.info("Response computed and cached.") | |
return response["answer"] | |
def display_chat_history(chain): | |
reply_container = st.container() | |
container = st.container() | |
question_cache = {} | |
with container: | |
with st.form(key='my_form', clear_on_submit=True): | |
user_input = st.text_input("Question:", placeholder="Ask about your PDF", key='input') | |
submit_button = st.form_submit_button(label='Send') | |
if submit_button and user_input: | |
with st.spinner('Generating response...'): | |
output = conversation_chat(user_input, chain, st.session_state['history']) | |
# Check if the question is being cached | |
if user_input: | |
if user_input in question_cache: | |
st.info("Response retrieved from cache.") | |
response = question_cache[user_input] | |
else: | |
st.info("Response computed.") | |
response = cache_checker(user_input, question_cache, chain) | |
question_cache[user_input] = response | |
# Display the response | |
st.write("Response:", response) | |
st.session_state['past'].append(user_input) | |
st.session_state['generated'].append(output) | |
if st.session_state['generated']: | |
with reply_container: | |
for i in range(len(st.session_state['generated'])): | |
message(st.session_state["past"][i], is_user=True, key=str(i) + '_user', avatar_style="thumbs") | |
message(st.session_state["generated"][i], key=str(i), avatar_style="fun-emoji") | |
def create_conversational_chain(vector_store): | |
# Create llm | |
llm = LlamaCpp( | |
streaming=True, | |
model_path="mistral-7b-instruct-v0.1.Q2_K.gguf", | |
temperature=0.75, | |
top_p=1, | |
verbose=True, | |
n_ctx=4096 | |
) | |
# llm = HuggingFaceHub(repo_id="HuggingFaceH4/zephyr-7b-beta", model_kwargs={ | |
# "temperature": 0.75, | |
# "n_ctx": 4096, | |
# "streaming":True, | |
# "top_p": 0.99, | |
# "verbose": True, | |
# "max_length": 4096 | |
# }) | |
memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True) | |
chain = ConversationalRetrievalChain.from_llm(llm=llm, chain_type='stuff', | |
retriever=vector_store.as_retriever(search_kwargs={"k": 2}), | |
memory=memory) | |
return chain | |
def main(): | |
# Initialize session state | |
initialize_session_state() | |
st.title("Medbot :books:") | |
# Initialize Streamlit | |
st.sidebar.title("Document Processing") | |
uploaded_files = st.sidebar.file_uploader("Upload files", accept_multiple_files=True) | |
if uploaded_files: | |
text = [] | |
for file in uploaded_files: | |
file_extension = os.path.splitext(file.name)[1] | |
with tempfile.NamedTemporaryFile(delete=False) as temp_file: | |
temp_file.write(file.read()) | |
temp_file_path = temp_file.name | |
# Initialize cache store | |
cache_store = LocalFileStore("./cache/") | |
loader = None | |
if file_extension == ".pdf": | |
loader = PyPDFLoader(temp_file_path) | |
if loader: | |
text.extend(loader.load()) | |
os.remove(temp_file_path) | |
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200) | |
text_chunks = text_splitter.split_documents(text) | |
# Create embeddings | |
embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2", | |
model_kwargs={'device': 'cpu'}) | |
# Create cache-backed embeddings | |
cached_embeddings = CacheBackedEmbeddings.from_bytes_store(embeddings, cache_store, namespace="embeddings") | |
# Cache the embeddings | |
#cache_store.save("embeddings", cached_embeddings) | |
# Create vector store | |
vector_store = FAISS.from_documents(text_chunks, embedding=cached_embeddings) | |
# Create the chain object | |
chain = create_conversational_chain(vector_store) | |
display_chat_history(chain) | |
if __name__ == "__main__": | |
main() | |