Spaces:
Sleeping
Sleeping
import os | |
import dotenv | |
import openai | |
import pinecone | |
from langchain.document_loaders import Docx2txtLoader | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
import hashlib | |
from time import sleep | |
from helper import append_file | |
import json | |
## Read the environment variables | |
dotenv.load_dotenv('.env') | |
openai.api_key = os.getenv('OPENAI_API_KEY') | |
embedding_model = os.getenv('EMBEDDING_ENGINE') | |
debug_mode = os.getenv('DEBUG') | |
file_path = os.getenv('GAME_DOCS_FOLDER') | |
file_name = os.getenv('GAME_DOCS_FILE') | |
game_index = os.getenv('GAME_ID_INDEX') | |
pinecone_api_key = os.getenv('PINECONE_API_KEY') | |
pinecone_env = os.getenv('PINECONE_REGION') | |
pinecone_index = os.getenv('PINECONE_INDEX') | |
pinecone.init( | |
api_key=pinecone_api_key, | |
environment=pinecone_env | |
) | |
# check if index_name' index already exists (only create index if not) | |
if pinecone_index not in pinecone.list_indexes(): | |
pinecone.create_index(pinecone_index, dimension=1536, metric="cosine", pods=1, pod_type="p1.x1") | |
sleep(3) | |
vector_db = pinecone.Index(pinecone_index) | |
def perform_embedding(doclist): | |
payload=list() | |
m = hashlib.md5() | |
# convert file_name to unique ID | |
m.update(file_name.encode('utf-8')) | |
game_id = m.hexdigest()[:12] | |
json_val = {"game_id":game_id, "game_file":file_name} | |
append_file(f"{file_path}/{game_index}",json.dumps(json_val)) | |
for i in range(len(doclist)): | |
unique_id = game_id + "-" + str(i) | |
content = doclist[i].page_content | |
content = content.encode(encoding='ASCII',errors='ignore').decode() | |
response = openai.Embedding.create(model=embedding_model, input=content) | |
metadata = {'game_id': game_id, 'split_count': i, 'text': content} | |
vector = response['data'][0]['embedding'] | |
payload.append((unique_id, vector, metadata)) | |
return payload | |
def load_split_document(): | |
loader = Docx2txtLoader(file_path + "/" + file_name) | |
word_doc_data = loader.load() | |
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=0) | |
docs = text_splitter.split_documents(word_doc_data) | |
if debug_mode == 'True': | |
print("Total count of splits created: " + str(len(docs))) | |
return docs | |
def upload_game_docs(): | |
docs = load_split_document() | |
payload = perform_embedding(docs) | |
vector_db.upsert(payload) | |
if __name__ == '__main__': | |
upload_game_docs() |