Spaces:
Sleeping
Sleeping
import re | |
from bs4 import BeautifulSoup | |
import requests | |
import json | |
import io | |
import fitz | |
from pptx import Presentation | |
from io import BytesIO | |
import chardet | |
from docx import Document | |
import pandas as pd | |
from sumarize import summarize | |
from io import BytesIO | |
from pdfminer.pdfinterp import PDFResourceManager, PDFPageInterpreter | |
from pdfminer.converter import TextConverter | |
from io import StringIO | |
from pdfminer.layout import LAParams | |
from pdfminer.pdfpage import PDFPage | |
def trim_input_words(input_str, max_new_tokens = 512, max_total_tokens=32768): | |
words = input_str.split() | |
max_input_tokens = max_total_tokens - max_new_tokens | |
if len(words) > max_input_tokens - 100: | |
words = words[:max_input_tokens] | |
trimmed_input_str = ' '.join(words) | |
return trimmed_input_str | |
def select_words_until_char_limit(s, char_limit): | |
s_no_punct = re.sub(r'[^\w\s]', '', s) # remove punctuation, but leave spaces | |
words = s_no_punct.split() | |
selected_words = [] | |
total_chars = 0 | |
for word in words: | |
if total_chars + len(word) + 1 <= char_limit: | |
selected_words.append(word) | |
total_chars += len(word) + 1 # add 1 for the space | |
else: | |
break | |
f = trim_input_words(' '.join(selected_words)) | |
return f | |
def downl(url): | |
try: | |
rq = requests.get(url) | |
if rq.status_code != 200: | |
return "" | |
bs = BeautifulSoup(rq.text, features='lxml') | |
lis = bs.find_all('ul', class_='dropdown-menu')[-1].find_all('li') | |
link = lis[-1].find('a').get('href') | |
print(link) | |
return link | |
except Exception as e: | |
return "" | |
def pdf(url): | |
# Download the PDF content | |
response = requests.get(url) | |
pdf_content = response.content | |
# Convert the bytes object to a file-like object | |
pdf_file = BytesIO(pdf_content) | |
# Extract text from the downloaded PDF content | |
resource_manager = PDFResourceManager() | |
fake_file_handle = StringIO() | |
converter = TextConverter(resource_manager, fake_file_handle, laparams=LAParams()) | |
page_interpreter = PDFPageInterpreter(resource_manager, converter) | |
for page in PDFPage.get_pages(pdf_file): | |
page_interpreter.process_page(page) | |
text = fake_file_handle.getvalue() | |
f = select_words_until_char_limit(text, 30000) | |
converter.close() | |
fake_file_handle.close() | |
return f | |
def excel(link : str) -> str: | |
try: | |
response = requests.get(link) | |
if response.status_code == 200: | |
file_content = response.content | |
df = pd.read_excel(BytesIO(file_content)) | |
if df.shape[0] > 50: | |
sample_size = 50 | |
sample_df = df.sample(n=sample_size, random_state=42) | |
else: | |
sample_df = df | |
json_data = sample_df.to_json(orient='records') | |
js = json.loads(json_data) | |
rs = select_words_until_char_limit(f"{js}", 32000) | |
return rs | |
else: | |
print("Failed to download file") | |
return "No dat avaible error" | |
except Exception as e: | |
print(e) | |
return "No data avaible" | |
def csv(link : str) -> str: | |
try: | |
response = requests.get(link) | |
if response.status_code == 200: | |
file_content = response.content | |
detected_encoding = chardet.detect(file_content)['encoding'] | |
df = pd.read_csv(io.BytesIO(file_content), encoding=detected_encoding, sep=';') | |
if df.empty: | |
print("The DataFrame is empty.") | |
return 'The data frame is empty' | |
if df.shape[0] > 50: | |
sample_size = 50 | |
sample_df = df.sample(n=sample_size, random_state=42) | |
else: | |
sample_df = df | |
json_data = sample_df.to_json(orient='records') | |
js = json.loads(json_data) | |
rs = select_words_until_char_limit(f"{js}", 32000) | |
return rs | |
except Exception as e: | |
return 'No data avaible' | |
def docx(url : str) -> str: | |
try: | |
response = requests.get(url) | |
response.raise_for_status() # Ensure we notice bad responses | |
# Read the .docx file | |
file_stream = io.BytesIO(response.content) | |
doc = Document(file_stream) | |
# Extract text | |
full_text = [] | |
for para in doc.paragraphs: | |
full_text.append(para.text) | |
f = "\n".join(full_text) | |
n = select_words_until_char_limit(f, 32000) | |
return n | |
except Exception as e: | |
print(f"An error occurred: {e}") | |
return 'No data avaible' | |
def pptx(url : str) -> str: | |
try: | |
response = requests.get(url) | |
response.raise_for_status() | |
# Read the .pptx file | |
file_stream = io.BytesIO(response.content) | |
presentation = Presentation(file_stream) | |
# Extract text | |
full_text = [] | |
for slide in presentation.slides: | |
for shape in slide.shapes: | |
if hasattr(shape, "text"): | |
full_text.append(shape.text) | |
g = "\n".join(full_text) | |
c = select_words_until_char_limit(g, 32000) | |
return c | |
except Exception as e: | |
print(f"An error occurred: {e}") | |
return 'No data avaible' | |
def get_data(url): | |
ki = url.replace('\nObservation', '').replace('"\nObservation', '') | |
jo = downl(ki) | |
ext = jo.split(".")[-1] | |
if ext == 'xlsx' or ext == 'xls' or ext == 'xlsm': | |
rs = excel(jo) | |
return summarize.invoke({"input":rs}) | |
elif ext == 'pdf': | |
rs = pdf(jo) | |
return summarize.invoke({"input":rs}) | |
elif ext == 'docx': | |
rs = docx(jo) | |
return summarize.invoke({"input":rs}) | |
elif ext == 'csv': | |
rs = csv(jo) | |
return summarize.invoke({"input":rs}) | |
elif ext == 'pptx' or ext == 'ppt': | |
rs = pptx(jo) | |
return summarize.invoke({"input":rs}) | |
elif ext == 'doc': | |
return "L'extension .doc non supportée." | |
return "No data returned" | |