Autodoc-Lifter / pdf_reader_utils.py
Jonathan Wang
initial commit
89cbc4d
#####################################################
### DOCUMENT PROCESSOR [PDF READER UTILITIES]
#####################################################
# Jonathan Wang
# ABOUT:
# This project creates an app to chat with PDFs.
# This is the PDF READER UTILITIES.
# It defines helper functions for the PDF reader,
# such as getting Keywords or finding Contact Info.
#####################################################
### TODO Board:
# Better Summarizer than T5, which has been stripped out?
# Better keywords than the RAKE+YAKE fusion we're currently using?
# Consider using GPE/GSP tagging with spacy to confirm mailing addresses?
# Handle FigureCaption somehow.
# Skip Header if it has a Page X or other page number construction.
# Detect images that are substantially overlapping according to coordinates.
# https://stackoverflow.com/questions/49897531/detect-overlapping-images-in-pil
# Keep them in the following order: no confidence score, larger image, higher confidence score
# Detect nodes whose text is substantially repeated at either the top or bottom of the page.
# Utilize the coordinates to ignore the text on the top and bottom two lines.
# Fix OCR issues with spell checking?
# Remove images that are too small in size, and overlapping with text boxes.
# Convert the List[BaseNode] -> List[BaseNode] functions into TransformComponents
#####################################################
### Imports
from __future__ import annotations
import difflib
import re
from collections import defaultdict
from copy import deepcopy
from typing import (
TYPE_CHECKING,
List,
Optional,
Tuple,
TypeVar,
)
import rapidfuzz
import regex
from llama_index.core.schema import (
BaseNode,
NodeRelationship,
RelatedNodeInfo,
)
if TYPE_CHECKING:
from unstructured.documents import elements
#####################################################
### CODE
GenericNode = TypeVar("GenericNode", bound=BaseNode)
def clean_pdf_chunk(pdf_chunk: elements.Element) -> elements.Element:
"""Given a single element of text from a pdf read by Unstructured, clean its text."""
### NOTE: Don't think it's work making this a separate TransformComponent.
# We'd still need to clean bad characters from the reader.
chunk_text = pdf_chunk.text
if (len(chunk_text) > 0):
# Clean any control characters which break the language detection for other parts of the reader.
re_bad_chars = regex.compile(r"[\p{Cc}\p{Cs}]+")
chunk_text = re_bad_chars.sub("", chunk_text)
# Remove PDF citations text
chunk_text = re.sub("\\(cid:\\d+\\)", "", chunk_text) # matches (cid:###)
# Clean whitespace and broken paragraphs
# chunk_text = clean_extra_whitespace(chunk_text)
# chunk_text = group_broken_paragraphs(chunk_text)
# Save cleaned text.
pdf_chunk.text = chunk_text
return pdf_chunk
def clean_abbreviations(pdf_chunks: list[GenericNode]) -> list[GenericNode]:
"""Remove any common abbreviations in the text which can confuse the sentence model.
Args:
pdf_chunks (List[GenericNode]): List of llama-index nodes.
Returns:
List[GenericNode]: The nodes with cleaned text, abbreviations replaced.
"""
for pdf_chunk in pdf_chunks:
text = getattr(pdf_chunk, "text", "")
if (text == ""):
continue
# No. -> Number
text = re.sub(r"\bNo\b\.\s", "Number", text, flags=re.IGNORECASE)
# Fig. -> Figure
text = re.sub(r"\bFig\b\.", "Figure", text, flags=re.IGNORECASE)
# Eq. -> Equation
text = re.sub(r"\bEq\b\.", "Equation", text, flags=re.IGNORECASE)
# Mr. -> Mr
text = re.sub(r"\bMr\b\.", "Mr", text, flags=re.IGNORECASE)
# Mrs. -> Mrs
text = re.sub(r"\bMrs\b\.", "Mrs", text, flags=re.IGNORECASE)
# Dr. -> Dr
text = re.sub(r"\bDr\b\.", "Dr", text, flags=re.IGNORECASE)
# Jr. -> Jr
text = re.sub(r"\bJr\b\.", "Jr", text, flags=re.IGNORECASE)
# etc. -> etc
text = re.sub(r"\betc\b\.", "etc", text, flags=re.IGNORECASE)
pdf_chunk.text = text
return pdf_chunks
def _remove_chunk(
pdf_chunks: list[GenericNode],
chunk_index: int | None=None,
chunk_id: str | None=None
) -> list[GenericNode]:
"""Given a list of chunks, remove the chunk at the given index or with the given id.
Args:
pdf_chunks (List[GenericNode]): The list of chunks.
chunk_index (Optional[int]): The index of the chunk to remove.
chunk_id (Optional[str]): The id of the chunk to remove.
Returns:
List[GenericNode]: The updated list of chunks, without the removed chunk.
"""
if (chunk_index is None and chunk_id is None):
msg = "_remove_chunk: Either chunk_index or chunk_id must be set."
raise ValueError(msg)
# Convert chunk_id to chunk_index
elif (chunk_index is None):
chunk = next((c for c in pdf_chunks if c.node_id == chunk_id), None)
if chunk is not None:
chunk_index = pdf_chunks.index(chunk)
else:
msg = f"_remove_chunk: No chunk found with id {chunk_id}."
raise ValueError(msg)
elif (chunk_index < 0 or chunk_index >= len(pdf_chunks)):
msg = f"_remove_chunk: Chunk {chunk_index} is out of range. Maximum index is {len(pdf_chunks) - 1}."
raise ValueError(msg)
# Update the previous-next node relationships around that index
def _node_rel_prev_next(prev_node: GenericNode, next_node: GenericNode) -> tuple[GenericNode, GenericNode]:
"""Update pre-next node relationships between two nodes."""
prev_node.relationships[NodeRelationship.NEXT] = RelatedNodeInfo(
node_id=next_node.node_id,
metadata={"filename": next_node.metadata["filename"]}
)
next_node.relationships[NodeRelationship.PREVIOUS] = RelatedNodeInfo(
node_id=prev_node.node_id,
metadata={"filename": prev_node.metadata["filename"]}
)
return (prev_node, next_node)
if (chunk_index > 0 and chunk_index < len(pdf_chunks) - 1):
pdf_chunks[chunk_index - 1], pdf_chunks[chunk_index + 1] = _node_rel_prev_next(prev_node=pdf_chunks[chunk_index - 1], next_node=pdf_chunks[chunk_index + 1])
popped_chunk = pdf_chunks.pop(chunk_index)
chunk_id = chunk_id or popped_chunk.node_id
# Remove any references to the removed chunk in node relationships or metadata
for node in pdf_chunks:
node.relationships = {k: v for k, v in node.relationships.items() if v.node_id != chunk_id}
node.metadata = {k: v for k, v in node.metadata.items() if ((isinstance(v, list) and (chunk_id in v)) or (v != chunk_id))}
return pdf_chunks
def _clean_overlap_text(
text1: str,
text2: str,
combining_text: str=" ",
min_length: int | None = 1,
max_length: int | None = 50,
overlap_threshold: float = 0.9
) -> str:
r"""Remove any overlapping text between two strings.
Args:
text1 (str): The first string.
text2 (str): The second string.
combining_text (str, optional): The text to combine the two strings with. Defaults to space (' '). Can also be \n.
min_length (int, optional): The minimum length of the overlap. Defaults to 1. None is no minimum.
max_length (int, optional): The maximum length of the overlap. Defaults to 50. None is no maximum.
overlap_threshold (float, optional): The threshold for being an overlap. Defaults to 0.8.
Returns:
str: The strings combined with the overlap removed.
"""
for overlap_len in range(min(len(text1), len(text2), (max_length or len(text1))), ((min_length or 1)-1), -1):
end_substring = text1[-overlap_len:]
start_substring = text2[:overlap_len]
similarity = difflib.SequenceMatcher(None, end_substring, start_substring).ratio()
if (similarity >= overlap_threshold):
return combining_text.join([text1[:-overlap_len], text2[overlap_len:]]).strip()
return combining_text.join([text1, text2]).strip()
def _combine_chunks(c1: GenericNode, c2: GenericNode) -> GenericNode:
"""Combine two chunks into one.
Args:
c1 (GenericNode): The first chunk.
c2 (GenericNode): The second chunk.
Returns:
GenericNode: The combined chunk.
"""
# Metadata merging
# Type merging
text_types = ["NarrativeText", "ListItem", "Formula", "UncategorizedText", "Composite-TextOnly"]
image_types = ["FigureCaption", "Image"] # things that make Image nodes.
def _combine_chunks_type(c1_type: str, c2_type: str) -> str:
"""Combine the types of two chunks.
Args:
c1_type (str): The type of the first chunk.
c2_type (str): The type of the second chunk.
Returns:
str: The type of the combined chunk.
"""
if (c1_type == c2_type):
return c1_type
elif (c1_type in text_types and c2_type in text_types):
return "Composite-TextOnly"
elif (c1_type in image_types and c2_type in image_types):
return "Image" # Add caption to image
else:
return "Composite"
c1_type = c1.metadata["type"]
c2_type = c2.metadata["type"]
c1.metadata["type"] = _combine_chunks_type(c1_type, c2_type)
# All other metadata merging
for k, v in c2.metadata.items():
if k not in c1.metadata:
c1.metadata[k] = v
# Merge lists
elif k in ["page_number", 'page_name', 'languages', 'emphasized_text_contents', 'link_texts', 'link_urls']:
if not isinstance(c1.metadata[k], list):
c1.metadata[k] = list(c1.metadata[k])
if (v not in c1.metadata[k]):
# Add to list, dedupe
c1.metadata[k].extend(v)
c1.metadata[k] = sorted(set(c1.metadata[k]))
# Text merging
c1_text = getattr(c1, "text", "")
c2_text = getattr(c2, "text", "")
if (c1_text == c2_text):
# No duplicates.
return c1
if (c1_text == "" or c2_text == ""):
c1.text = c1_text + c2_text
return c1
# Check if a sentence has been split between two chunks
# Option 1: letters
c1_text_last = c1_text[-1]
# Check if c1_text_last has a lowercase letter, digit, or punctuation that doesn't end a sentence
if (re.search(r'[\da-z\[\]\(\)\{\}\<\>\%\^\&\"\'\:\;\,\/\-\_\+\= \t\n\r]', c1_text_last)):
# We can probably combine these two texts as if they were on the same line.
c1.text = _clean_overlap_text(c1_text, c2_text, combining_text=" ")
else:
# We'll treat these as if they were on separate lines.
c1.text = _clean_overlap_text(c1_text, c2_text, combining_text="\n")
# NOTE: Relationships merging is handled in other functions, because it requires looking back at prior prior chunks.
return c1
def dedupe_title_chunks(pdf_chunks: list[GenericNode]) -> list[GenericNode]:
"""Given a list of chunks, return a list of chunks without any title duplicates.
Args:
pdf_chunks (List[BaseNode]): The list of chunks to have titles deduped.
Returns:
List[BaseNode]: The deduped list of chunks.
"""
index = 0
while (index < len(pdf_chunks)):
if (
(pdf_chunks[index].metadata["type"] in ("Title")) # is title
and (index > 0) # is not first chunk
and (pdf_chunks[index - 1].metadata["type"] in ("Title")) # previous chunk is also title
):
# if (getattr(pdf_chunks[index], 'text', None) != getattr(pdf_chunks[index - 1], 'text', '')):
# pdf_chunks[index].text = getattr(pdf_chunks[index - 1], 'text', '') + '\n' + getattr(pdf_chunks[index], 'text', '')
pdf_chunks[index] = _combine_chunks(pdf_chunks[index - 1], pdf_chunks[index])
# NOTE: We'll remove the PRIOR title, since duplicates AND child relationships are built on the CURRENT title.
# There shouldn't be any PARENT/CHILD relationships to the title that we are deleting, so this seems fine.
pdf_chunks = _remove_chunk(pdf_chunks=pdf_chunks, chunk_index=index-1)
# NOTE: don't need to shift index because we removed an element.
else:
# We don't care about any situations other than consecutive title chunks.
index += 1
return (pdf_chunks)
def combine_listitem_chunks(pdf_chunks: list[GenericNode]) -> list[GenericNode]:
"""Given a list of chunks, combine any adjacent chunks which are ListItems into one List.
Args:
pdf_chunks (List[GenericNode]): The list of chunks to combine.
Returns:
List[GenericNode]: The list of chunks with ListItems combined into one List chunk.
"""
index = 0
while (index < len(pdf_chunks)):
if (
(pdf_chunks[index].metadata["type"] == "ListItem") # is list item
and (index > 0) # is not first chunk
and (pdf_chunks[index - 1].metadata["type"] == "ListItem") # previous chunk is also list item
):
# Okay, we have a consecutive list item. Combine into one list.
# NOTE: We'll remove the PRIOR list item, since duplicates AND child relationships are built on the CURRENT list item.
# 1. Append prior list item's text to the current list item's text
# pdf_chunks[index].text = getattr(pdf_chunks[index - 1], 'text', '') + '\n' + getattr(pdf_chunks[index], 'text', '')
pdf_chunks[index] = _combine_chunks(pdf_chunks[index - 1], pdf_chunks[index])
# 2. Remove PRIOR list item
pdf_chunks.pop(index - 1)
# 3. Replace NEXT relationship from PRIOR list item with the later list item node ID, if prior prior node exists.
if (index - 2 >= 0):
pdf_chunks[index - 2].relationships[NodeRelationship.NEXT] = RelatedNodeInfo(
node_id=pdf_chunks[index].node_id,
metadata={"filename": pdf_chunks[index].metadata["filename"]}
)
# 4. Replace PREVIOUS relationship from LATER list item with the prior prior node ID, if prior prior node exists.
pdf_chunks[index].relationships[NodeRelationship.PREVIOUS] = RelatedNodeInfo(
node_id=pdf_chunks[index - 2].node_id,
metadata={"filename": pdf_chunks[index - 2].metadata['filename']}
)
# NOTE: the PARENT/CHILD relationships should be the same as the previous list item, so this seems fine.
else:
# We don't care about any situations other than consecutive list item chunks.
index += 1
return (pdf_chunks)
def remove_header_footer_repeated(
pdf_chunks_input: list[GenericNode],
window_size: int = 3,
fuzz_threshold: int = 80
) -> list[GenericNode]:
"""Given a list of chunks, remove any header/footer chunks that are repeated across pages.
Args:
pdf_chunks (List[GenericNode]): The list of chunks to process.
window_size (int): The number of chunks to consider at the beginning and end of each page.
fuzz_threshold (int): The threshold for fuzzy matching of chunk texts.
Returns:
List[GenericNode]: The list of chunks with header/footer chunks removed.
"""
nodes_to_remove = set() # id's to remove.
pdf_chunks = deepcopy(pdf_chunks_input)
# Build a dictionary of chunks by page number
chunks_by_page = defaultdict(list)
for chunk in pdf_chunks:
chunk_page_number = min(chunk.metadata["page_number"]) if isinstance(chunk.metadata["page_number"], list) else chunk.metadata["page_number"]
chunks_by_page[chunk_page_number].append(chunk)
# Get the first window_size and last window_size chunks on each page
header_candidates = defaultdict(set) # hashmap of chunk text, and set of chunk ids with that text.
footer_candidates = defaultdict(set) # hashmap of chunk text, and set of chunk ids with that text.
page_number_regex = re.compile(r"(?:-|\( ?)?\b(?:page|p\.?(?:[pg](?:\b|\.)?)?)? ?(?:\d+|\b[ivxm]+\b)\.?(?: ?-|\))?\b", re.IGNORECASE)
for chunks in chunks_by_page.values():
header_chunks = chunks[:window_size]
footer_chunks = chunks[-window_size:]
for chunk in header_chunks:
chunk_text = getattr(chunk, "text", "")
if chunk.metadata["type"] == "Header" and len(chunk_text) > 0:
chunk_text_is_pagenum_only = page_number_regex.match(chunk_text)
if chunk_text_is_pagenum_only and (len(chunk_text_is_pagenum_only.group(0)) == len(chunk_text)):
# Full match!
chunk.text = "Page Number Only"
nodes_to_remove.add(chunk.node_id)
elif chunk_text_is_pagenum_only and len(chunk_text_is_pagenum_only.group(0)) > 0:
# Remove the page number content from the chunk text for this exercise
chunk_text = page_number_regex.sub('', chunk_text)
chunk.text = chunk_text
if chunk.metadata["type"] not in ("Image", "Table") and len(chunk_text) > 0:
header_candidates[chunk_text].add(chunk.node_id)
for chunk in footer_chunks:
chunk_text = getattr(chunk, "text", "")
if chunk.metadata["type"] == "Footer" and len(chunk_text) > 0:
chunk_text_is_pagenum_only = page_number_regex.match(chunk_text)
if chunk_text_is_pagenum_only and (len(chunk_text_is_pagenum_only.group(0)) == len(chunk_text)):
# Full match!
chunk.text = "Page Number Only"
nodes_to_remove.add(chunk.node_id)
elif chunk_text_is_pagenum_only and len(chunk_text_is_pagenum_only.group(0)) > 0:
# Remove the page number content from the chunk text for this exercise
chunk_text = page_number_regex.sub('', chunk_text)
chunk.text = chunk_text
if chunk.metadata["type"] not in ("Image", "Table") and len(chunk_text) > 0:
footer_candidates[chunk_text].add(chunk.node_id)
# Identify any texts which are too similar to other header texts.
header_texts = list(header_candidates.keys())
header_distance_matrix = rapidfuzz.process.cdist(header_texts, header_texts, scorer=rapidfuzz.fuzz.ratio, score_cutoff=fuzz_threshold)
footer_texts = list(footer_candidates.keys())
footer_distance_matrix = rapidfuzz.process.cdist(footer_texts, footer_texts, scorer=rapidfuzz.fuzz.ratio, score_cutoff=fuzz_threshold)
# Combine header candidates which are too similar to each other in the distance matrix
for i in range(len(header_distance_matrix)-1):
for j in range(i+1, len(header_distance_matrix)):
if i == j:
continue
if header_distance_matrix[i][j] >= fuzz_threshold:
header_candidates[header_texts[i]].update(header_candidates[header_texts[j]])
header_candidates[header_texts[j]].update(header_candidates[header_texts[i]])
for i in range(len(footer_distance_matrix)-1):
for j in range(i+1, len(footer_distance_matrix)):
if i == j:
continue
if footer_distance_matrix[i][j] >= fuzz_threshold:
footer_candidates[footer_texts[i]].update(footer_candidates[footer_texts[j]])
footer_candidates[footer_texts[j]].update(footer_candidates[footer_texts[i]])
headers_to_remove = set()
for chunk_ids in header_candidates.values():
if len(chunk_ids) > 1:
headers_to_remove.update(chunk_ids)
footers_to_remove = set()
for chunk_ids in footer_candidates.values():
if len(chunk_ids) > 1:
footers_to_remove.update(chunk_ids)
nodes_to_remove = nodes_to_remove.union(headers_to_remove.union(footers_to_remove))
for node_id in nodes_to_remove:
pdf_chunks = _remove_chunk(pdf_chunks=pdf_chunks, chunk_id=node_id)
return pdf_chunks
def remove_overlap_images(pdf_chunks: list[GenericNode]) -> list[GenericNode]:
# TODO(Jonathan Wang): Implement this function to remove images which are completely overlapping each other
# OR... get a better dang reader!
raise NotImplementedError
def chunk_by_header(
pdf_chunks_in: list[GenericNode],
combine_text_under_n_chars: int = 1024,
multipage_sections: bool = True,
# ) -> Tuple[List[GenericNode], List[GenericNode]]:
) -> list[GenericNode]:
"""Combine chunks together that are part of the same header and have similar meaning.
Args:
pdf_chunks (List[GenericNode]): List of chunks to be combined.
Returns:
List[GenericNode]: List of combined chunks.
List[GenericNode]: List of original chunks, with node references updated.
"""
# TODO(Jonathan Wang): Handle semantic chunking between elements within a Header chunk.
# TODO(Jonathan Wang): Handle splitting element chunks if they are over `max_characters` in length (does this ever really happen?)
# TODO(Jonathan Wang): Handle relationships between nodes.
pdf_chunks = deepcopy(pdf_chunks_in)
output = []
id_to_index = {}
index = 0
# Pass 1: Combine chunks together that are part of the same title chunk.
while (index < len(pdf_chunks)):
chunk = pdf_chunks[index]
if (chunk.metadata["type"] in ["Header", "Footer", "Image", "Table"]):
# These go immediately into the semantic title chunks and also reset the new node.
# Let's add a newline to distinguish from any other content.
if (chunk.metadata["type"] in ["Header", "Footer", "Table"]):
chunk.text = getattr(chunk, "text", "") + "\n"
output.append(chunk)
index += 1
continue
# Make a new node if we have a new title (or if we don't have a title).
if (
chunk.metadata["type"] == "Title"
):
# We're good, this node can stay as a TitleChunk.
chunk.metadata['type'] = 'Composite'
# if (not isinstance(chunk.metadata['page number'], list)):
# chunk.metadata['page number'] = [chunk.metadata['page number']]
# Let's add a newline to distinguish the title from the content.
setattr(chunk, 'text', getattr(chunk, 'text', '') + "\n")
output.append(chunk)
id_to_index[chunk.id_] = len(output) - 1
index += 1
continue
elif (chunk.metadata.get('parent_id', None) in id_to_index):
# This chunk is part of the same title as a prior chunk.
# Add this text into the prior title node.
jndex = id_to_index[chunk.metadata['parent_id']]
# if (not isinstance(output[jndex].metadata['page number'], list)):
# output[jndex].metadata['page number'] = [chunk.metadata['page number']]
output[jndex] = _combine_chunks(output[jndex], chunk)
# output[jndex].text = getattr(output[jndex], 'text', '') + '\n' + getattr(chunk, 'text', '')
# output[jndex].metadata['page number'] = list(set(output[jndex].metadata['page number'] + [chunk.metadata['page number']]))
# output[jndex].metadata['languages'] = list(set(output[jndex].metadata['languages'] + chunk.metadata['languages']))
pdf_chunks.remove(chunk)
continue
elif (
(chunk.metadata.get('parent_id', None) is None)
and (
len(getattr(chunk, 'text', '')) > combine_text_under_n_chars # big enough text section to stand alone
or (len(id_to_index.keys()) <= 0) # no prior title
)
):
# Okay, so either we don't have a title, or it was interrupted by an image / table.
# This chunk can stay as a TextChunk.
chunk.metadata['type'] = 'Composite-TextOnly'
# if (not isinstance(chunk.metadata['page number'], list)):
# chunk.metadata['page number'] = [chunk.metadata['page number']]
output.append(chunk)
id_to_index[chunk.id_] = len(output) - 1
index += 1
continue
else:
# Add the text to the prior node that isn't a table or image.
jndex = len(output) - 1
while (
(jndex >= 0)
and (output[jndex].metadata['type'] in ['Table', 'Image'])
):
# for title_chunk in output:
# print(f'''{title_chunk.id_}: {title_chunk.metadata['type']}, text: {title_chunk.text}, parent: {title_chunk.metadata['parent_id']}''')
jndex -= 1
if (jndex < 0):
raise Exception(f'''Prior title chunk not found: {index}, {chunk.metadata.get('parent_id', None)}''')
# Add this text into the prior title node.
# if (not isinstance(output[jndex].metadata['page number'], list)):
# output[jndex].metadata['page number'] = [chunk.metadata['page number']]
output[jndex] = _combine_chunks(output[jndex], chunk)
# output[jndex].text = getattr(output[jndex], 'text', '') + ' ' + getattr(chunk, 'text', '')
# output[jndex].metadata['page number'] = list(set(output[jndex].metadata['page number'] + [chunk.metadata['page number']]))
# output[jndex].metadata['languages'] = list(set(output[jndex].metadata['languages'] + chunk.metadata['languages']))
pdf_chunks.remove(chunk)
# TODO: Update relationships between nodes.
continue
return (output)
### TODO:
# Merge images together that are substantially overlapping.
# Favour image with no confidence score. (these come straight from pdf).
# Favour the larger image over the smaller one.
# Favour the image with higher confidence score.
def merge_images() -> None:
pass