Spaces:
Sleeping
Sleeping
import os # Included in Python's standard library | |
from openai import OpenAI # Official OpenAI Python package | |
from IPython.display import Audio # Included in Python's standard library | |
## supporting functions | |
import base64, textwrap, time, io | |
from PIL import Image # Pillow image library | |
import tempfile | |
from pdf2image import convert_from_path # Convert PDFs to images | |
import gradio as gr # For building a UI | |
from gradio_pdf import PDF # Gradio PDF component | |
from dotenv import load_dotenv # To load environment variables from .env file | |
# Load environment variables | |
load_dotenv() | |
load_dotenv() | |
openai_api_key = os.getenv('OPENAI_API_KE') | |
client = OpenAI( | |
api_key=openai_api_key) # Fix the key to 'OPENAI_API_KEY' | |
# Resize the image if it exceeds the max dimension | |
def resize_image(image, max_dimension): | |
width, height = image.size | |
# Convert to RGB or RGBA if necessary (for images with transparency) | |
if image.mode == "P": | |
if "transparency" in image.info: | |
image = image.convert("RGBA") | |
else: | |
image = image.convert("RGB") | |
# Resize the image if it exceeds the max dimension | |
if width > max_dimension or height > max_dimension: | |
if width > height: | |
new_width = max_dimension | |
new_height = int(height * (max_dimension / width)) | |
else: | |
new_height = max_dimension | |
new_width = int(width * (max_dimension / height)) | |
image = image.resize((new_width, new_height), Image.LANCZOS) | |
return image | |
# Convert the image to PNG format and return it as a byte stream | |
def convert_to_png(image): | |
with io.BytesIO() as output: | |
image.save(output, format="PNG") | |
return output.getvalue() | |
# Process the image (resize and convert if necessary) | |
def process_image(path, max_size): | |
with Image.open(path) as image: | |
width, height = image.size | |
mimetype = Image.MIME.get(image.format) # Corrected the mimetype retrieval | |
if mimetype == "image/png" and width <= max_size and height <= max_size: | |
with open(path, "rb") as f: | |
encoded_image = base64.b64encode(f.read()).decode('utf-8') | |
return (encoded_image, max(width, height)) # Return as a tuple | |
else: | |
resized_image = resize_image(image, max_size) | |
png_image = convert_to_png(resized_image) | |
return (base64.b64encode(png_image).decode('utf-8'), | |
max(resized_image.size)) # Return resized image size | |
# Create the image content metadata | |
def create_image_content(image, maxdim, detail_threshold): | |
detail = "low" if maxdim < detail_threshold else "high" | |
return { | |
"type": "image_url", | |
"image_url": {"url": f"data:image/jpeg;base64,{image}", "detail": detail} | |
} | |
# Set the system message | |
def set_system_message(sysmsg): | |
return [{ | |
"role": "system", | |
"content": sysmsg | |
}] | |
# Set user message along with attached images | |
def set_user_message(user_msg_str, file_path_list=[], max_size_px=1024, file_names_list=None, tiled=False, detail_threshold=700): | |
if not isinstance(file_path_list, list): # Ensure the file list is a list | |
file_path_list = [] | |
if not file_path_list: # No files means no tiling | |
tiled = False | |
# Handle file names if provided | |
if file_names_list and len(file_names_list) == len(file_path_list): | |
file_names = file_names_list | |
else: | |
file_names = [os.path.basename(path) for path in file_path_list] | |
# Process images to base64 | |
base64_images = [process_image(path, max_size_px) for path in file_path_list] | |
uploaded_images_text = "" | |
if file_names: | |
uploaded_images_text = "\n\n---\n\nUploaded images:\n" + '\n'.join(file_names) | |
# Add content based on whether we are tiling images | |
if tiled: | |
content = [{"type": "text", "text": user_msg_str + uploaded_images_text}] | |
content += [create_image_content(image, maxdim, detail_threshold) for image, maxdim in base64_images] | |
return [{"role": "user", "content": content}] | |
else: | |
return [{ | |
"role": "user", | |
"content": ([user_msg_str + uploaded_images_text] | |
+ [{"image": image} for image, _ in base64_images]) | |
}] | |
# Define the path to Poppler (for PDF to image conversion) | |
poppler_path = '/usr/bin' # Adjust this path if needed | |
# Add the Poppler path to the system PATH | |
os.environ['PATH'] += os.pathsep + poppler_path | |
# Convert a PDF to images | |
def pdf_to_images(pdf_path, dpi=300, output_format='JPEG'): | |
temp_dir = tempfile.mkdtemp() | |
pages = convert_from_path(pdf_path, dpi) | |
image_paths = [] | |
for i, page in enumerate(pages): | |
image_path = os.path.join(temp_dir, f'page{i}.{output_format.lower()}') | |
page.save(image_path, output_format) | |
image_paths.append(image_path) | |
return image_paths | |
# System message setup for the assistant | |
system_msg = """ | |
You are an intelligent assistant tasked with extracting and validating information from French real estate syndic documents (*appel de fonds*). These documents contain financial details, property information, and owner details. Your job is to extract and ensure the correctness of the following information: | |
### Task Overview: | |
You need to extract and validate the following fields: | |
1. **Total à payer**: The total amount the owner must pay for the period. | |
2. **Fond travaux alur**: The amount allocated to the ALUR works fund. | |
3. **Total Part charges prévisionnelles**: The forecasted portion of charges the owner must pay for general building maintenance, collective services, etc. | |
4. **Part autres travaux**: Any additional expenses related to specific works or repairs. | |
5. **le solde précédent**: The previous balance from past transactions (can be positive or negative). | |
6. **Propriétaire**: The name of the property owner. | |
7. **Adresse du propriétaire**: The postal address of the owner. | |
8. **Adresse du bien**: The location of the property (address of the unit or building). | |
9. **Référence**: The reference number of the document or account related to the property. | |
10. **Date du document**: The date when the document was issued. | |
11. **Date limite du paiement**: The deadline by which the payment must be made. | |
12. **Montant total solde en notre faveur**: The total balance in favor of the syndic (if applicable). | |
### Validation Rule: | |
The following validation rules must be respected: | |
- **Total à payer** = **Fond travaux alur** + **Total Part charges prévisionnelles** + **Part autres travaux**. | |
- The amounts should be taken from the "débit" column, not the "crédit" column, to ensure accuracy. Verify that the **Total à payer** is from the correct column (débit). | |
- Additionally, both **Total à payer** and **Montant total solde en notre faveur** should be extracted for a cross-check to ensure that the final amounts are accurate and reflect the correct financial state. | |
### Format for Output: | |
Return the extracted information in JSON format. If there is a discrepancy (such as a mismatch between amounts or amounts found in the wrong column), return an error message in JSON format explaining the issue. | |
""".strip() | |
# Define process function to convert PDF and extract information | |
def process(pdf): | |
image_paths = pdf_to_images(pdf) | |
system = set_system_message(system_msg) | |
user = set_user_message("", file_path_list=image_paths, max_size_px=1024) | |
params = { | |
"model": "gpt-4o", # Fixed the model name | |
"temperature": 0.01, | |
"max_tokens": 500, | |
"stream": False, | |
"messages": system + user | |
} | |
start = time.perf_counter() | |
try: | |
response = client.chat.completions.create(**params) | |
reply = response['choices'][0]['message']['content'] | |
except Exception as e: | |
print(f"Error during API call: {e}") | |
return None | |
print(f"\n[elapsed: {time.perf_counter() - start:.2f} seconds]") | |
return reply | |
# Define Gradio interface for the PDF processing | |
iface = gr.Interface( | |
fn=process, | |
inputs=PDF(label="Upload a PDF", interactive=True), | |
outputs=gr.Textbox(label="Extracted Information"), | |
title="Immobilier", | |
description="Upload a PDF and extract the required information." | |
) | |
iface.launch(debug=True) | |