authentik.stages.user_login.stage
Login stage logic
1"""Login stage logic""" 2 3from datetime import datetime, timedelta 4from hashlib import sha256 5 6from django.conf import settings 7from django.contrib import messages 8from django.contrib.auth import login 9from django.http import HttpRequest, HttpResponse 10from django.utils.translation import gettext as _ 11from jwt import PyJWTError, decode, encode 12from rest_framework.fields import BooleanField, CharField 13 14from authentik.core.models import AuthenticatedSession, Session, User 15from authentik.events.middleware import audit_ignore 16from authentik.flows.challenge import ChallengeResponse, WithUserInfoChallenge 17from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER 18from authentik.flows.stage import ChallengeStageView 19from authentik.lib.utils.time import timedelta_from_string 20from authentik.root.middleware import ClientIPMiddleware 21from authentik.stages.password import BACKEND_INBUILT 22from authentik.stages.password.stage import ( 23 PLAN_CONTEXT_AUTHENTICATION_BACKEND, 24 PLAN_CONTEXT_METHOD_ARGS, 25) 26from authentik.stages.user_login.middleware import ( 27 SESSION_KEY_BINDING_GEO, 28 SESSION_KEY_BINDING_NET, 29) 30from authentik.stages.user_login.models import UserLoginStage 31from authentik.tenants.utils import get_unique_identifier 32 33COOKIE_NAME_KNOWN_DEVICE = "authentik_device" 34 35PLAN_CONTEXT_METHOD_ARGS_KNOWN_DEVICE = "known_device" 36 37 38class UserLoginChallenge(WithUserInfoChallenge): 39 """Empty challenge""" 40 41 component = CharField(default="ak-stage-user-login") 42 43 44class UserLoginChallengeResponse(ChallengeResponse): 45 """User login challenge""" 46 47 component = CharField(default="ak-stage-user-login") 48 49 remember_me = BooleanField(required=True) 50 51 52class UserLoginStageView(ChallengeStageView): 53 """Finalise Authentication flow by logging the user in""" 54 55 response_class = UserLoginChallengeResponse 56 57 def get_challenge(self, *args, **kwargs) -> UserLoginChallenge: 58 return UserLoginChallenge(data={}) 59 60 def dispatch(self, request: HttpRequest) -> HttpResponse: 61 """Check for remember_me, and do login""" 62 stage: UserLoginStage = self.executor.current_stage 63 if timedelta_from_string(stage.remember_me_offset).total_seconds() > 0: 64 return super().dispatch(request) 65 return self.do_login(request) 66 67 def challenge_valid(self, response: UserLoginChallengeResponse) -> HttpResponse: 68 return self.do_login(self.request, response.validated_data["remember_me"]) 69 70 def set_session_duration(self, remember: bool) -> timedelta: 71 """Update the sessions' expiry""" 72 delta = timedelta_from_string(self.executor.current_stage.session_duration) 73 if remember: 74 offset = timedelta_from_string(self.executor.current_stage.remember_me_offset) 75 delta = delta + offset 76 if delta.total_seconds() == 0: 77 self.request.session.set_expiry(0) 78 else: 79 self.request.session.set_expiry(delta) 80 return delta 81 82 def set_session_ip(self): 83 """Set the sessions' last IP and session bindings""" 84 stage: UserLoginStage = self.executor.current_stage 85 86 self.request.session[self.request.session.model.Keys.LAST_IP] = ( 87 ClientIPMiddleware.get_client_ip(self.request) 88 ) 89 self.request.session[SESSION_KEY_BINDING_NET] = stage.network_binding 90 self.request.session[SESSION_KEY_BINDING_GEO] = stage.geoip_binding 91 92 # FIXME: identical function in authenticator_validate 93 @property 94 def cookie_jwt_key(self) -> str: 95 """Signing key for Known-device Cookie for this stage""" 96 return sha256( 97 f"{get_unique_identifier()}:{self.executor.current_stage.pk.hex}".encode("ascii") 98 ).hexdigest() 99 100 def set_known_device_cookie(self, user: User): 101 """Set a cookie, valid longer than the session, which denotes that this user 102 has logged in on this device before.""" 103 delta = timedelta_from_string(self.executor.current_stage.remember_device) 104 response = self.executor.stage_ok() 105 if delta.total_seconds() < 1: 106 return response 107 expiry = datetime.now() + delta 108 cookie_payload = { 109 "sub": user.uid, 110 "exp": expiry.timestamp(), 111 } 112 cookie = encode(cookie_payload, self.cookie_jwt_key) 113 response.set_cookie( 114 COOKIE_NAME_KNOWN_DEVICE, 115 cookie, 116 expires=expiry, 117 path=settings.SESSION_COOKIE_PATH, 118 domain=settings.SESSION_COOKIE_DOMAIN, 119 samesite=settings.SESSION_COOKIE_SAMESITE, 120 ) 121 return response 122 123 def is_known_device(self, user: User): 124 """Returns `True` if the login happened on a "known" device, by the same user.""" 125 client_ip = ClientIPMiddleware.get_client_ip(self.request) 126 if AuthenticatedSession.objects.filter(session__last_ip=client_ip, user=user).exists(): 127 return True 128 if COOKIE_NAME_KNOWN_DEVICE not in self.request.COOKIES: 129 return False 130 try: 131 payload = decode( 132 self.request.COOKIES[COOKIE_NAME_KNOWN_DEVICE], self.cookie_jwt_key, ["HS256"] 133 ) 134 if payload["sub"] == user.uid: 135 return True 136 return False 137 except (PyJWTError, ValueError, TypeError) as exc: 138 self.logger.info("eh", exc=exc) 139 return False 140 141 def do_login(self, request: HttpRequest, remember: bool | None = None) -> HttpResponse: 142 """Attach the currently pending user to the current session. 143 `remember` Argument should be `None` if not configured, otherwise set to `True`/`False` 144 representative of the user's choice.""" 145 if PLAN_CONTEXT_PENDING_USER not in self.executor.plan.context: 146 message = _("No Pending user to login.") 147 messages.error(request, message) 148 self.logger.warning(message) 149 return self.executor.stage_invalid() 150 backend = self.executor.plan.context.get( 151 PLAN_CONTEXT_AUTHENTICATION_BACKEND, BACKEND_INBUILT 152 ) 153 user: User = self.executor.plan.context[PLAN_CONTEXT_PENDING_USER] 154 if not user.is_active: 155 self.logger.warning("User is not active, login will not work.") 156 return self.executor.stage_invalid() 157 delta = self.set_session_duration(bool(remember)) 158 self.set_session_ip() 159 # Check if the login request is coming from a known device 160 self.executor.plan.context.setdefault(PLAN_CONTEXT_METHOD_ARGS, {}) 161 self.executor.plan.context[PLAN_CONTEXT_METHOD_ARGS].setdefault( 162 PLAN_CONTEXT_METHOD_ARGS_KNOWN_DEVICE, self.is_known_device(user) 163 ) 164 # the `user_logged_in` signal will update the user to write the `last_login` field 165 # which we don't want to log as we already have a dedicated login event 166 with audit_ignore(): 167 login( 168 self.request, 169 user, 170 backend=backend, 171 ) 172 self.logger.debug( 173 "Logged in", 174 backend=backend, 175 user=user.username, 176 flow_slug=self.executor.flow.slug, 177 session_duration=delta, 178 ) 179 if self.executor.current_stage.terminate_other_sessions: 180 Session.objects.filter( 181 authenticatedsession__user=user, 182 ).exclude(session_key=self.request.session.session_key).delete() 183 if remember is None: 184 return self.set_known_device_cookie(user) 185 return self.executor.stage_ok()
COOKIE_NAME_KNOWN_DEVICE =
'authentik_device'
PLAN_CONTEXT_METHOD_ARGS_KNOWN_DEVICE =
'known_device'
39class UserLoginChallenge(WithUserInfoChallenge): 40 """Empty challenge""" 41 42 component = CharField(default="ak-stage-user-login")
Empty challenge
45class UserLoginChallengeResponse(ChallengeResponse): 46 """User login challenge""" 47 48 component = CharField(default="ak-stage-user-login") 49 50 remember_me = BooleanField(required=True)
User login challenge
53class UserLoginStageView(ChallengeStageView): 54 """Finalise Authentication flow by logging the user in""" 55 56 response_class = UserLoginChallengeResponse 57 58 def get_challenge(self, *args, **kwargs) -> UserLoginChallenge: 59 return UserLoginChallenge(data={}) 60 61 def dispatch(self, request: HttpRequest) -> HttpResponse: 62 """Check for remember_me, and do login""" 63 stage: UserLoginStage = self.executor.current_stage 64 if timedelta_from_string(stage.remember_me_offset).total_seconds() > 0: 65 return super().dispatch(request) 66 return self.do_login(request) 67 68 def challenge_valid(self, response: UserLoginChallengeResponse) -> HttpResponse: 69 return self.do_login(self.request, response.validated_data["remember_me"]) 70 71 def set_session_duration(self, remember: bool) -> timedelta: 72 """Update the sessions' expiry""" 73 delta = timedelta_from_string(self.executor.current_stage.session_duration) 74 if remember: 75 offset = timedelta_from_string(self.executor.current_stage.remember_me_offset) 76 delta = delta + offset 77 if delta.total_seconds() == 0: 78 self.request.session.set_expiry(0) 79 else: 80 self.request.session.set_expiry(delta) 81 return delta 82 83 def set_session_ip(self): 84 """Set the sessions' last IP and session bindings""" 85 stage: UserLoginStage = self.executor.current_stage 86 87 self.request.session[self.request.session.model.Keys.LAST_IP] = ( 88 ClientIPMiddleware.get_client_ip(self.request) 89 ) 90 self.request.session[SESSION_KEY_BINDING_NET] = stage.network_binding 91 self.request.session[SESSION_KEY_BINDING_GEO] = stage.geoip_binding 92 93 # FIXME: identical function in authenticator_validate 94 @property 95 def cookie_jwt_key(self) -> str: 96 """Signing key for Known-device Cookie for this stage""" 97 return sha256( 98 f"{get_unique_identifier()}:{self.executor.current_stage.pk.hex}".encode("ascii") 99 ).hexdigest() 100 101 def set_known_device_cookie(self, user: User): 102 """Set a cookie, valid longer than the session, which denotes that this user 103 has logged in on this device before.""" 104 delta = timedelta_from_string(self.executor.current_stage.remember_device) 105 response = self.executor.stage_ok() 106 if delta.total_seconds() < 1: 107 return response 108 expiry = datetime.now() + delta 109 cookie_payload = { 110 "sub": user.uid, 111 "exp": expiry.timestamp(), 112 } 113 cookie = encode(cookie_payload, self.cookie_jwt_key) 114 response.set_cookie( 115 COOKIE_NAME_KNOWN_DEVICE, 116 cookie, 117 expires=expiry, 118 path=settings.SESSION_COOKIE_PATH, 119 domain=settings.SESSION_COOKIE_DOMAIN, 120 samesite=settings.SESSION_COOKIE_SAMESITE, 121 ) 122 return response 123 124 def is_known_device(self, user: User): 125 """Returns `True` if the login happened on a "known" device, by the same user.""" 126 client_ip = ClientIPMiddleware.get_client_ip(self.request) 127 if AuthenticatedSession.objects.filter(session__last_ip=client_ip, user=user).exists(): 128 return True 129 if COOKIE_NAME_KNOWN_DEVICE not in self.request.COOKIES: 130 return False 131 try: 132 payload = decode( 133 self.request.COOKIES[COOKIE_NAME_KNOWN_DEVICE], self.cookie_jwt_key, ["HS256"] 134 ) 135 if payload["sub"] == user.uid: 136 return True 137 return False 138 except (PyJWTError, ValueError, TypeError) as exc: 139 self.logger.info("eh", exc=exc) 140 return False 141 142 def do_login(self, request: HttpRequest, remember: bool | None = None) -> HttpResponse: 143 """Attach the currently pending user to the current session. 144 `remember` Argument should be `None` if not configured, otherwise set to `True`/`False` 145 representative of the user's choice.""" 146 if PLAN_CONTEXT_PENDING_USER not in self.executor.plan.context: 147 message = _("No Pending user to login.") 148 messages.error(request, message) 149 self.logger.warning(message) 150 return self.executor.stage_invalid() 151 backend = self.executor.plan.context.get( 152 PLAN_CONTEXT_AUTHENTICATION_BACKEND, BACKEND_INBUILT 153 ) 154 user: User = self.executor.plan.context[PLAN_CONTEXT_PENDING_USER] 155 if not user.is_active: 156 self.logger.warning("User is not active, login will not work.") 157 return self.executor.stage_invalid() 158 delta = self.set_session_duration(bool(remember)) 159 self.set_session_ip() 160 # Check if the login request is coming from a known device 161 self.executor.plan.context.setdefault(PLAN_CONTEXT_METHOD_ARGS, {}) 162 self.executor.plan.context[PLAN_CONTEXT_METHOD_ARGS].setdefault( 163 PLAN_CONTEXT_METHOD_ARGS_KNOWN_DEVICE, self.is_known_device(user) 164 ) 165 # the `user_logged_in` signal will update the user to write the `last_login` field 166 # which we don't want to log as we already have a dedicated login event 167 with audit_ignore(): 168 login( 169 self.request, 170 user, 171 backend=backend, 172 ) 173 self.logger.debug( 174 "Logged in", 175 backend=backend, 176 user=user.username, 177 flow_slug=self.executor.flow.slug, 178 session_duration=delta, 179 ) 180 if self.executor.current_stage.terminate_other_sessions: 181 Session.objects.filter( 182 authenticatedsession__user=user, 183 ).exclude(session_key=self.request.session.session_key).delete() 184 if remember is None: 185 return self.set_known_device_cookie(user) 186 return self.executor.stage_ok()
Finalise Authentication flow by logging the user in
response_class =
<class 'UserLoginChallengeResponse'>
58 def get_challenge(self, *args, **kwargs) -> UserLoginChallenge: 59 return UserLoginChallenge(data={})
Return the challenge that the client should solve
def
dispatch( self, request: django.http.request.HttpRequest) -> django.http.response.HttpResponse:
61 def dispatch(self, request: HttpRequest) -> HttpResponse: 62 """Check for remember_me, and do login""" 63 stage: UserLoginStage = self.executor.current_stage 64 if timedelta_from_string(stage.remember_me_offset).total_seconds() > 0: 65 return super().dispatch(request) 66 return self.do_login(request)
Check for remember_me, and do login
def
challenge_valid( self, response: UserLoginChallengeResponse) -> django.http.response.HttpResponse:
68 def challenge_valid(self, response: UserLoginChallengeResponse) -> HttpResponse: 69 return self.do_login(self.request, response.validated_data["remember_me"])
Callback when the challenge has the correct format
def
set_session_duration(self, remember: bool) -> datetime.timedelta:
71 def set_session_duration(self, remember: bool) -> timedelta: 72 """Update the sessions' expiry""" 73 delta = timedelta_from_string(self.executor.current_stage.session_duration) 74 if remember: 75 offset = timedelta_from_string(self.executor.current_stage.remember_me_offset) 76 delta = delta + offset 77 if delta.total_seconds() == 0: 78 self.request.session.set_expiry(0) 79 else: 80 self.request.session.set_expiry(delta) 81 return delta
Update the sessions' expiry
def
set_session_ip(self):
83 def set_session_ip(self): 84 """Set the sessions' last IP and session bindings""" 85 stage: UserLoginStage = self.executor.current_stage 86 87 self.request.session[self.request.session.model.Keys.LAST_IP] = ( 88 ClientIPMiddleware.get_client_ip(self.request) 89 ) 90 self.request.session[SESSION_KEY_BINDING_NET] = stage.network_binding 91 self.request.session[SESSION_KEY_BINDING_GEO] = stage.geoip_binding
Set the sessions' last IP and session bindings
124 def is_known_device(self, user: User): 125 """Returns `True` if the login happened on a "known" device, by the same user.""" 126 client_ip = ClientIPMiddleware.get_client_ip(self.request) 127 if AuthenticatedSession.objects.filter(session__last_ip=client_ip, user=user).exists(): 128 return True 129 if COOKIE_NAME_KNOWN_DEVICE not in self.request.COOKIES: 130 return False 131 try: 132 payload = decode( 133 self.request.COOKIES[COOKIE_NAME_KNOWN_DEVICE], self.cookie_jwt_key, ["HS256"] 134 ) 135 if payload["sub"] == user.uid: 136 return True 137 return False 138 except (PyJWTError, ValueError, TypeError) as exc: 139 self.logger.info("eh", exc=exc) 140 return False
Returns True if the login happened on a "known" device, by the same user.
def
do_login( self, request: django.http.request.HttpRequest, remember: bool | None = None) -> django.http.response.HttpResponse:
142 def do_login(self, request: HttpRequest, remember: bool | None = None) -> HttpResponse: 143 """Attach the currently pending user to the current session. 144 `remember` Argument should be `None` if not configured, otherwise set to `True`/`False` 145 representative of the user's choice.""" 146 if PLAN_CONTEXT_PENDING_USER not in self.executor.plan.context: 147 message = _("No Pending user to login.") 148 messages.error(request, message) 149 self.logger.warning(message) 150 return self.executor.stage_invalid() 151 backend = self.executor.plan.context.get( 152 PLAN_CONTEXT_AUTHENTICATION_BACKEND, BACKEND_INBUILT 153 ) 154 user: User = self.executor.plan.context[PLAN_CONTEXT_PENDING_USER] 155 if not user.is_active: 156 self.logger.warning("User is not active, login will not work.") 157 return self.executor.stage_invalid() 158 delta = self.set_session_duration(bool(remember)) 159 self.set_session_ip() 160 # Check if the login request is coming from a known device 161 self.executor.plan.context.setdefault(PLAN_CONTEXT_METHOD_ARGS, {}) 162 self.executor.plan.context[PLAN_CONTEXT_METHOD_ARGS].setdefault( 163 PLAN_CONTEXT_METHOD_ARGS_KNOWN_DEVICE, self.is_known_device(user) 164 ) 165 # the `user_logged_in` signal will update the user to write the `last_login` field 166 # which we don't want to log as we already have a dedicated login event 167 with audit_ignore(): 168 login( 169 self.request, 170 user, 171 backend=backend, 172 ) 173 self.logger.debug( 174 "Logged in", 175 backend=backend, 176 user=user.username, 177 flow_slug=self.executor.flow.slug, 178 session_duration=delta, 179 ) 180 if self.executor.current_stage.terminate_other_sessions: 181 Session.objects.filter( 182 authenticatedsession__user=user, 183 ).exclude(session_key=self.request.session.session_key).delete() 184 if remember is None: 185 return self.set_known_device_cookie(user) 186 return self.executor.stage_ok()
Attach the currently pending user to the current session.
remember Argument should be None if not configured, otherwise set to True/False
representative of the user's choice.