Spaces:
Build error
Build error
import uuid | |
from flask import request | |
from flask_restful import Resource | |
from werkzeug.exceptions import NotFound, Unauthorized | |
from controllers.web import api | |
from controllers.web.error import WebSSOAuthRequiredError | |
from extensions.ext_database import db | |
from libs.passport import PassportService | |
from models.model import App, EndUser, Site | |
from services.enterprise.enterprise_service import EnterpriseService | |
from services.feature_service import FeatureService | |
class PassportResource(Resource): | |
"""Base resource for passport.""" | |
def get(self): | |
system_features = FeatureService.get_system_features() | |
app_code = request.headers.get("X-App-Code") | |
if app_code is None: | |
raise Unauthorized("X-App-Code header is missing.") | |
if system_features.sso_enforced_for_web: | |
app_web_sso_enabled = EnterpriseService.get_app_web_sso_enabled(app_code).get("enabled", False) | |
if app_web_sso_enabled: | |
raise WebSSOAuthRequiredError() | |
# get site from db and check if it is normal | |
site = db.session.query(Site).filter(Site.code == app_code, Site.status == "normal").first() | |
if not site: | |
raise NotFound() | |
# get app from db and check if it is normal and enable_site | |
app_model = db.session.query(App).filter(App.id == site.app_id).first() | |
if not app_model or app_model.status != "normal" or not app_model.enable_site: | |
raise NotFound() | |
end_user = EndUser( | |
tenant_id=app_model.tenant_id, | |
app_id=app_model.id, | |
type="browser", | |
is_anonymous=True, | |
session_id=generate_session_id(), | |
) | |
db.session.add(end_user) | |
db.session.commit() | |
payload = { | |
"iss": site.app_id, | |
"sub": "Web API Passport", | |
"app_id": site.app_id, | |
"app_code": app_code, | |
"end_user_id": end_user.id, | |
} | |
tk = PassportService().issue(payload) | |
return { | |
"access_token": tk, | |
} | |
api.add_resource(PassportResource, "/passport") | |
def generate_session_id(): | |
""" | |
Generate a unique session ID. | |
""" | |
while True: | |
session_id = str(uuid.uuid4()) | |
existing_count = db.session.query(EndUser).filter(EndUser.session_id == session_id).count() | |
if existing_count == 0: | |
return session_id | |