Spaces:
Sleeping
Sleeping
from flask import Flask, request, redirect, url_for, send_file, render_template, flash, Response | |
from flask_cors import CORS | |
from werkzeug.utils import secure_filename | |
from pymongo.mongo_client import MongoClient | |
from pymongo.server_api import ServerApi | |
import urllib.parse | |
from functools import wraps | |
import os | |
import io | |
app = Flask(__name__) | |
app.secret_key = os.getenv('SECRET_KEY') | |
username = urllib.parse.quote_plus(os.getenv('MONGO_USERNAME')) | |
password = urllib.parse.quote_plus(os.getenv('MONGO_PASSWORD')) | |
restUri = os.getenv('REST_URI') | |
uri = f'mongodb+srv://{username}:{password}{restUri}' | |
client = MongoClient(uri, server_api=ServerApi('1')) | |
db = client['file_storage'] | |
files_collection = db['files'] | |
try: | |
client.admin.command('ping') | |
print("Pinged your deployment. You successfully connected to MongoDB!") | |
except Exception as e: | |
print(e) | |
# Get the password from the environment variable | |
APP_PASSWORD = os.getenv('APP_PASSWORD') | |
def check_auth(username, password): | |
"""Check if a username/password combination is valid.""" | |
return username == 'admin' and password == APP_PASSWORD | |
def authenticate(): | |
"""Send a 401 response that enables basic auth.""" | |
return Response( | |
'Could not verify your access level for that URL.\n' | |
'You have to login with proper credentials', 401, | |
{'WWW-Authenticate': 'Basic realm="Login Required"'}) | |
def requires_auth(f): | |
def decorated(*args, **kwargs): | |
auth = request.authorization | |
if not auth or not check_auth(auth.username, auth.password): | |
return authenticate() | |
return f(*args, **kwargs) | |
return decorated | |
def index(): | |
return render_template('index.html') | |
def upload_file(): | |
if request.method == 'POST': | |
if 'file' not in request.files: | |
flash('No file part') | |
return redirect(request.url) | |
files = request.files.getlist('file') | |
for file in files: | |
if file.filename == '': | |
flash('No selected file') | |
return redirect(request.url) | |
if file: | |
filename = secure_filename(file.filename) | |
file_data = { | |
'filename': filename, | |
'data': file.read() | |
} | |
files_collection.insert_one(file_data) | |
flash('Files successfully uploaded') | |
return redirect(url_for('list_files')) | |
return render_template('upload.html') | |
def uploaded_file(filename): | |
file_data = files_collection.find_one({'filename': filename}) | |
if file_data: | |
return send_file( | |
io.BytesIO(file_data['data']), | |
mimetype='application/octet-stream' | |
) | |
else: | |
return 'File not found' | |
def list_files(): | |
files = [file_data['filename'] for file_data in files_collection.find({}, {'_id': 1, 'filename': 1})] | |
return render_template('files.html', files=files) | |
def delete_file(filename): | |
file_data = files_collection.find_one({'filename': filename}) | |
if file_data: | |
files_collection.delete_one({'filename': filename}) | |
flash('File successfully deleted') | |
else: | |
flash('File not found') | |
return redirect(url_for('list_files')) | |
if __name__ == '__main__': | |
app.run(debug=True) |