authentik.core.middleware
authentik admin Middleware to impersonate users
1"""authentik admin Middleware to impersonate users""" 2 3from collections.abc import Callable 4from contextvars import ContextVar 5from functools import partial 6from uuid import uuid4 7 8from django.contrib.auth import logout 9from django.contrib.auth.models import AnonymousUser 10from django.core.exceptions import ImproperlyConfigured 11from django.http import HttpRequest, HttpResponse, HttpResponseBadRequest 12from django.utils.deprecation import MiddlewareMixin 13from django.utils.functional import SimpleLazyObject 14from django.utils.translation import override 15from sentry_sdk.api import set_tag 16from structlog.contextvars import STRUCTLOG_KEY_PREFIX 17 18SESSION_KEY_IMPERSONATE_USER = "authentik/impersonate/user" 19SESSION_KEY_IMPERSONATE_ORIGINAL_USER = "authentik/impersonate/original_user" 20RESPONSE_HEADER_ID = "X-authentik-id" 21KEY_AUTH_VIA = "auth_via" 22KEY_USER = "user" 23 24CTX_REQUEST_ID = ContextVar[str | None](STRUCTLOG_KEY_PREFIX + "request_id", default=None) 25CTX_HOST = ContextVar[str | None](STRUCTLOG_KEY_PREFIX + "host", default=None) 26CTX_AUTH_VIA = ContextVar[str | None](STRUCTLOG_KEY_PREFIX + KEY_AUTH_VIA, default=None) 27 28 29def get_user(request): 30 if not hasattr(request, "_cached_user"): 31 user = None 32 if (authenticated_session := request.session.get("authenticatedsession", None)) is not None: 33 user = authenticated_session.user 34 request._cached_user = user or AnonymousUser() 35 return request._cached_user 36 37 38async def aget_user(request): 39 if not hasattr(request, "_cached_user"): 40 user = None 41 if ( 42 authenticated_session := await request.session.aget("authenticatedsession", None) 43 ) is not None: 44 user = authenticated_session.user 45 request._cached_user = user or AnonymousUser() 46 return request._cached_user 47 48 49class AuthenticationMiddleware(MiddlewareMixin): 50 def process_request(self, request: HttpRequest) -> HttpResponseBadRequest | None: 51 if not hasattr(request, "session"): 52 raise ImproperlyConfigured( 53 "The Django authentication middleware requires session " 54 "middleware to be installed. Edit your MIDDLEWARE setting to " 55 "insert " 56 "'authentik.root.middleware.SessionMiddleware' before " 57 "'authentik.core.middleware.AuthenticationMiddleware'." 58 ) 59 request.user = SimpleLazyObject(lambda: get_user(request)) 60 request.auser = partial(aget_user, request) 61 62 user = request.user 63 if user and user.is_authenticated and not user.is_active: 64 logout(request) 65 return HttpResponseBadRequest() 66 return None 67 68 69class ImpersonateMiddleware: 70 """Middleware to impersonate users""" 71 72 get_response: Callable[[HttpRequest], HttpResponse] 73 74 def __init__(self, get_response: Callable[[HttpRequest], HttpResponse]): 75 self.get_response = get_response 76 77 def __call__(self, request: HttpRequest) -> HttpResponse: 78 # No permission checks are done here, they need to be checked before 79 # SESSION_KEY_IMPERSONATE_USER is set. 80 locale_to_set = None 81 if request.user.is_authenticated: 82 locale = request.user.locale(request) 83 if locale != "": 84 locale_to_set = locale 85 86 if SESSION_KEY_IMPERSONATE_USER in request.session: 87 request.user = request.session[SESSION_KEY_IMPERSONATE_USER] 88 # Ensure that the user is active, otherwise nothing will work 89 request.user.is_active = True 90 91 if locale_to_set: 92 with override(locale_to_set): 93 return self.get_response(request) 94 return self.get_response(request) 95 96 97class RequestIDMiddleware: 98 """Add a unique ID to every request""" 99 100 get_response: Callable[[HttpRequest], HttpResponse] 101 102 def __init__(self, get_response: Callable[[HttpRequest], HttpResponse]): 103 self.get_response = get_response 104 105 def __call__(self, request: HttpRequest) -> HttpResponse: 106 if not hasattr(request, "request_id"): 107 request_id = uuid4().hex 108 request.request_id = request_id 109 CTX_REQUEST_ID.set(request_id) 110 CTX_HOST.set(request.get_host()) 111 set_tag("authentik.request_id", request_id) 112 if hasattr(request, "user") and getattr(request.user, "is_authenticated", False): 113 CTX_AUTH_VIA.set("session") 114 else: 115 CTX_AUTH_VIA.set("unauthenticated") 116 117 response = self.get_response(request) 118 119 response[RESPONSE_HEADER_ID] = request.request_id 120 response.ak_context = {} 121 response.ak_context["request_id"] = CTX_REQUEST_ID.get() 122 response.ak_context["host"] = CTX_HOST.get() 123 response.ak_context[KEY_AUTH_VIA] = CTX_AUTH_VIA.get() 124 response.ak_context[KEY_USER] = request.user.username 125 return response
SESSION_KEY_IMPERSONATE_USER =
'authentik/impersonate/user'
SESSION_KEY_IMPERSONATE_ORIGINAL_USER =
'authentik/impersonate/original_user'
RESPONSE_HEADER_ID =
'X-authentik-id'
KEY_AUTH_VIA =
'auth_via'
KEY_USER =
'user'
CTX_REQUEST_ID =
<ContextVar name='structlog_request_id' default=None>
CTX_HOST =
<ContextVar name='structlog_host' default=None>
CTX_AUTH_VIA =
<ContextVar name='structlog_auth_via' default=None>
def
get_user(request):
async def
aget_user(request):
39async def aget_user(request): 40 if not hasattr(request, "_cached_user"): 41 user = None 42 if ( 43 authenticated_session := await request.session.aget("authenticatedsession", None) 44 ) is not None: 45 user = authenticated_session.user 46 request._cached_user = user or AnonymousUser() 47 return request._cached_user
class
AuthenticationMiddleware(django.utils.deprecation.MiddlewareMixin):
50class AuthenticationMiddleware(MiddlewareMixin): 51 def process_request(self, request: HttpRequest) -> HttpResponseBadRequest | None: 52 if not hasattr(request, "session"): 53 raise ImproperlyConfigured( 54 "The Django authentication middleware requires session " 55 "middleware to be installed. Edit your MIDDLEWARE setting to " 56 "insert " 57 "'authentik.root.middleware.SessionMiddleware' before " 58 "'authentik.core.middleware.AuthenticationMiddleware'." 59 ) 60 request.user = SimpleLazyObject(lambda: get_user(request)) 61 request.auser = partial(aget_user, request) 62 63 user = request.user 64 if user and user.is_authenticated and not user.is_active: 65 logout(request) 66 return HttpResponseBadRequest() 67 return None
def
process_request( self, request: django.http.request.HttpRequest) -> django.http.response.HttpResponseBadRequest | None:
51 def process_request(self, request: HttpRequest) -> HttpResponseBadRequest | None: 52 if not hasattr(request, "session"): 53 raise ImproperlyConfigured( 54 "The Django authentication middleware requires session " 55 "middleware to be installed. Edit your MIDDLEWARE setting to " 56 "insert " 57 "'authentik.root.middleware.SessionMiddleware' before " 58 "'authentik.core.middleware.AuthenticationMiddleware'." 59 ) 60 request.user = SimpleLazyObject(lambda: get_user(request)) 61 request.auser = partial(aget_user, request) 62 63 user = request.user 64 if user and user.is_authenticated and not user.is_active: 65 logout(request) 66 return HttpResponseBadRequest() 67 return None
class
ImpersonateMiddleware:
70class ImpersonateMiddleware: 71 """Middleware to impersonate users""" 72 73 get_response: Callable[[HttpRequest], HttpResponse] 74 75 def __init__(self, get_response: Callable[[HttpRequest], HttpResponse]): 76 self.get_response = get_response 77 78 def __call__(self, request: HttpRequest) -> HttpResponse: 79 # No permission checks are done here, they need to be checked before 80 # SESSION_KEY_IMPERSONATE_USER is set. 81 locale_to_set = None 82 if request.user.is_authenticated: 83 locale = request.user.locale(request) 84 if locale != "": 85 locale_to_set = locale 86 87 if SESSION_KEY_IMPERSONATE_USER in request.session: 88 request.user = request.session[SESSION_KEY_IMPERSONATE_USER] 89 # Ensure that the user is active, otherwise nothing will work 90 request.user.is_active = True 91 92 if locale_to_set: 93 with override(locale_to_set): 94 return self.get_response(request) 95 return self.get_response(request)
Middleware to impersonate users
class
RequestIDMiddleware:
98class RequestIDMiddleware: 99 """Add a unique ID to every request""" 100 101 get_response: Callable[[HttpRequest], HttpResponse] 102 103 def __init__(self, get_response: Callable[[HttpRequest], HttpResponse]): 104 self.get_response = get_response 105 106 def __call__(self, request: HttpRequest) -> HttpResponse: 107 if not hasattr(request, "request_id"): 108 request_id = uuid4().hex 109 request.request_id = request_id 110 CTX_REQUEST_ID.set(request_id) 111 CTX_HOST.set(request.get_host()) 112 set_tag("authentik.request_id", request_id) 113 if hasattr(request, "user") and getattr(request.user, "is_authenticated", False): 114 CTX_AUTH_VIA.set("session") 115 else: 116 CTX_AUTH_VIA.set("unauthenticated") 117 118 response = self.get_response(request) 119 120 response[RESPONSE_HEADER_ID] = request.request_id 121 response.ak_context = {} 122 response.ak_context["request_id"] = CTX_REQUEST_ID.get() 123 response.ak_context["host"] = CTX_HOST.get() 124 response.ak_context[KEY_AUTH_VIA] = CTX_AUTH_VIA.get() 125 response.ak_context[KEY_USER] = request.user.username 126 return response
Add a unique ID to every request