""" This is the template for implementing the rankers for your search engine. You will be implementing WordCountCosineSimilarity, DirichletLM, TF-IDF, BM25, Pivoted Normalization, and your own ranker. """ from collections import Counter, defaultdict from indexing import InvertedIndex import math class Ranker: """ The ranker class is responsible for generating a list of documents for a given query, ordered by their scores using a particular relevance function (e.g., BM25). A Ranker can be configured with any RelevanceScorer. """ # TODO: Implement this class properly; this is responsible for returning a list of sorted relevant documents def __init__(self, index: InvertedIndex, document_preprocessor, stopwords: set[str], scorer: 'RelevanceScorer') -> None: """ Initializes the state of the Ranker object. TODO (HW3): Previous homeworks had you passing the class of the scorer to this function This has been changed as it created a lot of confusion. You should now pass an instantiated RelevanceScorer to this function. Args: index: An inverted index document_preprocessor: The DocumentPreprocessor to use for turning strings into tokens stopwords: The set of stopwords to use or None if no stopword filtering is to be done scorer: The RelevanceScorer object """ self.index = index self.tokenize = document_preprocessor.tokenize self.scorer = scorer self.stopwords = stopwords def query(self, query: str) -> list[tuple[int, float]]: """ Searches the collection for relevant documents to the query and returns a list of documents ordered by their relevance (most relevant first). Args: query: The query to search for Returns: A list of dictionary objects with keys "docid" and "score" where docid is a particular document in the collection and score is that document's relevance TODO (HW3): We are standardizing the query output of Ranker to match with L2RRanker.query and VectorRanker.query The query function should return a sorted list of tuples where each tuple has the first element as the document ID and the second element as the score of the document after the ranking process. """ # TODO: Tokenize the query and remove stopwords, if needed tokens = self.tokenize(query) query_parts = [token for token in tokens if token not in self.stopwords] if self.stopwords else tokens # TODO: Fetch a list of possible documents from the index and create a mapping from # a document ID to a dictionary of the counts of the query terms in that document. # You will pass the dictionary to the RelevanceScorer as input. doc_word_counts = defaultdict(Counter) query_word_counts = Counter(query_parts) for term in query_word_counts: postings = self.index.get_postings(term) for posting in postings: doc_word_counts[posting[0]][term] = posting[1] # TODO: Rank the documents using a RelevanceScorer (like BM25 from below classes) results = [] for docid in doc_word_counts: res = self.scorer.score(docid, doc_word_counts[docid], query_word_counts) if res: results.append((docid, res)) # TODO: Return the **sorted** results as format [{docid: 100, score:0.5}, {{docid: 10, score:0.2}}] results.sort(key=lambda x: x[1], reverse=True) return results class RelevanceScorer: """ This is the base interface for all the relevance scoring algorithm. It will take a document and attempt to assign a score to it. """ # TODO: Implement the functions in the child classes (WordCountCosineSimilarity, DirichletLM, BM25, # PivotedNormalization, TF_IDF) and not in this one def __init__(self, index: InvertedIndex, parameters) -> None: raise NotImplementedError def score(self, docid: int, doc_word_counts: dict[str, int], query_word_counts: dict[str, int]) -> float: """ Returns a score for how relevance is the document for the provided query. Args: docid: The ID of the document doc_word_counts: A dictionary containing all words in the document and their frequencies. Words that have been filtered will be None. query_word_counts: A dictionary containing all words in the query and their frequencies. Words that have been filtered will be None. Returns: A score for how relevant the document is (Higher scores are more relevant.) """ raise NotImplementedError # TODO (HW1): Implement unnormalized cosine similarity on word count vectors class WordCountCosineSimilarity(RelevanceScorer): def __init__(self, index: InvertedIndex, parameters={}) -> None: self.index = index self.parameters = parameters def score(self, docid: int, doc_word_counts: dict[str, int], query_word_counts: dict[str, int]) -> float: # 1. Find the dot product of the word count vector of the document and the word count vector of the query # 2. Return the score cwq = query_word_counts score = 0 flag = 0 for word in cwq: if word in doc_word_counts: flag = 1 score += cwq[word] * doc_word_counts[word] if not flag: return None return score # TODO (HW1): Implement DirichletLM class DirichletLM(RelevanceScorer): def __init__(self, index: InvertedIndex, parameters={'mu': 2000}) -> None: self.index = index self.parameters = parameters self.mu = parameters['mu'] def score(self, docid: int, doc_word_counts: dict[str, int], query_word_counts: dict[str, int]) -> float: # 1. Get necessary information from index # 2. Compute additional terms to use in algorithm # 3. For all query_parts, compute score # 4. Return the score cwq = query_word_counts q_len = sum(cwq.values()) flag = 0 score = 0 for term in cwq: if term in doc_word_counts and docid in self.index.document_metadata: flag = 1 pwc = self.index.get_term_metadata(term)['count']/self.index.statistics['total_token_count'] first_part = cwq[term]*math.log(1+doc_word_counts[term]/(self.mu*pwc)) score+=first_part if docid in self.index.document_metadata: second_part = q_len*math.log(self.mu/(self.mu+self.index.document_metadata[docid]['length'])) score+=second_part if not flag: return None return score # TODO (HW1): Implement BM25 class BM25(RelevanceScorer): def __init__(self, index: InvertedIndex, parameters={'b': 0.75, 'k1': 1.2, 'k3': 8}) -> None: self.index = index self.b = parameters['b'] self.k1 = parameters['k1'] self.k3 = parameters['k3'] def score(self, docid: int, doc_word_counts: dict[str, int], query_word_counts: dict[str, int])-> float: # 1. Get necessary information from index # 2. Find the dot product of the word count vector of the document and the word count vector of the query # 3. For all query parts, compute the TF and IDF to get a score # 4. Return score cwq = query_word_counts info = self.index.statistics # statistics avg_dl = info['mean_document_length'] N = info['number_of_documents'] score = 0 flag = 0 for term in cwq: if term in doc_word_counts and docid in self.index.document_metadata: flag = 1 third_part = cwq[term]*(self.k3+1)/(self.k3+cwq[term]) first_part = math.log((N+0.5-self.index.get_term_metadata(term)['document_count'])\ /(self.index.get_term_metadata(term)['document_count']+0.5)) ctd = doc_word_counts[term] second_part = ((self.k1+1)*ctd)\ /(self.k1*(1-self.b+self.b*self.index.document_metadata[docid]['length']/avg_dl)+ctd) score+=first_part*second_part*third_part if not flag: return None return score # TODO (HW1): Implement Pivoted Normalization class PivotedNormalization(RelevanceScorer): def __init__(self, index: InvertedIndex, parameters={'b': 0.2}) -> None: self.index = index self.b = parameters['b'] def score(self, docid: int, doc_word_counts: dict[str, int], query_word_counts: dict[str, int]) -> float: # 1. Get necessary information from index # 2. Compute additional terms to use in algorithm # 3. For all query parts, compute the TF, IDF, and QTF values to get a score # 4. Return the score cwq = query_word_counts info = self.index.statistics # statistics avg_dl = info['mean_document_length'] N = info['number_of_documents'] score = 0 flag = 0 for term in cwq: if term in doc_word_counts and docid in self.index.document_metadata: flag = 1 first_part = cwq[term] third_part = math.log((N+1)/self.index.get_term_metadata(term)['document_count']) second_part = (1+math.log(1+math.log(doc_word_counts[term])))\ /(1-self.b+self.b*self.index.document_metadata[docid]['length']/avg_dl) # print(first_part, second_part, third_part) score+=first_part*second_part*third_part if not flag: return None return score # TODO (HW1): Implement TF-IDF class TF_IDF(RelevanceScorer): def __init__(self, index: InvertedIndex, parameters={}) -> None: self.index = index self.parameters = parameters def score(self, docid: int, doc_word_counts: dict[str, int], query_word_counts: dict[str, int]) -> float: # 1. Get necessary information from index # 2. Compute additional terms to use in algorithm # 3. For all query parts, compute the TF, IDF, and QTF values to get a score # 4. Return the score cwq = query_word_counts doc_total = self.index.statistics['number_of_documents'] # statistics score = 0 flag = 0 for term in cwq: if term in doc_word_counts: flag = 1 score += math.log(doc_word_counts[term]+1)*\ (1+math.log(doc_total/(self.index.get_term_metadata(term)['document_count'])))*cwq[term] if not flag: return None return score