authentik.rbac.middleware
InitialPermissions middleware
1"""InitialPermissions middleware""" 2 3from collections.abc import Callable 4from contextvars import ContextVar 5from functools import partial 6 7from django.db.models import Model 8from django.db.models.signals import post_save 9from django.http import HttpRequest, HttpResponse 10 11from authentik.core.models import User 12from authentik.rbac.permissions import assign_initial_permissions 13 14_CTX_REQUEST = ContextVar[HttpRequest | None]("authentik_initial_permissions_request", default=None) 15 16 17class InitialPermissionsMiddleware: 18 """Register a handler for duration of request-response that assigns InitialPermissions""" 19 20 get_response: Callable[[HttpRequest], HttpResponse] 21 22 def __init__(self, get_response: Callable[[HttpRequest], HttpResponse]): 23 self.get_response = get_response 24 25 def get_uid(self, request_id: str) -> str: 26 return f"InitialPermissionMiddleware-{request_id}" 27 28 def connect(self, request: HttpRequest): 29 if not hasattr(request, "request_id"): 30 return 31 post_save.connect( 32 partial(self.post_save_handler, request=request), 33 dispatch_uid=self.get_uid(request.request_id), 34 weak=False, 35 ) 36 37 def disconnect(self, request: HttpRequest): 38 if not hasattr(request, "request_id"): 39 return 40 post_save.disconnect(dispatch_uid=self.get_uid(request.request_id)) 41 42 def __call__(self, request: HttpRequest) -> HttpResponse: 43 _CTX_REQUEST.set(request) 44 self.connect(request) 45 46 response = self.get_response(request) 47 48 self.disconnect(request) 49 _CTX_REQUEST.set(None) 50 return response 51 52 def process_exception(self, request: HttpRequest, exception: Exception): 53 self.disconnect(request) 54 55 def post_save_handler( 56 self, 57 request: HttpRequest, 58 instance: Model, 59 created: bool, 60 **_, 61 ): 62 if not created: 63 return 64 current_request = _CTX_REQUEST.get() 65 if current_request is None or request.request_id != current_request.request_id: 66 return 67 user: User = request.user 68 if not user or user.is_anonymous: 69 return 70 assign_initial_permissions(user, instance)
class
InitialPermissionsMiddleware:
18class InitialPermissionsMiddleware: 19 """Register a handler for duration of request-response that assigns InitialPermissions""" 20 21 get_response: Callable[[HttpRequest], HttpResponse] 22 23 def __init__(self, get_response: Callable[[HttpRequest], HttpResponse]): 24 self.get_response = get_response 25 26 def get_uid(self, request_id: str) -> str: 27 return f"InitialPermissionMiddleware-{request_id}" 28 29 def connect(self, request: HttpRequest): 30 if not hasattr(request, "request_id"): 31 return 32 post_save.connect( 33 partial(self.post_save_handler, request=request), 34 dispatch_uid=self.get_uid(request.request_id), 35 weak=False, 36 ) 37 38 def disconnect(self, request: HttpRequest): 39 if not hasattr(request, "request_id"): 40 return 41 post_save.disconnect(dispatch_uid=self.get_uid(request.request_id)) 42 43 def __call__(self, request: HttpRequest) -> HttpResponse: 44 _CTX_REQUEST.set(request) 45 self.connect(request) 46 47 response = self.get_response(request) 48 49 self.disconnect(request) 50 _CTX_REQUEST.set(None) 51 return response 52 53 def process_exception(self, request: HttpRequest, exception: Exception): 54 self.disconnect(request) 55 56 def post_save_handler( 57 self, 58 request: HttpRequest, 59 instance: Model, 60 created: bool, 61 **_, 62 ): 63 if not created: 64 return 65 current_request = _CTX_REQUEST.get() 66 if current_request is None or request.request_id != current_request.request_id: 67 return 68 user: User = request.user 69 if not user or user.is_anonymous: 70 return 71 assign_initial_permissions(user, instance)
Register a handler for duration of request-response that assigns InitialPermissions
InitialPermissionsMiddleware( get_response: Callable[[django.http.request.HttpRequest], django.http.response.HttpResponse])
def
post_save_handler( self, request: django.http.request.HttpRequest, instance: django.db.models.base.Model, created: bool, **_):
56 def post_save_handler( 57 self, 58 request: HttpRequest, 59 instance: Model, 60 created: bool, 61 **_, 62 ): 63 if not created: 64 return 65 current_request = _CTX_REQUEST.get() 66 if current_request is None or request.request_id != current_request.request_id: 67 return 68 user: User = request.user 69 if not user or user.is_anonymous: 70 return 71 assign_initial_permissions(user, instance)