from haystack import Document, Pipeline, component
from haystack_integrations.document_stores.qdrant import QdrantDocumentStore
from haystack_integrations.components.retrievers.qdrant import QdrantEmbeddingRetriever
from typing import List
from huggingface_hub import get_inference_endpoint, get_token
from datasets import load_dataset
from time import perf_counter
import gradio as gr
import shutil
import requests
import os
RETRIEVER_TOP_K = 5
RANKER_TOP_K = 2
HF_TOKEN = os.getenv("HF_TOKEN")
RANKER_URL = os.getenv("RANKER_URL")
EMBEDDER_URL = os.getenv("EMBEDDER_URL")
EMBEDDER_IE = get_inference_endpoint(
"fastrag-embedder", namespace="optimum-intel", token=HF_TOKEN
)
RANKER_IE = get_inference_endpoint(
"fastrag-ranker", namespace="optimum-intel", token=HF_TOKEN
)
def check_inference_endpoints():
EMBEDDER_IE.update()
RANKER_IE.update()
messages = []
if EMBEDDER_IE.status in ["initializing", "pending"]:
messages += [
f"Embedder Inference Endpoint is {EMBEDDER_IE.status}. Please wait a few seconds and try again."
]
elif EMBEDDER_IE.status in ["paused", "scaledToZero"]:
messages += [
f"Embedder Inference Endpoint is {EMBEDDER_IE.status}. Resuming it. Please wait a few seconds and try again."
]
EMBEDDER_IE.resume()
if RANKER_IE.status in ["initializing", "pending"]:
messages += [
f"Ranker Inference Endpoint is {RANKER_IE.status}. Please wait a few seconds and try again."
]
elif RANKER_IE.status in ["paused", "scaledToZero"]:
messages += [
f"Ranker Inference Endpoint is {RANKER_IE.status}. Resuming it. Please wait a few seconds and try again."
]
RANKER_IE.resume()
if len(messages) > 0:
return "
".join(messages)
else:
return None
def post(url, payload):
response = requests.post(
url,
json=payload,
headers={"Authorization": f"Bearer {HF_TOKEN}"},
)
return response.json()
def method_timer(method):
def timed(self, *args, **kw):
start_time = perf_counter()
result = method(self, *args, **kw)
end_time = perf_counter()
print(
f"{self.__class__.__name__}.{method.__name__} took {end_time - start_time} seconds"
)
return result
return timed
@component
class InferenceEndpointTextEmbedder:
@component.output_types(embedding=List[float])
def run(self, text: str):
return self.request(text)
@method_timer
def request(self, text: str):
payload = {"text": text, "inputs": ""}
response = post(EMBEDDER_URL, payload)
if "error" in response:
raise gr.Error(response["error"])
return {"embedding": response["embedding"]}
@component
class InferenceEndpointDocumentEmbedder:
@component.output_types(documents=List[Document])
def run(self, documents: List[Document]):
return self.request(documents)
@method_timer
def request(self, documents: List[Document]):
documents = [d.to_dict() for d in documents]
payload = {"documents": documents, "inputs": ""}
response = post(EMBEDDER_URL, payload)
if "error" in response:
raise gr.Error(response["error"])
return {"documents": [Document.from_dict(doc) for doc in response["documents"]]}
@component
class InferenceEndpointRanker:
def __init__(self, top_k: int):
self.top_k = top_k
@component.output_types(documents=List[Document])
def run(self, query: str, documents: List[Document]):
return self.request(query, documents)
@method_timer
def request(self, query: str, documents: List[Document]):
documents = [d.to_dict() for d in documents]
payload = {
"query": query,
"documents": documents,
"top_k": self.top_k,
"inputs": "",
}
response = post(RANKER_URL, payload)
if "error" in response:
raise gr.Error(response["error"])
return {"documents": [Document.from_dict(doc) for doc in response["documents"]]}
document_store = None
if os.path.exists("data/qdrant"):
try:
document_store = QdrantDocumentStore(
path="./data/qdrant",
return_embedding=True,
recreate_index=False,
embedding_dim=384,
)
except Exception:
shutil.rmtree("data/qdrant", ignore_errors=True)
if document_store is None:
document_store = QdrantDocumentStore(
path="./data/qdrant",
return_embedding=True,
recreate_index=True,
embedding_dim=384,
)
dataset = load_dataset("bilgeyucel/seven-wonders")
documents = [Document(**doc) for doc in dataset["train"]]
documents_embedder = InferenceEndpointDocumentEmbedder()
documents_with_embedding = documents_embedder.run(documents)["documents"]
document_store.write_documents(documents_with_embedding)
print(
"Number of embedded documents in DocumentStore:",
document_store.count_documents(),
)
pipe = Pipeline()
embedder = InferenceEndpointTextEmbedder()
ranker = InferenceEndpointRanker(top_k=RANKER_TOP_K)
retriever = QdrantEmbeddingRetriever(
document_store=document_store, top_k=RETRIEVER_TOP_K
)
pipe.add_component("retriever", retriever)
pipe.add_component("embedder", embedder)
pipe.add_component("ranker", ranker)
pipe.connect("retriever", "ranker.documents")
pipe.connect("embedder", "retriever")
print(pipe)
def run(query: str) -> dict:
message = check_inference_endpoints()
if message is not None:
return f"""
{message}
""" pipe_output = pipe.run({"embedder": {"text": query}, "ranker": {"query": query}}) output = """ID: {doc.id}
Score: {doc.score}
Content: {doc.content}
""" return output examples = [ "Where is Gardens of Babylon?", "Why did people build Great Pyramid of Giza?", "What does Rhodes Statue look like?", "Why did people visit the Temple of Artemis?", "What is the importance of Colossus of Rhodes?", "What happened to the Tomb of Mausolus?", "How did Colossus of Rhodes collapse?", ] input_text = gr.components.Textbox( label="Query", placeholder="Enter a query", value=examples[0], lines=1 ) output_html = gr.components.HTML(label="Documents") gr.Interface( fn=run, inputs=input_text, outputs=output_html, examples=examples, cache_examples=False, allow_flagging="never", title="End-to-End Retrieval & Ranking with Hugging Face Inference Endpoints and Spaces", description="""## A [haystack](https://haystack.deepset.ai/) V2 pipeline with the following components - Document Store: A [Qdrant document store](https://github.com/qdrant/qdrant) containing the [`seven-wonders` dataset](https://huggingface.co/datasets/bilgeyucel/seven-wonders), created on this Space's [persistent storage](https://huggingface.co/docs/hub/en/spaces-storage). - Embedder: [Quantized FastRAG Embedder](https://huggingface.co/optimum-intel/fastrag-embedder) deployed on [Inference Endpoints](https://huggingface.co/docs/inference-endpoints/index) + Intel Sapphire Rapids CPU. - Ranker: [Quantized FastRAG Retriever](https://huggingface.co/optimum-intel/fastrag-ranker) deployed on [Inference Endpoints](https://huggingface.co/docs/inference-endpoints/index) + Intel Sapphire Rapids CPU. This Space is based on the optimizations demonstrated in the blog [CPU Optimized Embeddings with 🤗 Optimum Intel and fastRAG](https://huggingface.co/blog/intel-fast-embedding) """, ).launch()