import re
import os
import datetime
from typing import Type, Dict, List, Tuple
import time
from itertools import compress
import pandas as pd
import numpy as np
# Model packages
import torch.cuda
from threading import Thread
from transformers import pipeline, TextIteratorStreamer
# Alternative model sources
#from dataclasses import asdict, dataclass
# Langchain functions
from langchain.prompts import PromptTemplate
from langchain_community.vectorstores import FAISS
from langchain_community.retrievers import SVMRetriever
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.docstore.document import Document
# For keyword extraction (not currently used)
#import nltk
#nltk.download('wordnet')
from nltk.corpus import stopwords
from nltk.tokenize import RegexpTokenizer
from nltk.stem import WordNetLemmatizer
#from nltk.stem.snowball import SnowballStemmer
from keybert import KeyBERT
# For Name Entity Recognition model
#from span_marker import SpanMarkerModel # Not currently used
# For BM25 retrieval
import bm25s
import Stemmer
#from gensim.corpora import Dictionary
#from gensim.models import TfidfModel, OkapiBM25Model
#from gensim.similarities import SparseMatrixSimilarity
from llama_cpp import Llama
from huggingface_hub import hf_hub_download
from chatfuncs.prompts import instruction_prompt_template_alpaca, instruction_prompt_mistral_orca, instruction_prompt_phi3, instruction_prompt_llama3, instruction_prompt_qwen
import gradio as gr
torch.cuda.empty_cache()
PandasDataFrame = Type[pd.DataFrame]
embeddings = None # global variable setup
vectorstore = None # global variable setup
model_type = None # global variable setup
max_memory_length = 0 # How long should the memory of the conversation last?
full_text = "" # Define dummy source text (full text) just to enable highlight function to load
model = [] # Define empty list for model functions to run
tokenizer = [] # Define empty list for model functions to run
## Highlight text constants
hlt_chunk_size = 12
hlt_strat = [" ", ". ", "! ", "? ", ": ", "\n\n", "\n", ", "]
hlt_overlap = 4
## Initialise NER model ##
ner_model = []#SpanMarkerModel.from_pretrained("tomaarsen/span-marker-mbert-base-multinerd") # Not currently used
## Initialise keyword model ##
# Used to pull out keywords from chat history to add to user queries behind the scenes
kw_model = pipeline("feature-extraction", model="sentence-transformers/all-MiniLM-L6-v2")
# Currently set gpu_layers to 0 even with cuda due to persistent bugs in implementation with cuda
if torch.cuda.is_available():
torch_device = "cuda"
gpu_layers = 100
else:
torch_device = "cpu"
gpu_layers = 0
print("Running on device:", torch_device)
threads = 8 #torch.get_num_threads()
print("CPU threads:", threads)
# Qwen 2 0.5B (small, fast) Model parameters
temperature: float = 0.1
top_k: int = 3
top_p: float = 1
repetition_penalty: float = 1.15
flan_alpaca_repetition_penalty: float = 1.3
last_n_tokens: int = 64
max_new_tokens: int = 1024
seed: int = 42
reset: bool = False
stream: bool = True
threads: int = threads
batch_size:int = 256
context_length:int = 2048
sample = True
class CtransInitConfig_gpu:
def __init__(self,
last_n_tokens=last_n_tokens,
seed=seed,
n_threads=threads,
n_batch=batch_size,
n_ctx=4096,
n_gpu_layers=gpu_layers):
self.last_n_tokens = last_n_tokens
self.seed = seed
self.n_threads = n_threads
self.n_batch = n_batch
self.n_ctx = n_ctx
self.n_gpu_layers = n_gpu_layers
# self.stop: list[str] = field(default_factory=lambda: [stop_string])
def update_gpu(self, new_value):
self.n_gpu_layers = new_value
class CtransInitConfig_cpu(CtransInitConfig_gpu):
def __init__(self):
super().__init__()
self.n_gpu_layers = 0
gpu_config = CtransInitConfig_gpu()
cpu_config = CtransInitConfig_cpu()
class CtransGenGenerationConfig:
def __init__(self, temperature=temperature,
top_k=top_k,
top_p=top_p,
repeat_penalty=repetition_penalty,
seed=seed,
stream=stream,
max_tokens=max_new_tokens
):
self.temperature = temperature
self.top_k = top_k
self.top_p = top_p
self.repeat_penalty = repeat_penalty
self.seed = seed
self.max_tokens=max_tokens
self.stream = stream
def update_temp(self, new_value):
self.temperature = new_value
# Vectorstore funcs
def docs_to_faiss_save(docs_out:PandasDataFrame, embeddings=embeddings):
print(f"> Total split documents: {len(docs_out)}")
vectorstore_func = FAISS.from_documents(documents=docs_out, embedding=embeddings)
'''
#with open("vectorstore.pkl", "wb") as f:
#pickle.dump(vectorstore, f)
'''
#if Path(save_to).exists():
# vectorstore_func.save_local(folder_path=save_to)
#else:
# os.mkdir(save_to)
# vectorstore_func.save_local(folder_path=save_to)
global vectorstore
vectorstore = vectorstore_func
out_message = "Document processing complete"
#print(out_message)
#print(f"> Saved to: {save_to}")
return out_message
# Prompt functions
def base_prompt_templates(model_type = "Qwen 2 0.5B (small, fast)"):
#EXAMPLE_PROMPT = PromptTemplate(
# template="\nCONTENT:\n\n{page_content}\n\nSOURCE: {source}\n\n",
# input_variables=["page_content", "source"],
#)
CONTENT_PROMPT = PromptTemplate(
template="{page_content}\n\n",#\n\nSOURCE: {source}\n\n",
input_variables=["page_content"]
)
# The main prompt:
if model_type == "Qwen 2 0.5B (small, fast)":
INSTRUCTION_PROMPT=PromptTemplate(template=instruction_prompt_qwen, input_variables=['question', 'summaries'])
elif model_type == "Phi 3.5 Mini (larger, slow)":
INSTRUCTION_PROMPT=PromptTemplate(template=instruction_prompt_phi3, input_variables=['question', 'summaries'])
return INSTRUCTION_PROMPT, CONTENT_PROMPT
def write_out_metadata_as_string(metadata_in):
metadata_string = [f"{' '.join(f'{k}: {v}' for k, v in d.items() if k != 'page_section')}" for d in metadata_in] # ['metadata']
return metadata_string
def generate_expanded_prompt(inputs: Dict[str, str], instruction_prompt, content_prompt, extracted_memory, vectorstore, embeddings, relevant_flag = True, out_passages = 2): # ,
question = inputs["question"]
chat_history = inputs["chat_history"]
print("relevant_flag in generate_expanded_prompt:", relevant_flag)
if relevant_flag == True:
new_question_kworded = adapt_q_from_chat_history(question, chat_history, extracted_memory) # new_question_keywords,
docs_keep_as_doc, doc_df, docs_keep_out = hybrid_retrieval(new_question_kworded, vectorstore, embeddings, k_val = 25, out_passages = out_passages, vec_score_cut_off = 0.85, vec_weight = 1, bm25_weight = 1, svm_weight = 1)
else:
new_question_kworded = question
doc_df = pd.DataFrame()
docs_keep_as_doc = []
docs_keep_out = []
if (not docs_keep_as_doc) | (doc_df.empty):
sorry_prompt = """Say 'Sorry, there is no relevant information to answer this question.'"""
return sorry_prompt, "No relevant sources found.", new_question_kworded
# Expand the found passages to the neighbouring context
print("Doc_df columns:", doc_df.columns)
if 'meta_url' in doc_df.columns:
file_type = determine_file_type(doc_df['meta_url'][0])
else:
file_type = determine_file_type(doc_df['source'][0])
# Only expand passages if not tabular data
if (file_type != ".csv") & (file_type != ".xlsx"):
docs_keep_as_doc, doc_df = get_expanded_passages(vectorstore, docs_keep_out, width=3)
# Build up sources content to add to user display
doc_df['meta_clean'] = write_out_metadata_as_string(doc_df["metadata"]) # [f"{' '.join(f'{k}: {v}' for k, v in d.items() if k != 'page_section')}" for d in doc_df['metadata']]
# Remove meta text from the page content if it already exists there
doc_df['page_content_no_meta'] = doc_df.apply(lambda row: row['page_content'].replace(row['meta_clean'] + ". ", ""), axis=1)
doc_df['content_meta'] = doc_df['meta_clean'].astype(str) + ".
" + doc_df['page_content_no_meta'].astype(str)
#modified_page_content = [f" Document {i+1} - {word}" for i, word in enumerate(doc_df['page_content'])]
modified_page_content = [f" Document {i+1} - {word}" for i, word in enumerate(doc_df['content_meta'])]
docs_content_string = '
'.join(modified_page_content)
sources_docs_content_string = '
'.join(doc_df['content_meta'])#.replace(" "," ")#.strip()
instruction_prompt_out = instruction_prompt.format(question=new_question_kworded, summaries=docs_content_string)
print('Final prompt is: ')
print(instruction_prompt_out)
return instruction_prompt_out, sources_docs_content_string, new_question_kworded
def create_full_prompt(user_input, history, extracted_memory, vectorstore, embeddings, model_type, out_passages, api_model_choice=None, api_key=None, relevant_flag = True):
#if chain_agent is None:
# history.append((user_input, "Please click the button to submit the Huggingface API key before using the chatbot (top right)"))
# return history, history, "", ""
print("\n==== date/time: " + str(datetime.datetime.now()) + " ====")
history = history or []
if api_model_choice and api_model_choice != "None":
print("API model choice detected")
if api_key:
print("API key detected")
return history, "", None, relevant_flag
else:
return history, "", None, relevant_flag
# Create instruction prompt
instruction_prompt, content_prompt = base_prompt_templates(model_type=model_type)
if not user_input.strip():
user_input = "No user input found"
relevant_flag = False
else:
relevant_flag = True
print("User input:", user_input)
instruction_prompt_out, docs_content_string, new_question_kworded =\
generate_expanded_prompt({"question": user_input, "chat_history": history}, #vectorstore,
instruction_prompt, content_prompt, extracted_memory, vectorstore, embeddings, relevant_flag, out_passages)
history.append(user_input)
print("Output history is:", history)
print("Final prompt to model is:",instruction_prompt_out)
return history, docs_content_string, instruction_prompt_out, relevant_flag
# Chat functions
import boto3
import json
from chatfuncs.helper_functions import get_or_create_env_var
# ResponseObject class for AWS Bedrock calls
class ResponseObject:
def __init__(self, text, usage_metadata):
self.text = text
self.usage_metadata = usage_metadata
max_tokens = 4096
AWS_DEFAULT_REGION = get_or_create_env_var('AWS_DEFAULT_REGION', 'eu-west-2')
print(f'The value of AWS_DEFAULT_REGION is {AWS_DEFAULT_REGION}')
bedrock_runtime = boto3.client('bedrock-runtime', region_name=AWS_DEFAULT_REGION)
def call_aws_claude(prompt: str, system_prompt: str, temperature: float, max_tokens: int, model_choice: str) -> ResponseObject:
"""
This function sends a request to AWS Claude with the following parameters:
- prompt: The user's input prompt to be processed by the model.
- system_prompt: A system-defined prompt that provides context or instructions for the model.
- temperature: A value that controls the randomness of the model's output, with higher values resulting in more diverse responses.
- max_tokens: The maximum number of tokens (words or characters) in the model's response.
- model_choice: The specific model to use for processing the request.
The function constructs the request configuration, invokes the model, extracts the response text, and returns a ResponseObject containing the text and metadata.
"""
prompt_config = {
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": max_tokens,
"top_p": 0.999,
"temperature":temperature,
"system": system_prompt,
"messages": [
{
"role": "user",
"content": [
{"type": "text", "text": prompt},
],
}
],
}
body = json.dumps(prompt_config)
modelId = model_choice
accept = "application/json"
contentType = "application/json"
request = bedrock_runtime.invoke_model(
body=body, modelId=modelId, accept=accept, contentType=contentType
)
# Extract text from request
response_body = json.loads(request.get("body").read())
text = response_body.get("content")[0].get("text")
response = ResponseObject(
text=text,
usage_metadata=request['ResponseMetadata']
)
# Now you can access both the text and metadata
#print("Text:", response.text)
print("Metadata:", response.usage_metadata)
return response
def produce_streaming_answer_chatbot(history,
full_prompt,
model_type,
temperature=temperature,
relevant_query_bool=True,
max_new_tokens=max_new_tokens,
sample=sample,
repetition_penalty=repetition_penalty,
top_p=top_p,
top_k=top_k
):
#print("Model type is: ", model_type)
#if not full_prompt.strip():
# if history is None:
# history = []
# return history
if relevant_query_bool == False:
out_message = [("","No relevant query found. Please retry your question")]
history.append(out_message)
yield history
return
if model_type == "Qwen 2 0.5B (small, fast)":
# Get the model and tokenizer, and tokenize the user text.
model_inputs = tokenizer(text=full_prompt, return_tensors="pt", return_attention_mask=False).to(torch_device)
# Start generation on a separate thread, so that we don't block the UI. The text is pulled from the streamer
# in the main thread. Adds timeout to the streamer to handle exceptions in the generation thread.
streamer = TextIteratorStreamer(tokenizer, timeout=120., skip_prompt=True, skip_special_tokens=True)
generate_kwargs = dict(
model_inputs,
streamer=streamer,
max_new_tokens=max_new_tokens,
do_sample=sample,
repetition_penalty=repetition_penalty,
top_p=top_p,
temperature=temperature,
top_k=top_k
)
#print(generate_kwargs)
t = Thread(target=model.generate, kwargs=generate_kwargs)
t.start()
# Pull the generated text from the streamer, and update the model output.
start = time.time()
NUM_TOKENS=0
print('-'*4+'Start Generation'+'-'*4)
history[-1][1] = ""
for new_text in streamer:
try:
if new_text == None: new_text = ""
history[-1][1] += new_text
NUM_TOKENS+=1
yield history
except Exception as e:
print(f"Error during text generation: {e}")
time_generate = time.time() - start
print('\n')
print('-'*4+'End Generation'+'-'*4)
print(f'Num of generated tokens: {NUM_TOKENS}')
print(f'Time for complete generation: {time_generate}s')
print(f'Tokens per secound: {NUM_TOKENS/time_generate}')
print(f'Time per token: {(time_generate/NUM_TOKENS)*1000}ms')
elif model_type == "Phi 3.5 Mini (larger, slow)":
#tokens = model.tokenize(full_prompt)
gen_config = CtransGenGenerationConfig()
gen_config.update_temp(temperature)
print(vars(gen_config))
# Pull the generated text from the streamer, and update the model output.
start = time.time()
NUM_TOKENS=0
print('-'*4+'Start Generation'+'-'*4)
output = model(
full_prompt, **vars(gen_config))
history[-1][1] = ""
for out in output:
if "choices" in out and len(out["choices"]) > 0 and "text" in out["choices"][0]:
history[-1][1] += out["choices"][0]["text"]
NUM_TOKENS+=1
yield history
else:
print(f"Unexpected output structure: {out}")
time_generate = time.time() - start
print('\n')
print('-'*4+'End Generation'+'-'*4)
print(f'Num of generated tokens: {NUM_TOKENS}')
print(f'Time for complete generation: {time_generate}s')
print(f'Tokens per secound: {NUM_TOKENS/time_generate}')
print(f'Time per token: {(time_generate/NUM_TOKENS)*1000}ms')
elif model_type == "anthropic.claude-3-haiku-20240307-v1:0" or model_type == "anthropic.claude-3-sonnet-20240229-v1:0":
system_prompt = "You are answering questions from the user based on source material. Respond with short, factually correct answers."
try:
print("Calling AWS Claude model")
response = call_aws_claude(full_prompt, system_prompt, temperature, max_tokens, model_type)
except Exception as e:
# If fails, try again after 10 seconds in case there is a throttle limit
print(e)
try:
out_message = "API limit hit - waiting 30 seconds to retry."
print(out_message)
time.sleep(30)
response = call_aws_claude(full_prompt, system_prompt, temperature, max_tokens, model_type)
except Exception as e:
print(e)
return "", history
# Update the conversation history with the new prompt and response
history.append({'role': 'user', 'parts': [full_prompt]})
history.append({'role': 'assistant', 'parts': [response.text]})
# Print the updated conversation history
#print("conversation_history:", conversation_history)
return response, history
# Chat helper functions
def adapt_q_from_chat_history(question, chat_history, extracted_memory, keyword_model=""):#keyword_model): # new_question_keywords,
chat_history_str, chat_history_first_q, chat_history_first_ans, max_memory_length = _get_chat_history(chat_history)
if chat_history_str:
# Keyword extraction is now done in the add_inputs_to_history function
#remove_q_stopwords(str(chat_history_first_q) + " " + str(chat_history_first_ans))
new_question_kworded = str(extracted_memory) + ". " + question #+ " " + new_question_keywords
#extracted_memory + " " + question
else:
new_question_kworded = question #new_question_keywords
#print("Question output is: " + new_question_kworded)
return new_question_kworded
def determine_file_type(file_path):
"""
Determine the file type based on its extension.
Parameters:
file_path (str): Path to the file.
Returns:
str: File extension (e.g., '.pdf', '.docx', '.txt', '.html').
"""
return os.path.splitext(file_path)[1].lower()
def create_doc_df(docs_keep_out):
# Extract content and metadata from 'winning' passages.
content=[]
meta=[]
meta_url=[]
page_section=[]
score=[]
doc_df = pd.DataFrame()
for item in docs_keep_out:
content.append(item[0].page_content)
meta.append(item[0].metadata)
meta_url.append(item[0].metadata['source'])
file_extension = determine_file_type(item[0].metadata['source'])
if (file_extension != ".csv") & (file_extension != ".xlsx"):
page_section.append(item[0].metadata['page_section'])
else: page_section.append("")
score.append(item[1])
# Create df from 'winning' passages
doc_df = pd.DataFrame(list(zip(content, meta, page_section, meta_url, score)),
columns =['page_content', 'metadata', 'page_section', 'meta_url', 'score'])
docs_content = doc_df['page_content'].astype(str)
doc_df['full_url'] = "https://" + doc_df['meta_url']
return doc_df
def hybrid_retrieval(new_question_kworded, vectorstore, embeddings, k_val, out_passages,
vec_score_cut_off, vec_weight, bm25_weight, svm_weight): # ,vectorstore, embeddings
#vectorstore=globals()["vectorstore"]
#embeddings=globals()["embeddings"]
doc_df = pd.DataFrame()
docs = vectorstore.similarity_search_with_score(new_question_kworded, k=k_val)
print("Docs from similarity search:")
print(docs)
# Keep only documents with a certain score
docs_len = [len(x[0].page_content) for x in docs]
docs_scores = [x[1] for x in docs]
# Only keep sources that are sufficiently relevant (i.e. similarity search score below threshold below)
score_more_limit = pd.Series(docs_scores) < vec_score_cut_off
docs_keep = list(compress(docs, score_more_limit))
if not docs_keep:
return [], pd.DataFrame(), []
# Only keep sources that are at least 100 characters long
length_more_limit = pd.Series(docs_len) >= 100
docs_keep = list(compress(docs_keep, length_more_limit))
if not docs_keep:
return [], pd.DataFrame(), []
docs_keep_as_doc = [x[0] for x in docs_keep]
docs_keep_length = len(docs_keep_as_doc)
if docs_keep_length == 1:
content=[]
meta_url=[]
score=[]
for item in docs_keep:
content.append(item[0].page_content)
meta_url.append(item[0].metadata['source'])
score.append(item[1])
# Create df from 'winning' passages
doc_df = pd.DataFrame(list(zip(content, meta_url, score)),
columns =['page_content', 'meta_url', 'score'])
docs_content = doc_df['page_content'].astype(str)
docs_url = doc_df['meta_url']
return docs_keep_as_doc, doc_df, docs_content, docs_url
# Check for if more docs are removed than the desired output
if out_passages > docs_keep_length:
out_passages = docs_keep_length
k_val = docs_keep_length
vec_rank = [*range(1, docs_keep_length+1)]
vec_score = [(docs_keep_length/x)*vec_weight for x in vec_rank]
print("Number of documents remaining: ", docs_keep_length)
# 2nd level check using BM25s package to do keyword search on retrieved passages.
content_keep=[]
for item in docs_keep:
content_keep.append(item[0].page_content)
# Prepare Corpus (Tokenized & Optional Stemming)
corpus = [doc.lower() for doc in content_keep]
#stemmer = SnowballStemmer("english", ignore_stopwords=True) # NLTK stemming not compatible
stemmer = Stemmer.Stemmer("english")
corpus_tokens = bm25s.tokenize(corpus, stopwords="en", stemmer=stemmer)
# Create and Index with BM25s
retriever = bm25s.BM25()
retriever.index(corpus_tokens)
# Query Processing (Stemming applied consistently if used above)
query_tokens = bm25s.tokenize(new_question_kworded.lower(), stemmer=stemmer)
results, scores = retriever.retrieve(query_tokens, corpus=corpus, k=len(corpus)) # Retrieve all docs
for i in range(results.shape[1]):
doc, score = results[0, i], scores[0, i]
print(f"Rank {i+1} (score: {score:.2f}): {doc}")
#print("BM25 results:", results)
#print("BM25 scores:", scores)
# Rank Calculation (Custom Logic for Your BM25 Score)
bm25_rank = list(range(1, len(results[0]) + 1))
#bm25_rank = results[0]#.tolist()[0] # Since you have a single query
bm25_score = [(docs_keep_length / (rank + 1)) * bm25_weight for rank in bm25_rank]
# +1 to avoid division by 0 for rank 0
# Result Ordering (Using the calculated ranks)
pairs = list(zip(bm25_rank, docs_keep_as_doc))
pairs.sort()
bm25_result = [value for rank, value in pairs]
# 3rd level check on retrieved docs with SVM retriever
# Check the type of the embeddings object
embeddings_type = type(embeddings)
print("Type of embeddings object:", embeddings_type)
print("embeddings:", embeddings)
from langchain_huggingface import HuggingFaceEmbeddings
#hf_embeddings = HuggingFaceEmbeddings(**embeddings)
hf_embeddings = embeddings
svm_retriever = SVMRetriever.from_texts(content_keep, hf_embeddings, k = k_val)
svm_result = svm_retriever.invoke(new_question_kworded)
svm_rank=[]
svm_score = []
for vec_item in docs_keep:
x = 0
for svm_item in svm_result:
x = x + 1
if svm_item.page_content == vec_item[0].page_content:
svm_rank.append(x)
svm_score.append((docs_keep_length/x)*svm_weight)
## Calculate final score based on three ranking methods
final_score = [a + b + c for a, b, c in zip(vec_score, bm25_score, svm_score)]
final_rank = [sorted(final_score, reverse=True).index(x)+1 for x in final_score]
# Force final_rank to increment by 1 each time
final_rank = list(pd.Series(final_rank).rank(method='first'))
#print("final rank: " + str(final_rank))
#print("out_passages: " + str(out_passages))
best_rank_index_pos = []
for x in range(1,out_passages+1):
try:
best_rank_index_pos.append(final_rank.index(x))
except IndexError: # catch the error
pass
# Adjust best_rank_index_pos to
best_rank_pos_series = pd.Series(best_rank_index_pos)
docs_keep_out = [docs_keep[i] for i in best_rank_index_pos]
# Keep only 'best' options
docs_keep_as_doc = [x[0] for x in docs_keep_out]
# Make df of best options
doc_df = create_doc_df(docs_keep_out)
print("doc_df:",doc_df)
print("docs_keep_as_doc:",docs_keep_as_doc)
print("docs_keep_out:", docs_keep_out)
return docs_keep_as_doc, doc_df, docs_keep_out
def get_expanded_passages(vectorstore, docs, width):
"""
Extracts expanded passages based on given documents and a width for context.
Parameters:
- vectorstore: The primary data source.
- docs: List of documents to be expanded.
- width: Number of documents to expand around a given document for context.
Returns:
- expanded_docs: List of expanded Document objects.
- doc_df: DataFrame representation of expanded_docs.
"""
from collections import defaultdict
def get_docs_from_vstore(vectorstore):
vector = vectorstore.docstore._dict
return list(vector.items())
def extract_details(docs_list):
docs_list_out = [tup[1] for tup in docs_list]
content = [doc.page_content for doc in docs_list_out]
meta = [doc.metadata for doc in docs_list_out]
return ''.join(content), meta[0], meta[-1]
def get_parent_content_and_meta(vstore_docs, width, target):
#target_range = range(max(0, target - width), min(len(vstore_docs), target + width + 1))
target_range = range(max(0, target), min(len(vstore_docs), target + width + 1)) # Now only selects extra passages AFTER the found passage
parent_vstore_out = [vstore_docs[i] for i in target_range]
content_str_out, meta_first_out, meta_last_out = [], [], []
for _ in parent_vstore_out:
content_str, meta_first, meta_last = extract_details(parent_vstore_out)
content_str_out.append(content_str)
meta_first_out.append(meta_first)
meta_last_out.append(meta_last)
return content_str_out, meta_first_out, meta_last_out
def merge_dicts_except_source(d1, d2):
merged = {}
for key in d1:
if key != "source":
merged[key] = str(d1[key]) + " to " + str(d2[key])
else:
merged[key] = d1[key] # or d2[key], based on preference
return merged
def merge_two_lists_of_dicts(list1, list2):
return [merge_dicts_except_source(d1, d2) for d1, d2 in zip(list1, list2)]
# Step 1: Filter vstore_docs
vstore_docs = get_docs_from_vstore(vectorstore)
doc_sources = {doc.metadata['source'] for doc, _ in docs}
vstore_docs = [(k, v) for k, v in vstore_docs if v.metadata.get('source') in doc_sources]
# Step 2: Group by source and proceed
vstore_by_source = defaultdict(list)
for k, v in vstore_docs:
vstore_by_source[v.metadata['source']].append((k, v))
expanded_docs = []
for doc, score in docs:
search_source = doc.metadata['source']
#if file_type == ".csv" | file_type == ".xlsx":
# content_str, meta_first, meta_last = get_parent_content_and_meta(vstore_by_source[search_source], 0, search_index)
#else:
search_section = doc.metadata['page_section']
parent_vstore_meta_section = [doc.metadata['page_section'] for _, doc in vstore_by_source[search_source]]
search_index = parent_vstore_meta_section.index(search_section) if search_section in parent_vstore_meta_section else -1
content_str, meta_first, meta_last = get_parent_content_and_meta(vstore_by_source[search_source], width, search_index)
meta_full = merge_two_lists_of_dicts(meta_first, meta_last)
expanded_doc = (Document(page_content=content_str[0], metadata=meta_full[0]), score)
expanded_docs.append(expanded_doc)
doc_df = pd.DataFrame()
doc_df = create_doc_df(expanded_docs) # Assuming you've defined the 'create_doc_df' function elsewhere
return expanded_docs, doc_df
def highlight_found_text(search_text: str, full_text: str, hlt_chunk_size:int=hlt_chunk_size, hlt_strat:List=hlt_strat, hlt_overlap:int=hlt_overlap) -> str:
"""
Highlights occurrences of search_text within full_text.
Parameters:
- search_text (str): The text to be searched for within full_text.
- full_text (str): The text within which search_text occurrences will be highlighted.
Returns:
- str: A string with occurrences of search_text highlighted.
Example:
>>> highlight_found_text("world", "Hello, world! This is a test. Another world awaits.")
'Hello, world! This is a test. Another world awaits.'
"""
def extract_text_from_input(text, i=0):
if isinstance(text, str):
return text.replace(" ", " ").strip()
elif isinstance(text, list):
return text[i][0].replace(" ", " ").strip()
else:
return ""
def extract_search_text_from_input(text):
if isinstance(text, str):
return text.replace(" ", " ").strip()
elif isinstance(text, list):
return text[-1][1].replace(" ", " ").strip()
else:
return ""
full_text = extract_text_from_input(full_text)
search_text = extract_search_text_from_input(search_text)
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=hlt_chunk_size,
separators=hlt_strat,
chunk_overlap=hlt_overlap,
)
sections = text_splitter.split_text(search_text)
found_positions = {}
for x in sections:
text_start_pos = 0
while text_start_pos != -1:
text_start_pos = full_text.find(x, text_start_pos)
if text_start_pos != -1:
found_positions[text_start_pos] = text_start_pos + len(x)
text_start_pos += 1
# Combine overlapping or adjacent positions
sorted_starts = sorted(found_positions.keys())
combined_positions = []
if sorted_starts:
current_start, current_end = sorted_starts[0], found_positions[sorted_starts[0]]
for start in sorted_starts[1:]:
if start <= (current_end + 10):
current_end = max(current_end, found_positions[start])
else:
combined_positions.append((current_start, current_end))
current_start, current_end = start, found_positions[start]
combined_positions.append((current_start, current_end))
# Construct pos_tokens
pos_tokens = []
prev_end = 0
for start, end in combined_positions:
if end-start > 15: # Only combine if there is a significant amount of matched text. Avoids picking up single words like 'and' etc.
pos_tokens.append(full_text[prev_end:start])
pos_tokens.append('' + full_text[start:end] + '')
prev_end = end
pos_tokens.append(full_text[prev_end:])
return "".join(pos_tokens)
# # Chat history functions
def clear_chat(chat_history_state, sources, chat_message, current_topic):
chat_history_state = []
sources = ''
chat_message = ''
current_topic = ''
return chat_history_state, sources, chat_message, current_topic
def _get_chat_history(chat_history: List[Tuple[str, str]], max_memory_length:int = max_memory_length): # Limit to last x interactions only
if (not chat_history) | (max_memory_length == 0):
chat_history = []
if len(chat_history) > max_memory_length:
chat_history = chat_history[-max_memory_length:]
#print(chat_history)
first_q = ""
first_ans = ""
for human_s, ai_s in chat_history:
first_q = human_s
first_ans = ai_s
#print("Text to keyword extract: " + first_q + " " + first_ans)
break
conversation = ""
for human_s, ai_s in chat_history:
human = f"Human: " + human_s
ai = f"Assistant: " + ai_s
conversation += "\n" + "\n".join([human, ai])
return conversation, first_q, first_ans, max_memory_length
def add_inputs_answer_to_history(user_message, history, current_topic):
if history is None:
history = [("","")]
#history.append((user_message, [-1]))
chat_history_str, chat_history_first_q, chat_history_first_ans, max_memory_length = _get_chat_history(history)
# Only get the keywords for the first question and response, or do it every time if over 'max_memory_length' responses in the conversation
if (len(history) == 1) | (len(history) > max_memory_length):
#print("History after appending is:")
#print(history)
first_q_and_first_ans = str(chat_history_first_q) + " " + str(chat_history_first_ans)
#ner_memory = remove_q_ner_extractor(first_q_and_first_ans)
keywords = keybert_keywords(first_q_and_first_ans, n = 8, kw_model=kw_model)
#keywords.append(ner_memory)
# Remove duplicate words while preserving order
ordered_tokens = set()
result = []
for word in keywords:
if word not in ordered_tokens:
ordered_tokens.add(word)
result.append(word)
extracted_memory = ' '.join(result)
else: extracted_memory=current_topic
print("Extracted memory is:")
print(extracted_memory)
return history, extracted_memory
# Keyword functions
def remove_q_stopwords(question): # Remove stopwords from question. Not used at the moment
# Prepare keywords from question by removing stopwords
text = question.lower()
# Remove numbers
text = re.sub('[0-9]', '', text)
tokenizer = RegexpTokenizer(r'\w+')
text_tokens = tokenizer.tokenize(text)
#text_tokens = word_tokenize(text)
tokens_without_sw = [word for word in text_tokens if not word in stopwords]
# Remove duplicate words while preserving order
ordered_tokens = set()
result = []
for word in tokens_without_sw:
if word not in ordered_tokens:
ordered_tokens.add(word)
result.append(word)
new_question_keywords = ' '.join(result)
return new_question_keywords
def remove_q_ner_extractor(question):
predict_out = ner_model.predict(question)
predict_tokens = [' '.join(v for k, v in d.items() if k == 'span') for d in predict_out]
# Remove duplicate words while preserving order
ordered_tokens = set()
result = []
for word in predict_tokens:
if word not in ordered_tokens:
ordered_tokens.add(word)
result.append(word)
new_question_keywords = ' '.join(result).lower()
return new_question_keywords
def apply_lemmatize(text, wnl=WordNetLemmatizer()):
def prep_for_lemma(text):
# Remove numbers
text = re.sub('[0-9]', '', text)
print(text)
tokenizer = RegexpTokenizer(r'\w+')
text_tokens = tokenizer.tokenize(text)
#text_tokens = word_tokenize(text)
return text_tokens
tokens = prep_for_lemma(text)
def lem_word(word):
if len(word) > 3: out_word = wnl.lemmatize(word)
else: out_word = word
return out_word
return [lem_word(token) for token in tokens]
def keybert_keywords(text, n, kw_model):
tokens_lemma = apply_lemmatize(text)
lemmatised_text = ' '.join(tokens_lemma)
keywords_text = KeyBERT(model=kw_model).extract_keywords(lemmatised_text, stop_words='english', top_n=n,
keyphrase_ngram_range=(1, 1))
keywords_list = [item[0] for item in keywords_text]
return keywords_list
# Gradio functions
def turn_off_interactivity(user_message, history):
return gr.update(value="", interactive=False), history + [[user_message, None]]
def restore_interactivity():
return gr.update(interactive=True)
def update_message(dropdown_value):
return gr.Textbox(value=dropdown_value)
def hide_block():
return gr.Radio(visible=False)
# Vote function
def vote(data: gr.LikeData, chat_history, instruction_prompt_out, model_type):
import os
import pandas as pd
chat_history_last = str(str(chat_history[-1][0]) + " - " + str(chat_history[-1][1]))
response_df = pd.DataFrame(data={"thumbs_up":data.liked,
"chosen_response":data.value,
"input_prompt":instruction_prompt_out,
"chat_history":chat_history_last,
"model_type": model_type,
"date_time": pd.Timestamp.now()}, index=[0])
if data.liked:
print("You upvoted this response: " + data.value)
if os.path.isfile("thumbs_up_data.csv"):
existing_thumbs_up_df = pd.read_csv("thumbs_up_data.csv")
thumbs_up_df_concat = pd.concat([existing_thumbs_up_df, response_df], ignore_index=True).drop("Unnamed: 0",axis=1, errors="ignore")
thumbs_up_df_concat.to_csv("thumbs_up_data.csv")
else:
response_df.to_csv("thumbs_up_data.csv")
else:
print("You downvoted this response: " + data.value)
if os.path.isfile("thumbs_down_data.csv"):
existing_thumbs_down_df = pd.read_csv("thumbs_down_data.csv")
thumbs_down_df_concat = pd.concat([existing_thumbs_down_df, response_df], ignore_index=True).drop("Unnamed: 0",axis=1, errors="ignore")
thumbs_down_df_concat.to_csv("thumbs_down_data.csv")
else:
response_df.to_csv("thumbs_down_data.csv")