Spaces:
Running
Running
| # import gradio as gr | |
| # import json | |
| # import os | |
| # import tempfile | |
| # import img2pdf | |
| # import glob | |
| # import shutil | |
| # from img2pdf import Rotation | |
| # from pathlib import Path | |
| # print("--- DEBUG: Current Working Directory ---") | |
| # print(os.getcwd()) | |
| # print("--- DEBUG: Files in Root ---") | |
| # print(os.listdir('.')) | |
| # # ============================== | |
| # # PIPELINE IMPORT | |
| # # ============================== | |
| # # try: | |
| # # from working_yolo_pipeline import run_document_pipeline, DEFAULT_LAYOUTLMV3_MODEL_PATH, WEIGHTS_PATH | |
| # # except ImportError: | |
| # # print("Warning: 'working_yolo_pipeline.py' not found. Using dummy paths.") | |
| # try: | |
| # from working_yolo_pipeline import run_document_pipeline, DEFAULT_LAYOUTLMV3_MODEL_PATH, WEIGHTS_PATH | |
| # except Exception as e: # Catch ALL exceptions | |
| # print(f"Warning: Failed to import pipeline: {e}") | |
| # import traceback | |
| # traceback.print_exc() # Show the actual error | |
| # def run_document_pipeline(*args): | |
| # return {"error": "Placeholder pipeline function called."} | |
| # DEFAULT_LAYOUTLMV3_MODEL_PATH = "./models/layoutlmv3_model" | |
| # WEIGHTS_PATH = "./weights/yolo_weights.pt" | |
| # def process_file(uploaded_files, layoutlmv3_model_path=None): | |
| # """ | |
| # Robust handler for multiple or single file uploads. | |
| # Returns the final JSON and a LIST of all intermediate JSON files (OCR, Predictions, BIO). | |
| # """ | |
| # if uploaded_files is None: | |
| # return "β Error: No files uploaded.", None | |
| # if not isinstance(uploaded_files, list): | |
| # file_list = [uploaded_files] | |
| # else: | |
| # file_list = uploaded_files | |
| # if len(file_list) == 0: | |
| # return "β Error: Empty file list.", None | |
| # # 1. Resolve all file paths safely | |
| # resolved_paths = [] | |
| # for f in file_list: | |
| # try: | |
| # if isinstance(f, dict) and "path" in f: | |
| # resolved_paths.append(f["path"]) | |
| # elif hasattr(f, 'path'): | |
| # resolved_paths.append(f.path) | |
| # else: | |
| # resolved_paths.append(str(f)) | |
| # except Exception as e: | |
| # print(f"Error resolving path for {f}: {e}") | |
| # if not resolved_paths: | |
| # return "β Error: Could not resolve file paths.", None | |
| # # 2. Determine if we should merge into a single PDF | |
| # first_file = Path(resolved_paths[0]) | |
| # is_image = first_file.suffix.lower() in ['.jpg', '.jpeg', '.png', '.bmp', '.webp', '.tiff'] | |
| # try: | |
| # if len(resolved_paths) > 1 or is_image: | |
| # print(f"π¦ Converting {len(resolved_paths)} image(s) to a single PDF...") | |
| # temp_pdf = tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") | |
| # with open(temp_pdf.name, "wb") as f_out: | |
| # f_out.write(img2pdf.convert(resolved_paths, rotation=Rotation.ifvalid)) | |
| # processing_path = temp_pdf.name | |
| # else: | |
| # processing_path = resolved_paths[0] | |
| # # 3. Standard Pipeline Checks | |
| # final_model_path = layoutlmv3_model_path or DEFAULT_LAYOUTLMV3_MODEL_PATH | |
| # if not os.path.exists(final_model_path): | |
| # return f"β Error: Model not found at {final_model_path}", None | |
| # # 4. Call the pipeline | |
| # print(f"π Starting pipeline for: {processing_path}") | |
| # result = run_document_pipeline(processing_path, final_model_path) | |
| # # 5. SCRAPE FOR INTERMEDIATE FILES | |
| # # We look for all .json files in /tmp/ created during this run | |
| # base_name = Path(processing_path).stem | |
| # # This matches common patterns like /tmp/pipeline_run_... or filenames in /tmp/ | |
| # search_patterns = [ | |
| # f"/tmp/pipeline_run_{base_name}*/*.json", | |
| # f"/tmp/*{base_name}*.json" | |
| # ] | |
| # all_intermediate_jsons = [] | |
| # for pattern in search_patterns: | |
| # all_intermediate_jsons.extend(glob.glob(pattern)) | |
| # # Remove duplicates while preserving order | |
| # all_intermediate_jsons = list(dict.fromkeys(all_intermediate_jsons)) | |
| # # 6. Prepare Final Output for Display | |
| # if result is None or (isinstance(result, list) and len(result) == 0): | |
| # display_text = "β οΈ Pipeline failed at Step 3 (BIO Decoding).\nDownload the intermediate JSONs below to inspect OCR and Model Predictions." | |
| # else: | |
| # display_text = json.dumps(result, indent=2, ensure_ascii=False) | |
| # # If the final result succeeded, save it to a temp file so it can be downloaded too | |
| # temp_final = tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.json', prefix='final_result_') | |
| # json.dump(result, temp_final, indent=2, ensure_ascii=False) | |
| # temp_final.close() | |
| # all_intermediate_jsons.append(temp_final.name) | |
| # return display_text, all_intermediate_jsons | |
| # except Exception as e: | |
| # import traceback | |
| # traceback.print_exc() | |
| # return f"β Error: {str(e)}", None | |
| # # def visualize_detections(uploaded_files): | |
| # # """Shows the first uploaded image with YOLO bounding boxes""" | |
| # # if not uploaded_files: | |
| # # return None | |
| # # # Get first file path | |
| # # file_path = uploaded_files[0] if isinstance(uploaded_files, list) else uploaded_files | |
| # # if isinstance(file_path, dict): | |
| # # file_path = file_path["path"] | |
| # # import cv2 | |
| # # from ultralytics import YOLO | |
| # # # Load image | |
| # # img = cv2.imread(str(file_path)) | |
| # # if img is None: | |
| # # return None | |
| # # # Run YOLO | |
| # # model = YOLO(WEIGHTS_PATH) | |
| # # results = model.predict(source=img, conf=0.2, imgsz=640, verbose=False) | |
| # # # Draw boxes | |
| # # for box in results[0].boxes: | |
| # # class_id = int(box.cls[0]) | |
| # # class_name = model.names[class_id] | |
| # # if class_name in ['figure', 'equation']: | |
| # # x1, y1, x2, y2 = map(int, box.xyxy[0].cpu().numpy()) | |
| # # color = (0, 255, 0) if class_name == 'figure' else (255, 0, 0) | |
| # # cv2.rectangle(img, (x1, y1), (x2, y2), color, 2) | |
| # # cv2.putText(img, f"{class_name} {box.conf[0]:.2f}", | |
| # # (x1, y1-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2) | |
| # # # Save and return | |
| # # temp_path = tempfile.NamedTemporaryFile(delete=False, suffix=".png").name | |
| # # cv2.imwrite(temp_path, img) | |
| # # return temp_path | |
| # # # ============================== | |
| # # # GRADIO INTERFACE | |
| # # # ============================== | |
| # # with gr.Blocks(title="Document Analysis Pipeline") as demo: | |
| # # gr.Markdown("# π Full Pipeline Analysis") | |
| # # gr.Markdown("### π Intermediate File Recovery Active") | |
| # # gr.Markdown("The **Download** box will contain: \n1. OCR JSON (Step 1)\n2. Raw LayoutLMv3 Prediction JSON (Step 2)\n3. Final BIO JSON (Step 3)") | |
| # # with gr.Row(): | |
| # # with gr.Column(scale=1): | |
| # # file_input = gr.File( | |
| # # label="Upload PDFs or Images", | |
| # # file_types=[".pdf", ".jpg", ".jpeg", ".png", ".bmp", ".webp", ".tiff"], | |
| # # file_count="multiple", | |
| # # type="filepath" | |
| # # ) | |
| # # model_path_input = gr.Textbox( | |
| # # label="Model Path", | |
| # # value=DEFAULT_LAYOUTLMV3_MODEL_PATH | |
| # # ) | |
| # # process_btn = gr.Button("π Run Pipeline", variant="primary") | |
| # # with gr.Column(scale=2): | |
| # # json_output = gr.Code(label="Final Structured Output", language="json", lines=20) | |
| # # # IMPORTANT: file_count="multiple" allows returning the list of all stage files | |
| # # download_output = gr.File(label="Download All Pipeline Stages (JSON)", file_count="multiple") | |
| # # process_btn.click( | |
| # # fn=process_file, | |
| # # inputs=[file_input, model_path_input], | |
| # # outputs=[json_output, download_output] | |
| # # ) | |
| # # if __name__ == "__main__": | |
| # # demo.launch(server_name="0.0.0.0", server_port=7860, show_error=True) | |
| # # # ============================== | |
| # # # VISUAL DEBUG FUNCTION | |
| # # # ============================== | |
| # # def visualize_detections(uploaded_files): | |
| # # """Shows the first uploaded image with YOLO bounding boxes""" | |
| # # if not uploaded_files: | |
| # # return None | |
| # # try: | |
| # # # Get first file path | |
| # # file_path = uploaded_files[0] if isinstance(uploaded_files, list) else uploaded_files | |
| # # if isinstance(file_path, dict): | |
| # # file_path = file_path["path"] | |
| # # elif hasattr(file_path, 'path'): | |
| # # file_path = file_path.path | |
| # # import cv2 | |
| # # import numpy as np | |
| # # from ultralytics import YOLO | |
| # # import fitz | |
| # # # Handle PDF conversion to image | |
| # # if str(file_path).lower().endswith('.pdf'): | |
| # # doc = fitz.open(file_path) | |
| # # page_idx = int(page_num) - 1 | |
| # # page = doc.load_page(page_idx) | |
| # # pix = page.get_pixmap(matrix=fitz.Matrix(2.0, 2.0)) | |
| # # img = np.frombuffer(pix.samples, dtype=np.uint8).reshape(pix.height, pix.width, pix.n) | |
| # # if pix.n == 3: | |
| # # img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR) | |
| # # elif pix.n == 4: | |
| # # img = cv2.cvtColor(img, cv2.COLOR_RGBA2BGR) | |
| # # doc.close() | |
| # # else: | |
| # # img = cv2.imread(str(file_path)) | |
| # # if img is None: | |
| # # return None | |
| # # # Run YOLO detection | |
| # # model = YOLO(WEIGHTS_PATH) | |
| # # results = model.predict(source=img, conf=0.2, imgsz=640, verbose=False) | |
| # # # Draw bounding boxes | |
| # # detection_count = {'figure': 0, 'equation': 0} | |
| # # for box in results[0].boxes: | |
| # # class_id = int(box.cls[0]) | |
| # # class_name = model.names[class_id] | |
| # # if class_name in ['figure', 'equation']: | |
| # # x1, y1, x2, y2 = map(int, box.xyxy[0].cpu().numpy()) | |
| # # conf = float(box.conf[0]) | |
| # # # Green for figures, Red for equations | |
| # # color = (0, 255, 0) if class_name == 'figure' else (0, 0, 255) | |
| # # cv2.rectangle(img, (x1, y1), (x2, y2), color, 3) | |
| # # # Add label with confidence | |
| # # label = f"{class_name.upper()} {conf:.2f}" | |
| # # (text_width, text_height), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2) | |
| # # cv2.rectangle(img, (x1, y1 - text_height - 10), (x1 + text_width, y1), color, -1) | |
| # # cv2.putText(img, label, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) | |
| # # detection_count[class_name] += 1 | |
| # # # Add summary text at top | |
| # # summary = f"Detected: {detection_count['figure']} Figures (GREEN), {detection_count['equation']} Equations (RED)" | |
| # # cv2.rectangle(img, (10, 10), (10 + len(summary) * 10, 40), (0, 0, 0), -1) | |
| # # cv2.putText(img, summary, (15, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2) | |
| # # # Save to temp file | |
| # # temp_path = tempfile.NamedTemporaryFile(delete=False, suffix=".png").name | |
| # # cv2.imwrite(temp_path, img) | |
| # # return temp_path | |
| # # except Exception as e: | |
| # # print(f"Error in visualize_detections: {e}") | |
| # # import traceback | |
| # # traceback.print_exc() | |
| # # return None | |
| # # # ============================== | |
| # # # GRADIO INTERFACE | |
| # # # ============================== | |
| # # with gr.Blocks(title="Document Analysis Pipeline") as demo: | |
| # # gr.Markdown("# π Full Pipeline Analysis") | |
| # # gr.Markdown("### π Intermediate File Recovery Active") | |
| # # gr.Markdown("The **Download** box will contain: \n1. OCR JSON (Step 1)\n2. Raw LayoutLMv3 Prediction JSON (Step 2)\n3. Final BIO JSON (Step 3)") | |
| # # with gr.Row(): | |
| # # with gr.Column(scale=1): | |
| # # file_input = gr.File( | |
| # # label="Upload PDFs or Images", | |
| # # file_types=[".pdf", ".jpg", ".jpeg", ".png", ".bmp", ".webp", ".tiff"], | |
| # # file_count="multiple", | |
| # # type="filepath" | |
| # # ) | |
| # # page_selector = gr.Slider( | |
| # # minimum=1, | |
| # # maximum=100, | |
| # # value=1, | |
| # # step=1, | |
| # # label="PDF Page Number (for preview)", | |
| # # visible=True | |
| # # ) | |
| # # model_path_input = gr.Textbox( | |
| # # label="Model Path", | |
| # # value=DEFAULT_LAYOUTLMV3_MODEL_PATH | |
| # # ) | |
| # # # Debug button for visual inspection | |
| # # debug_btn = gr.Button("π Show YOLO Detections (First Page)", variant="secondary") | |
| # # # Main processing button | |
| # # process_btn = gr.Button("π Run Full Pipeline", variant="primary") | |
| # # with gr.Column(scale=2): | |
| # # # Visual debug output | |
| # # detection_preview = gr.Image(label="YOLO Detection Preview (Green=Figure, Red=Equation)", type="filepath") | |
| # # # Final JSON output | |
| # # json_output = gr.Code(label="Final Structured Output", language="json", lines=20) | |
| # # # Download all intermediate files | |
| # # download_output = gr.File(label="Download All Pipeline Stages (JSON)", file_count="multiple") | |
| # # # Wire up the debug button | |
| # # debug_btn.click( | |
| # # fn=visualize_detections, | |
| # # inputs=[file_input, page_selector], | |
| # # outputs=[detection_preview] | |
| # # ) | |
| # # # Wire up the main processing button | |
| # # process_btn.click( | |
| # # fn=process_file, | |
| # # inputs=[file_input, model_path_input], | |
| # # outputs=[json_output, download_output] | |
| # # ) | |
| # # if __name__ == "__main__": | |
| # # demo.launch(server_name="0.0.0.0", server_port=7860, show_error=True) | |
| # # ============================== | |
| # # VISUAL DEBUG FUNCTION | |
| # # ============================== | |
| # def visualize_detections(uploaded_files, page_num): | |
| # """Shows the selected PDF page or image with YOLO bounding boxes""" | |
| # if not uploaded_files: | |
| # return None | |
| # try: | |
| # import cv2 | |
| # import numpy as np | |
| # import tempfile | |
| # from ultralytics import YOLO | |
| # import fitz | |
| # # Get first file path | |
| # file_path = uploaded_files[0] if isinstance(uploaded_files, list) else uploaded_files | |
| # if isinstance(file_path, dict): | |
| # file_path = file_path["path"] | |
| # elif hasattr(file_path, 'path'): | |
| # file_path = file_path.path | |
| # # Handle PDF conversion to image | |
| # if str(file_path).lower().endswith('.pdf'): | |
| # doc = fitz.open(file_path) | |
| # # Ensure the selected page exists in the document | |
| # page_idx = min(max(int(page_num) - 1, 0), len(doc) - 1) | |
| # page = doc.load_page(page_idx) | |
| # pix = page.get_pixmap(matrix=fitz.Matrix(2.0, 2.0)) | |
| # img = np.frombuffer(pix.samples, dtype=np.uint8).reshape(pix.height, pix.width, pix.n) | |
| # if pix.n == 3: | |
| # img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR) | |
| # elif pix.n == 4: | |
| # img = cv2.cvtColor(img, cv2.COLOR_RGBA2BGR) | |
| # doc.close() | |
| # else: | |
| # img = cv2.imread(str(file_path)) | |
| # if img is None: | |
| # return None | |
| # # Run YOLO detection | |
| # model = YOLO(WEIGHTS_PATH) | |
| # results = model.predict(source=img, conf=0.2, imgsz=640, verbose=False) | |
| # # Draw bounding boxes | |
| # detection_count = {'figure': 0, 'equation': 0} | |
| # for box in results[0].boxes: | |
| # class_id = int(box.cls[0]) | |
| # class_name = model.names[class_id] | |
| # if class_name in ['figure', 'equation']: | |
| # x1, y1, x2, y2 = map(int, box.xyxy[0].cpu().numpy()) | |
| # conf = float(box.conf[0]) | |
| # # Green for figures, Red for equations | |
| # color = (0, 255, 0) if class_name == 'figure' else (0, 0, 255) | |
| # cv2.rectangle(img, (x1, y1), (x2, y2), color, 3) | |
| # # Add label with confidence | |
| # label = f"{class_name.upper()} {conf:.2f}" | |
| # (text_width, text_height), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2) | |
| # cv2.rectangle(img, (x1, y1 - text_height - 10), (x1 + text_width, y1), color, -1) | |
| # cv2.putText(img, label, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) | |
| # detection_count[class_name] += 1 | |
| # # Add summary text at top | |
| # summary = f"Page {page_num} | Detected: {detection_count['figure']} Figures, {detection_count['equation']} Equations" | |
| # cv2.rectangle(img, (10, 10), (10 + len(summary) * 11, 40), (0, 0, 0), -1) | |
| # cv2.putText(img, summary, (15, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2) | |
| # # Save to temp file | |
| # temp_path = tempfile.NamedTemporaryFile(delete=False, suffix=".png").name | |
| # cv2.imwrite(temp_path, img) | |
| # return temp_path | |
| # except Exception as e: | |
| # print(f"Error in visualize_detections: {e}") | |
| # import traceback | |
| # traceback.print_exc() | |
| # return None | |
| # # ============================== | |
| # # GRADIO INTERFACE | |
| # # ============================== | |
| # with gr.Blocks(title="Document Analysis Pipeline") as demo: | |
| # gr.Markdown("# π Full Pipeline Analysis") | |
| # gr.Markdown("### π Intermediate File Recovery Active") | |
| # gr.Markdown("The **Download** box will contain: \n1. OCR JSON (Step 1)\n2. Raw LayoutLMv3 Prediction JSON (Step 2)\n3. Final BIO JSON (Step 3)") | |
| # with gr.Row(): | |
| # with gr.Column(scale=1): | |
| # file_input = gr.File( | |
| # label="Upload PDFs or Images", | |
| # file_types=[".pdf", ".jpg", ".jpeg", ".png", ".bmp", ".webp", ".tiff"], | |
| # file_count="multiple", | |
| # type="filepath" | |
| # ) | |
| # page_selector = gr.Slider( | |
| # minimum=1, | |
| # maximum=100, | |
| # value=1, | |
| # step=1, | |
| # label="PDF Page Number (for preview)", | |
| # visible=True | |
| # ) | |
| # model_path_input = gr.Textbox( | |
| # label="Model Path", | |
| # value=DEFAULT_LAYOUTLMV3_MODEL_PATH | |
| # ) | |
| # # Debug button for visual inspection | |
| # debug_btn = gr.Button("π Show YOLO Detections", variant="secondary") | |
| # # Main processing button | |
| # process_btn = gr.Button("π Run Full Pipeline", variant="primary") | |
| # with gr.Column(scale=2): | |
| # # Visual debug output | |
| # detection_preview = gr.Image(label="YOLO Detection Preview (Green=Figure, Red=Equation)", type="filepath") | |
| # # Final JSON output | |
| # json_output = gr.Code(label="Final Structured Output", language="json", lines=20) | |
| # # Download all intermediate files | |
| # download_output = gr.File(label="Download All Pipeline Stages (JSON)", file_count="multiple") | |
| # # Wire up the debug button | |
| # debug_btn.click( | |
| # fn=visualize_detections, | |
| # inputs=[file_input, page_selector], | |
| # outputs=[detection_preview] | |
| # ) | |
| # # Wire up the main processing button | |
| # process_btn.click( | |
| # fn=process_file, | |
| # inputs=[file_input, model_path_input], | |
| # outputs=[json_output, download_output] | |
| # ) | |
| # if __name__ == "__main__": | |
| # demo.launch(server_name="0.0.0.0", server_port=7860, show_error=True) | |
| import gradio as gr | |
| import json | |
| import os | |
| import tempfile | |
| import img2pdf | |
| import glob | |
| import shutil | |
| from img2pdf import Rotation | |
| from pathlib import Path | |
| print("--- DEBUG: Current Working Directory ---") | |
| print(os.getcwd()) | |
| print("--- DEBUG: Files in Root ---") | |
| print(os.listdir('.')) | |
| # ============================== | |
| # PIPELINE IMPORT | |
| # ============================== | |
| try: | |
| from working_yolo_pipeline import run_document_pipeline, DEFAULT_LAYOUTLMV3_MODEL_PATH, WEIGHTS_PATH | |
| except Exception as e: # Catch ALL exceptions | |
| print(f"Warning: Failed to import pipeline: {e}") | |
| import traceback | |
| traceback.print_exc() # Show the actual error | |
| def run_document_pipeline(*args): | |
| yield {"status": "error", "message": "Placeholder pipeline function called."} | |
| DEFAULT_LAYOUTLMV3_MODEL_PATH = "./models/layoutlmv3_model" | |
| WEIGHTS_PATH = "./weights/yolo_weights.pt" | |
| # ============================== | |
| # MAIN PROCESSING GENERATOR | |
| # ============================== | |
| def process_file(uploaded_files, layoutlmv3_model_path=None): | |
| """ | |
| Robust handler for multiple or single file uploads. | |
| Streams the estimation first, then yields the final JSON and intermediate files. | |
| """ | |
| if uploaded_files is None: | |
| yield "β Error: No files uploaded.", None | |
| return | |
| if not isinstance(uploaded_files, list): | |
| file_list = [uploaded_files] | |
| else: | |
| file_list = uploaded_files | |
| if len(file_list) == 0: | |
| yield "β Error: Empty file list.", None | |
| return | |
| # 1. Resolve all file paths safely | |
| resolved_paths = [] | |
| for f in file_list: | |
| try: | |
| if isinstance(f, dict) and "path" in f: | |
| resolved_paths.append(f["path"]) | |
| elif hasattr(f, 'path'): | |
| resolved_paths.append(f.path) | |
| else: | |
| resolved_paths.append(str(f)) | |
| except Exception as e: | |
| print(f"Error resolving path for {f}: {e}") | |
| if not resolved_paths: | |
| yield "β Error: Could not resolve file paths.", None | |
| return | |
| # 2. Determine if we should merge into a single PDF | |
| first_file = Path(resolved_paths[0]) | |
| is_image = first_file.suffix.lower() in ['.jpg', '.jpeg', '.png', '.bmp', '.webp', '.tiff'] | |
| try: | |
| if len(resolved_paths) > 1 or is_image: | |
| print(f"π¦ Converting {len(resolved_paths)} image(s) to a single PDF...") | |
| temp_pdf = tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") | |
| with open(temp_pdf.name, "wb") as f_out: | |
| f_out.write(img2pdf.convert(resolved_paths, rotation=Rotation.ifvalid)) | |
| processing_path = temp_pdf.name | |
| else: | |
| processing_path = resolved_paths[0] | |
| # 3. Standard Pipeline Checks | |
| final_model_path = layoutlmv3_model_path or DEFAULT_LAYOUTLMV3_MODEL_PATH | |
| if not os.path.exists(final_model_path): | |
| yield f"β Error: Model not found at {final_model_path}", None | |
| return | |
| # 4. Call the pipeline generator | |
| print(f"π Starting pipeline for: {processing_path}") | |
| # Iterate through the yields from run_document_pipeline | |
| for pipeline_update in run_document_pipeline(processing_path, final_model_path): | |
| # --- Handle Estimation Yield --- | |
| if pipeline_update.get("status") == "estimating": | |
| display_text = "β±οΈ ESTIMATING PROCESSING TIME...\n\n" + json.dumps(pipeline_update, indent=2) | |
| yield display_text, None | |
| # --- Handle Final Complete Yield --- | |
| elif pipeline_update.get("status") == "complete": | |
| final_result = pipeline_update.get("result") | |
| # SCRAPE FOR INTERMEDIATE FILES | |
| base_name = Path(processing_path).stem | |
| search_patterns = [ | |
| f"/tmp/pipeline_run_{base_name}*/*.json", | |
| f"/tmp/*{base_name}*.json" | |
| ] | |
| all_intermediate_jsons = [] | |
| for pattern in search_patterns: | |
| all_intermediate_jsons.extend(glob.glob(pattern)) | |
| all_intermediate_jsons = list(dict.fromkeys(all_intermediate_jsons)) | |
| # Prepare Final Output for Display | |
| if final_result is None or (isinstance(final_result, list) and len(final_result) == 0): | |
| display_text = "β οΈ Pipeline failed at Step 3 (BIO Decoding).\nDownload the intermediate JSONs below to inspect OCR and Model Predictions." | |
| else: | |
| display_text = json.dumps(final_result, indent=2, ensure_ascii=False) | |
| # Save it to a temp file so it can be downloaded too | |
| temp_final = tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.json', prefix='final_result_') | |
| json.dump(final_result, temp_final, indent=2, ensure_ascii=False) | |
| temp_final.close() | |
| all_intermediate_jsons.append(temp_final.name) | |
| yield display_text, all_intermediate_jsons | |
| # --- Handle Error Yield --- | |
| elif pipeline_update.get("status") == "error": | |
| yield f"β Error: {pipeline_update.get('message')}", None | |
| except Exception as e: | |
| import traceback | |
| traceback.print_exc() | |
| yield f"β Error: {str(e)}", None | |
| # ============================== | |
| # VISUAL DEBUG FUNCTION | |
| # ============================== | |
| def visualize_detections(uploaded_files, page_num): | |
| """Shows the selected PDF page or image with YOLO bounding boxes""" | |
| if not uploaded_files: | |
| return None | |
| try: | |
| import cv2 | |
| import numpy as np | |
| import tempfile | |
| from ultralytics import YOLO | |
| import fitz | |
| # Get first file path | |
| file_path = uploaded_files[0] if isinstance(uploaded_files, list) else uploaded_files | |
| if isinstance(file_path, dict): | |
| file_path = file_path["path"] | |
| elif hasattr(file_path, 'path'): | |
| file_path = file_path.path | |
| # Handle PDF conversion to image | |
| if str(file_path).lower().endswith('.pdf'): | |
| doc = fitz.open(file_path) | |
| # Ensure the selected page exists in the document | |
| page_idx = min(max(int(page_num) - 1, 0), len(doc) - 1) | |
| page = doc.load_page(page_idx) | |
| pix = page.get_pixmap(matrix=fitz.Matrix(2.0, 2.0)) | |
| img = np.frombuffer(pix.samples, dtype=np.uint8).reshape(pix.height, pix.width, pix.n) | |
| if pix.n == 3: | |
| img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR) | |
| elif pix.n == 4: | |
| img = cv2.cvtColor(img, cv2.COLOR_RGBA2BGR) | |
| doc.close() | |
| else: | |
| img = cv2.imread(str(file_path)) | |
| if img is None: | |
| return None | |
| # Run YOLO detection | |
| model = YOLO(WEIGHTS_PATH) | |
| results = model.predict(source=img, conf=0.2, imgsz=640, verbose=False) | |
| # Draw bounding boxes | |
| detection_count = {'figure': 0, 'equation': 0} | |
| for box in results[0].boxes: | |
| class_id = int(box.cls[0]) | |
| class_name = model.names[class_id] | |
| if class_name in ['figure', 'equation']: | |
| x1, y1, x2, y2 = map(int, box.xyxy[0].cpu().numpy()) | |
| conf = float(box.conf[0]) | |
| # Green for figures, Red for equations | |
| color = (0, 255, 0) if class_name == 'figure' else (0, 0, 255) | |
| cv2.rectangle(img, (x1, y1), (x2, y2), color, 3) | |
| # Add label with confidence | |
| label = f"{class_name.upper()} {conf:.2f}" | |
| (text_width, text_height), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2) | |
| cv2.rectangle(img, (x1, y1 - text_height - 10), (x1 + text_width, y1), color, -1) | |
| cv2.putText(img, label, (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) | |
| detection_count[class_name] += 1 | |
| # Add summary text at top | |
| summary = f"Page {page_num} | Detected: {detection_count['figure']} Figures, {detection_count['equation']} Equations" | |
| cv2.rectangle(img, (10, 10), (10 + len(summary) * 11, 40), (0, 0, 0), -1) | |
| cv2.putText(img, summary, (15, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2) | |
| # Save to temp file | |
| temp_path = tempfile.NamedTemporaryFile(delete=False, suffix=".png").name | |
| cv2.imwrite(temp_path, img) | |
| return temp_path | |
| except Exception as e: | |
| print(f"Error in visualize_detections: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return None | |
| # ============================== | |
| # GRADIO INTERFACE | |
| # ============================== | |
| with gr.Blocks(title="Document Analysis Pipeline") as demo: | |
| gr.Markdown("# π Full Pipeline Analysis") | |
| gr.Markdown("### π Intermediate File Recovery Active") | |
| gr.Markdown("The **Download** box will contain: \n1. OCR JSON (Step 1)\n2. Raw LayoutLMv3 Prediction JSON (Step 2)\n3. Final BIO JSON (Step 3)") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| file_input = gr.File( | |
| label="Upload PDFs or Images", | |
| file_types=[".pdf", ".jpg", ".jpeg", ".png", ".bmp", ".webp", ".tiff"], | |
| file_count="multiple", | |
| type="filepath" | |
| ) | |
| page_selector = gr.Slider( | |
| minimum=1, | |
| maximum=100, | |
| value=1, | |
| step=1, | |
| label="PDF Page Number (for preview)", | |
| visible=True | |
| ) | |
| model_path_input = gr.Textbox( | |
| label="Model Path", | |
| value=DEFAULT_LAYOUTLMV3_MODEL_PATH | |
| ) | |
| # Debug button for visual inspection | |
| debug_btn = gr.Button("π Show YOLO Detections", variant="secondary") | |
| # Main processing button | |
| process_btn = gr.Button("π Run Full Pipeline", variant="primary") | |
| with gr.Column(scale=2): | |
| # Visual debug output | |
| detection_preview = gr.Image(label="YOLO Detection Preview (Green=Figure, Red=Equation)", type="filepath") | |
| # Final JSON output (Will update with estimation, then final result) | |
| json_output = gr.Code(label="Pipeline Output", language="json", lines=20) | |
| # Download all intermediate files | |
| download_output = gr.File(label="Download All Pipeline Stages (JSON)", file_count="multiple") | |
| # Wire up the debug button | |
| debug_btn.click( | |
| fn=visualize_detections, | |
| inputs=[file_input, page_selector], | |
| outputs=[detection_preview] | |
| ) | |
| # Wire up the main processing button | |
| process_btn.click( | |
| fn=process_file, | |
| inputs=[file_input, model_path_input], | |
| outputs=[json_output, download_output], | |
| api_name="process" # This enables the streaming endpoint /api/process | |
| ) | |
| if __name__ == "__main__": | |
| # IMPORTANT: .queue() is required for streaming generators to work! | |
| demo.queue().launch(server_name="0.0.0.0", server_port=7860, show_error=True) | |