sidphbot's picture
spaces init
a8d4e3d
import os
import re
import sys
import glob
import shlex
from functools import partial
from multiprocessing import Pool
from subprocess import check_call, CalledProcessError, TimeoutExpired, PIPE
from arxiv_public_data.config import LOGGER
from arxiv_public_data import fixunicode, pdfstamp
log = LOGGER.getChild('fulltext')
TIMELIMIT = 2*60
STAMP_SEARCH_LIMIT = 1000
PDF2TXT = 'pdf2txt.py'
PDFTOTEXT = 'pdftotext'
RE_REPEATS = r'(\(cid:\d+\)|lllll|\.\.\.\.\.|\*\*\*\*\*)'
def reextension(filename: str, extension: str) -> str:
""" Give a filename a new extension """
name, _ = os.path.splitext(filename)
return '{}.{}'.format(name, extension)
def average_word_length(txt):
"""
Gather statistics about the text, primarily the average word length
Parameters
----------
txt : str
Returns
-------
word_length : float
Average word length in the text
"""
#txt = re.subn(RE_REPEATS, '', txt)[0]
nw = len(txt.split())
nc = len(txt)
avgw = nc / (nw + 1)
return avgw
def process_timeout(cmd, timeout):
return check_call(cmd, timeout=timeout, stdout=PIPE, stderr=PIPE)
# ============================================================================
# functions for calling the text extraction services
# ============================================================================
def run_pdf2txt(pdffile: str, timelimit: int=TIMELIMIT, options: str=''):
"""
Run pdf2txt to extract full text
Parameters
----------
pdffile : str
Path to PDF file
timelimit : int
Amount of time to wait for the process to complete
Returns
-------
output : str
Full plain text output
"""
log.debug('Running {} on {}'.format(PDF2TXT, pdffile))
tmpfile = reextension(pdffile, 'pdf2txt')
cmd = '{cmd} {options} -o "{output}" "{pdf}"'.format(
cmd=PDF2TXT, options=options, output=tmpfile, pdf=pdffile
)
cmd = shlex.split(cmd)
output = process_timeout(cmd, timeout=timelimit)
with open(tmpfile) as f:
return f.read()
def run_pdftotext(pdffile: str, timelimit: int = TIMELIMIT) -> str:
"""
Run pdftotext on PDF file for extracted plain text
Parameters
----------
pdffile : str
Path to PDF file
timelimit : int
Amount of time to wait for the process to complete
Returns
-------
output : str
Full plain text output
"""
log.debug('Running {} on {}'.format(PDFTOTEXT, pdffile))
tmpfile = reextension(pdffile, 'pdftotxt')
cmd = '{cmd} "{pdf}" "{output}"'.format(
cmd=PDFTOTEXT, pdf=pdffile, output=tmpfile
)
cmd = shlex.split(cmd)
output = process_timeout(cmd, timeout=timelimit)
with open(tmpfile) as f:
return f.read()
def run_pdf2txt_A(pdffile: str, **kwargs) -> str:
"""
Run pdf2txt with the -A option which runs 'positional analysis on images'
and can return better results when pdf2txt combines many words together.
Parameters
----------
pdffile : str
Path to PDF file
kwargs : dict
Keyword arguments to :func:`run_pdf2txt`
Returns
-------
output : str
Full plain text output
"""
return run_pdf2txt(pdffile, options='-A', **kwargs)
# ============================================================================
# main function which extracts text
# ============================================================================
def fulltext(pdffile: str, timelimit: int = TIMELIMIT):
"""
Given a pdf file, extract the unicode text and run through very basic
unicode normalization routines. Determine the best extracted text and
return as a string.
Parameters
----------
pdffile : str
Path to PDF file from which to extract text
timelimit : int
Time in seconds to allow the extraction routines to run
Returns
-------
fulltext : str
The full plain text of the PDF
"""
if not os.path.isfile(pdffile):
raise FileNotFoundError(pdffile)
if os.stat(pdffile).st_size == 0: # file is empty
raise RuntimeError('"{}" is an empty file'.format(pdffile))
try:
output = run_pdftotext(pdffile, timelimit=timelimit)
#output = run_pdf2txt(pdffile, timelimit=timelimit)
except (TimeoutExpired, CalledProcessError, RuntimeError) as e:
output = run_pdf2txt(pdffile, timelimit=timelimit)
#output = run_pdftotext(pdffile, timelimit=timelimit)
output = fixunicode.fix_unicode(output)
#output = stamp.remove_stamp(output, split=STAMP_SEARCH_LIMIT)
wordlength = average_word_length(output)
if wordlength <= 45:
try:
os.remove(reextension(pdffile, 'pdftotxt')) # remove the tempfile
except OSError:
pass
return output
output = run_pdf2txt_A(pdffile, timelimit=timelimit)
output = fixunicode.fix_unicode(output)
#output = stamp.remove_stamp(output, split=STAMP_SEARCH_LIMIT)
wordlength = average_word_length(output)
if wordlength > 45:
raise RuntimeError(
'No accurate text could be extracted from "{}"'.format(pdffile)
)
try:
os.remove(reextension(pdffile, 'pdftotxt')) # remove the tempfile
except OSError:
pass
return output
def sorted_files(globber: str):
"""
Give a globbing expression of files to find. They will be sorted upon
return. This function is most useful when sorting does not provide
numerical order,
e.g.:
9 -> 12 returned as 10 11 12 9 by string sort
In this case use num_sort=True, and it will be sorted by numbers in the
string, then by the string itself.
Parameters
----------
globber : str
Expression on which to search for files (bash glob expression)
"""
files = glob.glob(globber, recursive = True) # return a list of path, including sub directories
files.sort()
allfiles = []
for fn in files:
nums = re.findall(r'\d+', fn) # regular expression, find number in path names
data = [str(int(n)) for n in nums] + [fn]
# a list of [first number, second number,..., filename] in string format otherwise sorted fill fail
allfiles.append(data) # list of list
allfiles = sorted(allfiles)
return [f[-1] for f in allfiles] # sorted filenames
def convert_directory(path: str, timelimit: int = TIMELIMIT):
"""
Convert all pdfs in a given `path` to full plain text. For each pdf, a file
of the same name but extension .txt will be created. If that file exists,
it will be skipped.
Parameters
----------
path : str
Directory in which to search for pdfs and convert to text
Returns
-------
output : list of str
List of converted files
"""
outlist = []
globber = os.path.join(path, '*.pdf')
pdffiles = sorted_files(globber)
log.info('Searching "{}"...'.format(globber))
log.info('Found: {} pdfs'.format(len(pdffiles)))
for pdffile in pdffiles:
txtfile = reextension(pdffile, 'txt')
if os.path.exists(txtfile):
continue
# we don't want this function to stop half way because of one failed
# file so just charge onto the next one
try:
text = fulltext(pdffile, timelimit)
with open(txtfile, 'w') as f:
f.write(text)
except Exception as e:
log.error("Conversion failed for '{}'".format(pdffile))
log.exception(e)
continue
outlist.append(pdffile)
return outlist
def convert_directory_parallel(path: str, processes: int, timelimit: int = TIMELIMIT):
"""
Convert all pdfs in a given `path` to full plain text. For each pdf, a file
of the same name but extension .txt will be created. If that file exists,
it will be skipped.
Parameters
----------
path : str
Directory in which to search for pdfs and convert to text
Returns
-------
output : list of str
List of converted files
"""
globber = os.path.join(path, '**/*.pdf') # search expression for glob.glob
pdffiles = sorted_files(globber) # a list of path
log.info('Searching "{}"...'.format(globber))
log.info('Found: {} pdfs'.format(len(pdffiles)))
pool = Pool(processes=processes)
result = pool.map(partial(convert_safe, timelimit=timelimit), pdffiles)
pool.close()
pool.join()
def convert_safe(pdffile: str, timelimit: int = TIMELIMIT):
""" Conversion function that never fails """
try:
convert(pdffile, timelimit=timelimit)
except Exception as e:
log.error('File conversion failed for {}: {}'.format(pdffile, e))
def convert(path: str, skipconverted=True, timelimit: int = TIMELIMIT) -> str:
"""
Convert a single PDF to text.
Parameters
----------
path : str
Location of a PDF file.
skipconverted : boolean
Skip conversion when there is a text file already
Returns
-------
str
Location of text file.
"""
if not os.path.exists(path):
raise RuntimeError('No such path: %s' % path)
outpath = reextension(path, 'txt')
if os.path.exists(outpath):
return outpath
try:
content = fulltext(path, timelimit)
with open(outpath, 'w') as f:
f.write(content)
except Exception as e:
msg = "Conversion failed for '%s': %s"
log.error(msg, path, e)
raise RuntimeError(msg % (path, e)) from e
return outpath