|
""" |
|
Based upon ImageCaptionLoader in LangChain version: langchain/document_loaders/image_captions.py |
|
But accepts preloaded model to avoid slowness in use and CUDA forking issues |
|
|
|
Loader that uses H2O DocTR OCR models to extract text from images |
|
|
|
""" |
|
from typing import List, Union, Any, Tuple, Optional |
|
|
|
import requests |
|
import torch |
|
from langchain.docstore.document import Document |
|
from langchain.document_loaders import ImageCaptionLoader |
|
import numpy as np |
|
from utils import get_device, clear_torch_cache, NullContext |
|
from doctr.utils.common_types import AbstractFile |
|
|
|
|
|
class H2OOCRLoader(ImageCaptionLoader): |
|
"""Loader that extracts text from images""" |
|
|
|
def __init__(self, path_images: Union[str, List[str]] = None, layout_aware=False, gpu_id=None): |
|
super().__init__(path_images) |
|
self._ocr_model = None |
|
self.layout_aware = layout_aware |
|
self.gpu_id = gpu_id if isinstance(gpu_id, int) and gpu_id >= 0 else 0 |
|
|
|
self.device = 'cpu' |
|
|
|
self.set_context() |
|
|
|
def set_context(self): |
|
if get_device() == 'cuda': |
|
import torch |
|
n_gpus = torch.cuda.device_count() if torch.cuda.is_available() else 0 |
|
if n_gpus > 0: |
|
self.context_class = torch.device |
|
if self.gpu_id is not None: |
|
self.device = "cuda:%d" % self.gpu_id |
|
else: |
|
self.device = 'cuda' |
|
else: |
|
self.device = 'cpu' |
|
else: |
|
self.device = 'cpu' |
|
|
|
def load_model(self): |
|
try: |
|
from weasyprint import HTML |
|
from doctr.models.zoo import ocr_predictor |
|
except ImportError: |
|
raise ValueError( |
|
"`doctr` package not found, please install with " |
|
"`pip install git+https://github.com/h2oai/doctr.git`." |
|
) |
|
if self._ocr_model: |
|
self._ocr_model = self._ocr_model.to(self.device) |
|
return self |
|
self.set_context() |
|
self._ocr_model = ocr_predictor(det_arch="db_resnet50", reco_arch="crnn_efficientnetv2_mV2", |
|
pretrained=True).to(self.device) |
|
return self |
|
|
|
def unload_model(self): |
|
if self._ocr_model and hasattr(self._ocr_model.det_predictor.model, 'cpu'): |
|
self._ocr_model.det_predictor.model.cpu() |
|
clear_torch_cache() |
|
if self._ocr_model and hasattr(self._ocr_model.reco_predictor.model, 'cpu'): |
|
self._ocr_model.reco_predictor.model.cpu() |
|
clear_torch_cache() |
|
if self._ocr_model and hasattr(self._ocr_model, 'cpu'): |
|
self._ocr_model.cpu() |
|
clear_torch_cache() |
|
|
|
def set_document_paths(self, document_paths: Union[str, List[str]]): |
|
""" |
|
Load from a list of image files |
|
""" |
|
if isinstance(document_paths, str): |
|
self.document_paths = [document_paths] |
|
else: |
|
self.document_paths = document_paths |
|
|
|
def load(self, prompt=None) -> List[Document]: |
|
if self._ocr_model is None: |
|
self.load_model() |
|
context_class = torch.cuda.device(self.gpu_id) if 'cuda' in str(self.device) else NullContext |
|
results = [] |
|
with context_class: |
|
for document_path in self.document_paths: |
|
caption, metadata = self._get_captions_and_metadata( |
|
model=self._ocr_model, document_path=document_path |
|
) |
|
doc = Document(page_content=" \n".join(caption), metadata=metadata) |
|
results.append(doc) |
|
|
|
return results |
|
|
|
@staticmethod |
|
def pad_resize_image(image): |
|
import cv2 |
|
|
|
L = 1024 |
|
H = 1024 |
|
|
|
|
|
Li, Hi = image.shape[1], image.shape[0] |
|
|
|
|
|
aspect_ratio_original = Li / Hi |
|
aspect_ratio_final = L / H |
|
|
|
|
|
if Li < L and Hi < H: |
|
|
|
padding_x = (L - Li) // 2 |
|
padding_y = (H - Hi) // 2 |
|
image = cv2.copyMakeBorder(image, padding_y, padding_y, padding_x, padding_x, cv2.BORDER_CONSTANT, value=[0, 0, 0]) |
|
elif Li > L and Hi > H: |
|
|
|
if aspect_ratio_original < aspect_ratio_final: |
|
|
|
new_height = H |
|
new_width = int(H * aspect_ratio_original) |
|
else: |
|
|
|
new_width = L |
|
new_height = int(L / aspect_ratio_original) |
|
image = cv2.resize(image, (new_width, new_height), interpolation=cv2.INTER_AREA) |
|
else: |
|
|
|
if aspect_ratio_original < aspect_ratio_final: |
|
|
|
new_height = H |
|
new_width = int(H * aspect_ratio_original) |
|
else: |
|
|
|
new_width = L |
|
new_height = int(L / aspect_ratio_original) |
|
image = cv2.resize(image, (new_width, new_height), interpolation=cv2.INTER_AREA) |
|
padding_x = (L - new_width) // 2 |
|
padding_y = (H - new_height) // 2 |
|
image = cv2.copyMakeBorder(image, padding_y, padding_y, padding_x, padding_x, cv2.BORDER_CONSTANT, value=[0, 0, 0]) |
|
return image |
|
|
|
def _get_captions_and_metadata( |
|
self, model: Any, document_path: str) -> Tuple[list, dict]: |
|
""" |
|
Helper function for getting the captions and metadata of an image |
|
""" |
|
try: |
|
from doctr.io import DocumentFile |
|
except ImportError: |
|
raise ValueError( |
|
"`doctr` package not found, please install with " |
|
"`pip install git+https://github.com/h2oai/doctr.git`." |
|
) |
|
try: |
|
if document_path.lower().endswith(".pdf"): |
|
|
|
images = read_pdf(document_path) |
|
else: |
|
images = DocumentFile.from_images(document_path) |
|
except Exception: |
|
raise ValueError(f"Could not get image data for {document_path}") |
|
document_words = [] |
|
shapes = [] |
|
for image in images: |
|
shape0 = str(image.shape) |
|
image = self.pad_resize_image(image) |
|
|
|
|
|
|
|
shape1 = str(image.shape) |
|
|
|
ocr_output = model([image]) |
|
page_words = [] |
|
page_boxes = [] |
|
for block_num, block in enumerate(ocr_output.pages[0].blocks): |
|
for line_num, line in enumerate(block.lines): |
|
for word_num, word in enumerate(line.words): |
|
if not (word.value or "").strip(): |
|
continue |
|
page_words.append(word.value) |
|
page_boxes.append( |
|
[word.geometry[0][0], word.geometry[0][1], word.geometry[1][0], word.geometry[1][1]]) |
|
if self.layout_aware: |
|
ids = boxes_sort(page_boxes) |
|
texts = [page_words[i] for i in ids] |
|
text_boxes = [page_boxes[i] for i in ids] |
|
page_words = space_layout(texts=texts, boxes=text_boxes) |
|
else: |
|
page_words = " ".join(page_words) |
|
document_words.append(page_words) |
|
shapes.append(dict(shape0=shape0, shape1=shape1)) |
|
metadata: dict = {"image_path": document_path, 'shape': str(shapes)} |
|
return document_words, metadata |
|
|
|
|
|
def boxes_sort(boxes): |
|
""" From left top to right bottom |
|
Params: |
|
boxes: [[x1, y1, x2, y2], [x1, y1, x2, y2], ...] |
|
""" |
|
sorted_id = sorted(range(len(boxes)), key=lambda x: (boxes[x][1])) |
|
|
|
|
|
|
|
return sorted_id |
|
|
|
|
|
def is_same_line(box1, box2): |
|
""" |
|
Params: |
|
box1: [x1, y1, x2, y2] |
|
box2: [x1, y1, x2, y2] |
|
""" |
|
|
|
box1_midy = (box1[1] + box1[3]) / 2 |
|
box2_midy = (box2[1] + box2[3]) / 2 |
|
|
|
if box1_midy < box2[3] and box1_midy > box2[1] and box2_midy < box1[3] and box2_midy > box1[1]: |
|
return True |
|
else: |
|
return False |
|
|
|
|
|
def union_box(box1, box2): |
|
""" |
|
Params: |
|
box1: [x1, y1, x2, y2] |
|
box2: [x1, y1, x2, y2] |
|
""" |
|
x1 = min(box1[0], box2[0]) |
|
y1 = min(box1[1], box2[1]) |
|
x2 = max(box1[2], box2[2]) |
|
y2 = max(box1[3], box2[3]) |
|
|
|
return [x1, y1, x2, y2] |
|
|
|
|
|
def space_layout(texts, boxes, threshold_show_spaces=8, threshold_char_width=0.02): |
|
line_boxes = [] |
|
line_texts = [] |
|
max_line_char_num = 0 |
|
line_width = 0 |
|
|
|
boxes = np.array(boxes) |
|
texts = np.array(texts) |
|
while len(boxes) > 0: |
|
box = boxes[0] |
|
mid = (boxes[:, 3] + boxes[:, 1]) / 2 |
|
inline_boxes = np.logical_and(mid > box[1], mid < box[3]) |
|
sorted_xs = np.argsort(boxes[inline_boxes][:, 0], axis=0) |
|
line_box = boxes[inline_boxes][sorted_xs] |
|
line_text = texts[inline_boxes][sorted_xs] |
|
boxes = boxes[~inline_boxes] |
|
texts = texts[~inline_boxes] |
|
|
|
line_boxes.append(line_box.tolist()) |
|
line_texts.append(line_text.tolist()) |
|
if len(" ".join(line_texts[-1])) > max_line_char_num: |
|
max_line_char_num = len(" ".join(line_texts[-1])) |
|
line_width = np.array(line_boxes[-1]) |
|
line_width = line_width[:, 2].max() - line_width[:, 0].min() |
|
|
|
char_width = (line_width / max_line_char_num) if max_line_char_num > 0 else 0 |
|
if threshold_char_width == 0.0: |
|
if char_width == 0: |
|
char_width = 1 |
|
else: |
|
if char_width <= 0.02: |
|
char_width = 0.02 |
|
|
|
space_line_texts = [] |
|
for i, line_box in enumerate(line_boxes): |
|
space_line_text = "" |
|
for j, box in enumerate(line_box): |
|
left_char_num = int(box[0] / char_width) |
|
left_char_num = max((left_char_num - len(space_line_text)), 1) |
|
|
|
|
|
|
|
|
|
|
|
if left_char_num > threshold_show_spaces: |
|
space_line_text += f" <{left_char_num}> " |
|
else: |
|
space_line_text += " " |
|
|
|
space_line_text += line_texts[i][j] |
|
space_line_texts.append(space_line_text + "\n") |
|
|
|
return "".join(space_line_texts) |
|
|
|
|
|
def read_pdf( |
|
file: AbstractFile, |
|
scale: float = 300 / 72, |
|
rgb_mode: bool = True, |
|
password: Optional[str] = None, |
|
**kwargs: Any, |
|
) -> List[np.ndarray]: |
|
"""Read a PDF file and convert it into an image in numpy format |
|
|
|
>>> from doctr.documents import read_pdf |
|
>>> doc = read_pdf("path/to/your/doc.pdf") |
|
|
|
Args: |
|
file: the path to the PDF file |
|
scale: rendering scale (1 corresponds to 72dpi) |
|
rgb_mode: if True, the output will be RGB, otherwise BGR |
|
password: a password to unlock the document, if encrypted |
|
kwargs: additional parameters to :meth:`pypdfium2.PdfPage.render` |
|
|
|
Returns: |
|
the list of pages decoded as numpy ndarray of shape H x W x C |
|
""" |
|
|
|
|
|
import pypdfium2 as pdfium |
|
pdf = pdfium.PdfDocument(file, password=password, autoclose=True) |
|
return [page.render(scale=scale, rev_byteorder=rgb_mode, **kwargs).to_numpy() for page in pdf] |
|
|