luigi12345's picture
Create app.py
7ece4f3 verified
import os
import streamlit as st
import pandas as pd
from huggingface_hub import (
HfApi,
upload_folder,
create_repo,
login,
hf_hub_download,
list_repo_files,
)
import logging
import time
import json
import keyring # Secure token storage
import socket # Offline detection
import hashlib # Data integrity
from pathlib import Path
from threading import Thread
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import schedule
import datetime
from streamlit_option_menu import option_menu
# Set page configuration
st.set_page_config(page_title="InfiniteStorageFace", layout="wide")
# Initialize logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[logging.StreamHandler()]
)
# Initialize session state variables
session_defaults = {
'logs': [],
'uploading': False,
'cancel': False,
'observer': None,
'selected_files': [],
'sync_paused': False,
'token': "",
'settings_loaded': False,
'remote_files': {},
'queued_files': [],
'scheduled_sync': False,
'repo_id': "",
'repo_exists': False,
'folder_path': "",
'private': True, # Default to private repositories
'ignore_patterns_selected': [],
'process_individually': False,
'subfolder': "",
'total_files_synced': 0,
'total_files': 0,
'sync_option': 'Sync', # Default option is Sync
}
for key, default_value in session_defaults.items():
if key not in st.session_state:
st.session_state[key] = default_value
# Centralized ignore patterns mapping
IGNORE_PATTERNS_MAP = {
"Ignore __pycache__": "**/__pycache__/**",
"Ignore .git": ".git/**",
"Ignore .venv": "venv/**",
"Ignore *.pyc": "*.pyc",
"Ignore *.log": "*.log",
"Ignore *.tmp": "*.tmp",
"Ignore *.DS_Store": "*.DS_Store"
}
# Default values
DEFAULT_REPO = "your_username/your_private_vault"
DEFAULT_LOCAL_PATH = str(Path.home())
# Helper Functions
def is_connected():
"""Check for internet connectivity."""
try:
socket.create_connection(("1.1.1.1", 53), timeout=3)
return True
except OSError:
return False
def log(message, level="INFO"):
"""Log messages with timestamp."""
timestamp = time.strftime("[%Y-%m-%d %H:%M:%S]")
full_message = f"{timestamp} {message}"
st.session_state.logs.append(full_message)
if level == "INFO":
logging.info(message)
elif level == "WARNING":
logging.warning(message)
elif level == "ERROR":
logging.error(message)
elif level == "DEBUG":
logging.debug(message)
def authenticate(token):
"""Authenticate user with Hugging Face token."""
if not token:
log("❌ Hugging Face Token is required.", level="ERROR")
return False
try:
login(token)
keyring.set_password("huggingface", "token", token)
log("βœ… Authenticated successfully!")
return True
except Exception as e:
log(f"❌ Authentication failed: {e}", level="ERROR")
return False
def create_repo_if_not_exists(repo_id, token, private):
"""Create a repository if it doesn't exist."""
api = HfApi()
try:
api.list_repo_files(repo_id=repo_id, repo_type="dataset", token=token)
log(f"βœ… Repository '{repo_id}' exists.")
st.session_state.repo_exists = True
return True
except Exception:
log(f"⚠️ Repository '{repo_id}' does not exist. Creating it...", level="WARNING")
try:
create_repo(
repo_id=repo_id,
token=token,
private=private,
repo_type="dataset",
exist_ok=True,
)
log(f"βœ… Created new repository: '{repo_id}'.")
st.session_state.repo_exists = True
return True
except Exception as create_err:
log(f"❌ Failed to create repository '{repo_id}': {create_err}", level="ERROR")
return False
def compute_checksum(file_path):
"""Compute the checksum of a file for data integrity."""
sha256 = hashlib.sha256()
try:
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
sha256.update(chunk)
return sha256.hexdigest()
except Exception as e:
log(f"❌ Failed to compute checksum for '{file_path}': {e}")
return None
def upload_folder_structure(folder_path, repo_id, token, target_path, ignore_patterns, retry=3):
"""Upload a folder structure with error handling and retries."""
upload_params = {
"folder_path": folder_path,
"repo_id": repo_id,
"repo_type": "dataset",
"token": token,
"path_in_repo": target_path,
"ignore_patterns": ignore_patterns,
}
log(f"πŸš€ Uploading folder '{folder_path}' to '{target_path}' in repository '{repo_id}'...")
for attempt in range(1, retry + 1):
try:
upload_folder(**upload_params)
log(f"βœ… Upload completed for '{folder_path}'!")
return True
except Exception as upload_err:
log(f"❌ Upload failed for '{folder_path}' on attempt {attempt}: {upload_err}", level="ERROR")
if attempt < retry:
log(f"πŸ”„ Retrying upload ({attempt}/{retry})...", level="WARNING")
time.sleep(2 ** attempt) # Exponential backoff
else:
log(f"❌ All retry attempts failed for '{folder_path}'.", level="ERROR")
return False
# Function to handle uploads
def upload_files():
st.session_state.uploading = True
token = st.session_state.token
repo_id = st.session_state.repo_id
private = st.session_state.private
folder_path = st.session_state.folder_path
subfolder = st.session_state.subfolder
process_individually = st.session_state.process_individually
ignore_patterns_selected = st.session_state.ignore_patterns_selected
ignore_patterns = [IGNORE_PATTERNS_MAP[pattern] for pattern in ignore_patterns_selected]
target_path = subfolder.replace("\\", "/") if subfolder else ""
if not is_connected():
log("❌ No internet connection. Sync will resume when connection is restored.", level="ERROR")
st.session_state.uploading = False
return
if not authenticate(token):
st.session_state.uploading = False
return
if not create_repo_if_not_exists(repo_id, token, private):
st.session_state.uploading = False
return
if not os.path.isdir(folder_path):
log(f"❌ The folder path '{folder_path}' does not exist.", level="ERROR")
st.session_state.uploading = False
return
# Count the total files in folders
st.session_state.total_files = sum(len(files) for _, _, files in os.walk(folder_path))
st.session_state.total_files_synced = 0
# Upload only folders (no individual files except in root)
for item in os.listdir(folder_path):
item_path = os.path.join(folder_path, item)
if os.path.isdir(item_path):
# Upload each folder
success = upload_folder_structure(item_path, repo_id, token, f"{target_path}/{item}", ignore_patterns)
if success:
for root, _, files in os.walk(item_path):
for file in files:
local_file = os.path.join(root, file)
relative_path = os.path.relpath(local_file, folder_path).replace("\\", "/")
remote_file = f"{target_path}/{relative_path}"
st.session_state.total_files_synced += 1
local_checksum = compute_checksum(local_file)
remote_checksum = get_remote_file_checksum(repo_id, token, remote_file)
if local_checksum and remote_checksum and local_checksum == remote_checksum:
log(f"βœ… Data integrity verified for '{relative_path}'.")
else:
log(f"❌ Data integrity verification failed for '{relative_path}'.", level="ERROR")
elif os.path.isfile(item_path):
# Upload files only if they are in the root directory
relative_path = os.path.relpath(item_path, folder_path).replace("\\", "/")
if "/" not in relative_path: # Check if the file is in the root
success = upload_folder_structure(item_path, repo_id, token, f"{target_path}/{item}", ignore_patterns)
if success:
st.session_state.total_files_synced += 1
local_checksum = compute_checksum(item_path)
remote_checksum = get_remote_file_checksum(repo_id, token, f"{target_path}/{item}")
if local_checksum and remote_checksum and local_checksum == remote_checksum:
log(f"βœ… Data integrity verified for '{relative_path}'.")
else:
log(f"❌ Data integrity verification failed for '{relative_path}'.", level="ERROR")
st.session_state.uploading = False
log("πŸš€ Upload process completed.")
def get_remote_file_checksum(repo_id, token, file_path):
# Placeholder: Hugging Face Hub does not provide file checksums directly
return None
# Function to monitor folder changes with real-time sync and offline queueing
class ChangeHandler(FileSystemEventHandler):
def on_modified(self, event):
if not st.session_state.uploading and not st.session_state.sync_paused:
if is_connected():
log("πŸ”„ Changes detected. Starting upload...")
upload_thread = Thread(target=upload_files)
upload_thread.start()
else:
log("❌ No internet connection. Queuing changes for later upload.", level="WARNING")
queue_changes(event.src_path)
# Queue changes when offline
def queue_changes(file_path):
queued_files = st.session_state.get("queued_files", [])
queued_files.append(file_path)
st.session_state["queued_files"] = queued_files
log(f"πŸ•’ Queued file for upload: {file_path}")
# Check and upload queued changes when back online
def check_queued_uploads():
if is_connected() and st.session_state.get("queued_files"):
log("πŸ”„ Uploading queued files...")
for file in st.session_state["queued_files"]:
upload_files_specific(file)
st.session_state["queued_files"] = []
# Upload a specific file (for queued uploads)
def upload_files_specific(file_path):
token = st.session_state.token
repo_id = st.session_state.repo_id
private = st.session_state.private
folder_path = st.session_state.folder_path
subfolder = st.session_state.subfolder
ignore_patterns_selected = st.session_state.ignore_patterns_selected
ignore_patterns = [IGNORE_PATTERNS_MAP[pattern] for pattern in ignore_patterns_selected]
target_path = subfolder.replace("\\", "/") if subfolder else ""
if not authenticate(token):
return
if not create_repo_if_not_exists(repo_id, token, private):
return
if not os.path.isfile(file_path):
log(f"❌ The file path '{file_path}' does not exist.", level="ERROR")
return
relative_path = os.path.relpath(file_path, folder_path).replace("\\", "/")
remote_path = f"{target_path}/{relative_path}"
success = upload_folder_structure(file_path, repo_id, token, remote_path, ignore_patterns)
if success:
log(f"βœ… Uploaded queued file '{relative_path}'.")
else:
log(f"❌ Failed to upload queued file '{relative_path}'.", level="ERROR")
# Function to get version history
def get_version_history():
api = HfApi()
token = st.session_state.token
repo_id = st.session_state.repo_id
try:
commits = api.list_repo_commits(repo_id=repo_id, repo_type="dataset", token=token)
history = []
for commit in commits:
date_str = commit.created_at.strftime('%Y-%m-%d %H:%M:%S')
history.append(f"Commit {commit.commit_id[:7]} by {commit.author_name} on {date_str}: {commit.title}")
return "\n".join(history)
except Exception as e:
log(f"❌ Error fetching version history: {e}", level="ERROR")
return "Error fetching version history."
# Function to download from remote
def download_from_remote():
token = st.session_state.token
repo_id = st.session_state.repo_id
folder_path = st.session_state.folder_path
subfolder = st.session_state.subfolder
target_path = subfolder.replace("\\", "/") if subfolder else ""
api = HfApi()
try:
remote_files = api.list_repo_files(repo_id=repo_id, repo_type="dataset", token=token)
for file in remote_files:
local_file_path = os.path.join(folder_path, file)
os.makedirs(os.path.dirname(local_file_path), exist_ok=True)
hf_hub_download(repo_id, file, repo_type="dataset", token=token, local_dir=os.path.dirname(local_file_path), local_dir_use_symlinks=False)
log(f"βœ… Downloaded '{file}' to '{local_file_path}'")
except Exception as e:
log(f"❌ Error downloading from remote: {e}", level="ERROR")
def pause_sync():
st.session_state.sync_paused = True
log("⏸️ Sync paused.", level="WARNING")
def resume_sync():
st.session_state.sync_paused = False
log("▢️ Sync resumed.", level="INFO")
upload_thread = Thread(target=upload_files)
upload_thread.start()
def save_settings():
settings = {
"repo_id": st.session_state.repo_id,
"private": st.session_state.private,
"folder_path": st.session_state.folder_path,
"subfolder": st.session_state.subfolder,
"process_individually": st.session_state.process_individually,
"ignore_patterns_selected": st.session_state.ignore_patterns_selected,
"selected_files": st.session_state.selected_files
}
with open("settings.json", "w") as f:
json.dump(settings, f)
log("πŸ’Ύ Settings saved.", level="INFO")
def load_settings():
if os.path.exists("settings.json"):
with open("settings.json", "r") as f:
settings = json.load(f)
st.session_state.repo_id = settings.get("repo_id", DEFAULT_REPO)
st.session_state.private = settings.get("private", True)
st.session_state.folder_path = settings.get("folder_path", DEFAULT_LOCAL_PATH)
st.session_state.subfolder = settings.get("subfolder", "")
st.session_state.process_individually = settings.get("process_individually", False)
st.session_state.ignore_patterns_selected = settings.get("ignore_patterns_selected", [])
st.session_state.selected_files = settings.get("selected_files", [])
log("πŸ”„ Settings loaded.", level="INFO")
else:
log("❌ No saved settings found.", level="ERROR")
def get_local_files(folder_path):
files = []
for root, dirs, filenames in os.walk(folder_path):
for filename in filenames:
relative_path = os.path.relpath(os.path.join(root, filename), folder_path)
files.append(relative_path.replace("\\", "/"))
return files
def schedule_sync():
def scheduled_upload():
if is_connected() and not st.session_state.uploading and not st.session_state.sync_paused:
log("⏰ Scheduled sync triggered.", level="INFO")
upload_files()
schedule.every().day.at("02:00").do(scheduled_upload)
st.session_state.scheduled_sync = True
log("⏰ Scheduled daily sync at 02:00.", level="INFO")
def run_scheduler():
while True:
schedule.run_pending()
time.sleep(1)
# --------------------------- Main Interface ---------------------------
st.title("πŸš€ InfiniteStorageFace")
st.write("Effortlessly sync your local folders to your private Hugging Face repository!")
# Create tabs for navigation
tabs = st.tabs(["Home", "Vault", "Settings", "Logs", "Help"])
with tabs[0]:
st.header("Welcome to InfiniteStorageFace")
st.write("Use the tabs to navigate through the application.")
st.subheader("Vault Overview")
st.write(f"**Repository ID:** {st.session_state.repo_id or 'Not Set'}")
st.write(f"**Private Repository:** {'Yes' if st.session_state.private else 'No'}")
st.write(f"**Total Files Synced:** {st.session_state.total_files_synced}")
st.write(f"**Total Files in Folder:** {st.session_state.total_files}")
# Display repository contents
st.subheader("Repository Contents")
if st.session_state.repo_exists:
repo_files = list_repo_files(repo_id=st.session_state.repo_id, token=st.session_state.token)
if repo_files:
for file in repo_files:
st.write(f"πŸ“„ {file}")
else:
st.write("Repository is empty.")
else:
st.write("Repository not found or not authenticated.")
with tabs[1]:
st.header("Vault Sync and Upload")
# Select Sync or Upload
st.session_state.sync_option = st.radio("Choose an option:", ["Sync", "Upload"], index=0)
# Folder selection using file browser
st.subheader("Select Folder to Sync/Upload")
st.session_state.folder_path = st.text_input("Folder Path", value=st.session_state.folder_path or DEFAULT_LOCAL_PATH)
# Alternatively, use a file browser component
# st.session_state.folder_path = file_browser()
if os.path.isdir(st.session_state.folder_path):
# Display folders only
folders = [f for f in os.listdir(st.session_state.folder_path) if os.path.isdir(os.path.join(st.session_state.folder_path, f))]
st.session_state.selected_files = st.multiselect(
"Select Folders to Sync/Upload (leave empty to include all):",
folders,
default=st.session_state.get('selected_files', []),
help="Select specific folders to include."
)
else:
st.error("❌ Invalid folder path.")
# Sync Controls
col_start, col_stop = st.columns(2)
with col_start:
if st.session_state.sync_option == 'Sync':
start_sync = st.button("Start Sync", key="start_sync")
else:
start_upload = st.button("Start Upload", key="start_upload")
with col_stop:
stop_sync = st.button("Stop", key="stop_sync")
# Handle buttons
if st.session_state.sync_option == 'Sync':
if start_sync:
st.session_state.cancel = False
if not st.session_state.observer:
event_handler = ChangeHandler()
st.session_state.observer = Observer()
st.session_state.observer.schedule(event_handler, st.session_state.folder_path, recursive=True)
st.session_state.observer.start()
log("πŸ‘€ Started monitoring for changes.", level="INFO")
log("πŸ”„ Sync started.", level="INFO")
upload_thread = Thread(target=upload_files)
upload_thread.start()
if stop_sync:
st.session_state.cancel = True
if st.session_state.observer:
st.session_state.observer.stop()
st.session_state.observer.join()
st.session_state.observer = None
log("πŸ›‘ Sync stopped.", level="INFO")
else:
if start_upload:
st.session_state.cancel = False
log("πŸ”„ Upload started.", level="INFO")
upload_thread = Thread(target=upload_files)
upload_thread.start()
if stop_sync:
st.session_state.cancel = True
log("πŸ›‘ Upload stopped.", level="INFO")
# Display sync status and statistics
st.subheader("Status")
if st.session_state.uploading:
st.info("πŸš€ Uploading...")
elif st.session_state.sync_paused:
st.warning("⏸️ Sync Paused.")
else:
st.success("βœ… Idle.")
st.write(f"**Total Files Synced:** {st.session_state.total_files_synced}")
st.write(f"**Total Files in Folder:** {st.session_state.total_files}")
with tabs[2]:
st.header("Settings")
# Securely retrieve token
if not st.session_state.token:
stored_token = keyring.get_password("huggingface", "token")
if stored_token:
st.session_state.token = stored_token
st.session_state.token = st.text_input(
"Hugging Face Token",
type="password",
value=st.session_state.token,
help="Enter your Hugging Face API token. It will be securely stored."
)
st.session_state.repo_id = st.text_input(
"Vault ID (Repository ID)",
value=st.session_state.get('repo_id', DEFAULT_REPO),
help="Format: username/repo-name"
)
st.session_state.private = st.checkbox(
"Make Vault Private",
value=st.session_state.get('private', True),
help="Private vaults are not publicly accessible."
)
st.session_state.subfolder = st.text_input(
"Subfolder in Vault (Optional)",
value=st.session_state.get('subfolder', ""),
help="Specify a subdirectory within the vault."
)
st.session_state.process_individually = st.checkbox(
"Process First-Level Folders Individually",
value=st.session_state.get('process_individually', False),
help="Upload each first-level folder individually."
)
st.session_state.ignore_patterns_selected = st.multiselect(
"Select Patterns to Ignore",
options=list(IGNORE_PATTERNS_MAP.keys()),
default=st.session_state.get('ignore_patterns_selected', ["Ignore __pycache__", "Ignore .git", "Ignore *.pyc"]),
help="Select file patterns to exclude."
)
save_settings_button = st.button("Save Settings", key="save_settings")
load_settings_button = st.button("Load Settings", key="load_settings")
if save_settings_button:
save_settings()
if load_settings_button:
load_settings()
with tabs[3]:
st.header("Logs")
# Integrated terminal-like logs (using hypothetical package)
# st_terminal(st.session_state.logs)
logs_text = "\n".join(st.session_state.logs[-100:])
st.text_area("Logs", value=logs_text, height=300)
with tabs[4]:
st.header("Help and Documentation")
st.markdown("""
### InfiniteStorageFace Documentation
**Getting Started:**
- **Vault ID**: This is your repository ID in the format `username/repo-name`. Treat this as your personal storage vault.
- **Hugging Face Token**: Obtain your API token from your [Hugging Face account settings](https://huggingface.co/settings/tokens).
- **Folder Selection**: Use the file browser or enter the path to the folder you want to sync or upload.
**Sync vs Upload:**
- **Sync**: Continuously monitors the selected folder for changes and syncs them to your vault.
- **Upload**: Performs a one-time upload of the selected folder or files to your vault.
**Settings:**
- **Private Vault**: By default, your vault is private. Only you can access it.
- **Ignore Patterns**: Select file patterns that you want to exclude from syncing or uploading.
**Logs and Status:**
- View real-time logs in the **Logs** tab.
- Check the sync status and statistics in the **Vault** tab.
**Support:**
- For any issues or questions, please refer to the official documentation or contact support.
""")
# Check queued uploads
check_queued_uploads()
# Cleanup on exit
def cleanup():
"""Cleanup observers and threads on exit."""
if st.session_state.observer is not None:
st.session_state.observer.stop()
st.session_state.observer.join()
# Run scheduled sync if enabled
if st.session_state.scheduled_sync:
schedule_sync()
if 'scheduler_thread' not in st.session_state:
scheduler_thread = Thread(target=run_scheduler, daemon=True)
scheduler_thread.start()
st.session_state['scheduler_thread'] = scheduler_thread
log("πŸ•’ Scheduler started.", level="INFO")
# Handle session end
# st.on_session_end(cleanup)