|
import nltk |
|
nltk.download('punkt_tab') |
|
|
|
import os |
|
from dotenv import load_dotenv |
|
import asyncio |
|
from fastapi import FastAPI, Request, WebSocket, WebSocketDisconnect |
|
from fastapi.responses import HTMLResponse |
|
from fastapi.templating import Jinja2Templates |
|
from fastapi.middleware.cors import CORSMiddleware |
|
from langchain.chains import create_history_aware_retriever, create_retrieval_chain |
|
from langchain.chains.combine_documents import create_stuff_documents_chain |
|
from langchain_community.chat_message_histories import ChatMessageHistory |
|
from langchain_core.chat_history import BaseChatMessageHistory |
|
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder |
|
from langchain_core.runnables.history import RunnableWithMessageHistory |
|
from pinecone import Pinecone |
|
from pinecone_text.sparse import BM25Encoder |
|
from langchain_huggingface import HuggingFaceEmbeddings |
|
from langchain_community.retrievers import PineconeHybridSearchRetriever |
|
from langchain.retrievers import ContextualCompressionRetriever |
|
from langchain_community.chat_models import ChatPerplexity |
|
from langchain.retrievers.document_compressors import CrossEncoderReranker |
|
from langchain_community.cross_encoders import HuggingFaceCrossEncoder |
|
from langchain_core.prompts import PromptTemplate |
|
import re |
|
|
|
|
|
load_dotenv(".env") |
|
USER_AGENT = os.getenv("USER_AGENT") |
|
GROQ_API_KEY = os.getenv("GROQ_API_KEY") |
|
SECRET_KEY = os.getenv("SECRET_KEY") |
|
PINECONE_API_KEY = os.getenv("PINECONE_API_KEY") |
|
SESSION_ID_DEFAULT = "abc123" |
|
|
|
|
|
os.environ['USER_AGENT'] = USER_AGENT |
|
os.environ["GROQ_API_KEY"] = GROQ_API_KEY |
|
os.environ["TOKENIZERS_PARALLELISM"] = 'true' |
|
|
|
|
|
app = FastAPI() |
|
origins = ["*"] |
|
|
|
app.add_middleware( |
|
CORSMiddleware, |
|
allow_origins=origins, |
|
allow_credentials=True, |
|
allow_methods=["*"], |
|
allow_headers=["*"], |
|
) |
|
|
|
templates = Jinja2Templates(directory="templates") |
|
|
|
|
|
def initialize_pinecone(index_name: str): |
|
try: |
|
pc = Pinecone(api_key=PINECONE_API_KEY) |
|
return pc.Index(index_name) |
|
except Exception as e: |
|
print(f"Error initializing Pinecone: {e}") |
|
raise |
|
|
|
|
|
|
|
|
|
|
|
|
|
pinecone_index = initialize_pinecone("abu-dhabi-tourism-department") |
|
bm25 = BM25Encoder().load("./abu-dhabi-culture-tourism.json") |
|
|
|
|
|
|
|
|
|
|
|
embed_model = HuggingFaceEmbeddings(model_name="jinaai/jina-embeddings-v3", model_kwargs={"trust_remote_code":True}) |
|
retriever = PineconeHybridSearchRetriever( |
|
embeddings=embed_model, |
|
sparse_encoder=bm25, |
|
index=pinecone_index, |
|
top_k=10, |
|
alpha=0.5, |
|
) |
|
|
|
|
|
llm = ChatPerplexity(temperature=0, pplx_api_key=GROQ_API_KEY, model="llama-3.1-sonar-large-128k-chat", max_tokens=512, max_retries=2) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
contextualize_q_system_prompt = """Given a chat history and the latest user question \ |
|
which might reference context in the chat history, formulate a standalone question \ |
|
which can be understood without the chat history. Do NOT answer the question, \ |
|
just reformulate it if needed and otherwise return it as is. |
|
""" |
|
contextualize_q_prompt = ChatPromptTemplate.from_messages( |
|
[ |
|
("system", contextualize_q_system_prompt), |
|
MessagesPlaceholder("chat_history"), |
|
("human", "{input}") |
|
] |
|
) |
|
history_aware_retriever = create_history_aware_retriever(llm, retriever, contextualize_q_prompt) |
|
|
|
|
|
qa_system_prompt = """ You are a highly skilled information retrieval assistant. Use the following context to answer questions effectively. |
|
If you don't know the answer, state that you don't know. |
|
YOUR ANSWER SHOULD BE IN '{language}' LANGUAGE. |
|
When responding to queries, follow these guidelines: |
|
1. Provide Clear Answers: |
|
- You have to answer in that language based on the given language of the answer. If it is English, answer it in English; if it is Arabic, you should answer it in Arabic. |
|
- Ensure the response directly addresses the query with accurate and relevant information. |
|
- Do not give long answers. Provide detailed but concise responses. |
|
|
|
2. Formatting for Readability: |
|
- Provide the entire response in proper markdown format. |
|
- Use structured Markdown elements such as headings, subheadings, lists, tables, and links. |
|
- Use emphasis on headings, important texts, and phrases. |
|
|
|
3. Proper Citations: |
|
- Always use inline citations with embedded source URLs. |
|
- The inline citations should be in the format [1], [2], etc. |
|
- DO NOT INCLUDE THE 'References' SECTION IN THE RESPONSE. |
|
|
|
FOLLOW ALL THE GIVEN INSTRUCTIONS, FAILURE TO DO SO WILL RESULT IN THE TERMINATION OF THE CHAT. |
|
|
|
== CONTEXT == |
|
{context} |
|
""" |
|
qa_prompt = ChatPromptTemplate.from_messages( |
|
[ |
|
("system", qa_system_prompt), |
|
MessagesPlaceholder("chat_history"), |
|
("human", "{input}") |
|
] |
|
) |
|
|
|
document_prompt = PromptTemplate(input_variables=["page_content", "source"], template="{page_content} \n\n Source: {source}") |
|
question_answer_chain = create_stuff_documents_chain(llm, qa_prompt, document_prompt=document_prompt) |
|
|
|
|
|
rag_chain = create_retrieval_chain(history_aware_retriever, question_answer_chain) |
|
|
|
|
|
store = {} |
|
|
|
def get_session_history(session_id: str) -> BaseChatMessageHistory: |
|
if session_id not in store: |
|
store[session_id] = ChatMessageHistory() |
|
return store[session_id] |
|
|
|
|
|
conversational_rag_chain = RunnableWithMessageHistory( |
|
rag_chain, |
|
get_session_history, |
|
input_messages_key="input", |
|
history_messages_key="chat_history", |
|
language_message_key="language", |
|
output_messages_key="answer", |
|
) |
|
|
|
|
|
|
|
@app.websocket("/ws") |
|
async def websocket_endpoint(websocket: WebSocket): |
|
await websocket.accept() |
|
print(f"Client connected: {websocket.client}") |
|
session_id = None |
|
try: |
|
while True: |
|
data = await websocket.receive_json() |
|
question = data.get('question') |
|
language = data.get('language') |
|
if "en" in language: |
|
language = "English" |
|
else: |
|
language = "Arabic" |
|
session_id = data.get('session_id', SESSION_ID_DEFAULT) |
|
|
|
try: |
|
|
|
async def stream_response(): |
|
complete_response = "" |
|
context = {} |
|
async for chunk in conversational_rag_chain.astream( |
|
{"input": question, 'language': language}, |
|
config={"configurable": {"session_id": session_id}} |
|
): |
|
if "context" in chunk: |
|
context = chunk['context'] |
|
|
|
if "answer" in chunk: |
|
complete_response += chunk['answer'] |
|
await websocket.send_json({'response': chunk['answer']}) |
|
|
|
if context: |
|
citations = re.findall(r'\[(\d+)\]', complete_response) |
|
citation_numbers = list(map(int, citations)) |
|
sources = dict() |
|
backup = dict() |
|
i=1 |
|
for index, doc in enumerate(context): |
|
if (index+1) in citation_numbers: |
|
sources[f"[{index+1}]"] = doc.metadata["source"] |
|
else: |
|
if doc.metadata["source"] not in backup.values(): |
|
backup[f"[{i}]"] = doc.metadata["source"] |
|
i += 1 |
|
if sources: |
|
await websocket.send_json({'sources': sources}) |
|
else: |
|
await websocket.send_json({'sources': backup}) |
|
|
|
await stream_response() |
|
except Exception as e: |
|
print(f"Error during message handling: {e}") |
|
await websocket.send_json({'response': "Something went wrong, Please try again." + str(e)}) |
|
except WebSocketDisconnect: |
|
print(f"Client disconnected: {websocket.client}") |
|
if session_id: |
|
store.pop(session_id, None) |
|
|
|
|
|
@app.get("/", response_class=HTMLResponse) |
|
async def read_index(request: Request): |
|
return templates.TemplateResponse("chat.html", {"request": request}) |
|
|