import os
import re
import time
import logging
import zipfile
import requests
import bibtexparser
from tqdm import tqdm
from urllib.parse import quote, urlencode
import gradio as gr
from bs4 import BeautifulSoup
import io
import asyncio
import aiohttp
# Configure logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s: %(message)s')
logger = logging.getLogger(__name__)
class PaperDownloader:
def __init__(self, output_dir='papers'):
self.output_dir = output_dir
os.makedirs(output_dir, exist_ok=True)
# Updated download sources
self.download_sources = [
'https://sci-hub.ee/',
'https://sci-hub.st/',
'https://sci-hub.ru/',
'https://sci-hub.ren/',
'https://sci-hub.mksa.top/',
'https://sci-hub.se/',
'https://libgen.rs/scimag/'
]
# Request headers
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.9',
}
def clean_doi(self, doi):
"""Clean and encode DOI for URL"""
if not isinstance(doi, str):
return None
return quote(doi.strip()) if doi else None
async def fetch_with_headers(self, session, url, timeout=10):
"""Utility method to fetch an URL with headers and timeout"""
try:
async with session.get(url, headers=self.headers, timeout=timeout, allow_redirects=True) as response:
response.raise_for_status()
return await response.text(), response.headers
except Exception as e:
logger.debug(f"Error fetching {url}: {e}")
return None, None
async def download_paper_direct_doi_async(self, session, doi):
"""Attempt to download the pdf from the landing page of the doi"""
if not doi:
return None
try:
doi_url = f"https://doi.org/{self.clean_doi(doi)}"
text, headers = await self.fetch_with_headers(session, doi_url, timeout=15)
if not text:
return None
pdf_patterns = [
r'(https?://[^\s<>"]+?\.pdf)',
r'(https?://[^\s<>"]+?download/[^\s<>"]+)',
r'(https?://[^\s<>"]+?\/pdf\/[^\s<>"]+)',
]
pdf_urls = []
for pattern in pdf_patterns:
pdf_urls.extend(re.findall(pattern, text))
for pdf_url in pdf_urls:
try:
pdf_response = await session.get(pdf_url, headers=self.headers, timeout=10)
if 'application/pdf' in pdf_response.headers.get('Content-Type', ''):
logger.debug(f"Found PDF from: {pdf_url}")
return await pdf_response.read()
except Exception as e:
logger.debug(f"Error downloading PDF from {pdf_url}: {e}")
except Exception as e:
logger.debug(f"Error trying to get the PDF from {doi}: {e}")
return None
async def download_paper_scihub_async(self, session, doi):
"""Improved method to download paper from Sci-Hub using async requests"""
if not doi:
logger.warning("DOI not provided")
return None
for base_url in self.download_sources:
try:
scihub_url = f"{base_url}{self.clean_doi(doi)}"
text, headers = await self.fetch_with_headers(session, scihub_url, timeout=15)
if not text:
continue
# Search for multiple PDF URL patterns
pdf_patterns = [
r'(https?://[^\s<>"]+?\.pdf)',
r'(https?://[^\s<>"]+?download/[^\s<>"]+)',
r'(https?://[^\s<>"]+?\/pdf\/[^\s<>"]+)',
]
pdf_urls = []
for pattern in pdf_patterns:
pdf_urls.extend(re.findall(pattern, text))
# Try downloading from found URLs
for pdf_url in pdf_urls:
try:
pdf_response = await session.get(pdf_url, headers=self.headers, timeout=10)
# Verify if it's a PDF
if 'application/pdf' in pdf_response.headers.get('Content-Type', ''):
logger.debug(f"Found PDF from: {pdf_url}")
return await pdf_response.read()
except Exception as e:
logger.debug(f"Error downloading PDF from {pdf_url}: {e}")
except Exception as e:
logger.debug(f"Error trying to download {doi} from {base_url}: {e}")
return None
async def download_paper_libgen_async(self, session, doi):
"""Download from Libgen, handles the query and the redirection"""
if not doi:
return None
base_url = 'https://libgen.rs/scimag/'
try:
search_url = f"{base_url}?q={self.clean_doi(doi)}"
text, headers = await self.fetch_with_headers(session, search_url, timeout=10)
if not text or "No results" in text:
logger.debug(f"No results for DOI: {doi} on libgen")
return None
soup = BeautifulSoup(text, 'html.parser')
links = soup.select('table.c > tbody > tr:nth-child(2) > td:nth-child(1) > a')
if links:
link = links[0]
pdf_url = link['href']
pdf_response = await session.get(pdf_url, headers=self.headers, allow_redirects=True, timeout=10)
if 'application/pdf' in pdf_response.headers.get('Content-Type', ''):
logger.debug(f"Found PDF from: {pdf_url}")
return await pdf_response.read()
except Exception as e:
logger.debug(f"Error trying to download {doi} from libgen: {e}")
return None
async def download_paper_google_scholar_async(self, session, doi):
"""Search google scholar to find an article with the given doi, try to get the pdf"""
if not doi:
return None
try:
query = f'doi:"{doi}"'
params = {'q': query}
url = f'https://scholar.google.com/scholar?{urlencode(params)}'
text, headers = await self.fetch_with_headers(session, url, timeout=10)
if not text:
return None
soup = BeautifulSoup(text, 'html.parser')
# Find any links with [PDF]
links = soup.find_all('a', string=re.compile(r'\[PDF\]', re.IGNORECASE))
if links:
pdf_url = links[0]['href']
pdf_response = await session.get(pdf_url, headers=self.headers, timeout=10)
if 'application/pdf' in pdf_response.headers.get('Content-Type', ''):
logger.debug(f"Found PDF from: {pdf_url}")
return await pdf_response.read()
except Exception as e:
logger.debug(f"Google Scholar error for {doi}: {e}")
return None
async def download_paper_crossref_async(self, session, doi):
"""Alternative search method using Crossref"""
if not doi:
return None
try:
# Search for open access link
url = f"https://api.crossref.org/works/{doi}"
response = await session.get(url, headers=self.headers, timeout=10)
if response.status == 200:
data = await response.json()
work = data.get('message', {})
# Search for open access links
links = work.get('link', [])
for link in links:
if link.get('content-type') == 'application/pdf':
pdf_url = link.get('URL')
if pdf_url:
pdf_response = await session.get(pdf_url, headers=self.headers)
if 'application/pdf' in pdf_response.headers.get('Content-Type', ''):
logger.debug(f"Found PDF from: {pdf_url}")
return await pdf_response.read()
except Exception as e:
logger.debug(f"Crossref error for {doi}: {e}")
return None
async def download_with_retry_async(self, doi, max_retries=3, initial_delay=2):
"""Downloads a paper using multiple strategies with exponential backoff and async requests"""
pdf_content = None
retries = 0
delay = initial_delay
async with aiohttp.ClientSession() as session:
while retries < max_retries and not pdf_content:
try:
pdf_content = (
await self.download_paper_direct_doi_async(session, doi) or
await self.download_paper_scihub_async(session, doi) or
await self.download_paper_libgen_async(session, doi) or
await self.download_paper_google_scholar_async(session, doi) or
await self.download_paper_crossref_async(session, doi)
)
if pdf_content:
return pdf_content
except Exception as e:
logger.error(f"Error in download attempt {retries + 1} for DOI {doi}: {e}")
if not pdf_content:
retries += 1
logger.warning(f"Retry attempt {retries} for DOI: {doi} after {delay} seconds")
await asyncio.sleep(delay)
delay *= 2 # Exponential backoff
return None
def download_paper_scihub(self, doi):
"""Improved method to download paper from Sci-Hub"""
if not doi:
logger.warning("DOI not provided")
return None
for base_url in self.download_sources:
try:
scihub_url = f"{base_url}{self.clean_doi(doi)}"
# Request with more tolerance
response = requests.get(scihub_url,
headers=self.headers,
allow_redirects=True,
timeout=15)
# Search for multiple PDF URL patterns
pdf_patterns = [
r'(https?://[^\s<>"]+?\.pdf)',
r'(https?://[^\s<>"]+?download/[^\s<>"]+)',
r'(https?://[^\s<>"]+?\/pdf\/[^\s<>"]+)',
]
pdf_urls = []
for pattern in pdf_patterns:
pdf_urls.extend(re.findall(pattern, response.text))
# Try downloading from found URLs
for pdf_url in pdf_urls:
try:
pdf_response = requests.get(pdf_url,
headers=self.headers,
timeout=10)
# Verify if it's a PDF
if 'application/pdf' in pdf_response.headers.get('Content-Type', ''):
logger.debug(f"Found PDF from: {pdf_url}")
return pdf_response.content
except Exception as e:
logger.debug(f"Error downloading PDF from {pdf_url}: {e}")
except Exception as e:
logger.debug(f"Error trying to download {doi} from {base_url}: {e}")
return None
def download_paper_libgen(self, doi):
"""Download from Libgen, handles the query and the redirection"""
if not doi:
return None
base_url = 'https://libgen.rs/scimag/'
try:
search_url = f"{base_url}?q={self.clean_doi(doi)}"
response = requests.get(search_url, headers=self.headers, allow_redirects=True, timeout=10)
response.raise_for_status()
if "No results" in response.text:
logger.debug(f"No results for DOI: {doi} on libgen")
return None
soup = BeautifulSoup(response.text, 'html.parser')
# Find the link using a specific selector
links = soup.select('table.c > tbody > tr:nth-child(2) > td:nth-child(1) > a')
if links:
link = links[0]
pdf_url = link['href']
pdf_response = requests.get(pdf_url, headers=self.headers, allow_redirects=True, timeout=10)
if 'application/pdf' in pdf_response.headers.get('Content-Type', ''):
logger.debug(f"Found PDF from: {pdf_url}")
return pdf_response.content
except Exception as e:
logger.debug(f"Error trying to download {doi} from libgen: {e}")
return None
def download_paper_google_scholar(self, doi):
"""Search google scholar to find an article with the given doi, try to get the pdf"""
if not doi:
return None
try:
query = f'doi:"{doi}"'
params = {'q': query}
url = f'https://scholar.google.com/scholar?{urlencode(params)}'
response = requests.get(url, headers=self.headers, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
# Find any links with [PDF]
links = soup.find_all('a', string=re.compile(r'\[PDF\]', re.IGNORECASE))
if links:
pdf_url = links[0]['href']
pdf_response = requests.get(pdf_url, headers=self.headers, timeout=10)
if 'application/pdf' in pdf_response.headers.get('Content-Type', ''):
logger.debug(f"Found PDF from: {pdf_url}")
return pdf_response.content
except Exception as e:
logger.debug(f"Google Scholar error for {doi}: {e}")
return None
def download_paper_crossref(self, doi):
"""Alternative search method using Crossref"""
if not doi:
return None
try:
# Search for open access link
url = f"https://api.crossref.org/works/{doi}"
response = requests.get(url, headers=self.headers, timeout=10)
if response.status_code == 200:
data = response.json()
work = data.get('message', {})
# Search for open access links
links = work.get('link', [])
for link in links:
if link.get('content-type') == 'application/pdf':
pdf_url = link.get('URL')
if pdf_url:
pdf_response = requests.get(pdf_url, headers=self.headers)
if 'application/pdf' in pdf_response.headers.get('Content-Type', ''):
logger.debug(f"Found PDF from: {pdf_url}")
return pdf_response.content
except Exception as e:
logger.debug(f"Crossref error for {doi}: {e}")
return None
def download_with_retry(self, doi, max_retries=3, initial_delay=2):
"""Downloads a paper using multiple strategies with exponential backoff"""
pdf_content = None
retries = 0
delay = initial_delay
while retries < max_retries and not pdf_content:
try:
pdf_content = (
self.download_paper_scihub(doi) or
self.download_paper_libgen(doi) or
self.download_paper_google_scholar(doi) or
self.download_paper_crossref(doi)
)
if pdf_content:
return pdf_content
except Exception as e:
logger.error(f"Error in download attempt {retries + 1} for DOI {doi}: {e}")
if not pdf_content:
retries += 1
logger.warning(f"Retry attempt {retries} for DOI: {doi} after {delay} seconds")
time.sleep(delay)
delay *= 2 # Exponential backoff
return None
def download_single_doi(self, doi):
"""Downloads a single paper using a DOI"""
if not doi:
return None, "Error: DOI not provided", "Error: DOI not provided"
try:
pdf_content = self.download_with_retry(doi)
if pdf_content:
if doi is None:
return None, "Error: DOI not provided", "Error: DOI not provided"
filename = f"{str(doi).replace('/', '_').replace('.', '_')}.pdf"
filepath = os.path.join(self.output_dir, filename)
with open(filepath, 'wb') as f:
f.write(pdf_content)
logger.info(f"Successfully downloaded: {filename}")
return filepath, f'{doi}', ""
else:
logger.warning(f"Could not download: {doi}")
return None, f"Could not download {doi}", f'{doi}'
except Exception as e:
logger.error(f"Error processing {doi}: {e}")
return None, f"Error processing {doi}: {e}", f"Error processing {doi}: {e}"
def download_multiple_dois(self, dois_text, progress_callback=None):
"""Downloads multiple papers from a list of DOIs with progress updates and single copy button"""
if not dois_text:
return None, "Error: No DOIs provided", "Error: No DOIs provided"
dois = [doi.strip() for doi in dois_text.split('\n') if doi.strip()]
if not dois:
return None, "Error: No valid DOIs provided", "Error: No valid DOIs provided"
downloaded_files = []
failed_dois = []
downloaded_links = []
total_dois = len(dois)
for i, doi in enumerate(dois):
filepath, success_message, fail_message = self.download_single_doi(doi)
if filepath:
# Unique filename for zip
filename = f"{str(doi).replace('/', '_').replace('.', '_')}_{i}.pdf"
filepath_unique = os.path.join(self.output_dir, filename)
os.rename(filepath, filepath_unique)
downloaded_files.append(filepath_unique)
downloaded_links.append(f'{doi}')
else:
failed_dois.append(f'{doi}')
if progress_callback:
progress = int(((i + 1) / total_dois) * 100)
progress_callback(progress)
if downloaded_files:
zip_filename = 'papers.zip'
with zipfile.ZipFile(zip_filename, 'w') as zipf:
for file_path in downloaded_files:
zipf.write(file_path, arcname=os.path.basename(file_path))
logger.info(f"ZIP file created: {zip_filename}")
# Combine all links into a single string
all_links_html = "
".join(downloaded_links)
copy_button_html = f'' if all_links_html else ""
return zip_filename if downloaded_files else None, f"{all_links_html} {copy_button_html}", "\n".join(failed_dois)
def process_bibtex(self, bib_file, progress_callback=None):
"""Process BibTeX file and download papers with multiple strategies"""
# Read BibTeX file content from the uploaded object
try:
with open(bib_file.name, 'r', encoding='utf-8') as f:
bib_content = f.read()
except Exception as e:
logger.error(f"Error reading uploaded file {bib_file.name}: {e}")
return None, f"Error reading uploaded file {bib_file.name}: {e}", f"Error reading uploaded file {bib_file.name}: {e}", None
# Parse BibTeX data
try:
bib_database = bibtexparser.loads(bib_content)
except Exception as e:
logger.error(f"Error parsing BibTeX data: {e}")
return None, f"Error parsing BibTeX data: {e}", f"Error parsing BibTeX data: {e}", None
# Extract DOIs
dois = [entry.get('doi') for entry in bib_database.entries if entry.get('doi')]
logger.info(f"Found {len(dois)} DOIs to download")
# Result lists
downloaded_files = []
failed_dois = []
downloaded_links = []
total_dois = len(dois)
# Download PDFs
for i, doi in enumerate(dois):
try:
# Try to download with multiple methods with retries
pdf_content = self.download_with_retry(doi)
# Save PDF
if pdf_content:
if doi is None:
return None, "Error: DOI not provided", "Error: DOI not provided", None
filename = f"{str(doi).replace('/', '_').replace('.', '_')}.pdf"
filepath = os.path.join(self.output_dir, filename)
with open(filepath, 'wb') as f:
f.write(pdf_content)
downloaded_files.append(filepath)
downloaded_links.append(f'{doi}')
logger.info(f"Successfully downloaded: {filename}")
else:
failed_dois.append(f'{doi}')
except Exception as e:
failed_dois.append(f'{doi}')
logger.error(f"Error processing {doi}: {e}")
if progress_callback:
progress = int(((i + 1) / total_dois) * 100)
progress_callback(progress)
# Create ZIP of downloaded papers
if downloaded_files:
zip_filename = 'papers.zip'
with zipfile.ZipFile(zip_filename, 'w') as zipf:
for file_path in downloaded_files:
zipf.write(file_path, arcname=os.path.basename(file_path))
logger.info(f"ZIP file created: {zip_filename}")
# Combine all links into a single string
all_links_html = "
".join(downloaded_links)
copy_button_html = f'' if all_links_html else ""
return zip_filename, f"{all_links_html} {copy_button_html}", "\n".join(failed_dois), None
async def process_bibtex_async(self, bib_file, progress_callback=None):
"""Process BibTeX file and download papers with multiple strategies"""
# Read BibTeX file content from the uploaded object
try:
with open(bib_file.name, 'r', encoding='utf-8') as f:
bib_content = f.read()
except Exception as e:
logger.error(f"Error reading uploaded file {bib_file.name}: {e}")
return None, f"Error reading uploaded file {bib_file.name}: {e}", f"Error reading uploaded file {bib_file.name}: {e}", None
# Parse BibTeX data
try:
bib_database = bibtexparser.loads(bib_content)
except Exception as e:
logger.error(f"Error parsing BibTeX data: {e}")
return None, f"Error parsing BibTeX data: {e}", f"Error parsing BibTeX data: {e}", None
# Extract DOIs
dois = [entry.get('doi') for entry in bib_database.entries if entry.get('doi')]
logger.info(f"Found {len(dois)} DOIs to download")
# Result lists
downloaded_files = []
failed_dois = []
downloaded_links = []
total_dois = len(dois)
# Download PDFs
for i, doi in enumerate(dois):
try:
# Try to download with multiple methods with retries
pdf_content = await self.download_with_retry_async(doi)
# Save PDF
if pdf_content:
if doi is None:
return None, "Error: DOI not provided", "Error: DOI not provided", None
filename = f"{str(doi).replace('/', '_').replace('.', '_')}.pdf"
filepath = os.path.join(self.output_dir, filename)
with open(filepath, 'wb') as f:
f.write(pdf_content)
downloaded_files.append(filepath)
downloaded_links.append(f'{doi}')
logger.info(f"Successfully downloaded: {filename}")
else:
failed_dois.append(f'{doi}')
except Exception as e:
failed_dois.append(f'{doi}')
logger.error(f"Error processing {doi}: {e}")
if progress_callback:
progress = int(((i + 1) / total_dois) * 100)
progress_callback(progress)
# Create ZIP of downloaded papers
if downloaded_files:
zip_filename = 'papers.zip'
with zipfile.ZipFile(zip_filename, 'w') as zipf:
for file_path in downloaded_files:
zipf.write(file_path, arcname=os.path.basename(file_path))
logger.info(f"ZIP file created: {zip_filename}")
# Combine all links into a single string
all_links_html = "
".join(downloaded_links)
copy_button_html = f'' if all_links_html else ""
return zip_filename, f"{all_links_html} {copy_button_html}", "\n".join(failed_dois), None
def create_gradio_interface():
"""Create Gradio interface for Paper Downloader"""
downloader = PaperDownloader()
async def download_papers(bib_file, doi_input, dois_input, progress=gr.Progress()):
progress_callback = lambda p: progress(p, desc="Downloading Papers")
if bib_file:
# Check file type
if not bib_file.name.lower().endswith('.bib'):
return None, "Error: Please upload a .bib file", "Error: Please upload a .bib file", None
zip_path, downloaded_dois, failed_dois, _ = await downloader.process_bibtex_async(bib_file, progress_callback)
return zip_path, downloaded_dois, failed_dois, None
elif doi_input:
filepath, message, failed_doi = downloader.download_single_doi(doi_input)
return None, message, failed_doi, filepath
elif dois_input:
zip_path, downloaded_dois, failed_dois = downloader.download_multiple_dois(dois_input, progress_callback)
return zip_path, downloaded_dois, failed_dois, None
else:
return None, "Please provide a .bib file, a single DOI, or a list of DOIs", "Please provide a .bib file, a single DOI, or a list of DOIs", None
# Gradio Interface
interface = gr.Interface(
fn=download_papers,
inputs=[
gr.File(file_types=['.bib'], label="Upload BibTeX File"),
gr.Textbox(label="Enter Single DOI", placeholder="10.xxxx/xxxx"),
gr.Textbox(label="Enter Multiple DOIs (one per line)", placeholder="10.xxxx/xxxx\n10.yyyy/yyyy\n...")
],
outputs=[
gr.File(label="Download Papers (ZIP) or Single PDF"),
gr.HTML(label="""