Spaces:
Sleeping
Sleeping
File size: 3,534 Bytes
b242d03 2ace788 b242d03 2ace788 b242d03 2ace788 b242d03 2ace788 b242d03 2ace788 b242d03 2ace788 b242d03 2ace788 b242d03 2ace788 b242d03 2ace788 b242d03 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
from flask import Flask, request, redirect, url_for, send_file, render_template, flash, Response
from flask_cors import CORS
from werkzeug.utils import secure_filename
from pymongo.mongo_client import MongoClient
from pymongo.server_api import ServerApi
import urllib.parse
from functools import wraps
import os
import io
app = Flask(__name__)
app.secret_key = os.getenv('SECRET_KEY')
username = urllib.parse.quote_plus(os.getenv('MONGO_USERNAME'))
password = urllib.parse.quote_plus(os.getenv('MONGO_PASSWORD'))
restUri = os.getenv('REST_URI')
uri = f'mongodb+srv://{username}:{password}{restUri}'
client = MongoClient(uri, server_api=ServerApi('1'))
db = client['file_storage']
files_collection = db['files']
try:
client.admin.command('ping')
print("Pinged your deployment. You successfully connected to MongoDB!")
except Exception as e:
print(e)
# Get the password from the environment variable
APP_PASSWORD = os.getenv('APP_PASSWORD')
def check_auth(username, password):
"""Check if a username/password combination is valid."""
return username == 'admin' and password == APP_PASSWORD
def authenticate():
"""Send a 401 response that enables basic auth."""
return Response(
'Could not verify your access level for that URL.\n'
'You have to login with proper credentials', 401,
{'WWW-Authenticate': 'Basic realm="Login Required"'})
def requires_auth(f):
@wraps(f)
def decorated(*args, **kwargs):
auth = request.authorization
if not auth or not check_auth(auth.username, auth.password):
return authenticate()
return f(*args, **kwargs)
return decorated
@app.route('/')
@requires_auth
def index():
return render_template('index.html')
@app.route('/upload', methods=['GET', 'POST'])
@requires_auth
def upload_file():
if request.method == 'POST':
if 'file' not in request.files:
flash('No file part')
return redirect(request.url)
files = request.files.getlist('file')
for file in files:
if file.filename == '':
flash('No selected file')
return redirect(request.url)
if file:
filename = secure_filename(file.filename)
file_data = {
'filename': filename,
'data': file.read()
}
files_collection.insert_one(file_data)
flash('Files successfully uploaded')
return redirect(url_for('list_files'))
return render_template('upload.html')
@app.route('/uploads/<filename>')
@requires_auth
def uploaded_file(filename):
file_data = files_collection.find_one({'filename': filename})
if file_data:
return send_file(
io.BytesIO(file_data['data']),
mimetype='application/octet-stream'
)
else:
return 'File not found'
@app.route('/files')
@requires_auth
def list_files():
files = [file_data['filename'] for file_data in files_collection.find({}, {'_id': 1, 'filename': 1})]
return render_template('files.html', files=files)
@app.route('/delete/<filename>', methods=['POST'])
@requires_auth
def delete_file(filename):
file_data = files_collection.find_one({'filename': filename})
if file_data:
files_collection.delete_one({'filename': filename})
flash('File successfully deleted')
else:
flash('File not found')
return redirect(url_for('list_files'))
if __name__ == '__main__':
app.run(debug=True) |