openhands-index / setup_data.py
openhands
Reduce cache TTL from 1 hour to 15 minutes for faster data refresh
da96293
"""
Setup script to fetch data from GitHub repository or use mock data as fallback.
This runs before the app starts to ensure data is available.
Supports periodic refresh with caching.
"""
import os
import shutil
import subprocess
import sys
import threading
import time
import logging
from pathlib import Path
from datetime import datetime, timedelta
from config import DATA_DIR, EXTRACTED_DATA_DIR, CONFIG_NAME
logger = logging.getLogger(__name__)
GITHUB_REPO = "https://github.com/OpenHands/openhands-index-results.git"
# Keep the full repo clone so we can use pydantic models from scripts/
REPO_CLONE_DIR = Path("/tmp/openhands-index-results")
# Cache management
_last_fetch_time = None
_fetch_lock = threading.Lock()
_refresh_callbacks = [] # Callbacks to call after data refresh
# Cache TTL can be configured via environment variable (default: 15 minutes = 900 seconds)
CACHE_TTL_SECONDS = int(os.environ.get("CACHE_TTL_SECONDS", 900))
def get_repo_clone_dir() -> Path:
"""Get the path to the cloned openhands-index-results repository."""
return REPO_CLONE_DIR
def fetch_data_from_github():
"""
Fetch data from the openhands-index-results GitHub repository.
Keeps the full repo clone for access to pydantic schema models.
Returns True if successful, False otherwise.
"""
clone_dir = REPO_CLONE_DIR
target_dir = Path(EXTRACTED_DATA_DIR) / CONFIG_NAME
try:
# Remove existing clone directory if it exists
if clone_dir.exists():
shutil.rmtree(clone_dir)
print(f"Attempting to clone data from {GITHUB_REPO}...")
# Set environment to prevent git from prompting for credentials
env = os.environ.copy()
env['GIT_TERMINAL_PROMPT'] = '0'
# Clone the repository (keep full repo for pydantic models)
result = subprocess.run(
["git", "clone", "--depth", "1", GITHUB_REPO, str(clone_dir)],
capture_output=True,
text=True,
timeout=30,
stdin=subprocess.DEVNULL,
env=env
)
if result.returncode != 0:
print(f"Git clone failed: {result.stderr}")
return False
# Look for data files in the cloned repository
results_source = clone_dir / "results"
if not results_source.exists():
print(f"Results directory not found in repository")
return False
# Check if there are any agent result directories
result_dirs = list(results_source.iterdir())
if not result_dirs:
print(f"No agent results found in {results_source}")
return False
print(f"Found {len(result_dirs)} agent result directories")
# Create target directory and copy the results structure
os.makedirs(target_dir.parent, exist_ok=True)
if target_dir.exists():
shutil.rmtree(target_dir)
# Copy the entire results directory
target_results = target_dir / "results"
shutil.copytree(results_source, target_results)
print(f"Successfully fetched data from GitHub. Files: {list(target_dir.glob('*'))}")
# Verify data integrity by checking a sample agent
sample_agents = list(target_results.glob("*/scores.json"))
if sample_agents:
import json
with open(sample_agents[0]) as f:
sample_data = json.load(f)
print(f"Sample data from {sample_agents[0].parent.name}: {sample_data[0] if sample_data else 'EMPTY'}")
# Add the repo's scripts directory to Python path for pydantic models
scripts_dir = clone_dir / "scripts"
if scripts_dir.exists() and str(scripts_dir) not in sys.path:
sys.path.insert(0, str(scripts_dir))
print(f"Added {scripts_dir} to Python path for schema imports")
return True
except subprocess.TimeoutExpired:
print("Git clone timed out")
return False
except Exception as e:
print(f"Error fetching data from GitHub: {e}")
return False
def copy_mock_data():
"""Copy mock data to the expected extraction directory."""
# Try both relative and absolute paths
mock_source = Path("mock_results") / CONFIG_NAME
if not mock_source.exists():
# Try absolute path in case we're in a different working directory
mock_source = Path("/app/mock_results") / CONFIG_NAME
target_dir = Path(EXTRACTED_DATA_DIR) / CONFIG_NAME
print(f"Current working directory: {os.getcwd()}")
print(f"Looking for mock data at: {mock_source.absolute()}")
if not mock_source.exists():
print(f"ERROR: Mock data directory {mock_source} not found!")
print(f"Directory contents: {list(Path('.').glob('*'))}")
return False
# Create target directory
os.makedirs(target_dir.parent, exist_ok=True)
# Copy mock data
print(f"Using mock data from {mock_source}")
if target_dir.exists():
shutil.rmtree(target_dir)
shutil.copytree(mock_source, target_dir)
# Verify the copy
copied_files = list(target_dir.glob('*'))
print(f"Mock data copied successfully. Files: {copied_files}")
print(f"Target directory: {target_dir.absolute()}")
return True
def register_refresh_callback(callback):
"""
Register a callback to be called after data is refreshed.
The callback should clear any caches that depend on the data.
"""
global _refresh_callbacks
if callback not in _refresh_callbacks:
_refresh_callbacks.append(callback)
def _notify_refresh_callbacks():
"""Notify all registered callbacks that data has been refreshed."""
global _refresh_callbacks
for callback in _refresh_callbacks:
try:
callback()
except Exception as e:
logger.warning(f"Error in refresh callback: {e}")
def get_last_fetch_time():
"""Get the timestamp of the last successful data fetch."""
global _last_fetch_time
return _last_fetch_time
def is_cache_stale():
"""Check if the cache has expired (older than CACHE_TTL_SECONDS)."""
global _last_fetch_time
if _last_fetch_time is None:
return True
return datetime.now() - _last_fetch_time > timedelta(seconds=CACHE_TTL_SECONDS)
def refresh_data_if_needed(force: bool = False) -> bool:
"""
Refresh data from GitHub if the cache is stale or if forced.
Args:
force: If True, refresh regardless of cache age
Returns:
True if data was refreshed, False otherwise
"""
global _last_fetch_time, _fetch_lock
if not force and not is_cache_stale():
return False
# Use lock to prevent concurrent refreshes
if not _fetch_lock.acquire(blocking=False):
logger.info("Another refresh is in progress, skipping...")
return False
try:
logger.info("Refreshing data from GitHub...")
# Remove old data to force re-fetch
target_dir = Path(EXTRACTED_DATA_DIR) / CONFIG_NAME
if target_dir.exists():
shutil.rmtree(target_dir)
# Fetch new data
if fetch_data_from_github():
_last_fetch_time = datetime.now()
logger.info(f"✓ Data refreshed successfully at {_last_fetch_time}")
_notify_refresh_callbacks()
return True
else:
# If GitHub fails, try mock data as fallback
logger.warning("GitHub fetch failed, trying mock data...")
if copy_mock_data():
_last_fetch_time = datetime.now()
logger.info(f"✓ Using mock data (refreshed at {_last_fetch_time})")
_notify_refresh_callbacks()
return True
logger.error("Failed to refresh data from any source")
return False
finally:
_fetch_lock.release()
def setup_mock_data():
"""
Setup data for the leaderboard.
First tries to fetch from GitHub, falls back to mock data if unavailable.
"""
global _last_fetch_time
print("=" * 60)
print("STARTING DATA SETUP")
print("=" * 60)
target_dir = Path(EXTRACTED_DATA_DIR) / CONFIG_NAME
# Check if data already exists and cache is not stale
if target_dir.exists():
results_dir = target_dir / "results"
has_results = results_dir.exists() and any(results_dir.iterdir())
has_jsonl = any(target_dir.glob("*.jsonl"))
if has_results or has_jsonl:
if not is_cache_stale():
print(f"Data already exists at {target_dir} and cache is fresh")
return
print(f"Data exists but cache is stale, will refresh...")
# Try to fetch from GitHub first
print("\n--- Attempting to fetch from GitHub ---")
try:
if fetch_data_from_github():
_last_fetch_time = datetime.now()
print("✓ Successfully using data from GitHub repository")
return
except Exception as e:
print(f"GitHub fetch failed with error: {e}")
# Fall back to mock data
print("\n--- GitHub data not available, falling back to mock data ---")
try:
if copy_mock_data():
_last_fetch_time = datetime.now()
print("✓ Successfully using mock data")
return
except Exception as e:
print(f"Mock data copy failed with error: {e}")
print("\n" + "!" * 60)
print("ERROR: No data available! Neither GitHub nor mock data could be loaded.")
print("!" * 60)
# Background refresh scheduler
_scheduler_thread = None
_scheduler_stop_event = threading.Event()
def _background_refresh_loop():
"""Background thread that periodically refreshes data."""
global _scheduler_stop_event
logger.info(f"Background refresh scheduler started (interval: {CACHE_TTL_SECONDS}s)")
while not _scheduler_stop_event.is_set():
# Wait for the TTL interval (or until stopped)
if _scheduler_stop_event.wait(timeout=CACHE_TTL_SECONDS):
break # Stop event was set
# Try to refresh data
try:
logger.info("Background refresh triggered")
refresh_data_if_needed(force=True)
except Exception as e:
logger.error(f"Error in background refresh: {e}")
logger.info("Background refresh scheduler stopped")
def start_background_refresh():
"""Start the background refresh scheduler."""
global _scheduler_thread, _scheduler_stop_event
if _scheduler_thread is not None and _scheduler_thread.is_alive():
logger.info("Background refresh scheduler already running")
return
_scheduler_stop_event.clear()
_scheduler_thread = threading.Thread(target=_background_refresh_loop, daemon=True)
_scheduler_thread.start()
logger.info("Background refresh scheduler started")
def stop_background_refresh():
"""Stop the background refresh scheduler."""
global _scheduler_thread, _scheduler_stop_event
if _scheduler_thread is None:
return
_scheduler_stop_event.set()
_scheduler_thread.join(timeout=5)
_scheduler_thread = None
logger.info("Background refresh scheduler stopped")
if __name__ == "__main__":
setup_mock_data()