kenken999's picture
updatest
14dc68f
import os
import csv
import shutil
import openai
import pandas as pd
import numpy as np
from transformers import GPT2TokenizerFast
from dotenv import load_dotenv
import time
# Heavily derived from OpenAi's cookbook example
load_dotenv()
# the dir is the ./playground directory
REPOSITORY_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), "playground")
class Embeddings:
def __init__(self, workspace_path: str):
self.workspace_path = workspace_path
openai.api_key = os.getenv("OPENAI_API_KEY", "")
self.DOC_EMBEDDINGS_MODEL = f"text-embedding-ada-002"
self.QUERY_EMBEDDINGS_MODEL = f"text-embedding-ada-002"
self.SEPARATOR = "\n* "
self.tokenizer = GPT2TokenizerFast.from_pretrained("gpt2")
self.separator_len = len(self.tokenizer.tokenize(self.SEPARATOR))
def compute_repository_embeddings(self):
try:
playground_data_path = os.path.join(self.workspace_path, 'playground_data')
# Delete the contents of the playground_data directory but not the directory itself
# This is to ensure that we don't have any old data lying around
for filename in os.listdir(playground_data_path):
file_path = os.path.join(playground_data_path, filename)
try:
if os.path.isfile(file_path) or os.path.islink(file_path):
os.unlink(file_path)
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
except Exception as e:
print(f"Failed to delete {file_path}. Reason: {str(e)}")
except Exception as e:
print(f"Error: {str(e)}")
# extract and save info to csv
info = self.extract_info(REPOSITORY_PATH)
self.save_info_to_csv(info)
df = pd.read_csv(os.path.join(self.workspace_path, 'playground_data\\repository_info.csv'))
df = df.set_index(["filePath", "lineCoverage"])
self.df = df
context_embeddings = self.compute_doc_embeddings(df)
self.save_doc_embeddings_to_csv(context_embeddings, df, os.path.join(self.workspace_path, 'playground_data\\doc_embeddings.csv'))
try:
self.document_embeddings = self.load_embeddings(os.path.join(self.workspace_path, 'playground_data\\doc_embeddings.csv'))
except:
pass
# Extract information from files in the repository in chunks
# Return a list of [filePath, lineCoverage, chunkContent]
def extract_info(self, REPOSITORY_PATH):
# Initialize an empty list to store the information
info = []
LINES_PER_CHUNK = 60
# Iterate through the files in the repository
for root, dirs, files in os.walk(REPOSITORY_PATH):
for file in files:
file_path = os.path.join(root, file)
# Read the contents of the file
with open(file_path, "r", encoding="utf-8") as f:
try:
contents = f.read()
except:
continue
# Split the contents into lines
lines = contents.split("\n")
# Ignore empty lines
lines = [line for line in lines if line.strip()]
# Split the lines into chunks of LINES_PER_CHUNK lines
chunks = [
lines[i:i+LINES_PER_CHUNK]
for i in range(0, len(lines), LINES_PER_CHUNK)
]
# Iterate through the chunks
for i, chunk in enumerate(chunks):
# Join the lines in the chunk back into a single string
chunk = "\n".join(chunk)
# Get the first and last line numbers
first_line = i * LINES_PER_CHUNK + 1
last_line = first_line + len(chunk.split("\n")) - 1
line_coverage = (first_line, last_line)
# Add the file path, line coverage, and content to the list
info.append((os.path.join(root, file), line_coverage, chunk))
# Return the list of information
return info
def save_info_to_csv(self, info):
# Open a CSV file for writing
os.makedirs(os.path.join(self.workspace_path, "playground_data"), exist_ok=True)
with open(os.path.join(self.workspace_path, 'playground_data\\repository_info.csv'), "w", newline="") as csvfile:
# Create a CSV writer
writer = csv.writer(csvfile)
# Write the header row
writer.writerow(["filePath", "lineCoverage", "content"])
# Iterate through the info
for file_path, line_coverage, content in info:
# Write a row for each chunk of data
writer.writerow([file_path, line_coverage, content])
def get_relevant_code_chunks(self, task_description: str, task_context: str):
query = task_description + "\n" + task_context
most_relevant_document_sections = self.order_document_sections_by_query_similarity(query, self.document_embeddings)
selected_chunks = []
for _, section_index in most_relevant_document_sections:
try:
document_section = self.df.loc[section_index]
selected_chunks.append(self.SEPARATOR + document_section['content'].replace("\n", " "))
if len(selected_chunks) >= 2:
break
except:
pass
return selected_chunks
def get_embedding(self, text: str, model: str) -> list[float]:
result = openai.Embedding.create(
model=model,
input=text
)
return result["data"][0]["embedding"]
def get_doc_embedding(self, text: str) -> list[float]:
return self.get_embedding(text, self.DOC_EMBEDDINGS_MODEL)
def get_query_embedding(self, text: str) -> list[float]:
return self.get_embedding(text, self.QUERY_EMBEDDINGS_MODEL)
def compute_doc_embeddings(self, df: pd.DataFrame) -> dict[tuple[str, str], list[float]]:
"""
Create an embedding for each row in the dataframe using the OpenAI Embeddings API.
Return a dictionary that maps between each embedding vector and the index of the row that it corresponds to.
"""
embeddings = {}
for idx, r in df.iterrows():
# Wait one second before making the next call to the OpenAI Embeddings API
# print("Waiting one second before embedding next row\n")
time.sleep(1)
embeddings[idx] = self.get_doc_embedding(r.content.replace("\n", " "))
return embeddings
def save_doc_embeddings_to_csv(self, doc_embeddings: dict, df: pd.DataFrame, csv_filepath: str):
# Get the dimensionality of the embedding vectors from the first element in the doc_embeddings dictionary
if len(doc_embeddings) == 0:
return
EMBEDDING_DIM = len(list(doc_embeddings.values())[0])
# Create a new dataframe with the filePath, lineCoverage, and embedding vector columns
embeddings_df = pd.DataFrame(columns=["filePath", "lineCoverage"] + [f"{i}" for i in range(EMBEDDING_DIM)])
# Iterate over the rows in the original dataframe
for idx, _ in df.iterrows():
# Get the embedding vector for the current row
embedding = doc_embeddings[idx]
# Create a new row in the embeddings dataframe with the filePath, lineCoverage, and embedding vector values
row = [idx[0], idx[1]] + embedding
embeddings_df.loc[len(embeddings_df)] = row
# Save the embeddings dataframe to a CSV file
embeddings_df.to_csv(csv_filepath, index=False)
def vector_similarity(self, x: list[float], y: list[float]) -> float:
return np.dot(np.array(x), np.array(y))
def order_document_sections_by_query_similarity(self, query: str, contexts: dict[(str, str), np.array]) -> list[(float, (str, str))]:
"""
Find the query embedding for the supplied query, and compare it against all of the pre-calculated document embeddings
to find the most relevant sections.
Return the list of document sections, sorted by relevance in descending order.
"""
query_embedding = self.get_query_embedding(query)
document_similarities = sorted([
(self.vector_similarity(query_embedding, doc_embedding), doc_index) for doc_index, doc_embedding in contexts.items()
], reverse=True)
return document_similarities
def load_embeddings(self, fname: str) -> dict[tuple[str, str], list[float]]:
df = pd.read_csv(fname, header=0)
max_dim = max([int(c) for c in df.columns if c != "filePath" and c != "lineCoverage"])
return {
(r.filePath, r.lineCoverage): [r[str(i)] for i in range(max_dim + 1)] for _, r in df.iterrows()
}