authentik.core.middleware

authentik admin Middleware to impersonate users

  1"""authentik admin Middleware to impersonate users"""
  2
  3from collections.abc import Callable
  4from contextvars import ContextVar
  5from functools import partial
  6from uuid import uuid4
  7
  8from django.contrib.auth import logout
  9from django.contrib.auth.models import AnonymousUser
 10from django.core.exceptions import ImproperlyConfigured
 11from django.http import HttpRequest, HttpResponse, HttpResponseBadRequest
 12from django.utils.deprecation import MiddlewareMixin
 13from django.utils.functional import SimpleLazyObject
 14from django.utils.translation import override
 15from sentry_sdk.api import set_tag
 16from structlog.contextvars import STRUCTLOG_KEY_PREFIX
 17
 18SESSION_KEY_IMPERSONATE_USER = "authentik/impersonate/user"
 19SESSION_KEY_IMPERSONATE_ORIGINAL_USER = "authentik/impersonate/original_user"
 20RESPONSE_HEADER_ID = "X-authentik-id"
 21KEY_AUTH_VIA = "auth_via"
 22KEY_USER = "user"
 23
 24CTX_REQUEST_ID = ContextVar[str | None](STRUCTLOG_KEY_PREFIX + "request_id", default=None)
 25CTX_HOST = ContextVar[str | None](STRUCTLOG_KEY_PREFIX + "host", default=None)
 26CTX_AUTH_VIA = ContextVar[str | None](STRUCTLOG_KEY_PREFIX + KEY_AUTH_VIA, default=None)
 27
 28
 29def get_user(request):
 30    if not hasattr(request, "_cached_user"):
 31        user = None
 32        if (authenticated_session := request.session.get("authenticatedsession", None)) is not None:
 33            user = authenticated_session.user
 34        request._cached_user = user or AnonymousUser()
 35    return request._cached_user
 36
 37
 38async def aget_user(request):
 39    if not hasattr(request, "_cached_user"):
 40        user = None
 41        if (
 42            authenticated_session := await request.session.aget("authenticatedsession", None)
 43        ) is not None:
 44            user = authenticated_session.user
 45        request._cached_user = user or AnonymousUser()
 46    return request._cached_user
 47
 48
 49class AuthenticationMiddleware(MiddlewareMixin):
 50    def process_request(self, request: HttpRequest) -> HttpResponseBadRequest | None:
 51        if not hasattr(request, "session"):
 52            raise ImproperlyConfigured(
 53                "The Django authentication middleware requires session "
 54                "middleware to be installed. Edit your MIDDLEWARE setting to "
 55                "insert "
 56                "'authentik.root.middleware.SessionMiddleware' before "
 57                "'authentik.core.middleware.AuthenticationMiddleware'."
 58            )
 59        request.user = SimpleLazyObject(lambda: get_user(request))
 60        request.auser = partial(aget_user, request)
 61
 62        user = request.user
 63        if user and user.is_authenticated and not user.is_active:
 64            logout(request)
 65            return HttpResponseBadRequest()
 66        return None
 67
 68
 69class ImpersonateMiddleware:
 70    """Middleware to impersonate users"""
 71
 72    get_response: Callable[[HttpRequest], HttpResponse]
 73
 74    def __init__(self, get_response: Callable[[HttpRequest], HttpResponse]):
 75        self.get_response = get_response
 76
 77    def __call__(self, request: HttpRequest) -> HttpResponse:
 78        # No permission checks are done here, they need to be checked before
 79        # SESSION_KEY_IMPERSONATE_USER is set.
 80        locale_to_set = None
 81        if request.user.is_authenticated:
 82            locale = request.user.locale(request)
 83            if locale != "":
 84                locale_to_set = locale
 85
 86        if SESSION_KEY_IMPERSONATE_USER in request.session:
 87            request.user = request.session[SESSION_KEY_IMPERSONATE_USER]
 88            # Ensure that the user is active, otherwise nothing will work
 89            request.user.is_active = True
 90
 91        if locale_to_set:
 92            with override(locale_to_set):
 93                return self.get_response(request)
 94        return self.get_response(request)
 95
 96
 97class RequestIDMiddleware:
 98    """Add a unique ID to every request"""
 99
