File size: 7,102 Bytes
bf8498e
 
 
 
7db791c
bf8498e
 
 
 
 
df26250
 
2fd0e2b
bf8498e
 
2a61b37
bf8498e
 
 
 
7db791c
65a0de7
bf8498e
 
 
 
 
 
 
 
65a0de7
2a61b37
ffbac09
bf8498e
 
 
 
2a61b37
 
bf8498e
 
2a61b37
 
 
 
 
ffbac09
bf8498e
 
 
 
ffbac09
 
bf8498e
ffbac09
 
 
 
 
 
 
 
 
 
 
 
 
7db791c
bf8498e
 
 
 
 
 
 
 
 
 
7db791c
bf8498e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7db791c
bf8498e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7db791c
bf8498e
 
 
 
 
 
 
2a61b37
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bf8498e
822257a
bf8498e
 
ffbac09
 
7db791c
bf8498e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# extract_text_from_pdf.py

import os
import torch
#import spaces
from PyPDF2 import PdfReader
from accelerate import Accelerator
from transformers import AutoModelForCausalLM, AutoTokenizer
from tqdm import tqdm
import warnings
#import spaces
from groq import Groq

warnings.filterwarnings('ignore')

#@spaces.GPU
class PDFTextExtractor:
    """
    A class to handle PDF text extraction and preprocessing for podcast preparation.
    """
    #@spaces.GPU
    def __init__(self, pdf_path, output_path):
        """
        Initialize the PDFTextExtractor with paths and model details.
        
        Args:
            pdf_path (str): Path to the PDF file.
            output_path (str): Path to save the cleaned text file.
            model_name (str): Name of the model to use for text processing.
        """

        #model_name="meta-llama/Llama-3.2-1B-Instruct"

        self.pdf_path = pdf_path
        self.output_path = output_path
        self.max_chars = 100000
        self.chunk_size = 1000
        
        #self.device = "cuda" if torch.cuda.is_available() else "cpu"
        
        # Initialize model and tokenizer
        # self.accelerator = Accelerator()
        # self.model = AutoModelForCausalLM.from_pretrained(model_name, torch_dtype=torch.bfloat16,use_safetensors=True,device_map=self.device)
        # self.tokenizer = AutoTokenizer.from_pretrained(model_name, use_safetensors=True)
        # self.model, self.tokenizer = self.accelerator.prepare(self.model, self.tokenizer)
        self.model_name="llama3-8b-8192"

        
        # System prompt for text processing
        self.system_prompt = """
        You are a world class text pre-processor, here is the raw data from a PDF, please parse and return it in a way that is crispy and usable to send to a podcast writer.

        The raw data is messed up with new lines, Latex math and you will see fluff that we can remove completely. Basically take away any details that you think might be useless in a podcast author's transcript.
        
        Remember, the podcast could be on any topic whatsoever so the issues listed above are not exhaustive
        
        Please be smart with what you remove and be creative ok?
        
        Remember DO NOT START SUMMARIZING THIS, YOU ARE ONLY CLEANING UP THE TEXT AND RE-WRITING WHEN NEEDED
        
        Be very smart and aggressive with removing details, you will get a running portion of the text and keep returning the processed text.
        
        PLEASE DO NOT ADD MARKDOWN FORMATTING, STOP ADDING SPECIAL CHARACTERS THAT MARKDOWN CAPATILISATION ETC LIKES
        
        ALWAYS start your response directly with processed text and NO ACKNOWLEDGEMENTS about my questions ok?
        Here is the text:"""
    
    #@spaces.GPU
    def validate_pdf(self):
        """Check if the file exists and is a valid PDF."""
        if not os.path.exists(self.pdf_path):
            print(f"Error: File not found at path: {self.pdf_path}")
            return False
        if not self.pdf_path.lower().endswith('.pdf'):
            print("Error: File is not a PDF")
            return False
        return True

    #@spaces.GPU
    def extract_text(self):
        """Extract text from the PDF, limited by max_chars."""
        if not self.validate_pdf():
            return None
        
        with open(self.pdf_path, 'rb') as file:
            pdf_reader = PdfReader(file)
            num_pages = len(pdf_reader.pages)
            print(f"Processing PDF with {num_pages} pages...")
            
            extracted_text = []
            total_chars = 0
            
            for page_num in range(num_pages):
                page = pdf_reader.pages[page_num]
                text = page.extract_text() or ""
                
                if total_chars + len(text) > self.max_chars:
                    remaining_chars = self.max_chars - total_chars
                    extracted_text.append(text[:remaining_chars])
                    print(f"Reached {self.max_chars} character limit at page {page_num + 1}")
                    break
                
                extracted_text.append(text)
                total_chars += len(text)
                print(f"Processed page {page_num + 1}/{num_pages}")
            
            final_text = '\n'.join(extracted_text)
            print(f"Extraction complete! Total characters: {len(final_text)}")
            return final_text
    #@spaces.GPU
    def create_word_bounded_chunks(self, text):
        """Split text into chunks around the target size."""
        words = text.split()
        chunks = []
        current_chunk = []
        current_length = 0
        
        for word in words:
            word_length = len(word) + 1  # +1 for the space
            if current_length + word_length > self.chunk_size and current_chunk:
                chunks.append(' '.join(current_chunk))
                current_chunk = [word]
                current_length = word_length
            else:
                current_chunk.append(word)
                current_length += word_length
        
        if current_chunk:
            chunks.append(' '.join(current_chunk))
        
        return chunks

    #@spaces.GPU(duration=120)
    def process_chunk(self, text_chunk):
        """Process a text chunk with the model and return the cleaned text."""
        conversation = [
            {"role": "system", "content": self.system_prompt},
            {"role": "user", "content": text_chunk}
        ]
        
        # prompt = self.tokenizer.apply_chat_template(conversation, tokenize=False)
        # inputs = self.tokenizer(prompt, return_tensors="pt").to(self.device)
        
        # with torch.no_grad():
        #     output = self.model.generate(**inputs, temperature=0.7, top_p=0.9, max_new_tokens=512)
        
        # processed_text = self.tokenizer.decode(output[0], skip_special_tokens=True)[len(prompt):].strip()
        client = Groq(
            api_key=os.environ.get("GROQ_API_KEY"),
        )

        chat_completion = client.chat.completions.create(
            messages=conversation,
            model=self.model_name,
        )
        
        processed_text = chat_completion.choices[0].message.content
        
        return processed_text

        
    #@spaces.GPU
    def clean_and_save_text(self):
        """Extract, clean, and save processed text to a file."""
        extracted_text = self.extract_text()
        if not extracted_text:
            return None
        
        chunks = self.create_word_bounded_chunks(extracted_text)
        processed_text = ""
        
        with open(self.output_path, 'w', encoding='utf-8') as out_file:
            for chunk_num, chunk in enumerate(tqdm(chunks, desc="Processing chunks")):
                processed_chunk = self.process_chunk(chunk)
                processed_text += processed_chunk + "\n"
                out_file.write(processed_chunk + "\n")
                out_file.flush()
        
        print(f"\nExtracted and cleaned text has been saved to {self.output_path}")
        return self.output_path