Spaces:
Sleeping
Sleeping
import openai | |
import pinecone | |
from langchain_community.embeddings import HuggingFaceEmbeddings | |
from langchain.text_splitter import CharacterTextSplitter | |
from langchain.docstore.document import Document | |
from langchain.prompts import PromptTemplate | |
from langchain.memory import ConversationBufferMemory | |
import boto3 | |
import os | |
from time import sleep | |
from dotenv import load_dotenv | |
import gradio as gr | |
# Load environment variables | |
load_dotenv() | |
# Load OpenAI and Pinecone API keys from environment variables | |
openai.api_key = os.getenv("OPENAI_API_KEY") | |
pinecone_api_key = os.getenv("PINECONE_API_KEY") | |
aws_access_key = os.getenv("AWS_ACCESS_KEY_ID") | |
aws_secret_key = os.getenv("AWS_SECRET_ACCESS_KEY") | |
# Download the combined extracted text file from S3 | |
s3_client = boto3.client('s3', | |
aws_access_key_id=aws_access_key, | |
aws_secret_access_key=aws_secret_key, | |
region_name='us-east-1') | |
bucket_name = 'amtrak-superliner-ai-poc' # Replace with your S3 bucket name | |
txt_file_name = 'combined_extracted_text.txt' # Name of the text file stored in S3 | |
local_txt_path = f'/tmp/{txt_file_name}' # Temporary location to store the file locally | |
# Download the text file from S3 | |
s3_client.download_file(bucket_name, txt_file_name, local_txt_path) | |
# Load the extracted text from the text file | |
with open(local_txt_path, 'r') as f: | |
doc = f.read() | |
# Split the document into smaller chunks (increase chunk size as needed) | |
text_splitter = CharacterTextSplitter(separator='\n', chunk_size=2000, chunk_overlap=500) | |
docs = [Document(page_content=doc)] | |
split_docs = text_splitter.split_documents(docs) | |
# Initialize the HuggingFace SciBERT model for embedding | |
embedding_model = HuggingFaceEmbeddings(model_name="sentence-transformers/msmarco-distilbert-base-v4") | |
# Create embeddings for the document chunks | |
doc_embeddings = [embedding_model.embed_query(doc.page_content) for doc in split_docs] | |
# Initialize Pinecone client using the old structure you prefer | |
pc = pinecone.Pinecone(api_key=pinecone_api_key) | |
# Create Pinecone index if it doesn't exist | |
index_name = "amtrak-acela-ai-demo" | |
embedding_dim = 768 # For SciBERT model | |
if index_name not in pc.list_indexes().names(): | |
# Create Pinecone index if it doesn't exist | |
pc.create_index( | |
name=index_name, | |
dimension=embedding_dim, | |
metric="cosine", | |
spec=pinecone.ServerlessSpec(cloud="aws", region="us-east-1") | |
) | |
# Connect to the Pinecone index | |
index = pc.Index(index_name) | |
# Upload document embeddings to Pinecone with metadata | |
for i, doc in enumerate(split_docs): | |
index.upsert(vectors=[(str(i), doc_embeddings[i], {'content': doc.page_content})]) | |
# Set up conversation memory | |
memory = ConversationBufferMemory() | |
# Define a prompt template for retrieval-augmented generation (RAG) | |
RAG_PROMPT_TEMPLATE = ''' | |
Here is some important context that can help inform the Human's question: | |
{context} | |
Human: {human_input} | |
Please provide a specific and accurate answer based on the provided context. | |
Assistant: | |
''' | |
PROMPT = PromptTemplate.from_template(RAG_PROMPT_TEMPLATE) | |
def get_model_response(human_input, chat_history=None): # Add the second argument to handle chat history | |
try: | |
# Step 1: Embed the user input | |
query_embedding = embedding_model.embed_query(human_input) | |
# Step 2: Query Pinecone using the embedding vector | |
search_results = index.query( | |
vector=query_embedding, | |
top_k=5, | |
include_metadata=True # Ensures metadata is included in the results | |
) | |
# Step 3: Extract relevant context (actual document content) from the search results | |
context_list = [] | |
for ind, result in enumerate(search_results['matches']): | |
document_content = result.get('metadata', {}).get('content', 'No content found') | |
context_list.append(f"Document {ind+1}: {document_content}") | |
# Combine context into a string | |
context_string = '\n\n'.join(context_list) | |
# Step 4: Call OpenAI ChatCompletion API for responses | |
messages = [ | |
{"role": "system", "content": "You are a helpful assistant."}, | |
{"role": "user", "content": f"Here is some context:\n{context_string}\n\nUser's question: {human_input}"} | |
] | |
response = openai.ChatCompletion.create( | |
model="gpt-3.5-turbo", | |
messages=messages, | |
max_tokens=400, | |
temperature=0.7 | |
) | |
# Extract and return the model’s output | |
output_text = response['choices'][0]['message']['content'].strip() | |
return output_text | |
except Exception as e: | |
return f"Error invoking model: {str(e)}" | |
# Gradio ChatInterface | |
gr_interface = gr.ChatInterface( | |
fn=get_model_response, | |
title="Amtrak Acela RMM Maintenance Assistant", | |
description="Ask questions related to the RMMM documents." | |
) | |
# Launch the Gradio app on Hugging Face Spaces | |
gr_interface.launch() |