Spaces:
Build error
Build error
import os | |
import streamlit as st | |
import pandas as pd | |
from huggingface_hub import ( | |
HfApi, | |
upload_folder, | |
create_repo, | |
login, | |
hf_hub_download, | |
list_repo_files, | |
) | |
import logging | |
import time | |
import json | |
import keyring # Secure token storage | |
import socket # Offline detection | |
import hashlib # Data integrity | |
from pathlib import Path | |
from threading import Thread | |
from watchdog.observers import Observer | |
from watchdog.events import FileSystemEventHandler | |
import schedule | |
import datetime | |
from streamlit_option_menu import option_menu | |
# Set page configuration | |
st.set_page_config(page_title="InfiniteStorageFace", layout="wide") | |
# Initialize logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format="%(asctime)s [%(levelname)s] %(message)s", | |
handlers=[logging.StreamHandler()] | |
) | |
# Initialize session state variables | |
session_defaults = { | |
'logs': [], | |
'uploading': False, | |
'cancel': False, | |
'observer': None, | |
'selected_files': [], | |
'sync_paused': False, | |
'token': "", | |
'settings_loaded': False, | |
'remote_files': {}, | |
'queued_files': [], | |
'scheduled_sync': False, | |
'repo_id': "", | |
'repo_exists': False, | |
'folder_path': "", | |
'private': True, # Default to private repositories | |
'ignore_patterns_selected': [], | |
'process_individually': False, | |
'subfolder': "", | |
'total_files_synced': 0, | |
'total_files': 0, | |
'sync_option': 'Sync', # Default option is Sync | |
} | |
for key, default_value in session_defaults.items(): | |
if key not in st.session_state: | |
st.session_state[key] = default_value | |
# Centralized ignore patterns mapping | |
IGNORE_PATTERNS_MAP = { | |
"Ignore __pycache__": "**/__pycache__/**", | |
"Ignore .git": ".git/**", | |
"Ignore .venv": "venv/**", | |
"Ignore *.pyc": "*.pyc", | |
"Ignore *.log": "*.log", | |
"Ignore *.tmp": "*.tmp", | |
"Ignore *.DS_Store": "*.DS_Store" | |
} | |
# Default values | |
DEFAULT_REPO = "your_username/your_private_vault" | |
DEFAULT_LOCAL_PATH = str(Path.home()) | |
# Helper Functions | |
def is_connected(): | |
"""Check for internet connectivity.""" | |
try: | |
socket.create_connection(("1.1.1.1", 53), timeout=3) | |
return True | |
except OSError: | |
return False | |
def log(message, level="INFO"): | |
"""Log messages with timestamp.""" | |
timestamp = time.strftime("[%Y-%m-%d %H:%M:%S]") | |
full_message = f"{timestamp} {message}" | |
st.session_state.logs.append(full_message) | |
if level == "INFO": | |
logging.info(message) | |
elif level == "WARNING": | |
logging.warning(message) | |
elif level == "ERROR": | |
logging.error(message) | |
elif level == "DEBUG": | |
logging.debug(message) | |
def authenticate(token): | |
"""Authenticate user with Hugging Face token.""" | |
if not token: | |
log("β Hugging Face Token is required.", level="ERROR") | |
return False | |
try: | |
login(token) | |
keyring.set_password("huggingface", "token", token) | |
log("β Authenticated successfully!") | |
return True | |
except Exception as e: | |
log(f"β Authentication failed: {e}", level="ERROR") | |
return False | |
def create_repo_if_not_exists(repo_id, token, private): | |
"""Create a repository if it doesn't exist.""" | |
api = HfApi() | |
try: | |
api.list_repo_files(repo_id=repo_id, repo_type="dataset", token=token) | |
log(f"β Repository '{repo_id}' exists.") | |
st.session_state.repo_exists = True | |
return True | |
except Exception: | |
log(f"β οΈ Repository '{repo_id}' does not exist. Creating it...", level="WARNING") | |
try: | |
create_repo( | |
repo_id=repo_id, | |
token=token, | |
private=private, | |
repo_type="dataset", | |
exist_ok=True, | |
) | |
log(f"β Created new repository: '{repo_id}'.") | |
st.session_state.repo_exists = True | |
return True | |
except Exception as create_err: | |
log(f"β Failed to create repository '{repo_id}': {create_err}", level="ERROR") | |
return False | |
def compute_checksum(file_path): | |
"""Compute the checksum of a file for data integrity.""" | |
sha256 = hashlib.sha256() | |
try: | |
with open(file_path, "rb") as f: | |
for chunk in iter(lambda: f.read(4096), b""): | |
sha256.update(chunk) | |
return sha256.hexdigest() | |
except Exception as e: | |
log(f"β Failed to compute checksum for '{file_path}': {e}") | |
return None | |
def upload_folder_structure(folder_path, repo_id, token, target_path, ignore_patterns, retry=3): | |
"""Upload a folder structure with error handling and retries.""" | |
upload_params = { | |
"folder_path": folder_path, | |
"repo_id": repo_id, | |
"repo_type": "dataset", | |
"token": token, | |
"path_in_repo": target_path, | |
"ignore_patterns": ignore_patterns, | |
} | |
log(f"π Uploading folder '{folder_path}' to '{target_path}' in repository '{repo_id}'...") | |
for attempt in range(1, retry + 1): | |
try: | |
upload_folder(**upload_params) | |
log(f"β Upload completed for '{folder_path}'!") | |
return True | |
except Exception as upload_err: | |
log(f"β Upload failed for '{folder_path}' on attempt {attempt}: {upload_err}", level="ERROR") | |
if attempt < retry: | |
log(f"π Retrying upload ({attempt}/{retry})...", level="WARNING") | |
time.sleep(2 ** attempt) # Exponential backoff | |
else: | |
log(f"β All retry attempts failed for '{folder_path}'.", level="ERROR") | |
return False | |
# Function to handle uploads | |
def upload_files(): | |
st.session_state.uploading = True | |
token = st.session_state.token | |
repo_id = st.session_state.repo_id | |
private = st.session_state.private | |
folder_path = st.session_state.folder_path | |
subfolder = st.session_state.subfolder | |
process_individually = st.session_state.process_individually | |
ignore_patterns_selected = st.session_state.ignore_patterns_selected | |
ignore_patterns = [IGNORE_PATTERNS_MAP[pattern] for pattern in ignore_patterns_selected] | |
target_path = subfolder.replace("\\", "/") if subfolder else "" | |
if not is_connected(): | |
log("β No internet connection. Sync will resume when connection is restored.", level="ERROR") | |
st.session_state.uploading = False | |
return | |
if not authenticate(token): | |
st.session_state.uploading = False | |
return | |
if not create_repo_if_not_exists(repo_id, token, private): | |
st.session_state.uploading = False | |
return | |
if not os.path.isdir(folder_path): | |
log(f"β The folder path '{folder_path}' does not exist.", level="ERROR") | |
st.session_state.uploading = False | |
return | |
# Count the total files in folders | |
st.session_state.total_files = sum(len(files) for _, _, files in os.walk(folder_path)) | |
st.session_state.total_files_synced = 0 | |
# Upload only folders (no individual files except in root) | |
for item in os.listdir(folder_path): | |
item_path = os.path.join(folder_path, item) | |
if os.path.isdir(item_path): | |
# Upload each folder | |
success = upload_folder_structure(item_path, repo_id, token, f"{target_path}/{item}", ignore_patterns) | |
if success: | |
for root, _, files in os.walk(item_path): | |
for file in files: | |
local_file = os.path.join(root, file) | |
relative_path = os.path.relpath(local_file, folder_path).replace("\\", "/") | |
remote_file = f"{target_path}/{relative_path}" | |
st.session_state.total_files_synced += 1 | |
local_checksum = compute_checksum(local_file) | |
remote_checksum = get_remote_file_checksum(repo_id, token, remote_file) | |
if local_checksum and remote_checksum and local_checksum == remote_checksum: | |
log(f"β Data integrity verified for '{relative_path}'.") | |
else: | |
log(f"β Data integrity verification failed for '{relative_path}'.", level="ERROR") | |
elif os.path.isfile(item_path): | |
# Upload files only if they are in the root directory | |
relative_path = os.path.relpath(item_path, folder_path).replace("\\", "/") | |
if "/" not in relative_path: # Check if the file is in the root | |
success = upload_folder_structure(item_path, repo_id, token, f"{target_path}/{item}", ignore_patterns) | |
if success: | |
st.session_state.total_files_synced += 1 | |
local_checksum = compute_checksum(item_path) | |
remote_checksum = get_remote_file_checksum(repo_id, token, f"{target_path}/{item}") | |
if local_checksum and remote_checksum and local_checksum == remote_checksum: | |
log(f"β Data integrity verified for '{relative_path}'.") | |
else: | |
log(f"β Data integrity verification failed for '{relative_path}'.", level="ERROR") | |
st.session_state.uploading = False | |
log("π Upload process completed.") | |
def get_remote_file_checksum(repo_id, token, file_path): | |
# Placeholder: Hugging Face Hub does not provide file checksums directly | |
return None | |
# Function to monitor folder changes with real-time sync and offline queueing | |
class ChangeHandler(FileSystemEventHandler): | |
def on_modified(self, event): | |
if not st.session_state.uploading and not st.session_state.sync_paused: | |
if is_connected(): | |
log("π Changes detected. Starting upload...") | |
upload_thread = Thread(target=upload_files) | |
upload_thread.start() | |
else: | |
log("β No internet connection. Queuing changes for later upload.", level="WARNING") | |
queue_changes(event.src_path) | |
# Queue changes when offline | |
def queue_changes(file_path): | |
queued_files = st.session_state.get("queued_files", []) | |
queued_files.append(file_path) | |
st.session_state["queued_files"] = queued_files | |
log(f"π Queued file for upload: {file_path}") | |
# Check and upload queued changes when back online | |
def check_queued_uploads(): | |
if is_connected() and st.session_state.get("queued_files"): | |
log("π Uploading queued files...") | |
for file in st.session_state["queued_files"]: | |
upload_files_specific(file) | |
st.session_state["queued_files"] = [] | |
# Upload a specific file (for queued uploads) | |
def upload_files_specific(file_path): | |
token = st.session_state.token | |
repo_id = st.session_state.repo_id | |
private = st.session_state.private | |
folder_path = st.session_state.folder_path | |
subfolder = st.session_state.subfolder | |
ignore_patterns_selected = st.session_state.ignore_patterns_selected | |
ignore_patterns = [IGNORE_PATTERNS_MAP[pattern] for pattern in ignore_patterns_selected] | |
target_path = subfolder.replace("\\", "/") if subfolder else "" | |
if not authenticate(token): | |
return | |
if not create_repo_if_not_exists(repo_id, token, private): | |
return | |
if not os.path.isfile(file_path): | |
log(f"β The file path '{file_path}' does not exist.", level="ERROR") | |
return | |
relative_path = os.path.relpath(file_path, folder_path).replace("\\", "/") | |
remote_path = f"{target_path}/{relative_path}" | |
success = upload_folder_structure(file_path, repo_id, token, remote_path, ignore_patterns) | |
if success: | |
log(f"β Uploaded queued file '{relative_path}'.") | |
else: | |
log(f"β Failed to upload queued file '{relative_path}'.", level="ERROR") | |
# Function to get version history | |
def get_version_history(): | |
api = HfApi() | |
token = st.session_state.token | |
repo_id = st.session_state.repo_id | |
try: | |
commits = api.list_repo_commits(repo_id=repo_id, repo_type="dataset", token=token) | |
history = [] | |
for commit in commits: | |
date_str = commit.created_at.strftime('%Y-%m-%d %H:%M:%S') | |
history.append(f"Commit {commit.commit_id[:7]} by {commit.author_name} on {date_str}: {commit.title}") | |
return "\n".join(history) | |
except Exception as e: | |
log(f"β Error fetching version history: {e}", level="ERROR") | |
return "Error fetching version history." | |
# Function to download from remote | |
def download_from_remote(): | |
token = st.session_state.token | |
repo_id = st.session_state.repo_id | |
folder_path = st.session_state.folder_path | |
subfolder = st.session_state.subfolder | |
target_path = subfolder.replace("\\", "/") if subfolder else "" | |
api = HfApi() | |
try: | |
remote_files = api.list_repo_files(repo_id=repo_id, repo_type="dataset", token=token) | |
for file in remote_files: | |
local_file_path = os.path.join(folder_path, file) | |
os.makedirs(os.path.dirname(local_file_path), exist_ok=True) | |
hf_hub_download(repo_id, file, repo_type="dataset", token=token, local_dir=os.path.dirname(local_file_path), local_dir_use_symlinks=False) | |
log(f"β Downloaded '{file}' to '{local_file_path}'") | |
except Exception as e: | |
log(f"β Error downloading from remote: {e}", level="ERROR") | |
def pause_sync(): | |
st.session_state.sync_paused = True | |
log("βΈοΈ Sync paused.", level="WARNING") | |
def resume_sync(): | |
st.session_state.sync_paused = False | |
log("βΆοΈ Sync resumed.", level="INFO") | |
upload_thread = Thread(target=upload_files) | |
upload_thread.start() | |
def save_settings(): | |
settings = { | |
"repo_id": st.session_state.repo_id, | |
"private": st.session_state.private, | |
"folder_path": st.session_state.folder_path, | |
"subfolder": st.session_state.subfolder, | |
"process_individually": st.session_state.process_individually, | |
"ignore_patterns_selected": st.session_state.ignore_patterns_selected, | |
"selected_files": st.session_state.selected_files | |
} | |
with open("settings.json", "w") as f: | |
json.dump(settings, f) | |
log("πΎ Settings saved.", level="INFO") | |
def load_settings(): | |
if os.path.exists("settings.json"): | |
with open("settings.json", "r") as f: | |
settings = json.load(f) | |
st.session_state.repo_id = settings.get("repo_id", DEFAULT_REPO) | |
st.session_state.private = settings.get("private", True) | |
st.session_state.folder_path = settings.get("folder_path", DEFAULT_LOCAL_PATH) | |
st.session_state.subfolder = settings.get("subfolder", "") | |
st.session_state.process_individually = settings.get("process_individually", False) | |
st.session_state.ignore_patterns_selected = settings.get("ignore_patterns_selected", []) | |
st.session_state.selected_files = settings.get("selected_files", []) | |
log("π Settings loaded.", level="INFO") | |
else: | |
log("β No saved settings found.", level="ERROR") | |
def get_local_files(folder_path): | |
files = [] | |
for root, dirs, filenames in os.walk(folder_path): | |
for filename in filenames: | |
relative_path = os.path.relpath(os.path.join(root, filename), folder_path) | |
files.append(relative_path.replace("\\", "/")) | |
return files | |
def schedule_sync(): | |
def scheduled_upload(): | |
if is_connected() and not st.session_state.uploading and not st.session_state.sync_paused: | |
log("β° Scheduled sync triggered.", level="INFO") | |
upload_files() | |
schedule.every().day.at("02:00").do(scheduled_upload) | |
st.session_state.scheduled_sync = True | |
log("β° Scheduled daily sync at 02:00.", level="INFO") | |
def run_scheduler(): | |
while True: | |
schedule.run_pending() | |
time.sleep(1) | |
# --------------------------- Main Interface --------------------------- | |
st.title("π InfiniteStorageFace") | |
st.write("Effortlessly sync your local folders to your private Hugging Face repository!") | |
# Create tabs for navigation | |
tabs = st.tabs(["Home", "Vault", "Settings", "Logs", "Help"]) | |
with tabs[0]: | |
st.header("Welcome to InfiniteStorageFace") | |
st.write("Use the tabs to navigate through the application.") | |
st.subheader("Vault Overview") | |
st.write(f"**Repository ID:** {st.session_state.repo_id or 'Not Set'}") | |
st.write(f"**Private Repository:** {'Yes' if st.session_state.private else 'No'}") | |
st.write(f"**Total Files Synced:** {st.session_state.total_files_synced}") | |
st.write(f"**Total Files in Folder:** {st.session_state.total_files}") | |
# Display repository contents | |
st.subheader("Repository Contents") | |
if st.session_state.repo_exists: | |
repo_files = list_repo_files(repo_id=st.session_state.repo_id, token=st.session_state.token) | |
if repo_files: | |
for file in repo_files: | |
st.write(f"π {file}") | |
else: | |
st.write("Repository is empty.") | |
else: | |
st.write("Repository not found or not authenticated.") | |
with tabs[1]: | |
st.header("Vault Sync and Upload") | |
# Select Sync or Upload | |
st.session_state.sync_option = st.radio("Choose an option:", ["Sync", "Upload"], index=0) | |
# Folder selection using file browser | |
st.subheader("Select Folder to Sync/Upload") | |
st.session_state.folder_path = st.text_input("Folder Path", value=st.session_state.folder_path or DEFAULT_LOCAL_PATH) | |
# Alternatively, use a file browser component | |
# st.session_state.folder_path = file_browser() | |
if os.path.isdir(st.session_state.folder_path): | |
# Display folders only | |
folders = [f for f in os.listdir(st.session_state.folder_path) if os.path.isdir(os.path.join(st.session_state.folder_path, f))] | |
st.session_state.selected_files = st.multiselect( | |
"Select Folders to Sync/Upload (leave empty to include all):", | |
folders, | |
default=st.session_state.get('selected_files', []), | |
help="Select specific folders to include." | |
) | |
else: | |
st.error("β Invalid folder path.") | |
# Sync Controls | |
col_start, col_stop = st.columns(2) | |
with col_start: | |
if st.session_state.sync_option == 'Sync': | |
start_sync = st.button("Start Sync", key="start_sync") | |
else: | |
start_upload = st.button("Start Upload", key="start_upload") | |
with col_stop: | |
stop_sync = st.button("Stop", key="stop_sync") | |
# Handle buttons | |
if st.session_state.sync_option == 'Sync': | |
if start_sync: | |
st.session_state.cancel = False | |
if not st.session_state.observer: | |
event_handler = ChangeHandler() | |
st.session_state.observer = Observer() | |
st.session_state.observer.schedule(event_handler, st.session_state.folder_path, recursive=True) | |
st.session_state.observer.start() | |
log("π Started monitoring for changes.", level="INFO") | |
log("π Sync started.", level="INFO") | |
upload_thread = Thread(target=upload_files) | |
upload_thread.start() | |
if stop_sync: | |
st.session_state.cancel = True | |
if st.session_state.observer: | |
st.session_state.observer.stop() | |
st.session_state.observer.join() | |
st.session_state.observer = None | |
log("π Sync stopped.", level="INFO") | |
else: | |
if start_upload: | |
st.session_state.cancel = False | |
log("π Upload started.", level="INFO") | |
upload_thread = Thread(target=upload_files) | |
upload_thread.start() | |
if stop_sync: | |
st.session_state.cancel = True | |
log("π Upload stopped.", level="INFO") | |
# Display sync status and statistics | |
st.subheader("Status") | |
if st.session_state.uploading: | |
st.info("π Uploading...") | |
elif st.session_state.sync_paused: | |
st.warning("βΈοΈ Sync Paused.") | |
else: | |
st.success("β Idle.") | |
st.write(f"**Total Files Synced:** {st.session_state.total_files_synced}") | |
st.write(f"**Total Files in Folder:** {st.session_state.total_files}") | |
with tabs[2]: | |
st.header("Settings") | |
# Securely retrieve token | |
if not st.session_state.token: | |
stored_token = keyring.get_password("huggingface", "token") | |
if stored_token: | |
st.session_state.token = stored_token | |
st.session_state.token = st.text_input( | |
"Hugging Face Token", | |
type="password", | |
value=st.session_state.token, | |
help="Enter your Hugging Face API token. It will be securely stored." | |
) | |
st.session_state.repo_id = st.text_input( | |
"Vault ID (Repository ID)", | |
value=st.session_state.get('repo_id', DEFAULT_REPO), | |
help="Format: username/repo-name" | |
) | |
st.session_state.private = st.checkbox( | |
"Make Vault Private", | |
value=st.session_state.get('private', True), | |
help="Private vaults are not publicly accessible." | |
) | |
st.session_state.subfolder = st.text_input( | |
"Subfolder in Vault (Optional)", | |
value=st.session_state.get('subfolder', ""), | |
help="Specify a subdirectory within the vault." | |
) | |
st.session_state.process_individually = st.checkbox( | |
"Process First-Level Folders Individually", | |
value=st.session_state.get('process_individually', False), | |
help="Upload each first-level folder individually." | |
) | |
st.session_state.ignore_patterns_selected = st.multiselect( | |
"Select Patterns to Ignore", | |
options=list(IGNORE_PATTERNS_MAP.keys()), | |
default=st.session_state.get('ignore_patterns_selected', ["Ignore __pycache__", "Ignore .git", "Ignore *.pyc"]), | |
help="Select file patterns to exclude." | |
) | |
save_settings_button = st.button("Save Settings", key="save_settings") | |
load_settings_button = st.button("Load Settings", key="load_settings") | |
if save_settings_button: | |
save_settings() | |
if load_settings_button: | |
load_settings() | |
with tabs[3]: | |
st.header("Logs") | |
# Integrated terminal-like logs (using hypothetical package) | |
# st_terminal(st.session_state.logs) | |
logs_text = "\n".join(st.session_state.logs[-100:]) | |
st.text_area("Logs", value=logs_text, height=300) | |
with tabs[4]: | |
st.header("Help and Documentation") | |
st.markdown(""" | |
### InfiniteStorageFace Documentation | |
**Getting Started:** | |
- **Vault ID**: This is your repository ID in the format `username/repo-name`. Treat this as your personal storage vault. | |
- **Hugging Face Token**: Obtain your API token from your [Hugging Face account settings](https://huggingface.co/settings/tokens). | |
- **Folder Selection**: Use the file browser or enter the path to the folder you want to sync or upload. | |
**Sync vs Upload:** | |
- **Sync**: Continuously monitors the selected folder for changes and syncs them to your vault. | |
- **Upload**: Performs a one-time upload of the selected folder or files to your vault. | |
**Settings:** | |
- **Private Vault**: By default, your vault is private. Only you can access it. | |
- **Ignore Patterns**: Select file patterns that you want to exclude from syncing or uploading. | |
**Logs and Status:** | |
- View real-time logs in the **Logs** tab. | |
- Check the sync status and statistics in the **Vault** tab. | |
**Support:** | |
- For any issues or questions, please refer to the official documentation or contact support. | |
""") | |
# Check queued uploads | |
check_queued_uploads() | |
# Cleanup on exit | |
def cleanup(): | |
"""Cleanup observers and threads on exit.""" | |
if st.session_state.observer is not None: | |
st.session_state.observer.stop() | |
st.session_state.observer.join() | |
# Run scheduled sync if enabled | |
if st.session_state.scheduled_sync: | |
schedule_sync() | |
if 'scheduler_thread' not in st.session_state: | |
scheduler_thread = Thread(target=run_scheduler, daemon=True) | |
scheduler_thread.start() | |
st.session_state['scheduler_thread'] = scheduler_thread | |
log("π Scheduler started.", level="INFO") | |
# Handle session end | |
# st.on_session_end(cleanup) | |