authentik.root.asgi_middleware
ASGI middleware
1"""ASGI middleware""" 2 3from channels.auth import UserLazyObject 4from channels.db import database_sync_to_async 5from channels.middleware import BaseMiddleware 6from channels.sessions import CookieMiddleware 7from channels.sessions import InstanceSessionWrapper as UpstreamInstanceSessionWrapper 8from channels.sessions import SessionMiddleware as UpstreamSessionMiddleware 9from django.contrib.auth.models import AnonymousUser 10 11from authentik.root.middleware import SessionMiddleware as HTTPSessionMiddleware 12 13 14class InstanceSessionWrapper(UpstreamInstanceSessionWrapper): 15 """InstanceSessionWrapper which calls the django middleware to decode 16 the session key""" 17 18 async def resolve_session(self): 19 raw_session = self.scope["cookies"].get(self.cookie_name) 20 session_key = HTTPSessionMiddleware.decode_session_key(raw_session) 21 self.scope["session"]._wrapped = await database_sync_to_async(self.session_store)( 22 session_key 23 ) 24 25 26class SessionMiddleware(UpstreamSessionMiddleware): 27 """ASGI SessionMiddleware which uses the modified InstanceSessionWrapper 28 wrapper to decode the session key""" 29 30 async def __call__(self, scope, receive, send): 31 """ 32 Instantiate a session wrapper for this scope, resolve the session and 33 call the inner application. 34 """ 35 wrapper = InstanceSessionWrapper(scope, send) 36 37 await wrapper.resolve_session() 38 39 return await self.inner(wrapper.scope, receive, wrapper.send) 40 41 42@database_sync_to_async 43def get_user(scope): 44 """ 45 Return the user model instance associated with the given scope. 46 If no user is retrieved, return an instance of `AnonymousUser`. 47 """ 48 if "session" not in scope: 49 raise ValueError( 50 "Cannot find session in scope. You should wrap your consumer in SessionMiddleware." 51 ) 52 user = None 53 if (authenticated_session := scope["session"].get("authenticatedsession", None)) is not None: 54 user = authenticated_session.user 55 return user or AnonymousUser() 56 57 58class AuthMiddleware(BaseMiddleware): 59 def populate_scope(self, scope): 60 # Make sure we have a session 61 if "session" not in scope: 62 raise ValueError( 63 "AuthMiddleware cannot find session in scope. SessionMiddleware must be above it." 64 ) 65 # Add it to the scope if it's not there already 66 if "user" not in scope: 67 scope["user"] = UserLazyObject() 68 69 async def resolve_scope(self, scope): 70 scope["user"]._wrapped = await get_user(scope) 71 72 async def __call__(self, scope, receive, send): 73 scope = dict(scope) 74 # Scope injection/mutation per this middleware's needs. 75 self.populate_scope(scope) 76 # Grab the finalized/resolved scope 77 await self.resolve_scope(scope) 78 79 return await super().__call__(scope, receive, send) 80 81 82# Handy shortcut for applying all three layers at once 83def AuthMiddlewareStack(inner): 84 return CookieMiddleware(SessionMiddleware(AuthMiddleware(inner)))
class
InstanceSessionWrapper(channels.sessions.InstanceSessionWrapper):
15class InstanceSessionWrapper(UpstreamInstanceSessionWrapper): 16 """InstanceSessionWrapper which calls the django middleware to decode 17 the session key""" 18 19 async def resolve_session(self): 20 raw_session = self.scope["cookies"].get(self.cookie_name) 21 session_key = HTTPSessionMiddleware.decode_session_key(raw_session) 22 self.scope["session"]._wrapped = await database_sync_to_async(self.session_store)( 23 session_key 24 )
InstanceSessionWrapper which calls the django middleware to decode the session key
class
SessionMiddleware(channels.sessions.SessionMiddleware):
27class SessionMiddleware(UpstreamSessionMiddleware): 28 """ASGI SessionMiddleware which uses the modified InstanceSessionWrapper 29 wrapper to decode the session key""" 30 31 async def __call__(self, scope, receive, send): 32 """ 33 Instantiate a session wrapper for this scope, resolve the session and 34 call the inner application. 35 """ 36 wrapper = InstanceSessionWrapper(scope, send) 37 38 await wrapper.resolve_session() 39 40 return await self.inner(wrapper.scope, receive, wrapper.send)
ASGI SessionMiddleware which uses the modified InstanceSessionWrapper wrapper to decode the session key
@database_sync_to_async
def
get_user(scope):
43@database_sync_to_async 44def get_user(scope): 45 """ 46 Return the user model instance associated with the given scope. 47 If no user is retrieved, return an instance of `AnonymousUser`. 48 """ 49 if "session" not in scope: 50 raise ValueError( 51 "Cannot find session in scope. You should wrap your consumer in SessionMiddleware." 52 ) 53 user = None 54 if (authenticated_session := scope["session"].get("authenticatedsession", None)) is not None: 55 user = authenticated_session.user 56 return user or AnonymousUser()
Return the user model instance associated with the given scope.
If no user is retrieved, return an instance of AnonymousUser.
class
AuthMiddleware(channels.middleware.BaseMiddleware):
59class AuthMiddleware(BaseMiddleware): 60 def populate_scope(self, scope): 61 # Make sure we have a session 62 if "session" not in scope: 63 raise ValueError( 64 "AuthMiddleware cannot find session in scope. SessionMiddleware must be above it." 65 ) 66 # Add it to the scope if it's not there already 67 if "user" not in scope: 68 scope["user"] = UserLazyObject() 69 70 async def resolve_scope(self, scope): 71 scope["user"]._wrapped = await get_user(scope) 72 73 async def __call__(self, scope, receive, send): 74 scope = dict(scope) 75 # Scope injection/mutation per this middleware's needs. 76 self.populate_scope(scope) 77 # Grab the finalized/resolved scope 78 await self.resolve_scope(scope) 79 80 return await super().__call__(scope, receive, send)
Base class for implementing ASGI middleware.
Note that subclasses of this are not self-safe; don't store state on the instance, as it serves multiple application instances. Instead, use scope.
def
populate_scope(self, scope):
60 def populate_scope(self, scope): 61 # Make sure we have a session 62 if "session" not in scope: 63 raise ValueError( 64 "AuthMiddleware cannot find session in scope. SessionMiddleware must be above it." 65 ) 66 # Add it to the scope if it's not there already 67 if "user" not in scope: 68 scope["user"] = UserLazyObject()
def
AuthMiddlewareStack(inner):