Spaces:
Sleeping
Sleeping
from haystack.nodes.base import BaseComponent | |
from haystack.schema import Document | |
from haystack.nodes import ImageToTextConverter, PDFToTextConverter | |
from haystack.nodes import TextConverter, DocxToTextConverter, PreProcessor | |
from pdf2image import convert_from_path | |
from typing import Callable, Dict, List, Optional, Text, Tuple, Union | |
from typing_extensions import Literal | |
import pandas as pd | |
import logging | |
import re | |
import string | |
from haystack.pipelines import Pipeline | |
import streamlit as st | |
def useOCR(file_path: str)-> Text: | |
""" | |
Converts image pdfs into text, Using the Farm-haystack[OCR] | |
Params | |
---------- | |
file_path: file_path of uploade file, returned by add_upload function in | |
uploadAndExample.py | |
Returns the text file as string. | |
""" | |
# we need pdf file to be first converted into image file | |
# this will create each page as image file | |
images = convert_from_path(pdf_path = file_path) | |
list_ = [] | |
# save image file in cache and read them one by one to pass it to OCR | |
for i, pdf in enumerate(images): | |
# Save pages as images in the pdf | |
pdf.save(f'PDF\image_converted_{i+1}.png', 'PNG') | |
list_.append(f'PDF\image_converted_{i+1}.png') | |
converter = ImageToTextConverter(remove_numeric_tables=True, | |
valid_languages=["eng"]) | |
# placeholder to collect the text from each page | |
placeholder = [] | |
for file in list_: | |
document = converter.convert( | |
file_path=file, meta=None, | |
)[0] | |
text = document.content | |
placeholder.append(text) | |
# join the text from each page by page separator | |
text = '\x0c'.join(placeholder) | |
return text | |
class FileConverter(BaseComponent): | |
""" | |
Wrapper class to convert uploaded document into text by calling appropriate | |
Converter class, will use internally haystack PDFToTextOCR in case of image | |
pdf. Cannot use the FileClassifier from haystack as its doesnt has any | |
label/output class for image. | |
1. https://haystack.deepset.ai/pipeline_nodes/custom-nodes | |
2. https://docs.haystack.deepset.ai/docs/file_converters | |
3. https://github.com/deepset-ai/haystack/tree/main/haystack/nodes/file_converter | |
4. https://docs.haystack.deepset.ai/reference/file-converters-api | |
""" | |
outgoing_edges = 1 | |
def run(self, file_name: str , file_path: str, encoding: Optional[str]=None, | |
id_hash_keys: Optional[List[str]] = None, | |
) -> Tuple[dict,str]: | |
""" this is required method to invoke the component in | |
the pipeline implementation. | |
Params | |
---------- | |
file_name: name of file | |
file_path: file_path of uploade file, returned by add_upload function in | |
uploadAndExample.py | |
See the links provided in Class docstring/description to see other params | |
Return | |
--------- | |
output: dictionary, with key as identifier and value could be anything | |
we need to return. In this case its the List of Hasyatck Document | |
output_1: As there is only one outgoing edge, we pass 'output_1' string | |
""" | |
try: | |
if file_name.endswith('.pdf'): | |
converter = PDFToTextConverter(remove_numeric_tables=True) | |
if file_name.endswith('.txt'): | |
converter = TextConverter(remove_numeric_tables=True) | |
if file_name.endswith('.docx'): | |
converter = DocxToTextConverter() | |
except Exception as e: | |
logging.error(e) | |
return | |
documents = [] | |
document = converter.convert( | |
file_path=file_path, meta=None, | |
encoding=encoding, id_hash_keys=id_hash_keys | |
)[0] | |
text = document.content | |
# in case of scanned/images only PDF the content might contain only | |
# the page separator (\f or \x0c). We check if is so and use | |
# use the OCR to get the text. | |
filtered = re.sub(r'\x0c', '', text) | |
if filtered == "": | |
logging.info("Using OCR") | |
text = useOCR(file_path) | |
documents.append(Document(content=text, | |
meta={"name": file_name}, | |
id_hash_keys=id_hash_keys)) | |
logging.info('file conversion succesful') | |
output = {'documents': documents} | |
return output, 'output_1' | |
def run_batch(): | |
""" | |
we dont have requirement to process the multiple files in one go | |
therefore nothing here, however to use the custom node we need to have | |
this method for the class. | |
""" | |
return | |
def basic(s:str, remove_punc:bool = False): | |
""" | |
Performs basic cleaning of text. | |
Params | |
---------- | |
s: string to be processed | |
removePunc: to remove all Punctuation including ',' and '.' or not | |
Returns: processed string: see comments in the source code for more info | |
""" | |
# Remove URLs | |
s = re.sub(r'^https?:\/\/.*[\r\n]*', ' ', s, flags=re.MULTILINE) | |
s = re.sub(r"http\S+", " ", s) | |
# Remove new line characters | |
s = re.sub('\n', ' ', s) | |
# Remove punctuations | |
if remove_punc == True: | |
translator = str.maketrans(' ', ' ', string.punctuation) | |
s = s.translate(translator) | |
# Remove distracting single quotes and dotted pattern | |
s = re.sub("\'", " ", s) | |
s = s.replace("..","") | |
return s.strip() | |
def paraLengthCheck(paraList, max_len = 100): | |
""" | |
There are cases where preprocessor cannot respect word limit, when using | |
respect sentence boundary flag due to missing sentence boundaries. | |
Therefore we run one more round of split here for those paragraphs | |
Params | |
--------------- | |
paraList : list of paragraphs/text | |
max_len : max length to be respected by sentences which bypassed | |
preprocessor strategy | |
""" | |
new_para_list = [] | |
for passage in paraList: | |
# check if para exceeds words limit | |
if len(passage.content.split()) > max_len: | |
# we might need few iterations example if para = 512 tokens | |
# we need to iterate 5 times to reduce para to size limit of '100' | |
iterations = int(len(passage.content.split())/max_len) | |
for i in range(iterations): | |
temp = " ".join(passage.content.split()[max_len*i:max_len*(i+1)]) | |
new_para_list.append((temp,passage.meta['page'])) | |
temp = " ".join(passage.content.split()[max_len*(i+1):]) | |
new_para_list.append((temp,passage.meta['page'])) | |
else: | |
# paragraphs which dont need any splitting | |
new_para_list.append((passage.content, passage.meta['page'])) | |
logging.info("New paragraphs length {}".format(len(new_para_list))) | |
return new_para_list | |
class UdfPreProcessor(BaseComponent): | |
""" | |
class to preprocess the document returned by FileConverter. It will check | |
for splitting strategy and splits the document by word or sentences and then | |
synthetically create the paragraphs. | |
1. https://docs.haystack.deepset.ai/docs/preprocessor | |
2. https://docs.haystack.deepset.ai/reference/preprocessor-api | |
3. https://github.com/deepset-ai/haystack/tree/main/haystack/nodes/preprocessor | |
""" | |
outgoing_edges = 1 | |
def run(self, documents:List[Document], remove_punc:bool=False, apply_clean = True, | |
split_by: Literal["sentence", "word"] = 'sentence', | |
split_length:int = 2, split_respect_sentence_boundary:bool = False, | |
split_overlap:int = 0): | |
""" this is required method to invoke the component in | |
the pipeline implementation. | |
Params | |
---------- | |
documents: documents from the output dictionary returned by Fileconverter | |
remove_punc: to remove all Punctuation including ',' and '.' or not | |
split_by: document splitting strategy either as word or sentence | |
split_length: when synthetically creating the paragrpahs from document, | |
it defines the length of paragraph. | |
split_respect_sentence_boundary: Used when using 'word' strategy for | |
splititng of text. | |
split_overlap: Number of words or sentences that overlap when creating | |
the paragraphs. This is done as one sentence or 'some words' make sense | |
when read in together with others. Therefore the overlap is used. | |
Return | |
--------- | |
output: dictionary, with key as identifier and value could be anything | |
we need to return. In this case the output will contain 4 objects | |
the paragraphs text list as List, Haystack document, Dataframe and | |
one raw text file. | |
output_1: As there is only one outgoing edge, we pass 'output_1' string | |
""" | |
if split_by == 'sentence': | |
split_respect_sentence_boundary = False | |
else: | |
split_respect_sentence_boundary = split_respect_sentence_boundary | |
preprocessor = PreProcessor( | |
clean_empty_lines=True, | |
clean_whitespace=True, | |
clean_header_footer=True, | |
split_by=split_by, | |
split_length=split_length, | |
split_respect_sentence_boundary= split_respect_sentence_boundary, | |
split_overlap=split_overlap, | |
# will add page number only in case of PDF not for text/docx file. | |
add_page_number=True | |
) | |
for i in documents: | |
# # basic cleaning before passing it to preprocessor. | |
# i = basic(i) | |
docs_processed = preprocessor.process([i]) | |
if apply_clean: | |
for item in docs_processed: | |
item.content = basic(item.content, remove_punc= remove_punc) | |
else: | |
pass | |
df = pd.DataFrame(docs_processed) | |
all_text = " ".join(df.content.to_list()) | |
para_list = df.content.to_list() | |
logging.info('document split into {} paragraphs'.format(len(para_list))) | |
output = {'documents': docs_processed, | |
'dataframe': df, | |
'text': all_text, | |
'paraList': para_list | |
} | |
return output, "output_1" | |
def run_batch(): | |
""" | |
we dont have requirement to process the multiple files in one go | |
therefore nothing here, however to use the custom node we need to have | |
this method for the class. | |
""" | |
return | |
def processingpipeline(): | |
""" | |
Returns the preprocessing pipeline. Will use FileConverter and UdfPreProcesor | |
from utils.preprocessing | |
""" | |
preprocessing_pipeline = Pipeline() | |
file_converter = FileConverter() | |
custom_preprocessor = UdfPreProcessor() | |
preprocessing_pipeline.add_node(component=file_converter, | |
name="FileConverter", inputs=["File"]) | |
preprocessing_pipeline.add_node(component = custom_preprocessor, | |
name ='UdfPreProcessor', inputs=["FileConverter"]) | |
return preprocessing_pipeline |