Spaces:
Running
Running
import logging | |
import uuid | |
from typing import Optional | |
from open_webui.internal.db import Base, get_db | |
from open_webui.models.users import UserModel, Users | |
from open_webui.env import SRC_LOG_LEVELS | |
from pydantic import BaseModel | |
from sqlalchemy import Boolean, Column, String, Text | |
from open_webui.utils.auth import verify_password | |
log = logging.getLogger(__name__) | |
log.setLevel(SRC_LOG_LEVELS["MODELS"]) | |
#################### | |
# DB MODEL | |
#################### | |
class Auth(Base): | |
__tablename__ = "auth" | |
id = Column(String, primary_key=True) | |
email = Column(String) | |
password = Column(Text) | |
active = Column(Boolean) | |
class AuthModel(BaseModel): | |
id: str | |
email: str | |
password: str | |
active: bool = True | |
#################### | |
# Forms | |
#################### | |
class Token(BaseModel): | |
token: str | |
token_type: str | |
class ApiKey(BaseModel): | |
api_key: Optional[str] = None | |
class UserResponse(BaseModel): | |
id: str | |
email: str | |
name: str | |
role: str | |
profile_image_url: str | |
class SigninResponse(Token, UserResponse): | |
pass | |
class SigninForm(BaseModel): | |
email: str | |
password: str | |
class LdapForm(BaseModel): | |
user: str | |
password: str | |
class ProfileImageUrlForm(BaseModel): | |
profile_image_url: str | |
class UpdateProfileForm(BaseModel): | |
profile_image_url: str | |
name: str | |
class UpdatePasswordForm(BaseModel): | |
password: str | |
new_password: str | |
class SignupForm(BaseModel): | |
name: str | |
email: str | |
password: str | |
profile_image_url: Optional[str] = "/user.png" | |
class AddUserForm(SignupForm): | |
role: Optional[str] = "pending" | |
class AuthsTable: | |
def insert_new_auth( | |
self, | |
email: str, | |
password: str, | |
name: str, | |
profile_image_url: str = "/user.png", | |
role: str = "pending", | |
oauth_sub: Optional[str] = None, | |
) -> Optional[UserModel]: | |
with get_db() as db: | |
log.info("insert_new_auth") | |
id = str(uuid.uuid4()) | |
auth = AuthModel( | |
**{"id": id, "email": email, "password": password, "active": True} | |
) | |
result = Auth(**auth.model_dump()) | |
db.add(result) | |
user = Users.insert_new_user( | |
id, name, email, profile_image_url, role, oauth_sub | |
) | |
db.commit() | |
db.refresh(result) | |
if result and user: | |
return user | |
else: | |
return None | |
def authenticate_user(self, email: str, password: str) -> Optional[UserModel]: | |
log.info(f"authenticate_user: {email}") | |
try: | |
with get_db() as db: | |
auth = db.query(Auth).filter_by(email=email, active=True).first() | |
if auth: | |
if verify_password(password, auth.password): | |
user = Users.get_user_by_id(auth.id) | |
return user | |
else: | |
return None | |
else: | |
return None | |
except Exception: | |
return None | |
def authenticate_user_by_api_key(self, api_key: str) -> Optional[UserModel]: | |
log.info(f"authenticate_user_by_api_key: {api_key}") | |
# if no api_key, return None | |
if not api_key: | |
return None | |
try: | |
user = Users.get_user_by_api_key(api_key) | |
return user if user else None | |
except Exception: | |
return False | |
def authenticate_user_by_trusted_header(self, email: str) -> Optional[UserModel]: | |
log.info(f"authenticate_user_by_trusted_header: {email}") | |
try: | |
with get_db() as db: | |
auth = db.query(Auth).filter_by(email=email, active=True).first() | |
if auth: | |
user = Users.get_user_by_id(auth.id) | |
return user | |
except Exception: | |
return None | |
def update_user_password_by_id(self, id: str, new_password: str) -> bool: | |
try: | |
with get_db() as db: | |
result = ( | |
db.query(Auth).filter_by(id=id).update({"password": new_password}) | |
) | |
db.commit() | |
return True if result == 1 else False | |
except Exception: | |
return False | |
def update_email_by_id(self, id: str, email: str) -> bool: | |
try: | |
with get_db() as db: | |
result = db.query(Auth).filter_by(id=id).update({"email": email}) | |
db.commit() | |
return True if result == 1 else False | |
except Exception: | |
return False | |
def delete_auth_by_id(self, id: str) -> bool: | |
try: | |
with get_db() as db: | |
# Delete User | |
result = Users.delete_user_by_id(id) | |
if result: | |
db.query(Auth).filter_by(id=id).delete() | |
db.commit() | |
return True | |
else: | |
return False | |
except Exception: | |
return False | |
Auths = AuthsTable() | |