q-and-a-tool / document_qa_engine.py
hkoppen
Fix streaming error
d513adc unverified
from typing import List
from haystack.dataclasses import ChatMessage
from pypdf import PdfReader
from haystack.utils import Secret
from haystack import Pipeline, Document, component
from haystack.components.preprocessors import DocumentCleaner, DocumentSplitter
from haystack.components.writers import DocumentWriter
from haystack.components.embedders import SentenceTransformersDocumentEmbedder, SentenceTransformersTextEmbedder
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever
from haystack.components.builders import DynamicChatPromptBuilder
from haystack.components.generators.chat import OpenAIChatGenerator, HuggingFaceTGIChatGenerator
from haystack.document_stores.types import DuplicatePolicy
SENTENCE_RETREIVER_MODEL = "sentence-transformers/all-MiniLM-L6-v2"
MAX_TOKENS = 500
template = """
As a professional HR recruiter given the following information, answer the question shortly and concisely in 1 or 2 sentences.
Context:
{% for document in documents %}
{{ document.content }}
{% endfor %}
Question: {{question}}
Answer:
"""
@component
class UploadedFileConverter:
"""
A component to convert uploaded PDF files to Documents
"""
@component.output_types(documents=List[Document])
def run(self, uploaded_file):
pdf = PdfReader(uploaded_file)
documents = []
# uploaded file name without .pdf at the end and with _ and page number at the end
name = uploaded_file.name.rstrip('.PDF') + '_'
for page in pdf.pages:
documents.append(
Document(
content=page.extract_text(),
meta={'name': name + f"_{page.page_number}"}))
return {"documents": documents}
def create_ingestion_pipeline(document_store):
doc_embedder = SentenceTransformersDocumentEmbedder(model=SENTENCE_RETREIVER_MODEL)
doc_embedder.warm_up()
pipeline = Pipeline()
pipeline.add_component("converter", UploadedFileConverter())
pipeline.add_component("cleaner", DocumentCleaner())
pipeline.add_component("splitter",
DocumentSplitter(split_by="passage", split_length=100, split_overlap=10))
pipeline.add_component("embedder", doc_embedder)
pipeline.add_component("writer",
DocumentWriter(document_store=document_store, policy=DuplicatePolicy.OVERWRITE))
pipeline.connect("converter", "cleaner")
pipeline.connect("cleaner", "splitter")
pipeline.connect("splitter", "embedder")
pipeline.connect("embedder", "writer")
return pipeline
def create_inference_pipeline(document_store, model_name, api_key):
if model_name == "local LLM":
generator = OpenAIChatGenerator(api_key=Secret.from_token("<local LLM doesn't need an API key>"),
model=model_name,
api_base_url="http://localhost:1234/v1",
generation_kwargs={"max_tokens": MAX_TOKENS}
)
elif "gpt" in model_name:
generator = OpenAIChatGenerator(api_key=Secret.from_token(api_key), model=model_name,
generation_kwargs={"max_tokens": MAX_TOKENS},
streaming_callback=lambda chunk: print(chunk.content, end="", flush=True),
)
else:
generator = HuggingFaceTGIChatGenerator(token=Secret.from_token(api_key), model=model_name,
generation_kwargs={"max_new_tokens": MAX_TOKENS}
)
pipeline = Pipeline()
pipeline.add_component("text_embedder",
SentenceTransformersTextEmbedder(model=SENTENCE_RETREIVER_MODEL))
pipeline.add_component("retriever", InMemoryEmbeddingRetriever(document_store, top_k=3))
pipeline.add_component("prompt_builder",
DynamicChatPromptBuilder(runtime_variables=["query", "documents"]))
pipeline.add_component("llm", generator)
pipeline.connect("text_embedder.embedding", "retriever.query_embedding")
pipeline.connect("retriever.documents", "prompt_builder.documents")
pipeline.connect("prompt_builder.prompt", "llm.messages")
return pipeline
class DocumentQAEngine:
def __init__(self,
model_name,
api_key=None
):
self.api_key = api_key
self.model_name = model_name
document_store = InMemoryDocumentStore()
self.chunks = []
self.inference_pipeline = create_inference_pipeline(document_store, model_name, api_key)
self.pdf_ingestion_pipeline = create_ingestion_pipeline(document_store)
def ingest_pdf(self, uploaded_file):
self.pdf_ingestion_pipeline.run({"converter": {"uploaded_file": uploaded_file}})
def inference(self, query, input_messages: List[dict]):
system_message = ChatMessage.from_system(
"You are a professional HR recruiter that answers questions based on the content of the uploaded CV. in 1 or 2 sentences.")
messages = [system_message]
for message in input_messages:
if message["role"] == "user":
messages.append(ChatMessage.from_system(message["content"]))
else:
messages.append(
ChatMessage.from_user(message["content"]))
messages.append(ChatMessage.from_user("""
Relevant information from the uploaded CV:
{% for doc in documents %}
{{ doc.content }}
{% endfor %}
\nQuestion: {{query}}
\nAnswer:
"""))
res = self.inference_pipeline.run(data={"text_embedder": {"text": query},
"prompt_builder": {"prompt_source": messages,
"query": query
}})
return res["llm"]["replies"][0].content