Spaces:
Sleeping
Sleeping
import requests | |
from bs4 import BeautifulSoup | |
from typing import List, Dict | |
import re | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain.embeddings import HuggingFaceEmbeddings | |
from langchain.vectorstores import Chroma | |
from langchain.chains import RetrievalQA | |
from langchain.llms import OpenAI | |
import urllib.parse | |
import time | |
from scholarly import scholarly | |
class ResearchAgent: | |
def __init__(self, openai_api_key: str): | |
self.headers = { | |
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', | |
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', | |
'Accept-Language': 'en-US,en;q=0.5', | |
'Connection': 'keep-alive', | |
} | |
self.openai_api_key = openai_api_key | |
self.scientific_domains = [ | |
'sciencedirect.com', | |
'springer.com', | |
'nature.com', | |
'ncbi.nlm.nih.gov', | |
'wiley.com', | |
'scielo.org', | |
'frontiersin.org', | |
'mdpi.com', | |
'hindawi.com', | |
'tandfonline.com' | |
] | |
def extract_keywords(self, question: str) -> str: | |
llm = OpenAI(api_key=self.openai_api_key) | |
prompt = f"Extract 3-4 most important scientific search terms from this question, provide them as a space-separated list: {question}" | |
return llm.predict(prompt) | |
def search_papers(self, keywords: str, num_results: int = 10) -> List[Dict]: | |
# First try Google Scholar | |
try: | |
search_query = scholarly.search_pubs(keywords) | |
papers = [] | |
for i in range(num_results): | |
try: | |
paper = next(search_query) | |
if paper.get('pub_url'): | |
papers.append({ | |
'title': paper.get('bib', {}).get('title', ''), | |
'url': paper.get('pub_url'), | |
'abstract': paper.get('bib', {}).get('abstract', '') | |
}) | |
except StopIteration: | |
break | |
except Exception as e: | |
print(f"Scholar search failed: {str(e)}") | |
papers = [] | |
# Fallback to regular Google search if needed | |
if len(papers) < num_results: | |
remaining = num_results - len(papers) | |
search_query = f"{keywords} filetype:pdf site:({' OR site:'.join(self.scientific_domains)})" | |
encoded_query = urllib.parse.quote(search_query) | |
search_url = f"https://www.google.com/search?q={encoded_query}&num={remaining*2}" # Get more results as some might fail | |
try: | |
response = requests.get(search_url, headers=self.headers, timeout=10) | |
soup = BeautifulSoup(response.text, 'html.parser') | |
for result in soup.find_all('a'): | |
if len(papers) >= num_results: | |
break | |
link = result.get('href', '') | |
if any(domain in link for domain in self.scientific_domains): | |
clean_link = re.search(r'https?://[^&]+', link) | |
if clean_link: | |
papers.append({ | |
'url': clean_link.group(0), | |
'title': '', # Will be filled during scraping | |
'abstract': '' # Will be filled during scraping | |
}) | |
except Exception as e: | |
print(f"Google search failed: {str(e)}") | |
return papers | |
def scrape_paper(self, paper: Dict) -> Dict[str, str]: | |
try: | |
response = requests.get(paper['url'], headers=self.headers, timeout=15) | |
soup = BeautifulSoup(response.text, 'html.parser') | |
# Try different title selectors | |
if not paper['title']: | |
title_selectors = [ | |
'h1', | |
'h1.article-title', | |
'h1.title', | |
'div.article-title', | |
'meta[name="citation_title"]' | |
] | |
for selector in title_selectors: | |
title_elem = soup.select_one(selector) | |
if title_elem: | |
paper['title'] = title_elem.get('content', '') or title_elem.text.strip() | |
break | |
# Try different abstract/content selectors | |
if not paper['abstract']: | |
content_selectors = [ | |
'div.abstract', | |
'section.abstract', | |
'div#abstract', | |
'meta[name="description"]', | |
'div.paper-content', | |
'div.article-content' | |
] | |
for selector in content_selectors: | |
content_elem = soup.select_one(selector) | |
if content_elem: | |
paper['abstract'] = content_elem.get('content', '') or content_elem.text.strip() | |
break | |
if not paper['abstract']: | |
# Try to get main content | |
paragraphs = soup.find_all('p') | |
content = ' '.join([p.text.strip() for p in paragraphs[:5]]) # Get first 5 paragraphs | |
if content: | |
paper['abstract'] = content | |
# Clean up text | |
paper['title'] = paper['title'] or "Title not found" | |
paper['abstract'] = paper['abstract'] or "Content not found" | |
paper['abstract'] = re.sub(r'\s+', ' ', paper['abstract']) | |
return paper | |
except Exception as e: | |
print(f"Error scraping {paper['url']}: {str(e)}") | |
return paper | |
def perform_research(self, question: str) -> str: | |
# Extract keywords | |
keywords = self.extract_keywords(question) | |
print(f"Keywords extracted: {keywords}") | |
# Search for papers | |
papers = self.search_papers(keywords) | |
print(f"Found {len(papers)} papers") | |
# Scrape full content | |
articles = [] | |
for paper in papers: | |
article = self.scrape_paper(paper) | |
if article and article['abstract'] != "Content not found": | |
articles.append(article) | |
time.sleep(1) # Polite delay between requests | |
if not articles: | |
return "I apologize, but I couldn't find any relevant scientific papers to answer your question. Please try rephrasing your question or using different terms." | |
# Prepare documents for RAG | |
text_splitter = RecursiveCharacterTextSplitter( | |
chunk_size=1000, | |
chunk_overlap=200 | |
) | |
texts = [] | |
for article in articles: | |
chunks = text_splitter.split_text(article['abstract']) | |
for chunk in chunks: | |
texts.append(f"Title: {article['title']}\n\nContent: {chunk}\n\nSource: {article['url']}") | |
# Create vector store | |
embeddings = HuggingFaceEmbeddings() | |
vectorstore = Chroma.from_texts(texts, embeddings) | |
# Create QA chain with a more specific prompt | |
qa_chain = RetrievalQA.from_chain_type( | |
llm=OpenAI(api_key=self.openai_api_key, temperature=0.3), | |
chain_type="stuff", | |
retriever=vectorstore.as_retriever(search_kwargs={"k": 5}), | |
return_source_documents=True | |
) | |
# Get answer with references | |
result = qa_chain({"query": f"""Based on the provided scientific papers, please answer this question: {question} | |
If you can't find a direct answer, summarize the most relevant information from the papers. | |
Include specific findings, data, and methodology when available."""}) | |
answer = result['result'] | |
# Format response with article summaries | |
response = f"Answer: {answer}\n\nReferences:\n\n" | |
for article in articles: | |
response += f"Title: {article['title']}\nURL: {article['url']}\n\n" | |
return response |