100    get_response: Callable[[HttpRequest], HttpResponse]
101
102    def __init__(self, get_response: Callable[[HttpRequest], HttpResponse]):
103        self.get_response = get_response
104
105    def __call__(self, request: HttpRequest) -> HttpResponse:
106        if not hasattr(request, "request_id"):
107            request_id = uuid4().hex
108            request.request_id = request_id
109            CTX_REQUEST_ID.set(request_id)
110            CTX_HOST.set(request.get_host())
111            set_tag("authentik.request_id", request_id)
112        if hasattr(request, "user") and getattr(request.user, "is_authenticated", False):
113            CTX_AUTH_VIA.set("session")
114        else:
115            CTX_AUTH_VIA.set("unauthenticated")
116
117        response = self.get_response(request)
118
119        response[RESPONSE_HEADER_ID] = request.request_id
120        response.ak_context = {}
121        response.ak_context["request_id"] = CTX_REQUEST_ID.get()
122        response.ak_context["host"] = CTX_HOST.get()
123        response.ak_context[KEY_AUTH_VIA] = CTX_AUTH_VIA.get()
124        response.ak_context[KEY_USER] = request.user.username
125        return response
SESSION_KEY_IMPERSONATE_USER = 'authentik/impersonate/user'
SESSION_KEY_IMPERSONATE_ORIGINAL_USER = 'authentik/impersonate/original_user'
RESPONSE_HEADER_ID = 'X-authentik-id'
KEY_AUTH_VIA = 'auth_via'
KEY_USER = 'user'
CTX_REQUEST_ID = <ContextVar name='structlog_request_id' default=None>
CTX_HOST = <ContextVar name='structlog_host' default=None>
CTX_AUTH_VIA = <ContextVar name='structlog_auth_via' default=None>
def get_user(request):
30def get_user(request):
31    if not hasattr(request, "_cached_user"):
32        user = None
33        if (authenticated_session := request.session.get("authenticatedsession", None)) is not None:
34            user = authenticated_session.user
35        request._cached_user = user or AnonymousUser()
36    return request._cached_user
async def aget_user(request):
39async def aget_user(request):
40    if not hasattr(request, "_cached_user"):
41        user = None
42        if (
43            authenticated_session := await request.session.aget("authenticatedsession", None)
44        ) is not None:
45            user = authenticated_session.user
46        request._cached_user = user or AnonymousUser()
47    return request._cached_user
class AuthenticationMiddleware(django.utils.deprecation.MiddlewareMixin):
50class AuthenticationMiddleware(MiddlewareMixin):
51    def process_request(self, request: HttpRequest) -> HttpResponseBadRequest | None:
52        if not hasattr(request, "session"):
53            raise ImproperlyConfigured(
54                "The Django authentication middleware requires session "
55                "middleware to be installed. Edit your MIDDLEWARE setting to "
56                "insert "
57                "'authentik.root.middleware.SessionMiddleware' before "
58                "'authentik.core.middleware.AuthenticationMiddleware'."
59            )
60        request.user = SimpleLazyObject(lambda: get_user(request))
61        request.auser = partial(aget_user, request)
62
63        user = request.user
64        if user and user.is_authenticated and not user.is_active:
65            logout(request)
66            return HttpResponseBadRequest()
67        return None
def process_request( self, request: django.http.request.HttpRequest) -> django.http.response.HttpResponseBadRequest | None:
51    def process_request(self, request: HttpRequest) -> HttpResponseBadRequest | None:
52        if not hasattr(request, "session"):
53            raise ImproperlyConfigured(
54                "The Django authentication middleware requires session "
55                "middleware to be installed. Edit your MIDDLEWARE setting to "
56                "insert "
57                "'authentik.root.middleware.SessionMiddleware' before "
58                "'authentik.core.middleware.AuthenticationMiddleware'."
59            )
60        request.user = SimpleLazyObject(lambda: get_user(request))
61        request.auser = partial(aget_user, request)
62
63        user = request.user
64        if user and user.is_authenticated and not user.is_active:
65            logout(request)
66            return HttpResponseBadRequest()
67        return None
class ImpersonateMiddleware:
70class ImpersonateMiddleware:
71    """Middleware to impersonate users"""
72
73    get_response: Callable[[HttpRequest], HttpResponse]
74
75    def __init__(self, get_response: Callable[[HttpRequest], HttpResponse]):
76        self.get_response = get_response
77
78    def __call__(self, request: HttpRequest) -> HttpResponse:
79        # No permission checks are done here, they need to be checked before
80        # SESSION_KEY_IMPERSONATE_USER is set.
81        locale_to_set = None
82        if request.user.is_authenticated:
83            locale = request.user.locale(request)
84            if locale != "":
85                locale_to_set = locale
86
87        if SESSION_KEY_IMPERSONATE_USER in request.session:
88            request.user = request.session[SESSION_KEY_IMPERSONATE_USER]
89            # Ensure that the user is active, otherwise nothing will work
90            request.user.is_active = True
91
92        if locale_to_set:
93            with override(locale_to_set):
94                return self.get_response(request)
95        return self.get_response(request)

Middleware to impersonate users

ImpersonateMiddleware( get_response: Callable[[django.http.request.HttpRequest], django.http.response.HttpResponse])
75    def __init__(self, get_response: Callable[[HttpRequest], HttpResponse]):
76        self.get_response = get_response
get_response: Callable[[django.http.request.HttpRequest], django.http.response.HttpResponse]
class RequestIDMiddleware:
 98class RequestIDMiddleware:
 99    """Add a unique ID to every request"""
100
101    get_response: Callable[[HttpRequest], HttpResponse]
102
103    def __init__(self, get_response: Callable[[HttpRequest], HttpResponse]):
104        self.get_response = get_response
105
106    def __call__(self, request: HttpRequest) -> HttpResponse:
107        if not hasattr(request, "request_id"):
108            request_id = uuid4().hex
109            request.request_id = request_id
110            CTX_REQUEST_ID.set(request_id)
111            CTX_HOST.set(request.get_host())
112            set_tag("authentik.request_id", request_id)
113        if hasattr(request, "user") and getattr(request.user, "is_authenticated", False):
114            CTX_AUTH_VIA.set("session")
115        else:
116            CTX_AUTH_VIA.set("unauthenticated")
117
118        response = self.get_response(request)
119
120        response[RESPONSE_HEADER_ID] = request.request_id
121        response.ak_context = {}
122        response.ak_context["request_id"] = CTX_REQUEST_ID.get()
123        response.ak_context["host"] = CTX_HOST.get()
124        response.ak_context[KEY_AUTH_VIA] = CTX_AUTH_VIA.get()
125        response.ak_context[KEY_USER] = request.user.username
126        return response

Add a unique ID to every request

RequestIDMiddleware( get_response: Callable[[django.http.request.HttpRequest], django.http.response.HttpResponse])
103    def __init__(self, get_response: Callable[[HttpRequest], HttpResponse]):
104        self.get_response = get_response
get_response: Callable[[django.http.request.HttpRequest], django.http.response.HttpResponse]