##################################################### ### DOCUMENT PROCESSOR [PDF READER UTILITIES] ##################################################### # Jonathan Wang # ABOUT: # This project creates an app to chat with PDFs. # This is the PDF READER UTILITIES. # It defines helper functions for the PDF reader, # such as getting Keywords or finding Contact Info. ##################################################### ### TODO Board: # Better Summarizer than T5, which has been stripped out? # Better keywords than the RAKE+YAKE fusion we're currently using? # Consider using GPE/GSP tagging with spacy to confirm mailing addresses? # Handle FigureCaption somehow. # Skip Header if it has a Page X or other page number construction. # Detect images that are substantially overlapping according to coordinates. # https://stackoverflow.com/questions/49897531/detect-overlapping-images-in-pil # Keep them in the following order: no confidence score, larger image, higher confidence score # Detect nodes whose text is substantially repeated at either the top or bottom of the page. # Utilize the coordinates to ignore the text on the top and bottom two lines. # Fix OCR issues with spell checking? # Remove images that are too small in size, and overlapping with text boxes. # Convert the List[BaseNode] -> List[BaseNode] functions into TransformComponents ##################################################### ### Imports from __future__ import annotations import difflib import re from collections import defaultdict from copy import deepcopy from typing import ( TYPE_CHECKING, List, Optional, Tuple, TypeVar, ) import rapidfuzz import regex from llama_index.core.schema import ( BaseNode, NodeRelationship, RelatedNodeInfo, ) if TYPE_CHECKING: from unstructured.documents import elements ##################################################### ### CODE GenericNode = TypeVar("GenericNode", bound=BaseNode) def clean_pdf_chunk(pdf_chunk: elements.Element) -> elements.Element: """Given a single element of text from a pdf read by Unstructured, clean its text.""" ### NOTE: Don't think it's work making this a separate TransformComponent. # We'd still need to clean bad characters from the reader. chunk_text = pdf_chunk.text if (len(chunk_text) > 0): # Clean any control characters which break the language detection for other parts of the reader. re_bad_chars = regex.compile(r"[\p{Cc}\p{Cs}]+") chunk_text = re_bad_chars.sub("", chunk_text) # Remove PDF citations text chunk_text = re.sub("\\(cid:\\d+\\)", "", chunk_text) # matches (cid:###) # Clean whitespace and broken paragraphs # chunk_text = clean_extra_whitespace(chunk_text) # chunk_text = group_broken_paragraphs(chunk_text) # Save cleaned text. pdf_chunk.text = chunk_text return pdf_chunk def clean_abbreviations(pdf_chunks: list[GenericNode]) -> list[GenericNode]: """Remove any common abbreviations in the text which can confuse the sentence model. Args: pdf_chunks (List[GenericNode]): List of llama-index nodes. Returns: List[GenericNode]: The nodes with cleaned text, abbreviations replaced. """ for pdf_chunk in pdf_chunks: text = getattr(pdf_chunk, "text", "") if (text == ""): continue # No. -> Number text = re.sub(r"\bNo\b\.\s", "Number", text, flags=re.IGNORECASE) # Fig. -> Figure text = re.sub(r"\bFig\b\.", "Figure", text, flags=re.IGNORECASE) # Eq. -> Equation text = re.sub(r"\bEq\b\.", "Equation", text, flags=re.IGNORECASE) # Mr. -> Mr text = re.sub(r"\bMr\b\.", "Mr", text, flags=re.IGNORECASE) # Mrs. -> Mrs text = re.sub(r"\bMrs\b\.", "Mrs", text, flags=re.IGNORECASE) # Dr. -> Dr text = re.sub(r"\bDr\b\.", "Dr", text, flags=re.IGNORECASE) # Jr. -> Jr text = re.sub(r"\bJr\b\.", "Jr", text, flags=re.IGNORECASE) # etc. -> etc text = re.sub(r"\betc\b\.", "etc", text, flags=re.IGNORECASE) pdf_chunk.text = text return pdf_chunks def _remove_chunk( pdf_chunks: list[GenericNode], chunk_index: int | None=None, chunk_id: str | None=None ) -> list[GenericNode]: """Given a list of chunks, remove the chunk at the given index or with the given id. Args: pdf_chunks (List[GenericNode]): The list of chunks. chunk_index (Optional[int]): The index of the chunk to remove. chunk_id (Optional[str]): The id of the chunk to remove. Returns: List[GenericNode]: The updated list of chunks, without the removed chunk. """ if (chunk_index is None and chunk_id is None): msg = "_remove_chunk: Either chunk_index or chunk_id must be set." raise ValueError(msg) # Convert chunk_id to chunk_index elif (chunk_index is None): chunk = next((c for c in pdf_chunks if c.node_id == chunk_id), None) if chunk is not None: chunk_index = pdf_chunks.index(chunk) else: msg = f"_remove_chunk: No chunk found with id {chunk_id}." raise ValueError(msg) elif (chunk_index < 0 or chunk_index >= len(pdf_chunks)): msg = f"_remove_chunk: Chunk {chunk_index} is out of range. Maximum index is {len(pdf_chunks) - 1}." raise ValueError(msg) # Update the previous-next node relationships around that index def _node_rel_prev_next(prev_node: GenericNode, next_node: GenericNode) -> tuple[GenericNode, GenericNode]: """Update pre-next node relationships between two nodes.""" prev_node.relationships[NodeRelationship.NEXT] = RelatedNodeInfo( node_id=next_node.node_id, metadata={"filename": next_node.metadata["filename"]} ) next_node.relationships[NodeRelationship.PREVIOUS] = RelatedNodeInfo( node_id=prev_node.node_id, metadata={"filename": prev_node.metadata["filename"]} ) return (prev_node, next_node) if (chunk_index > 0 and chunk_index < len(pdf_chunks) - 1): pdf_chunks[chunk_index - 1], pdf_chunks[chunk_index + 1] = _node_rel_prev_next(prev_node=pdf_chunks[chunk_index - 1], next_node=pdf_chunks[chunk_index + 1]) popped_chunk = pdf_chunks.pop(chunk_index) chunk_id = chunk_id or popped_chunk.node_id # Remove any references to the removed chunk in node relationships or metadata for node in pdf_chunks: node.relationships = {k: v for k, v in node.relationships.items() if v.node_id != chunk_id} node.metadata = {k: v for k, v in node.metadata.items() if ((isinstance(v, list) and (chunk_id in v)) or (v != chunk_id))} return pdf_chunks def _clean_overlap_text( text1: str, text2: str, combining_text: str=" ", min_length: int | None = 1, max_length: int | None = 50, overlap_threshold: float = 0.9 ) -> str: r"""Remove any overlapping text between two strings. Args: text1 (str): The first string. text2 (str): The second string. combining_text (str, optional): The text to combine the two strings with. Defaults to space (' '). Can also be \n. min_length (int, optional): The minimum length of the overlap. Defaults to 1. None is no minimum. max_length (int, optional): The maximum length of the overlap. Defaults to 50. None is no maximum. overlap_threshold (float, optional): The threshold for being an overlap. Defaults to 0.8. Returns: str: The strings combined with the overlap removed. """ for overlap_len in range(min(len(text1), len(text2), (max_length or len(text1))), ((min_length or 1)-1), -1): end_substring = text1[-overlap_len:] start_substring = text2[:overlap_len] similarity = difflib.SequenceMatcher(None, end_substring, start_substring).ratio() if (similarity >= overlap_threshold): return combining_text.join([text1[:-overlap_len], text2[overlap_len:]]).strip() return combining_text.join([text1, text2]).strip() def _combine_chunks(c1: GenericNode, c2: GenericNode) -> GenericNode: """Combine two chunks into one. Args: c1 (GenericNode): The first chunk. c2 (GenericNode): The second chunk. Returns: GenericNode: The combined chunk. """ # Metadata merging # Type merging text_types = ["NarrativeText", "ListItem", "Formula", "UncategorizedText", "Composite-TextOnly"] image_types = ["FigureCaption", "Image"] # things that make Image nodes. def _combine_chunks_type(c1_type: str, c2_type: str) -> str: """Combine the types of two chunks. Args: c1_type (str): The type of the first chunk. c2_type (str): The type of the second chunk. Returns: str: The type of the combined chunk. """ if (c1_type == c2_type): return c1_type elif (c1_type in text_types and c2_type in text_types): return "Composite-TextOnly" elif (c1_type in image_types and c2_type in image_types): return "Image" # Add caption to image else: return "Composite" c1_type = c1.metadata["type"] c2_type = c2.metadata["type"] c1.metadata["type"] = _combine_chunks_type(c1_type, c2_type) # All other metadata merging for k, v in c2.metadata.items(): if k not in c1.metadata: c1.metadata[k] = v # Merge lists elif k in ["page_number", 'page_name', 'languages', 'emphasized_text_contents', 'link_texts', 'link_urls']: if not isinstance(c1.metadata[k], list): c1.metadata[k] = list(c1.metadata[k]) if (v not in c1.metadata[k]): # Add to list, dedupe c1.metadata[k].extend(v) c1.metadata[k] = sorted(set(c1.metadata[k])) # Text merging c1_text = getattr(c1, "text", "") c2_text = getattr(c2, "text", "") if (c1_text == c2_text): # No duplicates. return c1 if (c1_text == "" or c2_text == ""): c1.text = c1_text + c2_text return c1 # Check if a sentence has been split between two chunks # Option 1: letters c1_text_last = c1_text[-1] # Check if c1_text_last has a lowercase letter, digit, or punctuation that doesn't end a sentence if (re.search(r'[\da-z\[\]\(\)\{\}\<\>\%\^\&\"\'\:\;\,\/\-\_\+\= \t\n\r]', c1_text_last)): # We can probably combine these two texts as if they were on the same line. c1.text = _clean_overlap_text(c1_text, c2_text, combining_text=" ") else: # We'll treat these as if they were on separate lines. c1.text = _clean_overlap_text(c1_text, c2_text, combining_text="\n") # NOTE: Relationships merging is handled in other functions, because it requires looking back at prior prior chunks. return c1 def dedupe_title_chunks(pdf_chunks: list[GenericNode]) -> list[GenericNode]: """Given a list of chunks, return a list of chunks without any title duplicates. Args: pdf_chunks (List[BaseNode]): The list of chunks to have titles deduped. Returns: List[BaseNode]: The deduped list of chunks. """ index = 0 while (index < len(pdf_chunks)): if ( (pdf_chunks[index].metadata["type"] in ("Title")) # is title and (index > 0) # is not first chunk and (pdf_chunks[index - 1].metadata["type"] in ("Title")) # previous chunk is also title ): # if (getattr(pdf_chunks[index], 'text', None) != getattr(pdf_chunks[index - 1], 'text', '')): # pdf_chunks[index].text = getattr(pdf_chunks[index - 1], 'text', '') + '\n' + getattr(pdf_chunks[index], 'text', '') pdf_chunks[index] = _combine_chunks(pdf_chunks[index - 1], pdf_chunks[index]) # NOTE: We'll remove the PRIOR title, since duplicates AND child relationships are built on the CURRENT title. # There shouldn't be any PARENT/CHILD relationships to the title that we are deleting, so this seems fine. pdf_chunks = _remove_chunk(pdf_chunks=pdf_chunks, chunk_index=index-1) # NOTE: don't need to shift index because we removed an element. else: # We don't care about any situations other than consecutive title chunks. index += 1 return (pdf_chunks) def combine_listitem_chunks(pdf_chunks: list[GenericNode]) -> list[GenericNode]: """Given a list of chunks, combine any adjacent chunks which are ListItems into one List. Args: pdf_chunks (List[GenericNode]): The list of chunks to combine. Returns: List[GenericNode]: The list of chunks with ListItems combined into one List chunk. """ index = 0 while (index < len(pdf_chunks)): if ( (pdf_chunks[index].metadata["type"] == "ListItem") # is list item and (index > 0) # is not first chunk and (pdf_chunks[index - 1].metadata["type"] == "ListItem") # previous chunk is also list item ): # Okay, we have a consecutive list item. Combine into one list. # NOTE: We'll remove the PRIOR list item, since duplicates AND child relationships are built on the CURRENT list item. # 1. Append prior list item's text to the current list item's text # pdf_chunks[index].text = getattr(pdf_chunks[index - 1], 'text', '') + '\n' + getattr(pdf_chunks[index], 'text', '') pdf_chunks[index] = _combine_chunks(pdf_chunks[index - 1], pdf_chunks[index]) # 2. Remove PRIOR list item pdf_chunks.pop(index - 1) # 3. Replace NEXT relationship from PRIOR list item with the later list item node ID, if prior prior node exists. if (index - 2 >= 0): pdf_chunks[index - 2].relationships[NodeRelationship.NEXT] = RelatedNodeInfo( node_id=pdf_chunks[index].node_id, metadata={"filename": pdf_chunks[index].metadata["filename"]} ) # 4. Replace PREVIOUS relationship from LATER list item with the prior prior node ID, if prior prior node exists. pdf_chunks[index].relationships[NodeRelationship.PREVIOUS] = RelatedNodeInfo( node_id=pdf_chunks[index - 2].node_id, metadata={"filename": pdf_chunks[index - 2].metadata['filename']} ) # NOTE: the PARENT/CHILD relationships should be the same as the previous list item, so this seems fine. else: # We don't care about any situations other than consecutive list item chunks. index += 1 return (pdf_chunks) def remove_header_footer_repeated( pdf_chunks_input: list[GenericNode], window_size: int = 3, fuzz_threshold: int = 80 ) -> list[GenericNode]: """Given a list of chunks, remove any header/footer chunks that are repeated across pages. Args: pdf_chunks (List[GenericNode]): The list of chunks to process. window_size (int): The number of chunks to consider at the beginning and end of each page. fuzz_threshold (int): The threshold for fuzzy matching of chunk texts. Returns: List[GenericNode]: The list of chunks with header/footer chunks removed. """ nodes_to_remove = set() # id's to remove. pdf_chunks = deepcopy(pdf_chunks_input) # Build a dictionary of chunks by page number chunks_by_page = defaultdict(list) for chunk in pdf_chunks: chunk_page_number = min(chunk.metadata["page_number"]) if isinstance(chunk.metadata["page_number"], list) else chunk.metadata["page_number"] chunks_by_page[chunk_page_number].append(chunk) # Get the first window_size and last window_size chunks on each page header_candidates = defaultdict(set) # hashmap of chunk text, and set of chunk ids with that text. footer_candidates = defaultdict(set) # hashmap of chunk text, and set of chunk ids with that text. page_number_regex = re.compile(r"(?:-|\( ?)?\b(?:page|p\.?(?:[pg](?:\b|\.)?)?)? ?(?:\d+|\b[ivxm]+\b)\.?(?: ?-|\))?\b", re.IGNORECASE) for chunks in chunks_by_page.values(): header_chunks = chunks[:window_size] footer_chunks = chunks[-window_size:] for chunk in header_chunks: chunk_text = getattr(chunk, "text", "") if chunk.metadata["type"] == "Header" and len(chunk_text) > 0: chunk_text_is_pagenum_only = page_number_regex.match(chunk_text) if chunk_text_is_pagenum_only and (len(chunk_text_is_pagenum_only.group(0)) == len(chunk_text)): # Full match! chunk.text = "Page Number Only" nodes_to_remove.add(chunk.node_id) elif chunk_text_is_pagenum_only and len(chunk_text_is_pagenum_only.group(0)) > 0: # Remove the page number content from the chunk text for this exercise chunk_text = page_number_regex.sub('', chunk_text) chunk.text = chunk_text if chunk.metadata["type"] not in ("Image", "Table") and len(chunk_text) > 0: header_candidates[chunk_text].add(chunk.node_id) for chunk in footer_chunks: chunk_text = getattr(chunk, "text", "") if chunk.metadata["type"] == "Footer" and len(chunk_text) > 0: chunk_text_is_pagenum_only = page_number_regex.match(chunk_text) if chunk_text_is_pagenum_only and (len(chunk_text_is_pagenum_only.group(0)) == len(chunk_text)): # Full match! chunk.text = "Page Number Only" nodes_to_remove.add(chunk.node_id) elif chunk_text_is_pagenum_only and len(chunk_text_is_pagenum_only.group(0)) > 0: # Remove the page number content from the chunk text for this exercise chunk_text = page_number_regex.sub('', chunk_text) chunk.text = chunk_text if chunk.metadata["type"] not in ("Image", "Table") and len(chunk_text) > 0: footer_candidates[chunk_text].add(chunk.node_id) # Identify any texts which are too similar to other header texts. header_texts = list(header_candidates.keys()) header_distance_matrix = rapidfuzz.process.cdist(header_texts, header_texts, scorer=rapidfuzz.fuzz.ratio, score_cutoff=fuzz_threshold) footer_texts = list(footer_candidates.keys()) footer_distance_matrix = rapidfuzz.process.cdist(footer_texts, footer_texts, scorer=rapidfuzz.fuzz.ratio, score_cutoff=fuzz_threshold) # Combine header candidates which are too similar to each other in the distance matrix for i in range(len(header_distance_matrix)-1): for j in range(i+1, len(header_distance_matrix)): if i == j: continue if header_distance_matrix[i][j] >= fuzz_threshold: header_candidates[header_texts[i]].update(header_candidates[header_texts[j]]) header_candidates[header_texts[j]].update(header_candidates[header_texts[i]]) for i in range(len(footer_distance_matrix)-1): for j in range(i+1, len(footer_distance_matrix)): if i == j: continue if footer_distance_matrix[i][j] >= fuzz_threshold: footer_candidates[footer_texts[i]].update(footer_candidates[footer_texts[j]]) footer_candidates[footer_texts[j]].update(footer_candidates[footer_texts[i]]) headers_to_remove = set() for chunk_ids in header_candidates.values(): if len(chunk_ids) > 1: headers_to_remove.update(chunk_ids) footers_to_remove = set() for chunk_ids in footer_candidates.values(): if len(chunk_ids) > 1: footers_to_remove.update(chunk_ids) nodes_to_remove = nodes_to_remove.union(headers_to_remove.union(footers_to_remove)) for node_id in nodes_to_remove: pdf_chunks = _remove_chunk(pdf_chunks=pdf_chunks, chunk_id=node_id) return pdf_chunks def remove_overlap_images(pdf_chunks: list[GenericNode]) -> list[GenericNode]: # TODO(Jonathan Wang): Implement this function to remove images which are completely overlapping each other # OR... get a better dang reader! raise NotImplementedError def chunk_by_header( pdf_chunks_in: list[GenericNode], combine_text_under_n_chars: int = 1024, multipage_sections: bool = True, # ) -> Tuple[List[GenericNode], List[GenericNode]]: ) -> list[GenericNode]: """Combine chunks together that are part of the same header and have similar meaning. Args: pdf_chunks (List[GenericNode]): List of chunks to be combined. Returns: List[GenericNode]: List of combined chunks. List[GenericNode]: List of original chunks, with node references updated. """ # TODO(Jonathan Wang): Handle semantic chunking between elements within a Header chunk. # TODO(Jonathan Wang): Handle splitting element chunks if they are over `max_characters` in length (does this ever really happen?) # TODO(Jonathan Wang): Handle relationships between nodes. pdf_chunks = deepcopy(pdf_chunks_in) output = [] id_to_index = {} index = 0 # Pass 1: Combine chunks together that are part of the same title chunk. while (index < len(pdf_chunks)): chunk = pdf_chunks[index] if (chunk.metadata["type"] in ["Header", "Footer", "Image", "Table"]): # These go immediately into the semantic title chunks and also reset the new node. # Let's add a newline to distinguish from any other content. if (chunk.metadata["type"] in ["Header", "Footer", "Table"]): chunk.text = getattr(chunk, "text", "") + "\n" output.append(chunk) index += 1 continue # Make a new node if we have a new title (or if we don't have a title). if ( chunk.metadata["type"] == "Title" ): # We're good, this node can stay as a TitleChunk. chunk.metadata['type'] = 'Composite' # if (not isinstance(chunk.metadata['page number'], list)): # chunk.metadata['page number'] = [chunk.metadata['page number']] # Let's add a newline to distinguish the title from the content. setattr(chunk, 'text', getattr(chunk, 'text', '') + "\n") output.append(chunk) id_to_index[chunk.id_] = len(output) - 1 index += 1 continue elif (chunk.metadata.get('parent_id', None) in id_to_index): # This chunk is part of the same title as a prior chunk. # Add this text into the prior title node. jndex = id_to_index[chunk.metadata['parent_id']] # if (not isinstance(output[jndex].metadata['page number'], list)): # output[jndex].metadata['page number'] = [chunk.metadata['page number']] output[jndex] = _combine_chunks(output[jndex], chunk) # output[jndex].text = getattr(output[jndex], 'text', '') + '\n' + getattr(chunk, 'text', '') # output[jndex].metadata['page number'] = list(set(output[jndex].metadata['page number'] + [chunk.metadata['page number']])) # output[jndex].metadata['languages'] = list(set(output[jndex].metadata['languages'] + chunk.metadata['languages'])) pdf_chunks.remove(chunk) continue elif ( (chunk.metadata.get('parent_id', None) is None) and ( len(getattr(chunk, 'text', '')) > combine_text_under_n_chars # big enough text section to stand alone or (len(id_to_index.keys()) <= 0) # no prior title ) ): # Okay, so either we don't have a title, or it was interrupted by an image / table. # This chunk can stay as a TextChunk. chunk.metadata['type'] = 'Composite-TextOnly' # if (not isinstance(chunk.metadata['page number'], list)): # chunk.metadata['page number'] = [chunk.metadata['page number']] output.append(chunk) id_to_index[chunk.id_] = len(output) - 1 index += 1 continue else: # Add the text to the prior node that isn't a table or image. jndex = len(output) - 1 while ( (jndex >= 0) and (output[jndex].metadata['type'] in ['Table', 'Image']) ): # for title_chunk in output: # print(f'''{title_chunk.id_}: {title_chunk.metadata['type']}, text: {title_chunk.text}, parent: {title_chunk.metadata['parent_id']}''') jndex -= 1 if (jndex < 0): raise Exception(f'''Prior title chunk not found: {index}, {chunk.metadata.get('parent_id', None)}''') # Add this text into the prior title node. # if (not isinstance(output[jndex].metadata['page number'], list)): # output[jndex].metadata['page number'] = [chunk.metadata['page number']] output[jndex] = _combine_chunks(output[jndex], chunk) # output[jndex].text = getattr(output[jndex], 'text', '') + ' ' + getattr(chunk, 'text', '') # output[jndex].metadata['page number'] = list(set(output[jndex].metadata['page number'] + [chunk.metadata['page number']])) # output[jndex].metadata['languages'] = list(set(output[jndex].metadata['languages'] + chunk.metadata['languages'])) pdf_chunks.remove(chunk) # TODO: Update relationships between nodes. continue return (output) ### TODO: # Merge images together that are substantially overlapping. # Favour image with no confidence score. (these come straight from pdf). # Favour the larger image over the smaller one. # Favour the image with higher confidence score. def merge_images() -> None: pass