import torch
import time
import pinecone
import pickle
import os
import numpy as np
import hashlib
import gradio as gr
from transformers.generation.stopping_criteria import StoppingCriteria, StoppingCriteriaList
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig
from torch import nn
from sentence_transformers.cross_encoder import CrossEncoder
from peft import PeftModel
from sentence_transformers import SentenceTransformer
from bs4 import BeautifulSoup
import requests
headers = {
"User-Agent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_5) AppleWebKit 537.36 (KHTML, like Gecko) Chrome",
"Accept":"text/html,application/xhtml+xml,application/xml; q=0.9,image/webp,*/*;q=0.8",
'Cookie':'CONSENT=YES+cb.20210418-17-p0.it+FX+917; '
}
def google_search(text):
print(f"Google search on: {text}")
try:
site = requests.get(f'https://www.google.com/search?hl=en&q={text}', headers=headers)
main = BeautifulSoup(site.text, features="html.parser").select_one('#main').select('.VwiC3b.lyLwlc.yDYNvb.W8l4ac')
res = '\n\n'.join([m.get_text() for m in main])
except Exception as ex:
print(f"Error: {ex}")
res = ""
print(f"The result of the google search is: {res}")
return res
PINECONE_API_KEY = os.environ.get("PINECONE_API_KEY")
sentencetransformer_model = SentenceTransformer('sentence-transformers/multi-qa-mpnet-base-cos-v1')
pinecone.init(api_key=PINECONE_API_KEY, environment="gcp-starter")
CACHE_DIR = "./.cache"
INDEX_NAME = "k8s-semantic-search"
if not os.path.exists(CACHE_DIR):
os.makedirs(CACHE_DIR)
def cached(func):
def wrapper(*args, **kwargs):
SEP = "$|$"
cache_token = (
f"{func.__name__}{SEP}"
f"{SEP.join(str(arg) for arg in args)}{SEP}"
f"{SEP.join( str(key) + SEP * 2 + str(val) for key, val in kwargs.items())}"
)
hex_hash = hashlib.sha256(cache_token.encode()).hexdigest()
cache_filename: str = os.path.join(CACHE_DIR, f"{hex_hash}")
if os.path.exists(cache_filename):
with open(cache_filename, "rb") as cache_file:
return pickle.load(cache_file)
result = func(*args, **kwargs)
with open(cache_filename, "wb") as cache_file:
pickle.dump(result, cache_file)
return result
return wrapper
@cached
def create_embedding(text: str):
embed_text = sentencetransformer_model.encode(text)
return embed_text.tolist()
index = pinecone.Index(INDEX_NAME)
def query_from_pinecone(query, top_k=3):
embedding = create_embedding(query)
if not embedding:
return None
return index.query(vector=embedding, top_k=top_k, include_metadata=True).get("matches") # gets the metadata (text)
cross_encoder = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-12-v2")
def get_results_from_pinecone(query, top_k=3, re_rank=True, verbose=True):
results_from_pinecone = query_from_pinecone(query, top_k=top_k)
if not results_from_pinecone:
return []
if verbose:
print("Query:", query)
final_results = []
if re_rank:
if verbose:
print("Document ID (Hash)\t\tRetrieval Score\tCE Score\tText")
sentence_combinations = [
[query, result_from_pinecone["metadata"]["text"]] for result_from_pinecone in results_from_pinecone
]
# Compute the similarity scores for these combinations
similarity_scores = cross_encoder.predict(sentence_combinations, activation_fct=nn.Sigmoid())
# Sort the scores in decreasing order
sim_scores_argsort = reversed(np.argsort(similarity_scores))
# Print the scores
for idx in sim_scores_argsort:
result_from_pinecone = results_from_pinecone[idx]
final_results.append(result_from_pinecone)
if verbose:
print(
f"{result_from_pinecone['id']}\t{result_from_pinecone['score']:.2f}\t{similarity_scores[idx]:.2f}\t{result_from_pinecone['metadata']['text'][:50]}"
)
return final_results
if verbose:
print("Document ID (Hash)\t\tRetrieval Score\tText")
for result_from_pinecone in results_from_pinecone:
final_results.append(result_from_pinecone)
if verbose:
print(
f"{result_from_pinecone['id']}\t{result_from_pinecone['score']:.2f}\t{result_from_pinecone['metadata']['text'][:50]}"
)
return final_results
def semantic_search(prompt):
final_results = get_results_from_pinecone(prompt, top_k=9, re_rank=True, verbose=True)
if not final_results:
return ""
return "\n\n".join(res["metadata"]["text"].strip() for res in final_results[:3])
base_model_id = "mistralai/Mistral-7B-Instruct-v0.1"
lora_model_id = "ComponentSoft/mistral-kubectl-instruct"
tokenizer = AutoTokenizer.from_pretrained(
lora_model_id,
padding_side="left",
add_eos_token=False,
add_bos_token=True,
)
tokenizer.pad_token = tokenizer.eos_token
bnb_config = BitsAndBytesConfig(
load_in_4bit=True, bnb_4bit_use_double_quant=True, bnb_4bit_quant_type="nf4", bnb_4bit_compute_dtype=torch.bfloat16
)
base_model = AutoModelForCausalLM.from_pretrained(
base_model_id,
quantization_config=bnb_config,
use_cache=True,
trust_remote_code=True,
)
model = PeftModel.from_pretrained(base_model, lora_model_id)
model.eval()
def create_stop_criterion(*args):
term_tokens = [torch.tensor(tokenizer.encode(term, add_special_tokens=False)).to("cuda") for term in args]
class CustomStopCriterion(StoppingCriteria):
def __call__(self, input_ids: torch.LongTensor, score: torch.FloatTensor, **kwargs):
return any(torch.equal(e, input_ids[0][-len(e) :]) for e in term_tokens)
return CustomStopCriterion()
eval_stop_criterion = create_stop_criterion("", "#End")
category_stop_criterion = create_stop_criterion("", "\n")
start_template = "### Answer:"
command_template = "# Command:"
end_template = "#End"
def text_to_text_generation(verbose, prompt):
prompt = prompt.strip()
is_kubectl_prompt = (
f"You are a helpful assistant who classifies prompts into three categories. [INST] Respond with 0 if it pertains to a 'kubectl' operation. This is an instruction that can be answered with a 'kubectl' action. Look for keywords like 'get', 'list', 'create', 'show', 'view', and other command-like words. This category is an instruction instead of a question. Respond with 1 only if the prompt is a question, and is about a definition related to Kubernetes, or non-action inquiries. Respond with 2 every other scenario, for example if the question is a general question, not related to Kubernetes or 'kubectl'.\n"
f"So for instance the following:\n"
f'text: "List all pods in Kubernetes"\n'
f"Would get a response:\n"
f"response (0/1/2): 0 [/INST] \n"
f'text: "{prompt}"'
f"response (0/1/2): "
)
model_input = tokenizer(is_kubectl_prompt, return_tensors="pt").to("cuda")
with torch.no_grad():
response = tokenizer.decode(
model.generate(
**model_input,
max_new_tokens=8,
pad_token_id=tokenizer.eos_token_id,
repetition_penalty=1.15,
stopping_criteria=StoppingCriteriaList([category_stop_criterion]),
)[0],
skip_special_tokens=True,
)
response = response[len(is_kubectl_prompt) :]
print(f'{" Query Start ":-^40}')
print("Classified as: " + response)
response_num = 0 if "0" in response else (1 if "1" in response else 2)
def generate(response_num, prompt, retriever, verbose):
match response_num:
case 0:
prompt = f"[INST] {prompt}\n Lets think step by step. [/INST] {start_template}"
case 1:
if retriever == "semantic_search":
retrieved_results = semantic_search(prompt)
prompt = f"You are a helpful kubernetes professional. [INST] Use the following documentation, if it is relevant to answer the question below. [/INST]\nDocumentation: {retrieved_results} \n [INST] Answer the following question: {prompt} [/INST]\nAnswer: "
elif retriever == "google_search":
retrieved_results = google_search(prompt)
prompt = f"You are a helpful kubernetes professional. [INST] Use the following documentation, if it is relevant to answer the question below. [/INST]\nDocumentation: {retrieved_results} \n [INST] Answer the following question: {prompt} [/INST]\nAnswer: "
else:
prompt = f"[INST] Answer the following question: {prompt} [/INST]\nAnswer: "
case _:
prompt = f"[INST] {prompt} [/INST]"
print("Query:")
print(prompt)
# Generate output
model_input = tokenizer(prompt, return_tensors="pt").to("cuda")
with torch.no_grad():
response = tokenizer.decode(
model.generate(
**model_input,
max_new_tokens=256,
pad_token_id=tokenizer.eos_token_id,
repetition_penalty=1.15,
stopping_criteria=StoppingCriteriaList([eval_stop_criterion]),
)[0],
skip_special_tokens=True,
)
decoded_prompt = tokenizer.decode(tokenizer(prompt).input_ids, skip_special_tokens=True)
start = (
response.index(start_template) + len(start_template) if start_template in response else len(decoded_prompt)
)
start = response.index(command_template) + len(command_template) if command_template in response else start
end = response.index(end_template) if end_template in response else len(response)
return response if verbose else response[start:end].strip()
true_response = generate(response_num, prompt, False, verbose)
true_response_semantic_search = generate(response_num, prompt, "semantic_search", verbose)
true_response_google_search = generate(response_num, prompt, "google_search", verbose)
print("Returned: " + true_response)
print(f'{" QUERY END ":-^40}')
match response_num:
case 0:
mode = "Kubectl"
case 1:
mode = "Kubernetes"
case _:
mode = "Normal"
return (
f"*Mode*: {mode}",
f"# Answer\n\n {true_response}",
f"# Answer with RAG\n\n {true_response_semantic_search}",
f"# Answer with Google search\n\n {true_response_google_search}"
)
iface = gr.Interface(
fn=text_to_text_generation,
inputs=[
gr.components.Checkbox(label="Verbose"),
gr.components.Text(placeholder="prompt here ...", label="Prompt"),
],
outputs=[
gr.components.Markdown(label="Mode"),
gr.components.Markdown(label="Answer Without Retriever"),
gr.components.Markdown(label="Answer With Retriever"),
gr.components.Markdown(label="Answer With Google search"),
],
allow_flagging="never",
)
iface.launch()