Raven-1 / app.py
WarpWingHF's picture
fix
565441d unverified
raw
history blame
2.76 kB
from fastapi import FastAPI, UploadFile, File, HTTPException
from fastapi.responses import FileResponse
from pathlib import Path
import psutil
import shutil
import os
app = FastAPI()
# Directory where files will be uploaded
UPLOAD_DIRECTORY = Path("uploads")
UPLOAD_DIRECTORY.mkdir(parents=True, exist_ok=True)
# Hugging Face Spaces has a 50GB limit
TOTAL_SPACE_GB = 50
# Helper function to calculate the size of a directory
def get_directory_size(directory: Path) -> int:
total_size = 0
for dirpath, dirnames, filenames in os.walk(directory):
for f in filenames:
fp = os.path.join(dirpath, f)
total_size += os.path.getsize(fp)
return total_size
# Health check endpoint
@app.get("/health")
def health_check():
return {
"status": "healthy"
}
# System metrics endpoint: CPU, RAM, and disk usage for uploads folder (in GB and percentage of 50GB limit)
@app.get("/metrics")
def get_metrics():
# CPU percentage (rounded to the nearest whole number)
cpu_percent = round(psutil.cpu_percent(interval=1))
# Memory stats in GB
memory = psutil.virtual_memory()
memory_total_gb = memory.total / (1024 ** 3)
memory_available_gb = memory.available / (1024 ** 3)
# Disk stats for uploads directory
uploads_size_bytes = get_directory_size(UPLOAD_DIRECTORY)
uploads_size_gb = uploads_size_bytes / (1024 ** 3)
uploads_percent = (uploads_size_gb / TOTAL_SPACE_GB) * 100
return {
"cpu_percent": cpu_percent, # Rounded CPU percentage
"memory": {
"total_gb": round(memory_total_gb, 2),
"available_gb": round(memory_available_gb, 2),
"percent": memory.percent
},
"disk": {
"uploads_folder_size_gb": round(uploads_size_gb, 2),
"uploads_usage_percent_of_50gb": round(uploads_percent, 2),
"total_space_gb": TOTAL_SPACE_GB
}
}
# File upload endpoint
@app.post("/uploadfile/")
async def upload_file(file: UploadFile = File(...)):
file_location = UPLOAD_DIRECTORY / file.filename
with file_location.open("wb") as buffer:
shutil.copyfileobj(file.file, buffer)
return {"info": f"file '{file.filename}' saved at '{file_location}'"}
# File download endpoint
@app.get("/downloadfile/{filename}")
def download_file(filename: str):
file_location = UPLOAD_DIRECTORY / filename
if not file_location.exists():
raise HTTPException(status_code=404, detail="File not found")
return FileResponse(path=file_location, filename=filename)
# Show current files endpoint
@app.get("/files/")
def list_files():
files = [f for f in os.listdir(UPLOAD_DIRECTORY) if os.path.isfile(UPLOAD_DIRECTORY / f)]
return {"files": files}