Spaces:
Runtime error
Runtime error
from langchain import PromptTemplate | |
from langchain.chat_models import ChatOpenAI | |
from langchain.chains.summarize import load_summarize_chain | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain.document_loaders import DirectoryLoader | |
from wordcloud import WordCloud, STOPWORDS | |
import numpy as np | |
from langchain.embeddings import OpenAIEmbeddings | |
from sklearn.cluster import KMeans | |
from sklearn.metrics import silhouette_score | |
import os | |
from langchain.docstore.document import Document | |
import re | |
from collections import Counter | |
# import nltk | |
from nltk.corpus import stopwords | |
from config import OPENAI_API_KEY | |
## Added Key that is provided by Yasir bahi | |
os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY # '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' | |
class Extract_Summary: | |
def __init__(self,text_input, file_path=None, chunks=2000, chunking_strategy=None, LLM_Model="gpt-3.5-turbo", temperature=1, top_p=None, top_k=None): | |
self.chunks = chunks | |
self.file_path = file_path | |
self.text_input = text_input | |
self.chuking_strategy = chunking_strategy | |
self.LLM_Model = LLM_Model | |
self.temperature = temperature | |
self.top_p = top_p | |
self.top_k = top_k | |
def doc_summary(self, docs): | |
# print(f'You have {len(docs)} documents') | |
num_words = sum([len(doc.page_content.split(" ")) for doc in docs]) | |
# print(f"You have {num_words} words in documents") | |
return num_words, len(docs) | |
def load_docs(self): | |
if self.file_path is not None: | |
docs = DirectoryLoader(self.file_path, glob="**/*.txt").load() | |
else: | |
docs = Document(page_content=f"{self.text_input}", metadata={"source": "local"}) | |
docs = [docs] | |
# docs = self.text_input | |
tokens, documents_count = self.doc_summary(docs) | |
if documents_count > 8 or tokens > 6000: ## Add token checks as well. Add Model availabilty checks | |
docs = self.chunk_docs(docs) ## Handling Large Document with token more than 6000 | |
docs = self.summarise_large_documents(docs) | |
tokens, documents_count = self.doc_summary(docs) | |
if tokens > 2000: | |
docs = self.chunk_docs(docs) | |
chain_type = 'map_reduce' | |
else: | |
chain_type = 'stuff' | |
print("=="*20) | |
print(tokens) | |
print(chain_type) | |
return docs, chain_type | |
## Add ensemble retriver for this as well. | |
def summarise_large_documents(self, docs): | |
print("=="*20) | |
print('Orignial Docs size : ' ,len(docs)) | |
embeddings = OpenAIEmbeddings() | |
vectors = embeddings. embed_documents([x.page_content for x in docs]) | |
# Silhoute Score | |
n_clusters_range = range(2, 11) | |
silhouette_scores = [] | |
for i in n_clusters_range: | |
kmeans = KMeans(n_clusters=i, init='k-means++', | |
max_iter=300, n_init=10, random_state=0) | |
kmeans.fit(vectors) | |
score = silhouette_score(vectors, kmeans.labels_) | |
silhouette_scores.append(score) | |
optimal_n_clusters = n_clusters_range[np.argmax(silhouette_scores)] | |
# n_clusters = 5 | |
kmeans = KMeans(n_clusters=optimal_n_clusters, | |
random_state=42).fit(vectors) | |
# Getting documents closers to centeriod | |
closest_indices = [] | |
# Loop through the number of clusters you have | |
for i in range(optimal_n_clusters): | |
# Get the list of distances from that particular cluster center | |
distances = np.linalg.norm( | |
vectors - kmeans.cluster_centers_[i], axis=1) | |
# Find the list position of the closest one (using argmin to find the smallest distance) | |
closest_index = np.argmin(distances) | |
# Append that position to your closest indices list | |
closest_indices.append(closest_index) | |
sorted_indices = sorted(closest_indices) | |
selected_docs = [docs[doc] for doc in sorted_indices] | |
print('Selected Docs size : ' ,len(selected_docs)) | |
return selected_docs | |
def chunk_docs(self, docs): | |
text_splitter = RecursiveCharacterTextSplitter( | |
chunk_size=self.chunks, | |
chunk_overlap=50, | |
length_function=len, | |
is_separator_regex=False, | |
) | |
splitted_document = text_splitter.split_documents(docs) | |
return splitted_document | |
def get_key_information_stuff(self): | |
prompt_template = """ | |
Extract Key Informtion from the text below. This key information can include People Names & their Role/rank, Locations, Organization,Nationalities,Religions, | |
Events such as Historical, social, sporting and naturally occurring events, Products , Address & email, URL, Date & Time, Provide the list of Key information each | |
should be labeled with thier crossponding category.if key information related to category is not present,add "Not mentioned" in the response. | |
{text} | |
""" | |
prompt = PromptTemplate( | |
template=prompt_template, input_variables=['text']) | |
return prompt | |
def get_key_information_map_reduce(self): | |
map_prompts = """ | |
Extract Key Informtion from the text below. This key information can include People Names & their Role/rank, Locations, Organization,Nationalities,Religions, | |
Events such as Historical, social, sporting and naturally occurring events, Products , Address & email, URL, Date & Time, Provide the list of Key information each | |
should be labeled with thier crossponding category.if key information related to category is not present, add Not mentioned in the response. | |
{text} | |
""" | |
combine_prompt = """ | |
Below Text contains Key Information that was extracted from text. You job is to combine the Key Information and Return the results.This key information can include People Names & their Role/rank, | |
Locations, Organization,Nationalities,Religions,Events such as Historical, social, sporting and naturally occurring events, Products , | |
Address & email, URL, Date & Time, Provide the list of Key information each should be labeled with thier crossponding category. | |
if key information related to category is not present, add Not mentioned in the response. | |
{text} | |
""" | |
map_template = PromptTemplate(template=map_prompts,input_variables=['text'] | |
) | |
# combine_template = PromptTemplate(template=combine_prompt,input_variables=['Summary_type','Summary_strategy','Target_Person_type','Response_lenght','Writing_style','text'] | |
# ) | |
combine_template = PromptTemplate(template=combine_prompt,input_variables=['text']) | |
return map_template, combine_template | |
def get_stuff_prompt(self): | |
prompt_template = """ | |
Write a {Summary_type} and {Summary_strategy} for {Target_Person_type} lenght of the summary should be of {Response_length} words and writing style should be of {Writing_style}. | |
From the text below by identifying most important topics based on their importance in text corpus and summary should be based on these important topics. | |
{text} | |
""" | |
# prompt = PromptTemplate.from_template(prompt_template,input_variables=['Summary_type','Summary_strategy','Target_Person_type','Response_lenght','Writing_style','text']) | |
prompt = PromptTemplate( | |
template=prompt_template, input_variables=['Summary_type','Summary_strategy','Target_Person_type','Response_length','Writing_style','text']) | |
return prompt | |
def define_prompts(self): | |
map_prompts = """ | |
"Identify the key topics in the following text. in your response only add the most relevant and most important topics and Concised yet eloborative summary of text below. | |
Dont add all the topics that you find.if you didnt find any important topic,dont return anything in response.Also provide me importance score of each idenfied topics out of 1. | |
'Your response should be like this , eg: Summary of text: blah blah blah,list of comma saperated topic names `Topic 1 Topic 2 Topic 3` | |
and list of comma saperated importance scores for these topics `1 , 0.5,0.2`, so response should be formated like this. | |
Summary: | |
blah Blah blah | |
Topic Names : Topic 1, Topic 2, Topic 3 | |
Importance Score: 1,0.4,0.3 | |
{text} | |
""" | |
combine_prompt = """ | |
Here is list of summaries ,Topics Names and thier respective importance score that were extracted from text. | |
your job is to provide best possible summary based on the list of summaries below and Use most important topics present based on thier importance score. | |
Write a {Summary_type} and {Summary_strategy} for {Target_Person_type} lenght of the summary should be of {Response_length} words and writing style should be of {Writing_style}. | |
{text} | |
output Format should be like this.Dont try Return to multiple summaries.Only return one combined summary for above mentioned summaries. | |
Summary: | |
blah blah blah | |
""" | |
map_template = PromptTemplate(template=map_prompts, input_variables=['text'] | |
) | |
combine_template = PromptTemplate( | |
template=combine_prompt, input_variables=['Summary_type','Summary_strategy','Target_Person_type','Response_length','Writing_style','text']) | |
return map_template, combine_template | |
# pass | |
def define_chain(self,Summary_type,Summary_strategy, | |
Target_Person_type,Response_length,Writing_style,chain_type=None,key_information=False): | |
docs, chain_type = self.load_docs() | |
llm = ChatOpenAI(model='gpt-3.5-turbo', temperature=0) | |
if chain_type == 'stuff': | |
if key_information: | |
prompt = self.get_key_information_stuff() | |
else: | |
prompt = self.get_stuff_prompt() | |
chain = load_summarize_chain( | |
llm=llm, chain_type='stuff', verbose=False,prompt=prompt) | |
elif chain_type == 'map_reduce': | |
if key_information: | |
map_prompts, combine_prompt = self.get_key_information_map_reduce() | |
else: | |
map_prompts, combine_prompt = self.define_prompts() | |
chain = load_summarize_chain( | |
llm=llm, map_prompt=map_prompts, combine_prompt=combine_prompt, chain_type='map_reduce', verbose=False) | |
# elif chain_type == 'refine': | |
# chain = load_summarize_chain(llm=llm, question_prompt=map_prompts, | |
# refine_prompt=combine_prompt, chain_type='refine', verbose=False) | |
if ~key_information: | |
output = chain.run(Summary_type=Summary_type,Summary_strategy=Summary_strategy, | |
Target_Person_type=Target_Person_type,Response_length=Response_length,Writing_style=Writing_style,input_documents = docs) | |
else: | |
output = chain.run(input_documents = docs) | |
# self.create_wordcloud(output=output) | |
# display(Markdown(f"Text: {docs}")) | |
# display(Markdown(f"Summary Response: {output}")) | |
return output | |
def parse_key_information(self,text): | |
lines = [line.strip() for line in text.split('\n') if line.strip()] | |
# Initialize the dictionary to store information | |
info_dict = {} | |
current_category = None | |
# Iterate through each line and process the information | |
for line in lines: | |
if re.match(r'^[A-Z][\w\s&/-]*:', line): | |
current_category = line.rstrip(':') | |
info_dict[current_category] = [] | |
else: | |
if line != '- Not mentioned': | |
info_dict[current_category].append(line.replace('- ', '')) | |
# Remove categories with no entries | |
info_dict = {category: entries for category, entries in info_dict.items() if entries} | |
return info_dict | |
# def create_wordcloud(self, output): | |
# wc = WordCloud(stopwords=STOPWORDS, height=500, width=300) | |
# wc.generate(output) | |
# wc.to_file('WordCloud.png') | |
def create_word_count(text): | |
# Split the text into words, convert them to lowercase | |
words = text.split() | |
words = [word.lower() for word in words] | |
# Get a list of English stop words | |
stop_words = set(stopwords.words('english')) | |
# Filter out stop words from the list of words | |
filtered_words = [word for word in words if word not in stop_words] | |
# Count the frequencies of each word | |
word_counts = Counter(filtered_words) | |
# Convert the Counter object to a dictionary | |
word_count_dict = dict(word_counts) | |
return word_count_dict | |
class AudioBookNarration: | |
def __init__(self,text_input ,file_path=None, chunks=2000, chunking_strategy=None, LLM_Model="gpt-3.5-turbo", temperature=1, top_p=None, top_k=None): | |
self.chunks = chunks | |
self.file_path = file_path | |
self.text_input = text_input | |
self.chuking_strategy = chunking_strategy | |
self.LLM_Model = LLM_Model | |
self.temperature = temperature | |
self.top_p = top_p | |
self.top_k = top_k | |
def doc_summary(self, docs): | |
# print(f'You have {len(docs)} documents') | |
num_words = sum([len(doc.page_content.split(" ")) for doc in docs]) | |
# print(f"You have {num_words} words in documents") | |
return num_words, len(docs) | |
def load_docs(self): | |
if self.file_path is not None: | |
docs = DirectoryLoader(self.file_path, glob="**/*.txt").load() | |
else: | |
docs = Document(page_content=f"{self.text_input}", metadata={"source": "local"}) | |
docs = [docs] | |
# docs = self.text_input | |
tokens, documents_count = self.doc_summary(docs) | |
if documents_count > 8 or tokens > 6000: ## Add token checks as well. Add Model availabilty checks | |
docs = self.chunk_docs(docs) ## Handling Large Document with token more than 6000 | |
docs = self.summarise_large_documents(docs) | |
tokens, documents_count = self.doc_summary(docs) | |
if tokens > 2000: | |
docs = self.chunk_docs(docs) | |
chain_type = 'map_reduce' | |
else: | |
chain_type = 'stuff' | |
print("=="*20) | |
print(tokens) | |
print(chain_type) | |
return docs, chain_type | |
## Add ensemble retriver for this as well. | |
def summarise_large_documents(self, docs): | |
print("=="*20) | |
print('Orignial Docs size : ' ,len(docs)) | |
embeddings = OpenAIEmbeddings() | |
vectors = embeddings. embed_documents([x.page_content for x in docs]) | |
# Silhoute Score | |
n_clusters_range = range(2, 11) | |
silhouette_scores = [] | |
for i in n_clusters_range: | |
kmeans = KMeans(n_clusters=i, init='k-means++', | |
max_iter=300, n_init=10, random_state=0) | |
kmeans.fit(vectors) | |
score = silhouette_score(vectors, kmeans.labels_) | |
silhouette_scores.append(score) | |
optimal_n_clusters = n_clusters_range[np.argmax(silhouette_scores)] | |
# n_clusters = 5 | |
kmeans = KMeans(n_clusters=optimal_n_clusters, | |
random_state=42).fit(vectors) | |
# Getting documents closers to centeriod | |
closest_indices = [] | |
# Loop through the number of clusters you have | |
for i in range(optimal_n_clusters): | |
# Get the list of distances from that particular cluster center | |
distances = np.linalg.norm( | |
vectors - kmeans.cluster_centers_[i], axis=1) | |
# Find the list position of the closest one (using argmin to find the smallest distance) | |
closest_index = np.argmin(distances) | |
# Append that position to your closest indices list | |
closest_indices.append(closest_index) | |
sorted_indices = sorted(closest_indices) | |
selected_docs = [docs[doc] for doc in sorted_indices] | |
print('Selected Docs size : ' ,len(selected_docs)) | |
return selected_docs | |
def chunk_docs(self, docs): | |
text_splitter = RecursiveCharacterTextSplitter( | |
chunk_size=self.chunks, | |
chunk_overlap=50, | |
length_function=len, | |
is_separator_regex=False, | |
) | |
splitted_document = text_splitter.split_documents(docs) | |
return splitted_document | |
def get_stuff_prompt(self): | |
prompt_template = """ | |
Create a {narration_style} narration for this below text. This narration will be used for audiobook generation. | |
So provide the output that is verbose, easier to understand and full of expressions. | |
{text} | |
""" | |
prompt = PromptTemplate( | |
template=prompt_template, input_variables=['narration_style','text']) | |
return prompt | |
def define_prompts(self): | |
map_prompts = """ | |
Create a {narration_style} narration for this below text. This narration will be used for audiobook generation. | |
So provide the output that is verbose, easier to understand and full of expressions. | |
{text} | |
""" | |
combine_prompt = """ | |
Below are the list of text that represent narration from the text. | |
Your job is to combine these narrations and craete one verbose,easier to understand and full of experssions {narration_style} narration. | |
{text} | |
""" | |
map_template = PromptTemplate(template=map_prompts, input_variables=['narration_style','text'] | |
) | |
combine_template = PromptTemplate( | |
template=combine_prompt, input_variables=['narration_style','text']) | |
return map_template, combine_template | |
# pass | |
def define_chain(self,narration_style=None,chain_type=None): | |
docs, chain_type = self.load_docs() | |
llm = ChatOpenAI(model='gpt-3.5-turbo', temperature=0) | |
if chain_type == 'stuff': | |
prompt = self.get_stuff_prompt() | |
chain = load_summarize_chain( | |
llm=llm, chain_type='stuff', verbose=False,prompt=prompt) | |
elif chain_type == 'map_reduce': | |
map_prompts, combine_prompt = self.define_prompts() | |
chain = load_summarize_chain( | |
llm=llm, map_prompt=map_prompts, combine_prompt=combine_prompt, chain_type='map_reduce', verbose=False) | |
output = chain.run(narration_style = narration_style,input_documents = docs) | |
# self.create_wordcloud(output=output) | |
# display(Markdown(f"Text: {docs}")) | |
# display(Markdown(f"Summary Response: {output}")) | |
return output | |