Spaces:
Running
Running
File size: 7,102 Bytes
bf8498e 7db791c bf8498e df26250 2fd0e2b bf8498e 2a61b37 bf8498e 7db791c 65a0de7 bf8498e 65a0de7 2a61b37 ffbac09 bf8498e 2a61b37 bf8498e 2a61b37 ffbac09 bf8498e ffbac09 bf8498e ffbac09 7db791c bf8498e 7db791c bf8498e 7db791c bf8498e 7db791c bf8498e 2a61b37 bf8498e 822257a bf8498e ffbac09 7db791c bf8498e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
# extract_text_from_pdf.py
import os
import torch
#import spaces
from PyPDF2 import PdfReader
from accelerate import Accelerator
from transformers import AutoModelForCausalLM, AutoTokenizer
from tqdm import tqdm
import warnings
#import spaces
from groq import Groq
warnings.filterwarnings('ignore')
#@spaces.GPU
class PDFTextExtractor:
"""
A class to handle PDF text extraction and preprocessing for podcast preparation.
"""
#@spaces.GPU
def __init__(self, pdf_path, output_path):
"""
Initialize the PDFTextExtractor with paths and model details.
Args:
pdf_path (str): Path to the PDF file.
output_path (str): Path to save the cleaned text file.
model_name (str): Name of the model to use for text processing.
"""
#model_name="meta-llama/Llama-3.2-1B-Instruct"
self.pdf_path = pdf_path
self.output_path = output_path
self.max_chars = 100000
self.chunk_size = 1000
#self.device = "cuda" if torch.cuda.is_available() else "cpu"
# Initialize model and tokenizer
# self.accelerator = Accelerator()
# self.model = AutoModelForCausalLM.from_pretrained(model_name, torch_dtype=torch.bfloat16,use_safetensors=True,device_map=self.device)
# self.tokenizer = AutoTokenizer.from_pretrained(model_name, use_safetensors=True)
# self.model, self.tokenizer = self.accelerator.prepare(self.model, self.tokenizer)
self.model_name="llama3-8b-8192"
# System prompt for text processing
self.system_prompt = """
You are a world class text pre-processor, here is the raw data from a PDF, please parse and return it in a way that is crispy and usable to send to a podcast writer.
The raw data is messed up with new lines, Latex math and you will see fluff that we can remove completely. Basically take away any details that you think might be useless in a podcast author's transcript.
Remember, the podcast could be on any topic whatsoever so the issues listed above are not exhaustive
Please be smart with what you remove and be creative ok?
Remember DO NOT START SUMMARIZING THIS, YOU ARE ONLY CLEANING UP THE TEXT AND RE-WRITING WHEN NEEDED
Be very smart and aggressive with removing details, you will get a running portion of the text and keep returning the processed text.
PLEASE DO NOT ADD MARKDOWN FORMATTING, STOP ADDING SPECIAL CHARACTERS THAT MARKDOWN CAPATILISATION ETC LIKES
ALWAYS start your response directly with processed text and NO ACKNOWLEDGEMENTS about my questions ok?
Here is the text:"""
#@spaces.GPU
def validate_pdf(self):
"""Check if the file exists and is a valid PDF."""
if not os.path.exists(self.pdf_path):
print(f"Error: File not found at path: {self.pdf_path}")
return False
if not self.pdf_path.lower().endswith('.pdf'):
print("Error: File is not a PDF")
return False
return True
#@spaces.GPU
def extract_text(self):
"""Extract text from the PDF, limited by max_chars."""
if not self.validate_pdf():
return None
with open(self.pdf_path, 'rb') as file:
pdf_reader = PdfReader(file)
num_pages = len(pdf_reader.pages)
print(f"Processing PDF with {num_pages} pages...")
extracted_text = []
total_chars = 0
for page_num in range(num_pages):
page = pdf_reader.pages[page_num]
text = page.extract_text() or ""
if total_chars + len(text) > self.max_chars:
remaining_chars = self.max_chars - total_chars
extracted_text.append(text[:remaining_chars])
print(f"Reached {self.max_chars} character limit at page {page_num + 1}")
break
extracted_text.append(text)
total_chars += len(text)
print(f"Processed page {page_num + 1}/{num_pages}")
final_text = '\n'.join(extracted_text)
print(f"Extraction complete! Total characters: {len(final_text)}")
return final_text
#@spaces.GPU
def create_word_bounded_chunks(self, text):
"""Split text into chunks around the target size."""
words = text.split()
chunks = []
current_chunk = []
current_length = 0
for word in words:
word_length = len(word) + 1 # +1 for the space
if current_length + word_length > self.chunk_size and current_chunk:
chunks.append(' '.join(current_chunk))
current_chunk = [word]
current_length = word_length
else:
current_chunk.append(word)
current_length += word_length
if current_chunk:
chunks.append(' '.join(current_chunk))
return chunks
#@spaces.GPU(duration=120)
def process_chunk(self, text_chunk):
"""Process a text chunk with the model and return the cleaned text."""
conversation = [
{"role": "system", "content": self.system_prompt},
{"role": "user", "content": text_chunk}
]
# prompt = self.tokenizer.apply_chat_template(conversation, tokenize=False)
# inputs = self.tokenizer(prompt, return_tensors="pt").to(self.device)
# with torch.no_grad():
# output = self.model.generate(**inputs, temperature=0.7, top_p=0.9, max_new_tokens=512)
# processed_text = self.tokenizer.decode(output[0], skip_special_tokens=True)[len(prompt):].strip()
client = Groq(
api_key=os.environ.get("GROQ_API_KEY"),
)
chat_completion = client.chat.completions.create(
messages=conversation,
model=self.model_name,
)
processed_text = chat_completion.choices[0].message.content
return processed_text
#@spaces.GPU
def clean_and_save_text(self):
"""Extract, clean, and save processed text to a file."""
extracted_text = self.extract_text()
if not extracted_text:
return None
chunks = self.create_word_bounded_chunks(extracted_text)
processed_text = ""
with open(self.output_path, 'w', encoding='utf-8') as out_file:
for chunk_num, chunk in enumerate(tqdm(chunks, desc="Processing chunks")):
processed_chunk = self.process_chunk(chunk)
processed_text += processed_chunk + "\n"
out_file.write(processed_chunk + "\n")
out_file.flush()
print(f"\nExtracted and cleaned text has been saved to {self.output_path}")
return self.output_path
|