Spaces:
Running
Running
| """ | |
| Setup script to fetch data from GitHub repository or use mock data as fallback. | |
| This runs before the app starts to ensure data is available. | |
| Supports periodic refresh with caching. | |
| """ | |
| import os | |
| import shutil | |
| import subprocess | |
| import sys | |
| import threading | |
| import time | |
| import logging | |
| from pathlib import Path | |
| from datetime import datetime, timedelta | |
| from config import DATA_DIR, EXTRACTED_DATA_DIR, CONFIG_NAME | |
| logger = logging.getLogger(__name__) | |
| GITHUB_REPO = "https://github.com/OpenHands/openhands-index-results.git" | |
| # Keep the full repo clone so we can use pydantic models from scripts/ | |
| REPO_CLONE_DIR = Path("/tmp/openhands-index-results") | |
| # Cache management | |
| _last_fetch_time = None | |
| _fetch_lock = threading.Lock() | |
| _refresh_callbacks = [] # Callbacks to call after data refresh | |
| # Cache TTL can be configured via environment variable (default: 15 minutes = 900 seconds) | |
| CACHE_TTL_SECONDS = int(os.environ.get("CACHE_TTL_SECONDS", 900)) | |
| def get_repo_clone_dir() -> Path: | |
| """Get the path to the cloned openhands-index-results repository.""" | |
| return REPO_CLONE_DIR | |
| def fetch_data_from_github(): | |
| """ | |
| Fetch data from the openhands-index-results GitHub repository. | |
| Keeps the full repo clone for access to pydantic schema models. | |
| Returns True if successful, False otherwise. | |
| """ | |
| clone_dir = REPO_CLONE_DIR | |
| target_dir = Path(EXTRACTED_DATA_DIR) / CONFIG_NAME | |
| try: | |
| # Remove existing clone directory if it exists | |
| if clone_dir.exists(): | |
| shutil.rmtree(clone_dir) | |
| print(f"Attempting to clone data from {GITHUB_REPO}...") | |
| # Set environment to prevent git from prompting for credentials | |
| env = os.environ.copy() | |
| env['GIT_TERMINAL_PROMPT'] = '0' | |
| # Clone the repository (keep full repo for pydantic models) | |
| result = subprocess.run( | |
| ["git", "clone", "--depth", "1", GITHUB_REPO, str(clone_dir)], | |
| capture_output=True, | |
| text=True, | |
| timeout=30, | |
| stdin=subprocess.DEVNULL, | |
| env=env | |
| ) | |
| if result.returncode != 0: | |
| print(f"Git clone failed: {result.stderr}") | |
| return False | |
| # Look for data files in the cloned repository | |
| results_source = clone_dir / "results" | |
| if not results_source.exists(): | |
| print(f"Results directory not found in repository") | |
| return False | |
| # Check if there are any agent result directories | |
| result_dirs = list(results_source.iterdir()) | |
| if not result_dirs: | |
| print(f"No agent results found in {results_source}") | |
| return False | |
| print(f"Found {len(result_dirs)} agent result directories") | |
| # Create target directory and copy the results structure | |
| os.makedirs(target_dir.parent, exist_ok=True) | |
| if target_dir.exists(): | |
| shutil.rmtree(target_dir) | |
| # Copy the entire results directory | |
| target_results = target_dir / "results" | |
| shutil.copytree(results_source, target_results) | |
| print(f"Successfully fetched data from GitHub. Files: {list(target_dir.glob('*'))}") | |
| # Verify data integrity by checking a sample agent | |
| sample_agents = list(target_results.glob("*/scores.json")) | |
| if sample_agents: | |
| import json | |
| with open(sample_agents[0]) as f: | |
| sample_data = json.load(f) | |
| print(f"Sample data from {sample_agents[0].parent.name}: {sample_data[0] if sample_data else 'EMPTY'}") | |
| # Add the repo's scripts directory to Python path for pydantic models | |
| scripts_dir = clone_dir / "scripts" | |
| if scripts_dir.exists() and str(scripts_dir) not in sys.path: | |
| sys.path.insert(0, str(scripts_dir)) | |
| print(f"Added {scripts_dir} to Python path for schema imports") | |
| return True | |
| except subprocess.TimeoutExpired: | |
| print("Git clone timed out") | |
| return False | |
| except Exception as e: | |
| print(f"Error fetching data from GitHub: {e}") | |
| return False | |
| def copy_mock_data(): | |
| """Copy mock data to the expected extraction directory.""" | |
| # Try both relative and absolute paths | |
| mock_source = Path("mock_results") / CONFIG_NAME | |
| if not mock_source.exists(): | |
| # Try absolute path in case we're in a different working directory | |
| mock_source = Path("/app/mock_results") / CONFIG_NAME | |
| target_dir = Path(EXTRACTED_DATA_DIR) / CONFIG_NAME | |
| print(f"Current working directory: {os.getcwd()}") | |
| print(f"Looking for mock data at: {mock_source.absolute()}") | |
| if not mock_source.exists(): | |
| print(f"ERROR: Mock data directory {mock_source} not found!") | |
| print(f"Directory contents: {list(Path('.').glob('*'))}") | |
| return False | |
| # Create target directory | |
| os.makedirs(target_dir.parent, exist_ok=True) | |
| # Copy mock data | |
| print(f"Using mock data from {mock_source}") | |
| if target_dir.exists(): | |
| shutil.rmtree(target_dir) | |
| shutil.copytree(mock_source, target_dir) | |
| # Verify the copy | |
| copied_files = list(target_dir.glob('*')) | |
| print(f"Mock data copied successfully. Files: {copied_files}") | |
| print(f"Target directory: {target_dir.absolute()}") | |
| return True | |
| def register_refresh_callback(callback): | |
| """ | |
| Register a callback to be called after data is refreshed. | |
| The callback should clear any caches that depend on the data. | |
| """ | |
| global _refresh_callbacks | |
| if callback not in _refresh_callbacks: | |
| _refresh_callbacks.append(callback) | |
| def _notify_refresh_callbacks(): | |
| """Notify all registered callbacks that data has been refreshed.""" | |
| global _refresh_callbacks | |
| for callback in _refresh_callbacks: | |
| try: | |
| callback() | |
| except Exception as e: | |
| logger.warning(f"Error in refresh callback: {e}") | |
| def get_last_fetch_time(): | |
| """Get the timestamp of the last successful data fetch.""" | |
| global _last_fetch_time | |
| return _last_fetch_time | |
| def is_cache_stale(): | |
| """Check if the cache has expired (older than CACHE_TTL_SECONDS).""" | |
| global _last_fetch_time | |
| if _last_fetch_time is None: | |
| return True | |
| return datetime.now() - _last_fetch_time > timedelta(seconds=CACHE_TTL_SECONDS) | |
| def refresh_data_if_needed(force: bool = False) -> bool: | |
| """ | |
| Refresh data from GitHub if the cache is stale or if forced. | |
| Args: | |
| force: If True, refresh regardless of cache age | |
| Returns: | |
| True if data was refreshed, False otherwise | |
| """ | |
| global _last_fetch_time, _fetch_lock | |
| if not force and not is_cache_stale(): | |
| return False | |
| # Use lock to prevent concurrent refreshes | |
| if not _fetch_lock.acquire(blocking=False): | |
| logger.info("Another refresh is in progress, skipping...") | |
| return False | |
| try: | |
| logger.info("Refreshing data from GitHub...") | |
| # Remove old data to force re-fetch | |
| target_dir = Path(EXTRACTED_DATA_DIR) / CONFIG_NAME | |
| if target_dir.exists(): | |
| shutil.rmtree(target_dir) | |
| # Fetch new data | |
| if fetch_data_from_github(): | |
| _last_fetch_time = datetime.now() | |
| logger.info(f"✓ Data refreshed successfully at {_last_fetch_time}") | |
| _notify_refresh_callbacks() | |
| return True | |
| else: | |
| # If GitHub fails, try mock data as fallback | |
| logger.warning("GitHub fetch failed, trying mock data...") | |
| if copy_mock_data(): | |
| _last_fetch_time = datetime.now() | |
| logger.info(f"✓ Using mock data (refreshed at {_last_fetch_time})") | |
| _notify_refresh_callbacks() | |
| return True | |
| logger.error("Failed to refresh data from any source") | |
| return False | |
| finally: | |
| _fetch_lock.release() | |
| def setup_mock_data(): | |
| """ | |
| Setup data for the leaderboard. | |
| First tries to fetch from GitHub, falls back to mock data if unavailable. | |
| """ | |
| global _last_fetch_time | |
| print("=" * 60) | |
| print("STARTING DATA SETUP") | |
| print("=" * 60) | |
| target_dir = Path(EXTRACTED_DATA_DIR) / CONFIG_NAME | |
| # Check if data already exists and cache is not stale | |
| if target_dir.exists(): | |
| results_dir = target_dir / "results" | |
| has_results = results_dir.exists() and any(results_dir.iterdir()) | |
| has_jsonl = any(target_dir.glob("*.jsonl")) | |
| if has_results or has_jsonl: | |
| if not is_cache_stale(): | |
| print(f"Data already exists at {target_dir} and cache is fresh") | |
| return | |
| print(f"Data exists but cache is stale, will refresh...") | |
| # Try to fetch from GitHub first | |
| print("\n--- Attempting to fetch from GitHub ---") | |
| try: | |
| if fetch_data_from_github(): | |
| _last_fetch_time = datetime.now() | |
| print("✓ Successfully using data from GitHub repository") | |
| return | |
| except Exception as e: | |
| print(f"GitHub fetch failed with error: {e}") | |
| # Fall back to mock data | |
| print("\n--- GitHub data not available, falling back to mock data ---") | |
| try: | |
| if copy_mock_data(): | |
| _last_fetch_time = datetime.now() | |
| print("✓ Successfully using mock data") | |
| return | |
| except Exception as e: | |
| print(f"Mock data copy failed with error: {e}") | |
| print("\n" + "!" * 60) | |
| print("ERROR: No data available! Neither GitHub nor mock data could be loaded.") | |
| print("!" * 60) | |
| # Background refresh scheduler | |
| _scheduler_thread = None | |
| _scheduler_stop_event = threading.Event() | |
| def _background_refresh_loop(): | |
| """Background thread that periodically refreshes data.""" | |
| global _scheduler_stop_event | |
| logger.info(f"Background refresh scheduler started (interval: {CACHE_TTL_SECONDS}s)") | |
| while not _scheduler_stop_event.is_set(): | |
| # Wait for the TTL interval (or until stopped) | |
| if _scheduler_stop_event.wait(timeout=CACHE_TTL_SECONDS): | |
| break # Stop event was set | |
| # Try to refresh data | |
| try: | |
| logger.info("Background refresh triggered") | |
| refresh_data_if_needed(force=True) | |
| except Exception as e: | |
| logger.error(f"Error in background refresh: {e}") | |
| logger.info("Background refresh scheduler stopped") | |
| def start_background_refresh(): | |
| """Start the background refresh scheduler.""" | |
| global _scheduler_thread, _scheduler_stop_event | |
| if _scheduler_thread is not None and _scheduler_thread.is_alive(): | |
| logger.info("Background refresh scheduler already running") | |
| return | |
| _scheduler_stop_event.clear() | |
| _scheduler_thread = threading.Thread(target=_background_refresh_loop, daemon=True) | |
| _scheduler_thread.start() | |
| logger.info("Background refresh scheduler started") | |
| def stop_background_refresh(): | |
| """Stop the background refresh scheduler.""" | |
| global _scheduler_thread, _scheduler_stop_event | |
| if _scheduler_thread is None: | |
| return | |
| _scheduler_stop_event.set() | |
| _scheduler_thread.join(timeout=5) | |
| _scheduler_thread = None | |
| logger.info("Background refresh scheduler stopped") | |
| if __name__ == "__main__": | |
| setup_mock_data() | |