from rest_framework_simplejwt.tokens import AccessToken from rest_framework_simplejwt.exceptions import TokenError, InvalidToken from django.conf import settings from functools import wraps from rest_framework.response import Response from rest_framework import status from rest_framework_simplejwt.authentication import JWTAuthentication from .models import Bhagat from django.http import JsonResponse def get_token_from_request(request): """Extract token from Authorization header""" auth_header = request.META.get('HTTP_AUTHORIZATION', '') if auth_header.startswith('Bearer '): return auth_header.split(' ')[1] return None def validate_jwt_token(token): """Validate a JWT token and return the user""" try: if not token: return None # Validate token valid_token = AccessToken(token) # Get user from token user_id = valid_token.payload.get('user_id') if not user_id: return None # Get user from database user = Bhagat.objects.filter(id=user_id).first() return user except (TokenError, InvalidToken): return None def jwt_required(allowed_user_types=None): """ Decorator for views that require JWT authentication Optional: Pass allowed_user_types as a list to restrict access to specific user types """ def decorator(view_func): @wraps(view_func) def wrapped_view(request, *args, **kwargs): # Get token from header token = get_token_from_request(request) if not token: return JsonResponse({ "error": "No authentication token provided", "status": "error" }) # Validate token and get user user = validate_jwt_token(token) if not user: return JsonResponse({ "error": "Invalid or expired token", "status": "error" }) # Check user type if specified if allowed_user_types and user.user_type not in allowed_user_types: return JsonResponse({ "error": "Unauthorized access", "status": "error" }) # Add user to request request.user = user return view_func(request, *args, **kwargs) return wrapped_view return decorator