AuthSystem / app.py
thisisishara's picture
init commit
05d7116
import logging
import os
import re
from datetime import datetime, timedelta
from argon2 import PasswordHasher
from dotenv import load_dotenv
from flask import Flask, request, jsonify, render_template
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy.exc import IntegrityError
# load environment vars from .env
load_dotenv(".env")
logger = logging.getLogger(__name__)
app = Flask(__name__, template_folder="assets", static_folder="assets")
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///users.db"
db = SQLAlchemy(app)
hasher = PasswordHasher()
HOST = os.environ.get("HOST", "0.0.0.0")
PORT = int(os.environ.get("PORT", 5000))
DEBUG = True if str(os.environ.get("DEBUG", "true")).lower() == "true" else False
MAX_LOGIN_ATTEMPTS = 10
LOCKOUT_DURATION = timedelta(seconds=60)
COMMON_SWEAR_WORDS = [
"fuck", "shit", "bitch", "asshole", "bastard", "cunt", "dick", "cock", "pussy",
"motherfucker", "wanker", "twat", "bollocks", "arsehole", "crap", "damn", "bugger",
"bloody", "sod", "git", "idiot", "moron", "prick", "slut", "whore", "nigger", "retard"
]
CHARACTER_SUBSTITUTIONS = {
"0": "o",
"1": "i",
"3": "e",
"4": "a",
"5": "s",
"6": "g",
"7": "t",
"8": "b",
"9": "g",
"@": "a",
"$": "s",
"!": "i"
}
def load_passwords_from_file(file_path: str) -> set:
"""Loads passwords from a file specified"""
try:
with open(file_path, 'r') as file:
passwords = set(file.read().splitlines())
return passwords
except Exception as e:
logger.exception(f"Error occurred while retrieving passwords. {str(e)}")
return set()
WEAK_PASSWORDS = load_passwords_from_file('assets/weakpasswords.txt')
BREACHED_PASSWORDS = load_passwords_from_file('assets/breachedpasswords.txt')
BREACHED_PASSWORDS_LOWER = [p.lower() for p in BREACHED_PASSWORDS]
# create a sqlite model for
# storing user data
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
hashed_password = db.Column(db.String(256), nullable=False)
salt = db.Column(db.String(16), nullable=False)
login_attempts = db.Column(db.Integer, default=0)
last_login_time = db.Column(db.DateTime, default=datetime.utcnow)
# create database tables
with app.app_context():
db.create_all()
# custom exception definitions
class CredentialValidationError(Exception):
def __init__(self, message):
self.message = message
super().__init__(message)
def apply_character_substitutions(word):
"""Converts a char or number substituted
word into its original representation"""
for original, replacement in CHARACTER_SUBSTITUTIONS.items():
word = word.replace(original, replacement)
return word
def validate_username(username: str) -> None:
"""Validates the username by checking
it against a pre-defined ruleset"""
# 1. raise a validation exception if username
# contains non-alphanumeric chars other than
# the underscore
if not re.match(r'^[a-zA-Z0-9_]+$', username):
raise CredentialValidationError("Invalid username")
# 2. raise a validation exception when the
# username contains a commonly used swear word
if any(word in username.lower() for word in COMMON_SWEAR_WORDS):
raise CredentialValidationError("Username cannot contain swear words")
# 3. raises when users attempt to bypass the
# above rule using symbol and number substitutions
if any(word in apply_character_substitutions(username.lower()) for word in COMMON_SWEAR_WORDS):
raise CredentialValidationError(
"Username contains a symbol or number substituted swear word"
)
def validate_password(username: str, password: str) -> None:
"""Validates a user password according
to the NISP password guidelines"""
# 1. Length: At least 8 characters
if len(password) < 8:
raise CredentialValidationError(
"The password must be at least 8 characters long"
)
# 2. Complexity: Overly complex rules
# will not be enforced
# 3. Composition: Disallowing consequent
# characters if consequent char count > 3
if bool(re.search(r'(.)\1\1+', password)):
raise CredentialValidationError(
"The password cannot contain 3 or more "
"consequent repeated characters"
)
# 4. Expiration: Password expiration is
# not checked since it's not recommended
# to frequently expire passwords
# 5. Similarity to username: If exactly or
# partially similar to the username, an
# exception will be raised
if username.lower() in password.lower():
raise CredentialValidationError(
"The password cannot be similar to the username"
)
if apply_character_substitutions(username.lower()) == apply_character_substitutions(password.lower()):
raise CredentialValidationError(
"The password cannot be similar to the username, "
"even with character and number substitutions"
)
# 6. Data Breaches and Weak Passwords: any weak
# password or breached passwords are disallowed
if password in WEAK_PASSWORDS:
raise CredentialValidationError(
"Password is too weak. Please try again with a strong password"
)
if password in BREACHED_PASSWORDS:
raise CredentialValidationError(
"Found the password in an already breached password dictionary, "
"thus not secure. Please try again with a strong password"
)
if password.lower() in BREACHED_PASSWORDS_LOWER:
raise CredentialValidationError(
"Password is very similar to a password in an already breached "
"password dictionary. Please try again with a strong password"
)
def hash_password(password) -> tuple[str, ...]:
try:
salt = os.urandom(16)
hashed_password = hasher.hash(password + salt.hex())
return hashed_password, salt.hex()
except Exception as e:
logger.exception("Couldn't hash the password")
raise e
def verify_password(hashed_password, password, salt) -> bool:
try:
hasher.verify(hashed_password, password + salt)
return True
except Exception as e:
logger.exception(f"Couldn't verify the password. {str(e)}")
return False
@app.route("/")
def homepage():
return render_template("index.html")
@app.route("/enroll")
def enroll_page():
return render_template("enroll.html")
@app.route("/api/enroll", methods=["POST"])
def enroll():
"""Enrolling new users"""
try:
data = request.get_json()
username = data.get("username")
password = data.get("password")
validate_username(username=username)
validate_password(username=username, password=password)
# securely hashing the password
# and storing it in the sqlite db
hashed_password, salt = hash_password(password)
user = User(username=username.lower(), hashed_password=hashed_password, salt=salt)
db.session.add(user)
db.session.commit()
return jsonify({"message": "User enrolled successfully"}), 200
except CredentialValidationError as e:
logger.exception(str(e))
return jsonify({"error": str(e)}), 400
except IntegrityError as e:
logger.exception(str(e))
return jsonify({"error": "Username is already taken"}), 409
except Exception as e:
logger.exception(str(e))
return jsonify({"error": "Internal Server Error", "details": str(e)}), 500
@app.route("/api/authenticate", methods=["POST"])
def authenticate():
"""Authenticates a user based
on user credentials"""
try:
data = request.get_json()
username = data.get("username")
password = data.get("password")
# 2FA/MFA: NIST entertains 2FA or MFA, but here it
# is not imposed due to its implementation complexity
user = User.query.filter_by(username=username.lower()).first()
if user is None:
return jsonify({"error": "User not found"}), 404
# Retry attempts: Users are given 10 consequent
# login attempts until they're locked out
if user.login_attempts >= MAX_LOGIN_ATTEMPTS:
lockout_time = user.last_login_time + LOCKOUT_DURATION
remaining_duration = lockout_time - datetime.utcnow()
if remaining_duration.total_seconds() > 0:
remaining_seconds = int(remaining_duration.total_seconds())
minutes, seconds = divmod(remaining_seconds, 60)
if minutes > 0:
remaining_time_str = f"{minutes} minute(s) and {seconds} second(s)"
else:
remaining_time_str = f"{seconds} second(s)"
return jsonify(
{
"error": f"Account locked out. Try again in {remaining_time_str}"
}
), 401
else:
user.login_attempts = 0
user.last_login_time = datetime.utcnow()
db.session.commit()
# Verify the password
if not verify_password(user.hashed_password, password, user.salt):
user.login_attempts += 1
user.last_login_time = datetime.utcnow()
db.session.commit()
if user.login_attempts >= MAX_LOGIN_ATTEMPTS:
remaining_time = int(LOCKOUT_DURATION.total_seconds())
minutes, seconds = divmod(remaining_time, 60)
if minutes > 0:
remaining_time_str = f"{minutes} minute(s) and {seconds} second(s)"
else:
remaining_time_str = f"{seconds} second(s)"
return jsonify({"error": f"Account locked out. Try again in {remaining_time_str}"}), 401
return jsonify({"error": "Invalid password"}), 401
# Reset if a valid login attempt
user.login_attempts = 0
user.last_login_time = datetime.utcnow()
db.session.commit()
return jsonify({"message": f"Authentication successful. Welcome @{username.lower()}!"}), 200
except Exception as e:
logger.exception(str(e))
return jsonify({"error": "Internal Server Error", "details": str(e)}), 500
@app.route("/api/users", methods=["GET"])
def get_users():
"""Retrieves the list of all users"""
try:
users = User.query.all()
user_list = [{"id": user.id, "username": user.username} for user in users]
return jsonify(user_list), 200
except Exception as e:
logger.exception(str(e))
return jsonify({"error": "Internal Server Error", "details": str(e)}), 500
@app.route("/api/users/<string:username>", methods=["DELETE"])
def delete_user(username):
"""Deletes a user by username"""
try:
user = User.query.filter_by(username=username.lower()).first()
if user is None:
return jsonify({"error": "User not found"}), 404
db.session.delete(user)
db.session.commit()
return jsonify({"message": "User deleted successfully"}), 200
except Exception as e:
logger.exception(str(e))
return jsonify({"error": "Internal Server Error", "details": str(e)}), 500
if __name__ == "__main__":
app.run(host=HOST, port=PORT, debug=DEBUG)