Spaces:
Sleeping
Sleeping
import os | |
import pandas as pd | |
import cv2 | |
import numpy as np | |
import json | |
import requests | |
import traceback | |
import tempfile | |
from PIL import Image | |
def preprocess_image(image_path, max_file_size_mb=1, target_file_size_mb=0.5): | |
try: | |
# Read the image | |
image = cv2.imread(image_path) | |
# Enhance text | |
enhanced = enhance_txt(image) | |
# Save the enhanced image to a temporary file | |
temp_file_path = tempfile.NamedTemporaryFile(suffix='.jpg').name | |
cv2.imwrite(temp_file_path, enhanced) | |
# Check file size of the temporary file | |
file_size_mb = os.path.getsize( | |
temp_file_path) / (1024 * 1024) # Convert to megabytes | |
while file_size_mb > max_file_size_mb: | |
print( | |
f"File size ({file_size_mb} MB) exceeds the maximum allowed size ({max_file_size_mb} MB). Resizing the image.") | |
ratio = np.sqrt(target_file_size_mb / file_size_mb) | |
new_width = int(image.shape[1] * ratio) | |
new_height = int(image.shape[0] * ratio) | |
# Resize the image | |
enhanced = cv2.resize(enhanced, (new_width, new_height)) | |
# Save the resized image to a temporary file | |
temp_file_path = tempfile.NamedTemporaryFile(suffix='.jpg').name | |
cv2.imwrite(temp_file_path, enhanced) | |
# Update file size | |
file_size_mb = os.path.getsize(temp_file_path) / (1024 * 1024) | |
print(f"New file size: ({file_size_mb} MB)") | |
# Return the final resized image | |
image_resized = cv2.imread(temp_file_path) | |
return image_resized | |
except Exception as e: | |
print(f"An error occurred in preprocess_image: {str(e)}") | |
return None | |
def enhance_txt(img, intensity_increase=20, bilateral_filter_diameter=9, bilateral_filter_sigma_color=75, bilateral_filter_sigma_space=75): | |
# Get the width and height of the image | |
w = img.shape[1] | |
h = img.shape[0] | |
w1 = int(w * 0.05) | |
w2 = int(w * 0.95) | |
h1 = int(h * 0.05) | |
h2 = int(h * 0.95) | |
ROI = img[h1:h2, w1:w2] # 95% of the center of the image | |
threshold = np.mean(ROI) * 0.88 # % of average brightness | |
# Convert image to grayscale | |
grayscale_img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | |
# Find contours | |
contours, _ = cv2.findContours( | |
grayscale_img, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
# # Apply Gaussian blur | |
blurred = cv2.GaussianBlur(grayscale_img, (1, 1), 0) | |
edged = 255 - cv2.Canny(blurred, 100, 150, apertureSize=7) | |
# Increase intensity by adding a constant value | |
img = np.clip(img + intensity_increase, 0, 255).astype(np.uint8) | |
# Apply bilateral filter to reduce noise | |
img = cv2.bilateralFilter(img, bilateral_filter_diameter, | |
bilateral_filter_sigma_color, bilateral_filter_sigma_space) | |
_, binary = cv2.threshold(blurred, threshold, 255, cv2.THRESH_BINARY) | |
return binary | |
def run_tesseract_on_preprocessed_image(preprocessed_image, image_path): | |
image_name = os.path.basename(image_path) | |
image_name = image_name[:image_name.find('.')] | |
# Create the "temp" folder if it doesn't exist | |
temp_folder = "static/temp" | |
if not os.path.exists(temp_folder): | |
os.makedirs(temp_folder) | |
# Define the OCR API endpoint | |
url = "https://api.ocr.space/parse/image" | |
# Define the API key and the language | |
# api_key = "K88232854988957" # Replace with your actual OCR Space API key | |
api_key = os.getenv("ocr_space") | |
language = "eng" | |
# Save the preprocessed image | |
cv2.imwrite(os.path.join( | |
temp_folder, f"{image_name}_preprocessed.jpg"), preprocessed_image) | |
# Open the preprocessed image file as binary | |
with open(os.path.join(temp_folder, f"{image_name}_preprocessed.jpg"), "rb") as f: | |
# Define the payload for the API request | |
payload = { | |
"apikey": api_key, | |
"language": language, | |
"isOverlayRequired": True, | |
"OCREngine": 2 | |
} | |
# Define the file parameter for the API request | |
file = { | |
"file": f | |
} | |
# Send the POST request to the OCR API | |
response = requests.post(url, data=payload, files=file) | |
# Check the status code of the response | |
if response.status_code == 200: | |
# Parse the JSON response | |
result = response.json() | |
print("---JSON file saved") | |
# Save the OCR result as JSON | |
with open(os.path.join(temp_folder, f"{image_name}_ocr.json"), 'w') as f: | |
json.dump(result, f) | |
return os.path.join(temp_folder, f"{image_name}_ocr.json") | |
else: | |
raise Exception("An error occurred: " + response.text) | |
def clean_tesseract_output(json_output_path): | |
try: | |
with open(json_output_path, 'r') as json_file: | |
data = json.load(json_file) | |
lines = data['ParsedResults'][0]['TextOverlay']['Lines'] | |
words = [] | |
for line in lines: | |
for word_info in line['Words']: | |
word = {} | |
origin_box = [ | |
word_info['Left'], | |
word_info['Top'], | |
word_info['Left'] + word_info['Width'], | |
word_info['Top'] + word_info['Height'] | |
] | |
word['word_text'] = word_info['WordText'] | |
word['word_box'] = origin_box | |
words.append(word) | |
return words | |
except (KeyError, IndexError, FileNotFoundError, json.JSONDecodeError) as e: | |
print(f"Check your Internet Connection.") | |
print(f"Error cleaning Tesseract output: {str(e)}") | |
return None | |
def prepare_batch_for_inference(image_paths): | |
# print("my_function was called") | |
# traceback.print_stack() # This will print the stack trace | |
# Print the total number of images to be processed | |
print(f"Number of images to process: {len(image_paths)}") | |
print("1. Preparing for Inference") | |
tsv_output_paths = [] | |
inference_batch = dict() | |
print("2. Starting Preprocessing") | |
# Ensure that the image is only 1 | |
for image_path in image_paths: | |
# Print the image being processed | |
print(f"Processing the image: {image_path}") | |
print("3. Preprocessing the Receipt") | |
preprocessed_image = preprocess_image(image_path) | |
if preprocessed_image is not None: | |
try: | |
print("4. Preprocessing done. Running OCR") | |
try: | |
json_output_path = run_tesseract_on_preprocessed_image( | |
preprocessed_image, image_path) | |
except Exception as e: | |
print(f"An error has occured: {str(e)}") | |
raise e | |
print("5. OCR Complete") | |
except Exception as e: | |
print(f"An error has occured: {str(e)}") | |
raise e | |
if json_output_path: | |
tsv_output_paths.append(json_output_path) | |
print("6. Preprocessing and OCR Done") | |
# clean_outputs is a list of lists | |
clean_outputs = [clean_tesseract_output( | |
tsv_path) for tsv_path in tsv_output_paths] | |
print("7. Cleaned OCR output") | |
word_lists = [[word['word_text'] for word in clean_output] | |
for clean_output in clean_outputs] | |
print("8. Word List Created") | |
boxes_lists = [[word['word_box'] for word in clean_output] | |
for clean_output in clean_outputs] | |
print("9. Box List Created") | |
inference_batch = { | |
"image_path": image_paths, | |
"bboxes": boxes_lists, | |
"words": word_lists | |
} | |
print("10. Prepared for Inference Batch") | |
return inference_batch | |