Spaces:
Runtime error
Runtime error
import logging | |
import os | |
import re | |
from datetime import datetime, timedelta | |
from argon2 import PasswordHasher | |
from dotenv import load_dotenv | |
from flask import Flask, request, jsonify, render_template | |
from flask_sqlalchemy import SQLAlchemy | |
from sqlalchemy.exc import IntegrityError | |
# load environment vars from .env | |
load_dotenv(".env") | |
logger = logging.getLogger(__name__) | |
app = Flask(__name__, template_folder="assets", static_folder="assets") | |
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///users.db" | |
db = SQLAlchemy(app) | |
hasher = PasswordHasher() | |
HOST = os.environ.get("HOST", "0.0.0.0") | |
PORT = int(os.environ.get("PORT", 5000)) | |
DEBUG = True if str(os.environ.get("DEBUG", "true")).lower() == "true" else False | |
MAX_LOGIN_ATTEMPTS = 10 | |
LOCKOUT_DURATION = timedelta(seconds=60) | |
COMMON_SWEAR_WORDS = [ | |
"fuck", "shit", "bitch", "asshole", "bastard", "cunt", "dick", "cock", "pussy", | |
"motherfucker", "wanker", "twat", "bollocks", "arsehole", "crap", "damn", "bugger", | |
"bloody", "sod", "git", "idiot", "moron", "prick", "slut", "whore", "nigger", "retard" | |
] | |
CHARACTER_SUBSTITUTIONS = { | |
"0": "o", | |
"1": "i", | |
"3": "e", | |
"4": "a", | |
"5": "s", | |
"6": "g", | |
"7": "t", | |
"8": "b", | |
"9": "g", | |
"@": "a", | |
"$": "s", | |
"!": "i" | |
} | |
def load_passwords_from_file(file_path: str) -> set: | |
"""Loads passwords from a file specified""" | |
try: | |
with open(file_path, 'r') as file: | |
passwords = set(file.read().splitlines()) | |
return passwords | |
except Exception as e: | |
logger.exception(f"Error occurred while retrieving passwords. {str(e)}") | |
return set() | |
WEAK_PASSWORDS = load_passwords_from_file('assets/weakpasswords.txt') | |
BREACHED_PASSWORDS = load_passwords_from_file('assets/breachedpasswords.txt') | |
BREACHED_PASSWORDS_LOWER = [p.lower() for p in BREACHED_PASSWORDS] | |
# create a sqlite model for | |
# storing user data | |
class User(db.Model): | |
id = db.Column(db.Integer, primary_key=True) | |
username = db.Column(db.String(80), unique=True, nullable=False) | |
hashed_password = db.Column(db.String(256), nullable=False) | |
salt = db.Column(db.String(16), nullable=False) | |
login_attempts = db.Column(db.Integer, default=0) | |
last_login_time = db.Column(db.DateTime, default=datetime.utcnow) | |
# create database tables | |
with app.app_context(): | |
db.create_all() | |
# custom exception definitions | |
class CredentialValidationError(Exception): | |
def __init__(self, message): | |
self.message = message | |
super().__init__(message) | |
def apply_character_substitutions(word): | |
"""Converts a char or number substituted | |
word into its original representation""" | |
for original, replacement in CHARACTER_SUBSTITUTIONS.items(): | |
word = word.replace(original, replacement) | |
return word | |
def validate_username(username: str) -> None: | |
"""Validates the username by checking | |
it against a pre-defined ruleset""" | |
# 1. raise a validation exception if username | |
# contains non-alphanumeric chars other than | |
# the underscore | |
if not re.match(r'^[a-zA-Z0-9_]+$', username): | |
raise CredentialValidationError("Invalid username") | |
# 2. raise a validation exception when the | |
# username contains a commonly used swear word | |
if any(word in username.lower() for word in COMMON_SWEAR_WORDS): | |
raise CredentialValidationError("Username cannot contain swear words") | |
# 3. raises when users attempt to bypass the | |
# above rule using symbol and number substitutions | |
if any(word in apply_character_substitutions(username.lower()) for word in COMMON_SWEAR_WORDS): | |
raise CredentialValidationError( | |
"Username contains a symbol or number substituted swear word" | |
) | |
def validate_password(username: str, password: str) -> None: | |
"""Validates a user password according | |
to the NISP password guidelines""" | |
# 1. Length: At least 8 characters | |
if len(password) < 8: | |
raise CredentialValidationError( | |
"The password must be at least 8 characters long" | |
) | |
# 2. Complexity: Overly complex rules | |
# will not be enforced | |
# 3. Composition: Disallowing consequent | |
# characters if consequent char count > 3 | |
if bool(re.search(r'(.)\1\1+', password)): | |
raise CredentialValidationError( | |
"The password cannot contain 3 or more " | |
"consequent repeated characters" | |
) | |
# 4. Expiration: Password expiration is | |
# not checked since it's not recommended | |
# to frequently expire passwords | |
# 5. Similarity to username: If exactly or | |
# partially similar to the username, an | |
# exception will be raised | |
if username.lower() in password.lower(): | |
raise CredentialValidationError( | |
"The password cannot be similar to the username" | |
) | |
if apply_character_substitutions(username.lower()) == apply_character_substitutions(password.lower()): | |
raise CredentialValidationError( | |
"The password cannot be similar to the username, " | |
"even with character and number substitutions" | |
) | |
# 6. Data Breaches and Weak Passwords: any weak | |
# password or breached passwords are disallowed | |
if password in WEAK_PASSWORDS: | |
raise CredentialValidationError( | |
"Password is too weak. Please try again with a strong password" | |
) | |
if password in BREACHED_PASSWORDS: | |
raise CredentialValidationError( | |
"Found the password in an already breached password dictionary, " | |
"thus not secure. Please try again with a strong password" | |
) | |
if password.lower() in BREACHED_PASSWORDS_LOWER: | |
raise CredentialValidationError( | |
"Password is very similar to a password in an already breached " | |
"password dictionary. Please try again with a strong password" | |
) | |
def hash_password(password) -> tuple[str, ...]: | |
try: | |
salt = os.urandom(16) | |
hashed_password = hasher.hash(password + salt.hex()) | |
return hashed_password, salt.hex() | |
except Exception as e: | |
logger.exception("Couldn't hash the password") | |
raise e | |
def verify_password(hashed_password, password, salt) -> bool: | |
try: | |
hasher.verify(hashed_password, password + salt) | |
return True | |
except Exception as e: | |
logger.exception(f"Couldn't verify the password. {str(e)}") | |
return False | |
def homepage(): | |
return render_template("index.html") | |
def enroll_page(): | |
return render_template("enroll.html") | |
def enroll(): | |
"""Enrolling new users""" | |
try: | |
data = request.get_json() | |
username = data.get("username") | |
password = data.get("password") | |
validate_username(username=username) | |
validate_password(username=username, password=password) | |
# securely hashing the password | |
# and storing it in the sqlite db | |
hashed_password, salt = hash_password(password) | |
user = User(username=username.lower(), hashed_password=hashed_password, salt=salt) | |
db.session.add(user) | |
db.session.commit() | |
return jsonify({"message": "User enrolled successfully"}), 200 | |
except CredentialValidationError as e: | |
logger.exception(str(e)) | |
return jsonify({"error": str(e)}), 400 | |
except IntegrityError as e: | |
logger.exception(str(e)) | |
return jsonify({"error": "Username is already taken"}), 409 | |
except Exception as e: | |
logger.exception(str(e)) | |
return jsonify({"error": "Internal Server Error", "details": str(e)}), 500 | |
def authenticate(): | |
"""Authenticates a user based | |
on user credentials""" | |
try: | |
data = request.get_json() | |
username = data.get("username") | |
password = data.get("password") | |
# 2FA/MFA: NIST entertains 2FA or MFA, but here it | |
# is not imposed due to its implementation complexity | |
user = User.query.filter_by(username=username.lower()).first() | |
if user is None: | |
return jsonify({"error": "User not found"}), 404 | |
# Retry attempts: Users are given 10 consequent | |
# login attempts until they're locked out | |
if user.login_attempts >= MAX_LOGIN_ATTEMPTS: | |
lockout_time = user.last_login_time + LOCKOUT_DURATION | |
remaining_duration = lockout_time - datetime.utcnow() | |
if remaining_duration.total_seconds() > 0: | |
remaining_seconds = int(remaining_duration.total_seconds()) | |
minutes, seconds = divmod(remaining_seconds, 60) | |
if minutes > 0: | |
remaining_time_str = f"{minutes} minute(s) and {seconds} second(s)" | |
else: | |
remaining_time_str = f"{seconds} second(s)" | |
return jsonify( | |
{ | |
"error": f"Account locked out. Try again in {remaining_time_str}" | |
} | |
), 401 | |
else: | |
user.login_attempts = 0 | |
user.last_login_time = datetime.utcnow() | |
db.session.commit() | |
# Verify the password | |
if not verify_password(user.hashed_password, password, user.salt): | |
user.login_attempts += 1 | |
user.last_login_time = datetime.utcnow() | |
db.session.commit() | |
if user.login_attempts >= MAX_LOGIN_ATTEMPTS: | |
remaining_time = int(LOCKOUT_DURATION.total_seconds()) | |
minutes, seconds = divmod(remaining_time, 60) | |
if minutes > 0: | |
remaining_time_str = f"{minutes} minute(s) and {seconds} second(s)" | |
else: | |
remaining_time_str = f"{seconds} second(s)" | |
return jsonify({"error": f"Account locked out. Try again in {remaining_time_str}"}), 401 | |
return jsonify({"error": "Invalid password"}), 401 | |
# Reset if a valid login attempt | |
user.login_attempts = 0 | |
user.last_login_time = datetime.utcnow() | |
db.session.commit() | |
return jsonify({"message": f"Authentication successful. Welcome @{username.lower()}!"}), 200 | |
except Exception as e: | |
logger.exception(str(e)) | |
return jsonify({"error": "Internal Server Error", "details": str(e)}), 500 | |
def get_users(): | |
"""Retrieves the list of all users""" | |
try: | |
users = User.query.all() | |
user_list = [{"id": user.id, "username": user.username} for user in users] | |
return jsonify(user_list), 200 | |
except Exception as e: | |
logger.exception(str(e)) | |
return jsonify({"error": "Internal Server Error", "details": str(e)}), 500 | |
def delete_user(username): | |
"""Deletes a user by username""" | |
try: | |
user = User.query.filter_by(username=username.lower()).first() | |
if user is None: | |
return jsonify({"error": "User not found"}), 404 | |
db.session.delete(user) | |
db.session.commit() | |
return jsonify({"message": "User deleted successfully"}), 200 | |
except Exception as e: | |
logger.exception(str(e)) | |
return jsonify({"error": "Internal Server Error", "details": str(e)}), 500 | |
if __name__ == "__main__": | |
app.run(host=HOST, port=PORT, debug=DEBUG) | |