from typing import List from haystack.dataclasses import ChatMessage from pypdf import PdfReader from haystack.utils import Secret from haystack import Pipeline, Document, component from haystack.components.preprocessors import DocumentCleaner, DocumentSplitter from haystack.components.writers import DocumentWriter from haystack.components.embedders import SentenceTransformersDocumentEmbedder, SentenceTransformersTextEmbedder from haystack.document_stores.in_memory import InMemoryDocumentStore from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever from haystack.components.builders import DynamicChatPromptBuilder from haystack.components.generators.chat import OpenAIChatGenerator, HuggingFaceTGIChatGenerator from haystack.document_stores.types import DuplicatePolicy SENTENCE_RETREIVER_MODEL = "sentence-transformers/all-MiniLM-L6-v2" MAX_TOKENS = 500 template = """ As a professional HR recruiter given the following information, answer the question shortly and concisely in 1 or 2 sentences. Context: {% for document in documents %} {{ document.content }} {% endfor %} Question: {{question}} Answer: """ @component class UploadedFileConverter: """ A component to convert uploaded PDF files to Documents """ @component.output_types(documents=List[Document]) def run(self, uploaded_file): pdf = PdfReader(uploaded_file) documents = [] # uploaded file name without .pdf at the end and with _ and page number at the end name = uploaded_file.name.rstrip('.PDF') + '_' for page in pdf.pages: documents.append( Document( content=page.extract_text(), meta={'name': name + f"_{page.page_number}"})) return {"documents": documents} def create_ingestion_pipeline(document_store): doc_embedder = SentenceTransformersDocumentEmbedder(model=SENTENCE_RETREIVER_MODEL) doc_embedder.warm_up() pipeline = Pipeline() pipeline.add_component("converter", UploadedFileConverter()) pipeline.add_component("cleaner", DocumentCleaner()) pipeline.add_component("splitter", DocumentSplitter(split_by="passage", split_length=100, split_overlap=10)) pipeline.add_component("embedder", doc_embedder) pipeline.add_component("writer", DocumentWriter(document_store=document_store, policy=DuplicatePolicy.OVERWRITE)) pipeline.connect("converter", "cleaner") pipeline.connect("cleaner", "splitter") pipeline.connect("splitter", "embedder") pipeline.connect("embedder", "writer") return pipeline def create_inference_pipeline(document_store, model_name, api_key): if model_name == "local LLM": generator = OpenAIChatGenerator(api_key=Secret.from_token(""), model=model_name, api_base_url="http://localhost:1234/v1", generation_kwargs={"max_tokens": MAX_TOKENS} ) elif "gpt" in model_name: generator = OpenAIChatGenerator(api_key=Secret.from_token(api_key), model=model_name, generation_kwargs={"max_tokens": MAX_TOKENS}, streaming_callback=lambda chunk: print(chunk.content, end="", flush=True), ) else: generator = HuggingFaceTGIChatGenerator(token=Secret.from_token(api_key), model=model_name, generation_kwargs={"max_new_tokens": MAX_TOKENS} ) pipeline = Pipeline() pipeline.add_component("text_embedder", SentenceTransformersTextEmbedder(model=SENTENCE_RETREIVER_MODEL)) pipeline.add_component("retriever", InMemoryEmbeddingRetriever(document_store, top_k=3)) pipeline.add_component("prompt_builder", DynamicChatPromptBuilder(runtime_variables=["query", "documents"])) pipeline.add_component("llm", generator) pipeline.connect("text_embedder.embedding", "retriever.query_embedding") pipeline.connect("retriever.documents", "prompt_builder.documents") pipeline.connect("prompt_builder.prompt", "llm.messages") return pipeline class DocumentQAEngine: def __init__(self, model_name, api_key=None ): self.api_key = api_key self.model_name = model_name document_store = InMemoryDocumentStore() self.chunks = [] self.inference_pipeline = create_inference_pipeline(document_store, model_name, api_key) self.pdf_ingestion_pipeline = create_ingestion_pipeline(document_store) def ingest_pdf(self, uploaded_file): self.pdf_ingestion_pipeline.run({"converter": {"uploaded_file": uploaded_file}}) def inference(self, query, input_messages: List[dict]): system_message = ChatMessage.from_system( "You are a consultant answering questions about potential AI use cases based on the uploaded document. Please provide accurate, concise answers in 3-5 sentences, referencing both the document content and additional sources.") messages = [system_message] for message in input_messages: if message["role"] == "user": messages.append(ChatMessage.from_system(message["content"])) else: messages.append( ChatMessage.from_user(message["content"])) messages.append(ChatMessage.from_user(""" Relevant information from the uploaded documents: {% for doc in documents %} {{ doc.content }} {% endfor %} \nQuestion: {{query}} \nAnswer: """)) res = self.inference_pipeline.run(data={"text_embedder": {"text": query}, "prompt_builder": {"prompt_source": messages, "query": query }}) return res["llm"]["replies"][0].content