authentik.enterprise.middleware

Enterprise middleware

 1"""Enterprise middleware"""
 2
 3from collections.abc import Callable
 4
 5from django.http import HttpRequest, HttpResponse, JsonResponse
 6from django.urls import resolve
 7from structlog.stdlib import BoundLogger, get_logger
 8
 9from authentik.core.api.users import UserViewSet
10from authentik.enterprise.api import LicenseViewSet
11from authentik.enterprise.license import LicenseKey
12from authentik.enterprise.models import LicenseUsageStatus
13from authentik.flows.views.executor import FlowExecutorView
14from authentik.lib.utils.reflection import class_to_path
15
16
17class EnterpriseMiddleware:
18    """Enterprise middleware"""
19
20    get_response: Callable[[HttpRequest], HttpResponse]
21    logger: BoundLogger
22
23    def __init__(self, get_response: Callable[[HttpRequest], HttpResponse]):
24        self.get_response = get_response
25        self.logger = get_logger().bind()
26
27    def __call__(self, request: HttpRequest) -> HttpResponse:
28        resolver_match = resolve(request.path_info)
29        request.resolver_match = resolver_match
30        if not self.is_request_allowed(request):
31            self.logger.warning("Refusing request due to expired/invalid license")
32            return JsonResponse(
33                {
34                    "detail": "Request denied due to expired/invalid license.",
35                    "code": "denied_license",
36                },
37                status=400,
38            )
39        return self.get_response(request)
40
41    def is_request_allowed(self, request: HttpRequest) -> bool:
42        """Check if a specific request is allowed"""
43        if self.is_request_always_allowed(request):
44            return True
45        cached_status = LicenseKey.cached_summary()
46        if not cached_status:
47            return True
48        if cached_status.status == LicenseUsageStatus.READ_ONLY:
49            return False
50        return True
51
52    def is_request_always_allowed(self, request: HttpRequest):
53        """Check if a request is always allowed"""
54        # Always allow "safe" methods
55        if request.method.lower() in ["get", "head", "options", "trace"]:
56            return True
57        # Always allow requests to manage licenses
58        if request.resolver_match._func_path == class_to_path(LicenseViewSet):
59            return True
60        # Flow executor is mounted as an API path but explicitly allowed
61        if request.resolver_match._func_path == class_to_path(FlowExecutorView):
62            return True
63        # Always allow making changes to users, even in case the license has ben exceeded
64        if request.resolver_match._func_path == class_to_path(UserViewSet):
65            return True
66        # Only apply these restrictions to the API
67        if "authentik_api" not in request.resolver_match.app_names:
68            return True
69        return False
class EnterpriseMiddleware:
18class EnterpriseMiddleware:
19    """Enterprise middleware"""
20
21    get_response: Callable[[HttpRequest], HttpResponse]
22    logger: BoundLogger
23
24    def __init__(self, get_response: Callable[[HttpRequest], HttpResponse]):
25        self.get_response = get_response
26        self.logger = get_logger().bind()
27
28    def __call__(self, request: HttpRequest) -> HttpResponse:
29        resolver_match = resolve(request.path_info)
30        request.resolver_match = resolver_match
31        if not self.is_request_allowed(request):
32            self.logger.warning("Refusing request due to expired/invalid license")
33            return JsonResponse(
34                {
35                    "detail": "Request denied due to expired/invalid license.",
36                    "code": "denied_license",
37                },
38                status=400,
39            )
40        return self.get_response(request)
41
42    def is_request_allowed(self, request: HttpRequest) -> bool:
43        """Check if a specific request is allowed"""
44        if self.is_request_always_allowed(request):
45            return True
46        cached_status = LicenseKey.cached_summary()
47        if not cached_status:
48            return True
49        if cached_status.status == LicenseUsageStatus.READ_ONLY:
50            return False
51        return True
52
53    def is_request_always_allowed(self, request: HttpRequest):
54        """Check if a request is always allowed"""
55        # Always allow "safe" methods
56        if request.method.lower() in ["get", "head", "options", "trace"]:
57            return True
58        # Always allow requests to manage licenses
59        if request.resolver_match._func_path == class_to_path(LicenseViewSet):
60            return True
61        # Flow executor is mounted as an API path but explicitly allowed
62        if request.resolver_match._func_path == class_to_path(FlowExecutorView):
63            return True
64        # Always allow making changes to users, even in case the license has ben exceeded
65        if request.resolver_match._func_path == class_to_path(UserViewSet):
66            return True
67        # Only apply these restrictions to the API
68        if "authentik_api" not in request.resolver_match.app_names:
69            return True
70        return False

Enterprise middleware

EnterpriseMiddleware( get_response: Callable[[django.http.request.HttpRequest], django.http.response.HttpResponse])
24    def __init__(self, get_response: Callable[[HttpRequest], HttpResponse]):
25        self.get_response = get_response
26        self.logger = get_logger().bind()
get_response: Callable[[django.http.request.HttpRequest], django.http.response.HttpResponse]
logger: structlog.stdlib.BoundLogger
def is_request_allowed(self, request: django.http.request.HttpRequest) -> bool:
42    def is_request_allowed(self, request: HttpRequest) -> bool:
43        """Check if a specific request is allowed"""
44        if self.is_request_always_allowed(request):
45            return True
46        cached_status = LicenseKey.cached_summary()
47        if not cached_status:
48            return True
49        if cached_status.status == LicenseUsageStatus.READ_ONLY:
50            return False
51        return True

Check if a specific request is allowed

def is_request_always_allowed(self, request: django.http.request.HttpRequest):
53    def is_request_always_allowed(self, request: HttpRequest):
54        """Check if a request is always allowed"""
55        # Always allow "safe" methods
56        if request.method.lower() in ["get", "head", "options", "trace"]:
57            return True
58        # Always allow requests to manage licenses
59        if request.resolver_match._func_path == class_to_path(LicenseViewSet):
60            return True
61        # Flow executor is mounted as an API path but explicitly allowed
62        if request.resolver_match._func_path == class_to_path(FlowExecutorView):
63            return True
64        # Always allow making changes to users, even in case the license has ben exceeded
65        if request.resolver_match._func_path == class_to_path(UserViewSet):
66            return True
67        # Only apply these restrictions to the API
68        if "authentik_api" not in request.resolver_match.app_names:
69            return True
70        return False

Check if a request is always allowed