authentik.enterprise.middleware
Enterprise middleware
1"""Enterprise middleware""" 2 3from collections.abc import Callable 4 5from django.http import HttpRequest, HttpResponse, JsonResponse 6from django.urls import resolve 7from structlog.stdlib import BoundLogger, get_logger 8 9from authentik.core.api.users import UserViewSet 10from authentik.enterprise.api import LicenseViewSet 11from authentik.enterprise.license import LicenseKey 12from authentik.enterprise.models import LicenseUsageStatus 13from authentik.flows.views.executor import FlowExecutorView 14from authentik.lib.utils.reflection import class_to_path 15 16 17class EnterpriseMiddleware: 18 """Enterprise middleware""" 19 20 get_response: Callable[[HttpRequest], HttpResponse] 21 logger: BoundLogger 22 23 def __init__(self, get_response: Callable[[HttpRequest], HttpResponse]): 24 self.get_response = get_response 25 self.logger = get_logger().bind() 26 27 def __call__(self, request: HttpRequest) -> HttpResponse: 28 resolver_match = resolve(request.path_info) 29 request.resolver_match = resolver_match 30 if not self.is_request_allowed(request): 31 self.logger.warning("Refusing request due to expired/invalid license") 32 return JsonResponse( 33 { 34 "detail": "Request denied due to expired/invalid license.", 35 "code": "denied_license", 36 }, 37 status=400, 38 ) 39 return self.get_response(request) 40 41 def is_request_allowed(self, request: HttpRequest) -> bool: 42 """Check if a specific request is allowed""" 43 if self.is_request_always_allowed(request): 44 return True 45 cached_status = LicenseKey.cached_summary() 46 if not cached_status: 47 return True 48 if cached_status.status == LicenseUsageStatus.READ_ONLY: 49 return False 50 return True 51 52 def is_request_always_allowed(self, request: HttpRequest): 53 """Check if a request is always allowed""" 54 # Always allow "safe" methods 55 if request.method.lower() in ["get", "head", "options", "trace"]: 56 return True 57 # Always allow requests to manage licenses 58 if request.resolver_match._func_path == class_to_path(LicenseViewSet): 59 return True 60 # Flow executor is mounted as an API path but explicitly allowed 61 if request.resolver_match._func_path == class_to_path(FlowExecutorView): 62 return True 63 # Always allow making changes to users, even in case the license has ben exceeded 64 if request.resolver_match._func_path == class_to_path(UserViewSet): 65 return True 66 # Only apply these restrictions to the API 67 if "authentik_api" not in request.resolver_match.app_names: 68 return True 69 return False
class
EnterpriseMiddleware:
18class EnterpriseMiddleware: 19 """Enterprise middleware""" 20 21 get_response: Callable[[HttpRequest], HttpResponse] 22 logger: BoundLogger 23 24 def __init__(self, get_response: Callable[[HttpRequest], HttpResponse]): 25 self.get_response = get_response 26 self.logger = get_logger().bind() 27 28 def __call__(self, request: HttpRequest) -> HttpResponse: 29 resolver_match = resolve(request.path_info) 30 request.resolver_match = resolver_match 31 if not self.is_request_allowed(request): 32 self.logger.warning("Refusing request due to expired/invalid license") 33 return JsonResponse( 34 { 35 "detail": "Request denied due to expired/invalid license.", 36 "code": "denied_license", 37 }, 38 status=400, 39 ) 40 return self.get_response(request) 41 42 def is_request_allowed(self, request: HttpRequest) -> bool: 43 """Check if a specific request is allowed""" 44 if self.is_request_always_allowed(request): 45 return True 46 cached_status = LicenseKey.cached_summary() 47 if not cached_status: 48 return True 49 if cached_status.status == LicenseUsageStatus.READ_ONLY: 50 return False 51 return True 52 53 def is_request_always_allowed(self, request: HttpRequest): 54 """Check if a request is always allowed""" 55 # Always allow "safe" methods 56 if request.method.lower() in ["get", "head", "options", "trace"]: 57 return True 58 # Always allow requests to manage licenses 59 if request.resolver_match._func_path == class_to_path(LicenseViewSet): 60 return True 61 # Flow executor is mounted as an API path but explicitly allowed 62 if request.resolver_match._func_path == class_to_path(FlowExecutorView): 63 return True 64 # Always allow making changes to users, even in case the license has ben exceeded 65 if request.resolver_match._func_path == class_to_path(UserViewSet): 66 return True 67 # Only apply these restrictions to the API 68 if "authentik_api" not in request.resolver_match.app_names: 69 return True 70 return False
Enterprise middleware
EnterpriseMiddleware( get_response: Callable[[django.http.request.HttpRequest], django.http.response.HttpResponse])
def
is_request_allowed(self, request: django.http.request.HttpRequest) -> bool:
42 def is_request_allowed(self, request: HttpRequest) -> bool: 43 """Check if a specific request is allowed""" 44 if self.is_request_always_allowed(request): 45 return True 46 cached_status = LicenseKey.cached_summary() 47 if not cached_status: 48 return True 49 if cached_status.status == LicenseUsageStatus.READ_ONLY: 50 return False 51 return True
Check if a specific request is allowed
def
is_request_always_allowed(self, request: django.http.request.HttpRequest):
53 def is_request_always_allowed(self, request: HttpRequest): 54 """Check if a request is always allowed""" 55 # Always allow "safe" methods 56 if request.method.lower() in ["get", "head", "options", "trace"]: 57 return True 58 # Always allow requests to manage licenses 59 if request.resolver_match._func_path == class_to_path(LicenseViewSet): 60 return True 61 # Flow executor is mounted as an API path but explicitly allowed 62 if request.resolver_match._func_path == class_to_path(FlowExecutorView): 63 return True 64 # Always allow making changes to users, even in case the license has ben exceeded 65 if request.resolver_match._func_path == class_to_path(UserViewSet): 66 return True 67 # Only apply these restrictions to the API 68 if "authentik_api" not in request.resolver_match.app_names: 69 return True 70 return False
Check if a request is always allowed