authentik.rbac.middleware

InitialPermissions middleware

 1"""InitialPermissions middleware"""
 2
 3from collections.abc import Callable
 4from contextvars import ContextVar
 5from functools import partial
 6
 7from django.db.models import Model
 8from django.db.models.signals import post_save
 9from django.http import HttpRequest, HttpResponse
10
11from authentik.core.models import User
12from authentik.rbac.permissions import assign_initial_permissions
13
14_CTX_REQUEST = ContextVar[HttpRequest | None]("authentik_initial_permissions_request", default=None)
15
16
17class InitialPermissionsMiddleware:
18    """Register a handler for duration of request-response that assigns InitialPermissions"""
19
20    get_response: Callable[[HttpRequest], HttpResponse]
21
22    def __init__(self, get_response: Callable[[HttpRequest], HttpResponse]):
23        self.get_response = get_response
24
25    def get_uid(self, request_id: str) -> str:
26        return f"InitialPermissionMiddleware-{request_id}"
27
28    def connect(self, request: HttpRequest):
29        if not hasattr(request, "request_id"):
30            return
31        post_save.connect(
32            partial(self.post_save_handler, request=request),
33            dispatch_uid=self.get_uid(request.request_id),
34            weak=False,
35        )
36
37    def disconnect(self, request: HttpRequest):
38        if not hasattr(request, "request_id"):
39            return
40        post_save.disconnect(dispatch_uid=self.get_uid(request.request_id))
41
42    def __call__(self, request: HttpRequest) -> HttpResponse:
43        _CTX_REQUEST.set(request)
44        self.connect(request)
45
46        response = self.get_response(request)
47
48        self.disconnect(request)
49        _CTX_REQUEST.set(None)
50        return response
51
52    def process_exception(self, request: HttpRequest, exception: Exception):
53        self.disconnect(request)
54
55    def post_save_handler(
56        self,
57        request: HttpRequest,
58        instance: Model,
59        created: bool,
60        **_,
61    ):
62        if not created:
63            return
64        current_request = _CTX_REQUEST.get()
65        if current_request is None or request.request_id != current_request.request_id:
66            return
67        user: User = request.user
68        if not user or user.is_anonymous:
69            return
70        assign_initial_permissions(user, instance)
class InitialPermissionsMiddleware:
18class InitialPermissionsMiddleware:
19    """Register a handler for duration of request-response that assigns InitialPermissions"""
20
21    get_response: Callable[[HttpRequest], HttpResponse]
22
23    def __init__(self, get_response: Callable[[HttpRequest], HttpResponse]):
24        self.get_response = get_response
25
26    def get_uid(self, request_id: str) -> str:
27        return f"InitialPermissionMiddleware-{request_id}"
28
29    def connect(self, request: HttpRequest):
30        if not hasattr(request, "request_id"):
31            return
32        post_save.connect(
33            partial(self.post_save_handler, request=request),
34            dispatch_uid=self.get_uid(request.request_id),
35            weak=False,
36        )
37
38    def disconnect(self, request: HttpRequest):
39        if not hasattr(request, "request_id"):
40            return
41        post_save.disconnect(dispatch_uid=self.get_uid(request.request_id))
42
43    def __call__(self, request: HttpRequest) -> HttpResponse:
44        _CTX_REQUEST.set(request)
45        self.connect(request)
46
47        response = self.get_response(request)
48
49        self.disconnect(request)
50        _CTX_REQUEST.set(None)
51        return response
52
53    def process_exception(self, request: HttpRequest, exception: Exception):
54        self.disconnect(request)
55
56    def post_save_handler(
57        self,
58        request: HttpRequest,
59        instance: Model,
60        created: bool,
61        **_,
62    ):
63        if not created:
64            return
65        current_request = _CTX_REQUEST.get()
66        if current_request is None or request.request_id != current_request.request_id:
67            return
68        user: User = request.user
69        if not user or user.is_anonymous:
70            return
71        assign_initial_permissions(user, instance)

Register a handler for duration of request-response that assigns InitialPermissions

InitialPermissionsMiddleware( get_response: Callable[[django.http.request.HttpRequest], django.http.response.HttpResponse])
23    def __init__(self, get_response: Callable[[HttpRequest], HttpResponse]):
24        self.get_response = get_response
get_response: Callable[[django.http.request.HttpRequest], django.http.response.HttpResponse]
def get_uid(self, request_id: str) -> str:
26    def get_uid(self, request_id: str) -> str:
27        return f"InitialPermissionMiddleware-{request_id}"
def connect(self, request: django.http.request.HttpRequest):
29    def connect(self, request: HttpRequest):
30        if not hasattr(request, "request_id"):
31            return
32        post_save.connect(
33            partial(self.post_save_handler, request=request),
34            dispatch_uid=self.get_uid(request.request_id),
35            weak=False,
36        )
def disconnect(self, request: django.http.request.HttpRequest):
38    def disconnect(self, request: HttpRequest):
39        if not hasattr(request, "request_id"):
40            return
41        post_save.disconnect(dispatch_uid=self.get_uid(request.request_id))
def process_exception(self, request: django.http.request.HttpRequest, exception: Exception):
53    def process_exception(self, request: HttpRequest, exception: Exception):
54        self.disconnect(request)
def post_save_handler( self, request: django.http.request.HttpRequest, instance: django.db.models.base.Model, created: bool, **_):
56    def post_save_handler(
57        self,
58        request: HttpRequest,
59        instance: Model,
60        created: bool,
61        **_,
62    ):
63        if not created:
64            return
65        current_request = _CTX_REQUEST.get()
66        if current_request is None or request.request_id != current_request.request_id:
67            return
68        user: User = request.user
69        if not user or user.is_anonymous:
70            return
71        assign_initial_permissions(user, instance)