q-and-a-tool / utils /haystack.py
Henryk Borzymowski
support for switching between tasks and ui
b6208a3
raw
history blame
4.55 kB
import streamlit as st
from utils.config import document_store_configs, model_configs
from haystack import Pipeline
from haystack.schema import Answer
from haystack.document_stores import BaseDocumentStore
from haystack.document_stores import InMemoryDocumentStore, OpenSearchDocumentStore, WeaviateDocumentStore
from haystack.nodes import EmbeddingRetriever, FARMReader, PromptNode
from milvus_haystack import MilvusDocumentStore
#Use this file to set up your Haystack pipeline and querying
@st.cache_resource(show_spinner=False)
def start_document_store(type: str):
#This function starts the documents store of your choice based on your command line preference
if type == 'inmemory':
document_store = InMemoryDocumentStore(use_bm25=True, embedding_dim=384)
documents = [
{
'content': "Pi is a super dog",
'meta': {'name': "pi.txt"}
},
{
'content': "The revenue of siemens is 5 milion Euro",
'meta': {'name': "siemens.txt"}
},
]
document_store.write_documents(documents)
elif type == 'opensearch':
document_store = OpenSearchDocumentStore(scheme = document_store_configs['OPENSEARCH_SCHEME'],
username = document_store_configs['OPENSEARCH_USERNAME'],
password = document_store_configs['OPENSEARCH_PASSWORD'],
host = document_store_configs['OPENSEARCH_HOST'],
port = document_store_configs['OPENSEARCH_PORT'],
index = document_store_configs['OPENSEARCH_INDEX'],
embedding_dim = document_store_configs['OPENSEARCH_EMBEDDING_DIM'])
elif type == 'weaviate':
document_store = WeaviateDocumentStore(host = document_store_configs['WEAVIATE_HOST'],
port = document_store_configs['WEAVIATE_PORT'],
index = document_store_configs['WEAVIATE_INDEX'],
embedding_dim = document_store_configs['WEAVIATE_EMBEDDING_DIM'])
elif type == 'milvus':
document_store = MilvusDocumentStore(uri = document_store_configs['MILVUS_URI'],
index = document_store_configs['MILVUS_INDEX'],
embedding_dim = document_store_configs['MILVUS_EMBEDDING_DIM'],
return_embedding=True)
return document_store
# cached to make index and models load only at start
@st.cache_resource(show_spinner=False)
def start_haystack_extractive(_document_store: BaseDocumentStore):
retriever = EmbeddingRetriever(document_store=_document_store,
embedding_model=model_configs['EMBEDDING_MODEL'],
top_k=5)
_document_store.update_embeddings(retriever)
reader = FARMReader(model_name_or_path=model_configs['EXTRACTIVE_MODEL'])
pipe = Pipeline()
pipe.add_node(component=retriever, name="Retriever", inputs=["Query"])
pipe.add_node(component=reader, name="Reader", inputs=["Retriever"])
return pipe
@st.cache_resource(show_spinner=False)
def start_haystack_rag(_document_store: BaseDocumentStore):
retriever = EmbeddingRetriever(document_store=_document_store,
embedding_model=model_configs['EMBEDDING_MODEL'],
top_k=5)
_document_store.update_embeddings(retriever)
prompt_node = PromptNode(default_prompt_template="deepset/question-answering",
model_name_or_path=model_configs['GENERATIVE_MODEL'],
api_key=model_configs['OPENAI_KEY'])
pipe = Pipeline()
pipe.add_node(component=retriever, name="Retriever", inputs=["Query"])
pipe.add_node(component=prompt_node, name="PromptNode", inputs=["Retriever"])
return pipe
#@st.cache_data(show_spinner=True)
def query(_pipeline, question):
params = {}
results = _pipeline.run(question, params=params)
return results
def initialize_pipeline(task, document_store):
if task == 'extractive':
return start_haystack_extractive(document_store)
elif task == 'rag':
return start_haystack_rag(document_store)