Report-Generator / hf_sync.py
root
feat: implement codebase and data management system with HF integration
7d4bbcc
import os
import sys
import shutil
import sqlite3
import json
import time
import hashlib
from datetime import datetime
from huggingface_hub import snapshot_download, HfApi
# Configuration
REPO_ID = os.environ.get("DATASET_REPO_ID", "Jaimodiji/Report-Generator-Data")
HF_TOKEN = os.environ.get("HF_TOKEN")
DEBUG_MODE = os.environ.get("RPT_DEBUG", "false").lower() in ("true", "1", "t")
DATA_DIR = "data_repo"
DB_FILE = os.path.join(DATA_DIR, "database.db")
STATE_FILE = os.path.join(DATA_DIR, "sync_state.json")
LOCK_FILE = "/tmp/hf_sync.lock"
api = HfApi(token=HF_TOKEN)
def get_state():
if os.path.exists(STATE_FILE):
try:
with open(STATE_FILE, 'r') as f:
return json.load(f)
except: pass
return {"uploaded_files": {}, "last_db_hash": None, "version": 0}
def save_state(state):
state["last_update"] = datetime.now().isoformat()
with open(STATE_FILE, 'w') as f:
json.dump(state, f, indent=2)
def get_file_hash(path):
if not os.path.exists(path): return None
hasher = hashlib.md5()
with open(path, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b""): hasher.update(chunk)
return hasher.hexdigest()
def safe_db_backup():
if not os.path.exists(DB_FILE): return None
backup_db = DB_FILE + ".bak"
try:
source_conn = sqlite3.connect(DB_FILE)
dest_conn = sqlite3.connect(backup_db)
with dest_conn: source_conn.backup(dest_conn)
source_conn.close(); dest_conn.close()
return backup_db
except Exception as e:
print(f"Database backup failed: {e}")
return None
def upload():
if not REPO_ID or not HF_TOKEN: return
if os.path.exists(LOCK_FILE):
if time.time() - os.path.getmtime(LOCK_FILE) < 600: return
try:
with open(LOCK_FILE, 'w') as f: f.write(str(os.getpid()))
state = get_state()
changes_made = False
# 1. Sync Database (Granular)
backup_path = safe_db_backup()
if backup_path:
db_hash = get_file_hash(backup_path)
if db_hash != state.get("last_db_hash"):
print(f"{'[DEBUG] ' if DEBUG_MODE else ''}Syncing Database...")
if not DEBUG_MODE:
# Upload the backup file directly without replacing the active database
api.upload_file(path_or_fileobj=backup_path, path_in_repo="database.db", repo_id=REPO_ID, repo_type="dataset")
else:
print(f"[DEBUG] Simulated upload of {backup_path} to {REPO_ID}/database.db")
state["last_db_hash"] = db_hash
changes_made = True
# Clean up the backup file regardless
if os.path.exists(backup_path):
os.remove(backup_path)
# 2. Sync Files Iteratively (Immune to folder timeouts)
for sub_dir in ['uploads', 'processed', 'output']:
dir_path = os.path.join(DATA_DIR, sub_dir)
if not os.path.exists(dir_path): continue
for root, _, files in os.walk(dir_path):
for file in files:
full_path = os.path.join(root, file)
rel_path = os.path.relpath(full_path, DATA_DIR)
# Check if file needs upload (by size/mtime to avoid hashing thousands of images)
mtime = os.path.getmtime(full_path)
size = os.path.getsize(full_path)
file_id = f"{rel_path}_{size}_{mtime}"
if state["uploaded_files"].get(rel_path) != file_id:
print(f"{'[DEBUG] ' if DEBUG_MODE else ''}Syncing new file: {rel_path}")
try:
if not DEBUG_MODE:
api.upload_file(
path_or_fileobj=full_path,
path_in_repo=rel_path,
repo_id=REPO_ID,
repo_type="dataset"
)
else:
print(f"[DEBUG] Simulated upload of {rel_path} to {REPO_ID}/{rel_path}")
state["uploaded_files"][rel_path] = file_id
changes_made = True
except Exception as e:
print(f"Failed to upload {rel_path}: {e}")
if changes_made:
state["version"] += 1
save_state(state)
# Sync state file too
if not DEBUG_MODE:
api.upload_file(path_or_fileobj=STATE_FILE, path_in_repo="sync_state.json", repo_id=REPO_ID, repo_type="dataset")
else:
print(f"[DEBUG] Simulated upload of {STATE_FILE} to {REPO_ID}/sync_state.json")
print(f"{'[DEBUG] ' if DEBUG_MODE else ''}Sync complete. Version {state['version']} saved.")
else:
print(f"{'[DEBUG] ' if DEBUG_MODE else ''}Everything up to date.")
except Exception as e: print(f"Upload process failed: {e}")
finally:
if os.path.exists(LOCK_FILE): os.remove(LOCK_FILE)
def download(revision=None):
if not REPO_ID: return
rev_str = f" (revision: {revision})" if revision else ""
print(f"Downloading data from {REPO_ID}{rev_str}...")
try:
snapshot_download(
repo_id=REPO_ID,
repo_type="dataset",
local_dir=DATA_DIR,
token=HF_TOKEN,
revision=revision,
max_workers=8
)
print("Download successful.")
except Exception as e: print(f"Download failed: {e}")
def init_local():
for d in ['output', 'processed', 'uploads']: os.makedirs(f"{DATA_DIR}/{d}", exist_ok=True)
if __name__ == "__main__":
action = sys.argv[1] if len(sys.argv) > 1 else "help"
if action == "download":
revision = sys.argv[2] if len(sys.argv) > 2 else None
download(revision)
elif action == "upload": upload()
elif action == "init": init_local()
else: print("Usage: python hf_sync.py [download [revision]|upload|init]")