Spaces:
Build error
Build error
import os | |
import re | |
import sys | |
import glob | |
import shlex | |
from functools import partial | |
from multiprocessing import Pool | |
from subprocess import check_call, CalledProcessError, TimeoutExpired, PIPE | |
from arxiv_public_data.config import LOGGER | |
from arxiv_public_data import fixunicode, pdfstamp | |
log = LOGGER.getChild('fulltext') | |
TIMELIMIT = 2*60 | |
STAMP_SEARCH_LIMIT = 1000 | |
PDF2TXT = 'pdf2txt.py' | |
PDFTOTEXT = 'pdftotext' | |
RE_REPEATS = r'(\(cid:\d+\)|lllll|\.\.\.\.\.|\*\*\*\*\*)' | |
def reextension(filename: str, extension: str) -> str: | |
""" Give a filename a new extension """ | |
name, _ = os.path.splitext(filename) | |
return '{}.{}'.format(name, extension) | |
def average_word_length(txt): | |
""" | |
Gather statistics about the text, primarily the average word length | |
Parameters | |
---------- | |
txt : str | |
Returns | |
------- | |
word_length : float | |
Average word length in the text | |
""" | |
#txt = re.subn(RE_REPEATS, '', txt)[0] | |
nw = len(txt.split()) | |
nc = len(txt) | |
avgw = nc / (nw + 1) | |
return avgw | |
def process_timeout(cmd, timeout): | |
return check_call(cmd, timeout=timeout, stdout=PIPE, stderr=PIPE) | |
# ============================================================================ | |
# functions for calling the text extraction services | |
# ============================================================================ | |
def run_pdf2txt(pdffile: str, timelimit: int=TIMELIMIT, options: str=''): | |
""" | |
Run pdf2txt to extract full text | |
Parameters | |
---------- | |
pdffile : str | |
Path to PDF file | |
timelimit : int | |
Amount of time to wait for the process to complete | |
Returns | |
------- | |
output : str | |
Full plain text output | |
""" | |
log.debug('Running {} on {}'.format(PDF2TXT, pdffile)) | |
tmpfile = reextension(pdffile, 'pdf2txt') | |
cmd = '{cmd} {options} -o "{output}" "{pdf}"'.format( | |
cmd=PDF2TXT, options=options, output=tmpfile, pdf=pdffile | |
) | |
cmd = shlex.split(cmd) | |
output = process_timeout(cmd, timeout=timelimit) | |
with open(tmpfile) as f: | |
return f.read() | |
def run_pdftotext(pdffile: str, timelimit: int = TIMELIMIT) -> str: | |
""" | |
Run pdftotext on PDF file for extracted plain text | |
Parameters | |
---------- | |
pdffile : str | |
Path to PDF file | |
timelimit : int | |
Amount of time to wait for the process to complete | |
Returns | |
------- | |
output : str | |
Full plain text output | |
""" | |
log.debug('Running {} on {}'.format(PDFTOTEXT, pdffile)) | |
tmpfile = reextension(pdffile, 'pdftotxt') | |
cmd = '{cmd} "{pdf}" "{output}"'.format( | |
cmd=PDFTOTEXT, pdf=pdffile, output=tmpfile | |
) | |
cmd = shlex.split(cmd) | |
output = process_timeout(cmd, timeout=timelimit) | |
with open(tmpfile) as f: | |
return f.read() | |
def run_pdf2txt_A(pdffile: str, **kwargs) -> str: | |
""" | |
Run pdf2txt with the -A option which runs 'positional analysis on images' | |
and can return better results when pdf2txt combines many words together. | |
Parameters | |
---------- | |
pdffile : str | |
Path to PDF file | |
kwargs : dict | |
Keyword arguments to :func:`run_pdf2txt` | |
Returns | |
------- | |
output : str | |
Full plain text output | |
""" | |
return run_pdf2txt(pdffile, options='-A', **kwargs) | |
# ============================================================================ | |
# main function which extracts text | |
# ============================================================================ | |
def fulltext(pdffile: str, timelimit: int = TIMELIMIT): | |
""" | |
Given a pdf file, extract the unicode text and run through very basic | |
unicode normalization routines. Determine the best extracted text and | |
return as a string. | |
Parameters | |
---------- | |
pdffile : str | |
Path to PDF file from which to extract text | |
timelimit : int | |
Time in seconds to allow the extraction routines to run | |
Returns | |
------- | |
fulltext : str | |
The full plain text of the PDF | |
""" | |
if not os.path.isfile(pdffile): | |
raise FileNotFoundError(pdffile) | |
if os.stat(pdffile).st_size == 0: # file is empty | |
raise RuntimeError('"{}" is an empty file'.format(pdffile)) | |
try: | |
output = run_pdftotext(pdffile, timelimit=timelimit) | |
#output = run_pdf2txt(pdffile, timelimit=timelimit) | |
except (TimeoutExpired, CalledProcessError, RuntimeError) as e: | |
output = run_pdf2txt(pdffile, timelimit=timelimit) | |
#output = run_pdftotext(pdffile, timelimit=timelimit) | |
output = fixunicode.fix_unicode(output) | |
#output = stamp.remove_stamp(output, split=STAMP_SEARCH_LIMIT) | |
wordlength = average_word_length(output) | |
if wordlength <= 45: | |
try: | |
os.remove(reextension(pdffile, 'pdftotxt')) # remove the tempfile | |
except OSError: | |
pass | |
return output | |
output = run_pdf2txt_A(pdffile, timelimit=timelimit) | |
output = fixunicode.fix_unicode(output) | |
#output = stamp.remove_stamp(output, split=STAMP_SEARCH_LIMIT) | |
wordlength = average_word_length(output) | |
if wordlength > 45: | |
raise RuntimeError( | |
'No accurate text could be extracted from "{}"'.format(pdffile) | |
) | |
try: | |
os.remove(reextension(pdffile, 'pdftotxt')) # remove the tempfile | |
except OSError: | |
pass | |
return output | |
def sorted_files(globber: str): | |
""" | |
Give a globbing expression of files to find. They will be sorted upon | |
return. This function is most useful when sorting does not provide | |
numerical order, | |
e.g.: | |
9 -> 12 returned as 10 11 12 9 by string sort | |
In this case use num_sort=True, and it will be sorted by numbers in the | |
string, then by the string itself. | |
Parameters | |
---------- | |
globber : str | |
Expression on which to search for files (bash glob expression) | |
""" | |
files = glob.glob(globber, recursive = True) # return a list of path, including sub directories | |
files.sort() | |
allfiles = [] | |
for fn in files: | |
nums = re.findall(r'\d+', fn) # regular expression, find number in path names | |
data = [str(int(n)) for n in nums] + [fn] | |
# a list of [first number, second number,..., filename] in string format otherwise sorted fill fail | |
allfiles.append(data) # list of list | |
allfiles = sorted(allfiles) | |
return [f[-1] for f in allfiles] # sorted filenames | |
def convert_directory(path: str, timelimit: int = TIMELIMIT): | |
""" | |
Convert all pdfs in a given `path` to full plain text. For each pdf, a file | |
of the same name but extension .txt will be created. If that file exists, | |
it will be skipped. | |
Parameters | |
---------- | |
path : str | |
Directory in which to search for pdfs and convert to text | |
Returns | |
------- | |
output : list of str | |
List of converted files | |
""" | |
outlist = [] | |
globber = os.path.join(path, '*.pdf') | |
pdffiles = sorted_files(globber) | |
log.info('Searching "{}"...'.format(globber)) | |
log.info('Found: {} pdfs'.format(len(pdffiles))) | |
for pdffile in pdffiles: | |
txtfile = reextension(pdffile, 'txt') | |
if os.path.exists(txtfile): | |
continue | |
# we don't want this function to stop half way because of one failed | |
# file so just charge onto the next one | |
try: | |
text = fulltext(pdffile, timelimit) | |
with open(txtfile, 'w') as f: | |
f.write(text) | |
except Exception as e: | |
log.error("Conversion failed for '{}'".format(pdffile)) | |
log.exception(e) | |
continue | |
outlist.append(pdffile) | |
return outlist | |
def convert_directory_parallel(path: str, processes: int, timelimit: int = TIMELIMIT): | |
""" | |
Convert all pdfs in a given `path` to full plain text. For each pdf, a file | |
of the same name but extension .txt will be created. If that file exists, | |
it will be skipped. | |
Parameters | |
---------- | |
path : str | |
Directory in which to search for pdfs and convert to text | |
Returns | |
------- | |
output : list of str | |
List of converted files | |
""" | |
globber = os.path.join(path, '**/*.pdf') # search expression for glob.glob | |
pdffiles = sorted_files(globber) # a list of path | |
log.info('Searching "{}"...'.format(globber)) | |
log.info('Found: {} pdfs'.format(len(pdffiles))) | |
pool = Pool(processes=processes) | |
result = pool.map(partial(convert_safe, timelimit=timelimit), pdffiles) | |
pool.close() | |
pool.join() | |
def convert_safe(pdffile: str, timelimit: int = TIMELIMIT): | |
""" Conversion function that never fails """ | |
try: | |
convert(pdffile, timelimit=timelimit) | |
except Exception as e: | |
log.error('File conversion failed for {}: {}'.format(pdffile, e)) | |
def convert(path: str, skipconverted=True, timelimit: int = TIMELIMIT) -> str: | |
""" | |
Convert a single PDF to text. | |
Parameters | |
---------- | |
path : str | |
Location of a PDF file. | |
skipconverted : boolean | |
Skip conversion when there is a text file already | |
Returns | |
------- | |
str | |
Location of text file. | |
""" | |
if not os.path.exists(path): | |
raise RuntimeError('No such path: %s' % path) | |
outpath = reextension(path, 'txt') | |
if os.path.exists(outpath): | |
return outpath | |
try: | |
content = fulltext(path, timelimit) | |
with open(outpath, 'w') as f: | |
f.write(content) | |
except Exception as e: | |
msg = "Conversion failed for '%s': %s" | |
log.error(msg, path, e) | |
raise RuntimeError(msg % (path, e)) from e | |
return outpath | |