authentik.root.asgi_middleware

ASGI middleware

 1"""ASGI middleware"""
 2
 3from channels.auth import UserLazyObject
 4from channels.db import database_sync_to_async
 5from channels.middleware import BaseMiddleware
 6from channels.sessions import CookieMiddleware
 7from channels.sessions import InstanceSessionWrapper as UpstreamInstanceSessionWrapper
 8from channels.sessions import SessionMiddleware as UpstreamSessionMiddleware
 9from django.contrib.auth.models import AnonymousUser
10
11from authentik.root.middleware import SessionMiddleware as HTTPSessionMiddleware
12
13
14class InstanceSessionWrapper(UpstreamInstanceSessionWrapper):
15    """InstanceSessionWrapper which calls the django middleware to decode
16    the session key"""
17
18    async def resolve_session(self):
19        raw_session = self.scope["cookies"].get(self.cookie_name)
20        session_key = HTTPSessionMiddleware.decode_session_key(raw_session)
21        self.scope["session"]._wrapped = await database_sync_to_async(self.session_store)(
22            session_key
23        )
24
25
26class SessionMiddleware(UpstreamSessionMiddleware):
27    """ASGI SessionMiddleware which uses the modified InstanceSessionWrapper
28    wrapper to decode the session key"""
29
30    async def __call__(self, scope, receive, send):
31        """
32        Instantiate a session wrapper for this scope, resolve the session and
33        call the inner application.
34        """
35        wrapper = InstanceSessionWrapper(scope, send)
36
37        await wrapper.resolve_session()
38
39        return await self.inner(wrapper.scope, receive, wrapper.send)
40
41
42@database_sync_to_async
43def get_user(scope):
44    """
45    Return the user model instance associated with the given scope.
46    If no user is retrieved, return an instance of `AnonymousUser`.
47    """
48    if "session" not in scope:
49        raise ValueError(
50            "Cannot find session in scope. You should wrap your consumer in SessionMiddleware."
51        )
52    user = None
53    if (authenticated_session := scope["session"].get("authenticatedsession", None)) is not None:
54        user = authenticated_session.user
55    return user or AnonymousUser()
56
57
58class AuthMiddleware(BaseMiddleware):
59    def populate_scope(self, scope):
60        # Make sure we have a session
61        if "session" not in scope:
62            raise ValueError(
63                "AuthMiddleware cannot find session in scope. SessionMiddleware must be above it."
64            )
65        # Add it to the scope if it's not there already
66        if "user" not in scope:
67            scope["user"] = UserLazyObject()
68
69    async def resolve_scope(self, scope):
70        scope["user"]._wrapped = await get_user(scope)
71
72    async def __call__(self, scope, receive, send):
73        scope = dict(scope)
74        # Scope injection/mutation per this middleware's needs.
75        self.populate_scope(scope)
76        # Grab the finalized/resolved scope
77        await self.resolve_scope(scope)
78
79        return await super().__call__(scope, receive, send)
80
81
82# Handy shortcut for applying all three layers at once
83def AuthMiddlewareStack(inner):
84    return CookieMiddleware(SessionMiddleware(AuthMiddleware(inner)))
class InstanceSessionWrapper(channels.sessions.InstanceSessionWrapper):
15class InstanceSessionWrapper(UpstreamInstanceSessionWrapper):
16    """InstanceSessionWrapper which calls the django middleware to decode
17    the session key"""
18
19    async def resolve_session(self):
20        raw_session = self.scope["cookies"].get(self.cookie_name)
21        session_key = HTTPSessionMiddleware.decode_session_key(raw_session)
22        self.scope["session"]._wrapped = await database_sync_to_async(self.session_store)(
23            session_key
24        )

InstanceSessionWrapper which calls the django middleware to decode the session key

async def resolve_session(self):
19    async def resolve_session(self):
20        raw_session = self.scope["cookies"].get(self.cookie_name)
21        session_key = HTTPSessionMiddleware.decode_session_key(raw_session)
22        self.scope["session"]._wrapped = await database_sync_to_async(self.session_store)(
23            session_key
24        )
class SessionMiddleware(channels.sessions.SessionMiddleware):
27class SessionMiddleware(UpstreamSessionMiddleware):
28    """ASGI SessionMiddleware which uses the modified InstanceSessionWrapper
29    wrapper to decode the session key"""
30
31    async def __call__(self, scope, receive, send):
32        """
33        Instantiate a session wrapper for this scope, resolve the session and
34        call the inner application.
35        """
36        wrapper = InstanceSessionWrapper(scope, send)
37
38        await wrapper.resolve_session()
39
40        return await self.inner(wrapper.scope, receive, wrapper.send)

ASGI SessionMiddleware which uses the modified InstanceSessionWrapper wrapper to decode the session key

@database_sync_to_async
def get_user(scope):
43@database_sync_to_async
44def get_user(scope):
45    """
46    Return the user model instance associated with the given scope.
47    If no user is retrieved, return an instance of `AnonymousUser`.
48    """
49    if "session" not in scope:
50        raise ValueError(
51            "Cannot find session in scope. You should wrap your consumer in SessionMiddleware."
52        )
53    user = None
54    if (authenticated_session := scope["session"].get("authenticatedsession", None)) is not None:
55        user = authenticated_session.user
56    return user or AnonymousUser()

Return the user model instance associated with the given scope. If no user is retrieved, return an instance of AnonymousUser.

class AuthMiddleware(channels.middleware.BaseMiddleware):
59class AuthMiddleware(BaseMiddleware):
60    def populate_scope(self, scope):
61        # Make sure we have a session
62        if "session" not in scope:
63            raise ValueError(
64                "AuthMiddleware cannot find session in scope. SessionMiddleware must be above it."
65            )
66        # Add it to the scope if it's not there already
67        if "user" not in scope:
68            scope["user"] = UserLazyObject()
69
70    async def resolve_scope(self, scope):
71        scope["user"]._wrapped = await get_user(scope)
72
73    async def __call__(self, scope, receive, send):
74        scope = dict(scope)
75        # Scope injection/mutation per this middleware's needs.
76        self.populate_scope(scope)
77        # Grab the finalized/resolved scope
78        await self.resolve_scope(scope)
79
80        return await super().__call__(scope, receive, send)

Base class for implementing ASGI middleware.

Note that subclasses of this are not self-safe; don't store state on the instance, as it serves multiple application instances. Instead, use scope.

def populate_scope(self, scope):
60    def populate_scope(self, scope):
61        # Make sure we have a session
62        if "session" not in scope:
63            raise ValueError(
64                "AuthMiddleware cannot find session in scope. SessionMiddleware must be above it."
65            )
66        # Add it to the scope if it's not there already
67        if "user" not in scope:
68            scope["user"] = UserLazyObject()
async def resolve_scope(self, scope):
70    async def resolve_scope(self, scope):
71        scope["user"]._wrapped = await get_user(scope)
def AuthMiddlewareStack(inner):
84def AuthMiddlewareStack(inner):
85    return CookieMiddleware(SessionMiddleware(AuthMiddleware(inner)))