Spaces:
Build error
Build error
import os | |
os.environ["TOKENIZERS_PARALLELISM"] = "false" | |
from PIL import Image, ImageDraw | |
import traceback | |
import gradio as gr | |
from gradio import processing_utils | |
import torch | |
from docquery import pipeline | |
from docquery.document import load_bytes, load_document, ImageDocument | |
from docquery.ocr_reader import get_ocr_reader | |
def ensure_list(x): | |
if isinstance(x, list): | |
return x | |
else: | |
return [x] | |
CHECKPOINTS = { | |
"LayoutLMv1 for Invoices 🧾": "impira/layoutlm-invoices", | |
} | |
PIPELINES = {} | |
def construct_pipeline(task, model): | |
global PIPELINES | |
if model in PIPELINES: | |
return PIPELINES[model] | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
ret = pipeline(task=task, model=CHECKPOINTS[model], device=device) | |
PIPELINES[model] = ret | |
return ret | |
def run_pipeline(model, question, document, top_k): | |
pipeline = construct_pipeline("document-question-answering", model) | |
return pipeline(question=question, **document.context, top_k=top_k) | |
# TODO: Move into docquery | |
# TODO: Support words past the first page (or window?) | |
def lift_word_boxes(document, page): | |
return document.context["image"][page][1] | |
def expand_bbox(word_boxes): | |
if len(word_boxes) == 0: | |
return None | |
min_x, min_y, max_x, max_y = zip(*[x[1] for x in word_boxes]) | |
min_x, min_y, max_x, max_y = [min(min_x), min(min_y), max(max_x), max(max_y)] | |
return [min_x, min_y, max_x, max_y] | |
# LayoutLM boxes are normalized to 0, 1000 | |
def normalize_bbox(box, width, height, padding=0.005): | |
min_x, min_y, max_x, max_y = [c / 1000 for c in box] | |
if padding != 0: | |
min_x = max(0, min_x - padding) | |
min_y = max(0, min_y - padding) | |
max_x = min(max_x + padding, 1) | |
max_y = min(max_y + padding, 1) | |
return [min_x * width, min_y * height, max_x * width, max_y * height] | |
EXAMPLES = [ | |
[ | |
"invoice.png", | |
"Invoice 1", | |
], | |
[ | |
"contract.jpeg", | |
"What is the purchase amount?", | |
], | |
[ | |
"statement.png", | |
"What are net sales for 2020?", | |
], | |
] | |
QUESTION_FILES = {} | |
FIELDS = { | |
"Vendor Name": ["Vendor Name - Logo?", "Vendor Name - Address?"], | |
"Vendor Address": ["Vendor Address?"], | |
"Invoice Number": ["Invoice Number?"], | |
"Invoice Date": ["Invoice Date?"], | |
"Due Date": ["Due Date?"], | |
"Subtotal": ["Subtotal?"], | |
"Total Tax": ["Total Tax?"], | |
"Invoice Total": ["Invoice Total?"], | |
"Amount Due": ["Amount Due?"], | |
"Payment Terms": ["Payment Terms?"], | |
} | |
def empty_table(fields): | |
return {"value": [[name, None] for name in fields.keys()], "interactive": False} | |
def process_document(document, fields, model, error=None): | |
if document is not None and error is None: | |
preview, json_output, table = process_fields(document, fields, model) | |
return ( | |
document, | |
fields, | |
preview, | |
gr.update(visible=True), | |
gr.update(visible=False, value=None), | |
json_output, | |
table, | |
) | |
else: | |
return ( | |
None, | |
fields, | |
None, | |
gr.update(visible=False), | |
gr.update(visible=True, value=error) if error is not None else None, | |
None, | |
gr.update(**empty_table(fields)), | |
) | |
def process_path(path, fields, model): | |
error = None | |
document = None | |
if path: | |
try: | |
document = load_document(path) | |
except Exception as e: | |
traceback.print_exc() | |
error = str(e) | |
return process_document(document, fields, model, error) | |
def process_upload(file, fields, model): | |
return process_path(file.name if file else None, fields, model) | |
colors = ["#64A087", "green", "black"] | |
def annotate_page(prediction, pages, document): | |
if "word_ids" in prediction: | |
image = pages[prediction["page"]] | |
draw = ImageDraw.Draw(image, "RGBA") | |
word_boxes = lift_word_boxes(document, prediction["page"]) | |
x1, y1, x2, y2 = normalize_bbox( | |
expand_bbox([word_boxes[i] for i in prediction["word_ids"]]), | |
image.width, | |
image.height, | |
) | |
draw.rectangle(((x1, y1), (x2, y2)), fill=(0, 255, 0, int(0.4 * 255))) | |
def process_question( | |
question, document, img_gallery, model, fields, output, output_table | |
): | |
if not question or document is None: | |
return None, None, None | |
text_value = None | |
pages = [processing_utils.decode_base64_to_image(p) for p in img_gallery] | |
prediction = run_pipeline(model, question, document, 1) | |
annotate_page(prediction, pages, document) | |
field_name = question.rstrip("?") | |
fields = {field_name: [question], **fields} | |
output = {field_name: prediction, **output} | |
table = [[field_name, prediction.get("answer")]] + output_table.values.tolist() | |
return ( | |
gr.update(visible=True, value=pages), | |
fields, | |
output, | |
gr.update(value=table, interactive=False), | |
) | |
def process_fields(document, fields, model=list(CHECKPOINTS.keys())[0]): | |
pages = [x.copy().convert("RGB") for x in document.preview] | |
ret = {} | |
table = [] | |
for (field_name, questions) in fields.items(): | |
answers = [run_pipeline(model, q, document, top_k=1) for q in questions] | |
answers.sort(key=lambda x: -x.get("score", 0) if x else 0) | |
top = answers[0] | |
annotate_page(top, pages, document) | |
ret[field_name] = top | |
table.append([field_name, top.get("answer") if top is not None else None]) | |
return ( | |
gr.update(visible=True, value=pages), | |
gr.update(visible=True, value=ret), | |
gr.update(visible=True, value=table), | |
) | |
def load_example_document(img, title, fields, model): | |
document = None | |
if img is not None: | |
if title in QUESTION_FILES: | |
document = load_document(QUESTION_FILES[title]) | |
else: | |
document = ImageDocument(Image.fromarray(img), ocr_reader=get_ocr_reader()) | |
return process_document(document, fields, model) | |
CSS = """ | |
#question input { | |
font-size: 16px; | |
} | |
#url-textbox, #question-textbox { | |
padding: 0 !important; | |
} | |
#short-upload-box .w-full { | |
min-height: 10rem !important; | |
} | |
/* I think something like this can be used to re-shape | |
* the table | |
*/ | |
/* | |
.gr-samples-table tr { | |
display: inline; | |
} | |
.gr-samples-table .p-2 { | |
width: 100px; | |
} | |
*/ | |
#select-a-file { | |
width: 100%; | |
} | |
#file-clear { | |
padding-top: 2px !important; | |
padding-bottom: 2px !important; | |
padding-left: 8px !important; | |
padding-right: 8px !important; | |
margin-top: 10px; | |
} | |
.gradio-container .gr-button-primary { | |
background: linear-gradient(180deg, #CDF9BE 0%, #AFF497 100%); | |
border: 1px solid #B0DCCC; | |
border-radius: 8px; | |
color: #1B8700; | |
} | |
.gradio-container.dark button#submit-button { | |
background: linear-gradient(180deg, #CDF9BE 0%, #AFF497 100%); | |
border: 1px solid #B0DCCC; | |
border-radius: 8px; | |
color: #1B8700 | |
} | |
table.gr-samples-table tr td { | |
border: none; | |
outline: none; | |
} | |
table.gr-samples-table tr td:first-of-type { | |
width: 0%; | |
} | |
div#short-upload-box div.absolute { | |
display: none !important; | |
} | |
gradio-app > div > div > div > div.w-full > div, .gradio-app > div > div > div > div.w-full > div { | |
gap: 0px 2%; | |
} | |
gradio-app div div div div.w-full, .gradio-app div div div div.w-full { | |
gap: 0px; | |
} | |
gradio-app h2, .gradio-app h2 { | |
padding-top: 10px; | |
} | |
#answer { | |
overflow-y: scroll; | |
color: white; | |
background: #666; | |
border-color: #666; | |
font-size: 20px; | |
font-weight: bold; | |
} | |
#answer span { | |
color: white; | |
} | |
#answer textarea { | |
color:white; | |
background: #777; | |
border-color: #777; | |
font-size: 18px; | |
} | |
#url-error input { | |
color: red; | |
} | |
""" | |
with gr.Blocks(css=CSS) as demo: | |
gr.Markdown("# DocQuery: Document Query Engine") | |
gr.Markdown( | |
"DocQuery (created by [Impira](https://impira.com)) uses LayoutLMv1 fine-tuned on an invoice dataset" | |
" as well as DocVQA and SQuAD, which boot its general comprehension skills. The model is an enhanced" | |
" QA architecture that supports selecting blocks of text which may be non-consecutive, which is a major" | |
" issue when dealing with invoice documents (e.g. addresses)." | |
" To use it, simply upload an image or PDF invoice and the model will predict values for several fields." | |
" You can also create additional fields by simply typing in a question." | |
" DocQuery is available on [Github](https://github.com/impira/docquery)." | |
) | |
document = gr.Variable() | |
fields = gr.Variable(value={**FIELDS}) | |
example_question = gr.Textbox(visible=False) | |
example_image = gr.Image(visible=False) | |
with gr.Row(equal_height=True): | |
with gr.Column(): | |
with gr.Row(): | |
gr.Markdown("## 1. Select an invoice", elem_id="select-a-file") | |
img_clear_button = gr.Button( | |
"Clear", variant="secondary", elem_id="file-clear", visible=False | |
) | |
image = gr.Gallery(visible=False) | |
with gr.Row(equal_height=True): | |
with gr.Column(): | |
with gr.Row(): | |
url = gr.Textbox( | |
show_label=False, | |
placeholder="URL", | |
lines=1, | |
max_lines=1, | |
elem_id="url-textbox", | |
) | |
submit = gr.Button("Get") | |
url_error = gr.Textbox( | |
visible=False, | |
elem_id="url-error", | |
max_lines=1, | |
interactive=False, | |
label="Error", | |
) | |
gr.Markdown("— or —") | |
upload = gr.File(label=None, interactive=True, elem_id="short-upload-box") | |
gr.Examples( | |
examples=EXAMPLES, | |
inputs=[example_image, example_question], | |
) | |
with gr.Column() as col: | |
with gr.Tabs(): | |
with gr.TabItem("Table"): | |
output_table = gr.Dataframe( | |
headers=["Field", "Value"], **empty_table(fields.value) | |
) | |
with gr.TabItem("JSON"): | |
output = gr.JSON(label="Output", visible=False) | |
gr.Markdown("## 2. Ask a question") | |
model = gr.Radio( | |
choices=list(CHECKPOINTS.keys()), | |
value=list(CHECKPOINTS.keys())[0], | |
label="Model", | |
visible=False, | |
) | |
with gr.Row(): | |
question = gr.Textbox( | |
label="Question", | |
show_label=False, | |
placeholder="e.g. What is the invoice number?", | |
lines=1, | |
max_lines=1, | |
elem_id="question-textbox", | |
) | |
clear_button = gr.Button("Clear", variant="secondary", visible=False) | |
submit_button = gr.Button( | |
"Add", variant="primary", elem_id="submit-button" | |
) | |
for cb in [img_clear_button, clear_button]: | |
cb.click( | |
lambda _: ( | |
gr.update(visible=False, value=None), # image | |
None, # document | |
# {**FIELDS}, # fields | |
gr.update(visible=False, value=None), # output | |
# gr.update(**empty_table(FIELDS)), # output_table | |
# gr.update(visible=False), | |
None, | |
None, | |
None, | |
gr.update(visible=False, value=None), | |
None, | |
), | |
inputs=clear_button, | |
outputs=[ | |
image, | |
document, | |
# fields, | |
output, | |
# output_table, | |
# img_clear_button, | |
example_image, | |
upload, | |
url, | |
url_error, | |
question, | |
], | |
) | |
submit_outputs = [ | |
document, | |
fields, | |
image, | |
img_clear_button, | |
url_error, | |
output, | |
output_table, | |
] | |
upload.change( | |
fn=process_upload, | |
inputs=[upload, fields, model], | |
outputs=submit_outputs, | |
) | |
submit.click( | |
fn=process_path, | |
inputs=[url, fields, model], | |
outputs=submit_outputs, | |
) | |
question.submit( | |
fn=process_question, | |
inputs=[question, document, image, model, fields, output, output_table], | |
outputs=[image, fields, output, output_table], | |
) | |
submit_button.click( | |
process_question, | |
inputs=[question, document, model], | |
outputs=[image, output, output_table], | |
) | |
model.change( | |
process_question, | |
inputs=[question, document, model], | |
outputs=[image, output, output_table], | |
) | |
example_image.change( | |
fn=load_example_document, | |
inputs=[example_image, example_question, fields, model], | |
outputs=submit_outputs, | |
) | |
if __name__ == "__main__": | |
demo.launch(enable_queue=False) | |