Spaces:
Sleeping
Sleeping
from typing import List | |
from haystack.dataclasses import ChatMessage | |
from pypdf import PdfReader | |
from haystack.utils import Secret | |
from haystack import Pipeline, Document, component | |
from haystack.components.preprocessors import DocumentCleaner, DocumentSplitter | |
from haystack.components.writers import DocumentWriter | |
from haystack.components.embedders import SentenceTransformersDocumentEmbedder, SentenceTransformersTextEmbedder | |
from haystack.document_stores.in_memory import InMemoryDocumentStore | |
from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever | |
from haystack.components.builders import DynamicChatPromptBuilder | |
from haystack.components.generators.chat import OpenAIChatGenerator, HuggingFaceTGIChatGenerator | |
from haystack.document_stores.types import DuplicatePolicy | |
SENTENCE_RETREIVER_MODEL = "sentence-transformers/all-MiniLM-L6-v2" | |
MAX_TOKENS = 500 | |
template = """ | |
As a professional HR recruiter given the following information, answer the question shortly and concisely in 1 or 2 sentences. | |
Context: | |
{% for document in documents %} | |
{{ document.content }} | |
{% endfor %} | |
Question: {{question}} | |
Answer: | |
""" | |
class UploadedFileConverter: | |
""" | |
A component to convert uploaded PDF files to Documents | |
""" | |
def run(self, uploaded_file): | |
pdf = PdfReader(uploaded_file) | |
documents = [] | |
# uploaded file name without .pdf at the end and with _ and page number at the end | |
name = uploaded_file.name.rstrip('.PDF') + '_' | |
for page in pdf.pages: | |
documents.append( | |
Document( | |
content=page.extract_text(), | |
meta={'name': name + f"_{page.page_number}"})) | |
return {"documents": documents} | |
def create_ingestion_pipeline(document_store): | |
doc_embedder = SentenceTransformersDocumentEmbedder(model=SENTENCE_RETREIVER_MODEL) | |
doc_embedder.warm_up() | |
pipeline = Pipeline() | |
pipeline.add_component("converter", UploadedFileConverter()) | |
pipeline.add_component("cleaner", DocumentCleaner()) | |
pipeline.add_component("splitter", | |
DocumentSplitter(split_by="passage", split_length=100, split_overlap=10)) | |
pipeline.add_component("embedder", doc_embedder) | |
pipeline.add_component("writer", | |
DocumentWriter(document_store=document_store, policy=DuplicatePolicy.OVERWRITE)) | |
pipeline.connect("converter", "cleaner") | |
pipeline.connect("cleaner", "splitter") | |
pipeline.connect("splitter", "embedder") | |
pipeline.connect("embedder", "writer") | |
return pipeline | |
def create_inference_pipeline(document_store, model_name, api_key): | |
if model_name == "local LLM": | |
generator = OpenAIChatGenerator(api_key=Secret.from_token("<local LLM doesn't need an API key>"), | |
model=model_name, | |
api_base_url="http://localhost:1234/v1", | |
generation_kwargs={"max_tokens": MAX_TOKENS} | |
) | |
elif "gpt" in model_name: | |
generator = OpenAIChatGenerator(api_key=Secret.from_token(api_key), model=model_name, | |
generation_kwargs={"max_tokens": MAX_TOKENS}, | |
streaming_callback=lambda chunk: print(chunk.content, end="", flush=True), | |
) | |
else: | |
generator = HuggingFaceTGIChatGenerator(token=Secret.from_token(api_key), model=model_name, | |
generation_kwargs={"max_new_tokens": MAX_TOKENS} | |
) | |
pipeline = Pipeline() | |
pipeline.add_component("text_embedder", | |
SentenceTransformersTextEmbedder(model=SENTENCE_RETREIVER_MODEL)) | |
pipeline.add_component("retriever", InMemoryEmbeddingRetriever(document_store, top_k=3)) | |
pipeline.add_component("prompt_builder", | |
DynamicChatPromptBuilder(runtime_variables=["query", "documents"])) | |
pipeline.add_component("llm", generator) | |
pipeline.connect("text_embedder.embedding", "retriever.query_embedding") | |
pipeline.connect("retriever.documents", "prompt_builder.documents") | |
pipeline.connect("prompt_builder.prompt", "llm.messages") | |
return pipeline | |
class DocumentQAEngine: | |
def __init__(self, | |
model_name, | |
api_key=None | |
): | |
self.api_key = api_key | |
self.model_name = model_name | |
document_store = InMemoryDocumentStore() | |
self.chunks = [] | |
self.inference_pipeline = create_inference_pipeline(document_store, model_name, api_key) | |
self.pdf_ingestion_pipeline = create_ingestion_pipeline(document_store) | |
def ingest_pdf(self, uploaded_file): | |
self.pdf_ingestion_pipeline.run({"converter": {"uploaded_file": uploaded_file}}) | |
def inference(self, query, input_messages: List[dict]): | |
system_message = ChatMessage.from_system( | |
"You are a professional HR recruiter that answers questions based on the content of the uploaded CV. in 1 or 2 sentences.") | |
messages = [system_message] | |
for message in input_messages: | |
if message["role"] == "user": | |
messages.append(ChatMessage.from_system(message["content"])) | |
else: | |
messages.append( | |
ChatMessage.from_user(message["content"])) | |
messages.append(ChatMessage.from_user(""" | |
Relevant information from the uploaded CV: | |
{% for doc in documents %} | |
{{ doc.content }} | |
{% endfor %} | |
\nQuestion: {{query}} | |
\nAnswer: | |
""")) | |
res = self.inference_pipeline.run(data={"text_embedder": {"text": query}, | |
"prompt_builder": {"prompt_source": messages, | |
"query": query | |
}}) | |
return res["llm"]["replies"][0].content | |