timeki's picture
Rerank documents and force summary for policy makers
d562d38
raw
history blame
3.75 kB
# https://github.com/langchain-ai/langchain/issues/8623
import pandas as pd
from langchain_core.retrievers import BaseRetriever
from langchain_core.vectorstores import VectorStoreRetriever
from langchain_core.documents.base import Document
from langchain_core.vectorstores import VectorStore
from langchain_core.callbacks.manager import CallbackManagerForRetrieverRun
from typing import List
from pydantic import Field
def _add_metadata_and_score(docs: List) -> Document:
# Add score to metadata
docs_with_metadata = []
for i,(doc,score) in enumerate(docs):
doc.page_content = doc.page_content.replace("\r\n"," ")
doc.metadata["similarity_score"] = score
doc.metadata["content"] = doc.page_content
doc.metadata["page_number"] = int(doc.metadata["page_number"]) + 1
# doc.page_content = f"""Doc {i+1} - {doc.metadata['short_name']}: {doc.page_content}"""
docs_with_metadata.append(doc)
return docs_with_metadata
class ClimateQARetriever(BaseRetriever):
vectorstore:VectorStore
sources:list = ["IPCC","IPBES","IPOS"]
reports:list = []
threshold:float = 0.6
k_summary:int = 3
k_total:int = 10
namespace:str = "vectors",
min_size:int = 200,
def _get_relevant_documents(
self, query: str, *, run_manager: CallbackManagerForRetrieverRun
) -> List[Document]:
# Check if all elements in the list are either IPCC or IPBES
assert isinstance(self.sources,list)
assert self.sources
assert all([x in ["IPCC","IPBES","IPOS"] for x in self.sources])
assert self.k_total > self.k_summary, "k_total should be greater than k_summary"
# Prepare base search kwargs
filters = {}
if len(self.reports) > 0:
filters["short_name"] = {"$in":self.reports}
else:
filters["source"] = { "$in":self.sources}
# Search for k_summary documents in the summaries dataset
filters_summaries = {
**filters,
"chunk_type":"text",
"report_type": { "$in":["SPM"]},
}
docs_summaries = self.vectorstore.similarity_search_with_score(query=query,filter = filters_summaries,k = self.k_summary)
docs_summaries = [x for x in docs_summaries if x[1] > self.threshold]
# Search for k_total - k_summary documents in the full reports dataset
filters_full = {
**filters,
"chunk_type":"text",
"report_type": { "$nin":["SPM"]},
}
k_full = self.k_total - len(docs_summaries)
docs_full = self.vectorstore.similarity_search_with_score(query=query,filter = filters_full,k = k_full)
# Images
filters_image = {
**filters,
"chunk_type":"image"
}
docs_images = self.vectorstore.similarity_search_with_score(query=query,filter = filters_image,k = k_full)
# Concatenate documents
docs = docs_summaries + docs_full + docs_images
# Filter if scores are below threshold
docs = [x for x in docs if len(x[0].page_content) > self.min_size]
# docs = [x for x in docs if x[1] > self.threshold]
docs_summaries, docs_full, docs_images = _add_metadata_and_score(docs_summaries), _add_metadata_and_score(docs_full), _add_metadata_and_score(docs_images)
# Filter if length are below threshold
docs_summaries = [x for x in docs_summaries if len(x.page_content) > self.min_size]
docs_full = [x for x in docs_full if len(x.page_content) > self.min_size]
return {
"docs_summaries" : docs_summaries,
"docs_full" : docs_full,
"docs_images" : docs_images
}