authentik.stages.identification.stage
Identification stage logic
1"""Identification stage logic""" 2 3from dataclasses import asdict 4from typing import Any 5 6from django.contrib.auth.hashers import make_password 7from django.core.exceptions import PermissionDenied 8from django.db.models import Q 9from django.http import HttpRequest, HttpResponse 10from django.utils.timezone import now 11from django.utils.translation import gettext as _ 12from drf_spectacular.utils import PolymorphicProxySerializer, extend_schema_field 13from rest_framework.fields import BooleanField, CharField, ChoiceField, DictField, ListField 14from rest_framework.serializers import ValidationError 15from sentry_sdk import start_span 16 17from authentik.core.api.utils import JSONDictField, PassiveSerializer 18from authentik.core.models import Application, Source, User 19from authentik.endpoints.connectors.agent.stage import PLAN_CONTEXT_DEVICE_AUTH_TOKEN 20from authentik.endpoints.models import Device 21from authentik.events.middleware import audit_ignore 22from authentik.events.utils import sanitize_item 23from authentik.flows.challenge import ( 24 Challenge, 25 ChallengeResponse, 26 RedirectChallenge, 27) 28from authentik.flows.models import FlowDesignation 29from authentik.flows.planner import ( 30 PLAN_CONTEXT_APPLICATION, 31 PLAN_CONTEXT_DEVICE, 32 PLAN_CONTEXT_PENDING_USER, 33) 34from authentik.flows.stage import PLAN_CONTEXT_PENDING_USER_IDENTIFIER, ChallengeStageView 35from authentik.flows.views.executor import SESSION_KEY_GET 36from authentik.lib.avatars import DEFAULT_AVATAR 37from authentik.lib.utils.reflection import all_subclasses, class_to_path 38from authentik.lib.utils.urls import reverse_with_qs 39from authentik.root.middleware import ClientIPMiddleware 40from authentik.stages.authenticator_validate.challenge import ( 41 get_webauthn_challenge_without_user, 42 validate_challenge_webauthn, 43) 44from authentik.stages.authenticator_webauthn.models import WebAuthnDevice 45from authentik.stages.captcha.stage import ( 46 PLAN_CONTEXT_CAPTCHA_PRIVATE_KEY, 47 CaptchaChallenge, 48 verify_captcha_token, 49) 50from authentik.stages.identification.models import IdentificationStage 51from authentik.stages.identification.signals import identification_failed 52from authentik.stages.password.stage import ( 53 PLAN_CONTEXT_METHOD, 54 PLAN_CONTEXT_METHOD_ARGS, 55 authenticate, 56) 57 58 59class LoginChallengeMixin: 60 """Base login challenge for Identification stage""" 61 62 63def get_login_serializers(): 64 mapping = { 65 RedirectChallenge().fields["component"].default: RedirectChallenge, 66 } 67 for cls in all_subclasses(LoginChallengeMixin): 68 mapping[cls().fields["component"].default] = cls 69 return mapping 70 71 72@extend_schema_field( 73 PolymorphicProxySerializer( 74 component_name="LoginChallengeTypes", 75 serializers=get_login_serializers, 76 resource_type_field_name="component", 77 ) 78) 79class ChallengeDictWrapper(DictField): 80 """Wrapper around DictField that annotates itself as challenge proxy""" 81 82 83class LoginSourceSerializer(PassiveSerializer): 84 """Serializer for Login buttons of sources""" 85 86 name = CharField() 87 icon_url = CharField(required=False, allow_null=True) 88 promoted = BooleanField(default=False) 89 90 challenge = ChallengeDictWrapper() 91 92 93class IdentificationChallenge(Challenge): 94 """Identification challenges with all UI elements""" 95 96 user_fields = ListField(child=CharField(), allow_empty=True, allow_null=True) 97 pending_user_identifier = CharField(required=False, allow_null=True) 98 99 password_fields = BooleanField() 100 allow_show_password = BooleanField(default=False) 101 application_pre = CharField(required=False) 102 application_pre_launch = CharField(required=False) 103 flow_designation = ChoiceField(FlowDesignation.choices) 104 captcha_stage = CaptchaChallenge(required=False, allow_null=True) 105 106 enroll_url = CharField(required=False) 107 recovery_url = CharField(required=False) 108 passwordless_url = CharField(required=False) 109 primary_action = CharField() 110 sources = LoginSourceSerializer(many=True, required=False) 111 show_source_labels = BooleanField() 112 enable_remember_me = BooleanField(required=False, default=True) 113 114 passkey_challenge = JSONDictField(required=False, allow_null=True) 115 116 component = CharField(default="ak-stage-identification") 117 118 119class IdentificationChallengeResponse(ChallengeResponse): 120 """Identification challenge""" 121 122 uid_field = CharField(required=False, allow_blank=True, allow_null=True) 123 password = CharField(required=False, allow_blank=True, allow_null=True) 124 captcha_token = CharField(required=False, allow_blank=True, allow_null=True) 125 passkey = JSONDictField(required=False, allow_null=True) 126 component = CharField(default="ak-stage-identification") 127 128 pre_user: User | None = None 129 passkey_device: WebAuthnDevice | None = None 130 131 def _validate_passkey_response(self, passkey: dict) -> WebAuthnDevice: 132 """Validate passkey/WebAuthn response for passwordless authentication""" 133 # Get the webauthn_stage from the current IdentificationStage 134 current_stage: IdentificationStage = IdentificationStage.objects.get( 135 pk=self.stage.executor.current_stage.pk 136 ) 137 return validate_challenge_webauthn( 138 passkey, self.stage, self.stage.get_pending_user(), current_stage.webauthn_stage 139 ) 140 141 def validate(self, attrs: dict[str, Any]) -> dict[str, Any]: 142 """Validate that user exists, and optionally their password, captcha token, or passkey""" 143 current_stage: IdentificationStage = self.stage.executor.current_stage 144 client_ip = ClientIPMiddleware.get_client_ip(self.stage.request) 145 146 # Check if this is a passkey authentication 147 passkey = attrs.get("passkey") 148 if passkey: 149 device = self._validate_passkey_response(passkey) 150 self.passkey_device = device 151 self.pre_user = device.user 152 # Set backend so password stage policy knows user is already authenticated 153 self.pre_user.backend = class_to_path(IdentificationStageView) 154 return attrs 155 156 # Standard username/password flow 157 uid_field = attrs.get("uid_field") 158 if not uid_field: 159 raise ValidationError(_("No identification data provided.")) 160 161 pre_user = self.stage.get_user(uid_field) 162 if not pre_user: 163 with start_span( 164 op="authentik.stages.identification.validate_invalid_wait", 165 name="Sleep random time on invalid user identifier", 166 ): 167 # hash a random password on invalid identifier, same as with a valid identifier 168 make_password(make_password(None)) 169 # Log in a similar format to Event.new(), but we don't want to create an event here 170 # as this stage is mostly used by unauthenticated users with very high rate limits 171 self.stage.logger.info( 172 "invalid_login", 173 identifier=uid_field, 174 client_ip=client_ip, 175 action="invalid_identifier", 176 context={ 177 "stage": sanitize_item(self.stage), 178 }, 179 ) 180 identification_failed.send(sender=self, request=self.stage.request, uid_field=uid_field) 181 # We set the pending_user even on failure so it's part of the context, even 182 # when the input is invalid 183 # This is so its part of the current flow plan, and on flow restart can be kept, and 184 # policies can be applied. 185 self.stage.executor.plan.context[PLAN_CONTEXT_PENDING_USER] = User( 186 username=uid_field, 187 email=uid_field, 188 ) 189 self.pre_user = self.stage.executor.plan.context[PLAN_CONTEXT_PENDING_USER] 190 if not current_stage.show_matched_user: 191 self.stage.executor.plan.context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER] = uid_field 192 # when `pretend` is enabled, continue regardless 193 if current_stage.pretend_user_exists and not current_stage.password_stage: 194 return attrs 195 raise ValidationError(_("Failed to authenticate.")) 196 self.pre_user = pre_user 197 198 # Captcha check 199 if captcha_stage := current_stage.captcha_stage: 200 captcha_token = attrs.get("captcha_token", None) 201 if not captcha_token: 202 self.stage.logger.warning("Token not set for captcha attempt") 203 try: 204 verify_captcha_token( 205 captcha_stage, 206 captcha_token, 207 client_ip, 208 key=self.stage.executor.plan.context.get(PLAN_CONTEXT_CAPTCHA_PRIVATE_KEY), 209 ) 210 except ValidationError: 211 raise ValidationError(_("Failed to authenticate.")) from None 212 213 # Password check 214 if not current_stage.password_stage: 215 # No password stage select, don't validate the password 216 return attrs 217 218 password = attrs.get("password", None) 219 if not password: 220 self.stage.logger.warning("Password not set for ident+auth attempt") 221 try: 222 with start_span( 223 op="authentik.stages.identification.authenticate", 224 name="User authenticate call (combo stage)", 225 ): 226 user = authenticate( 227 self.stage.request, 228 current_stage.password_stage.backends, 229 current_stage, 230 username=self.pre_user.username, 231 password=password, 232 ) 233 if not user: 234 raise ValidationError(_("Failed to authenticate.")) 235 self.pre_user = user 236 except PermissionDenied as exc: 237 raise ValidationError(str(exc)) from exc 238 return attrs 239 240 241class IdentificationStageView(ChallengeStageView): 242 """Form to identify the user""" 243 244 response_class = IdentificationChallengeResponse 245 246 def get_user(self, uid_value: str) -> User | None: 247 """Find user instance. Returns None if no user was found.""" 248 current_stage: IdentificationStage = self.executor.current_stage 249 query = Q() 250 for search_field in current_stage.user_fields: 251 model_field = { 252 "email": "email", 253 "username": "username", 254 "upn": "attributes__upn", 255 }[search_field] 256 if current_stage.case_insensitive_matching: 257 model_field += "__iexact" 258 else: 259 model_field += "__exact" 260 query |= Q(**{model_field: uid_value}) 261 if not query: 262 self.logger.debug("Empty user query", query=query) 263 return None 264 user = User.objects.filter(query).first() 265 if user: 266 self.logger.debug("Found user", user=user.username, query=query) 267 return user 268 return None 269 270 def get_primary_action(self) -> str: 271 """Get the primary action label for this stage""" 272 if self.executor.flow.designation == FlowDesignation.AUTHENTICATION: 273 return _("Log in") 274 return _("Continue") 275 276 def get_passkey_challenge(self) -> dict | None: 277 """Generate a WebAuthn challenge for passkey/conditional UI authentication""" 278 # Refresh from DB to get the latest configuration 279 current_stage: IdentificationStage = IdentificationStage.objects.get( 280 pk=self.executor.current_stage.pk 281 ) 282 if not current_stage.webauthn_stage: 283 self.logger.debug("No webauthn_stage configured") 284 return None 285 challenge = get_webauthn_challenge_without_user(self, current_stage.webauthn_stage) 286 self.logger.debug("Generated passkey challenge", challenge=challenge) 287 return challenge 288 289 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 290 """Check for existing pending user identifier and skip stage if possible""" 291 current_stage: IdentificationStage = self.executor.current_stage 292 pending_user_identifier = self.executor.plan.context.get( 293 PLAN_CONTEXT_PENDING_USER_IDENTIFIER 294 ) 295 296 if not pending_user_identifier: 297 return super().get(request, *args, **kwargs) 298 299 # Only skip if this is a "simple" identification stage with no extra features 300 can_skip = ( 301 not current_stage.password_stage 302 and not current_stage.captcha_stage 303 and not current_stage.webauthn_stage 304 and not self.executor.current_binding.policies.exists() 305 ) 306 307 if can_skip: 308 # Use the normal validation flow (handles timing protection, logging, signals) 309 response = IdentificationChallengeResponse( 310 data={"uid_field": pending_user_identifier}, 311 stage=self, 312 ) 313 if response.is_valid(): 314 return self.challenge_valid(response) 315 # Validation failed (user doesn't exist and pretend_user_exists is off) 316 # Don't pre-fill invalid username, fall through to show the challenge 317 self.executor.plan.context.pop(PLAN_CONTEXT_PENDING_USER_IDENTIFIER, None) 318 319 # Can't skip - just pre-fill the username field 320 return super().get(request, *args, **kwargs) 321 322 def get_challenge(self) -> Challenge: 323 current_stage: IdentificationStage = self.executor.current_stage 324 challenge = IdentificationChallenge( 325 data={ 326 "component": "ak-stage-identification", 327 "primary_action": self.get_primary_action(), 328 "user_fields": current_stage.user_fields, 329 "password_fields": bool(current_stage.password_stage), 330 "captcha_stage": ( 331 { 332 "js_url": current_stage.captcha_stage.js_url, 333 "site_key": current_stage.captcha_stage.public_key, 334 "interactive": current_stage.captcha_stage.interactive, 335 "pending_user": "", 336 "pending_user_avatar": DEFAULT_AVATAR, 337 } 338 if current_stage.captcha_stage 339 else None 340 ), 341 "allow_show_password": bool(current_stage.password_stage) 342 and current_stage.password_stage.allow_show_password, 343 "show_source_labels": current_stage.show_source_labels, 344 "flow_designation": self.executor.flow.designation, 345 "enable_remember_me": current_stage.enable_remember_me, 346 "passkey_challenge": self.get_passkey_challenge(), 347 } 348 ) 349 # If the user has been redirected to us whilst trying to access an 350 # application, PLAN_CONTEXT_APPLICATION is set in the flow plan 351 if PLAN_CONTEXT_APPLICATION in self.executor.plan.context: 352 app: Application = self.executor.plan.context.get( 353 PLAN_CONTEXT_APPLICATION, Application() 354 ) 355 challenge.initial_data["application_pre"] = app.name 356 if launch_url := app.get_launch_url(): 357 challenge.initial_data["application_pre_launch"] = launch_url 358 if ( 359 PLAN_CONTEXT_DEVICE in self.executor.plan.context 360 and PLAN_CONTEXT_DEVICE_AUTH_TOKEN in self.executor.plan.context 361 ): 362 challenge.initial_data["application_pre"] = self.executor.plan.context.get( 363 PLAN_CONTEXT_DEVICE, Device() 364 ).name 365 get_qs = self.request.session.get(SESSION_KEY_GET, self.request.GET) 366 # Check for related enrollment and recovery flow, add URL to view 367 if current_stage.enrollment_flow: 368 challenge.initial_data["enroll_url"] = reverse_with_qs( 369 "authentik_core:if-flow", 370 query=get_qs, 371 kwargs={"flow_slug": current_stage.enrollment_flow.slug}, 372 ) 373 if current_stage.recovery_flow: 374 challenge.initial_data["recovery_url"] = reverse_with_qs( 375 "authentik_core:if-flow", 376 query=get_qs, 377 kwargs={"flow_slug": current_stage.recovery_flow.slug}, 378 ) 379 if current_stage.passwordless_flow: 380 challenge.initial_data["passwordless_url"] = reverse_with_qs( 381 "authentik_core:if-flow", 382 query=get_qs, 383 kwargs={"flow_slug": current_stage.passwordless_flow.slug}, 384 ) 385 386 # Check all enabled source, add them if they have a UI Login button. 387 ui_sources = [] 388 sources: list[Source] = ( 389 current_stage.sources.filter(enabled=True).order_by("name").select_subclasses() 390 ) 391 for source in sources: 392 ui_login_button = source.ui_login_button(self.request) 393 if ui_login_button: 394 button = asdict(ui_login_button) 395 source_challenge = ui_login_button.challenge 396 source_challenge.is_valid() 397 button["challenge"] = source_challenge.data 398 ui_sources.append(button) 399 challenge.initial_data["sources"] = ui_sources 400 401 # Pre-fill username from login_hint unless user clicked "Not you?" 402 if prefill := self.executor.plan.context.get(PLAN_CONTEXT_PENDING_USER_IDENTIFIER): 403 challenge.initial_data["pending_user_identifier"] = prefill 404 405 return challenge 406 407 def challenge_valid(self, response: IdentificationChallengeResponse) -> HttpResponse: 408 self.executor.plan.context[PLAN_CONTEXT_PENDING_USER] = response.pre_user 409 current_stage: IdentificationStage = self.executor.current_stage 410 411 # Handle passkey authentication 412 if response.passkey_device: 413 self.logger.debug("Passkey authentication successful", user=response.pre_user) 414 self.executor.plan.context[PLAN_CONTEXT_METHOD] = "auth_webauthn_pwl" 415 self.executor.plan.context.setdefault(PLAN_CONTEXT_METHOD_ARGS, {}) 416 self.executor.plan.context[PLAN_CONTEXT_METHOD_ARGS].update( 417 { 418 "device": response.passkey_device, 419 "device_type": response.passkey_device.device_type, 420 } 421 ) 422 # Update device last_used 423 with audit_ignore(): 424 response.passkey_device.last_used = now() 425 response.passkey_device.save() 426 return self.executor.stage_ok() 427 428 if not current_stage.show_matched_user: 429 self.executor.plan.context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER] = ( 430 response.validated_data.get("uid_field") 431 ) 432 return self.executor.stage_ok()
class
LoginChallengeMixin:
Base login challenge for Identification stage
def
get_login_serializers():
@extend_schema_field(PolymorphicProxySerializer(component_name='LoginChallengeTypes', serializers=get_login_serializers, resource_type_field_name='component'))
class
ChallengeDictWrapper73@extend_schema_field( 74 PolymorphicProxySerializer( 75 component_name="LoginChallengeTypes", 76 serializers=get_login_serializers, 77 resource_type_field_name="component", 78 ) 79) 80class ChallengeDictWrapper(DictField): 81 """Wrapper around DictField that annotates itself as challenge proxy"""
Wrapper around DictField that annotates itself as challenge proxy
84class LoginSourceSerializer(PassiveSerializer): 85 """Serializer for Login buttons of sources""" 86 87 name = CharField() 88 icon_url = CharField(required=False, allow_null=True) 89 promoted = BooleanField(default=False) 90 91 challenge = ChallengeDictWrapper()
Serializer for Login buttons of sources
Inherited Members
94class IdentificationChallenge(Challenge): 95 """Identification challenges with all UI elements""" 96 97 user_fields = ListField(child=CharField(), allow_empty=True, allow_null=True) 98 pending_user_identifier = CharField(required=False, allow_null=True) 99 100 password_fields = BooleanField() 101 allow_show_password = BooleanField(default=False) 102 application_pre = CharField(required=False) 103 application_pre_launch = CharField(required=False) 104 flow_designation = ChoiceField(FlowDesignation.choices) 105 captcha_stage = CaptchaChallenge(required=False, allow_null=True) 106 107 enroll_url = CharField(required=False) 108 recovery_url = CharField(required=False) 109 passwordless_url = CharField(required=False) 110 primary_action = CharField() 111 sources = LoginSourceSerializer(many=True, required=False) 112 show_source_labels = BooleanField() 113 enable_remember_me = BooleanField(required=False, default=True) 114 115 passkey_challenge = JSONDictField(required=False, allow_null=True) 116 117 component = CharField(default="ak-stage-identification")
Identification challenges with all UI elements
120class IdentificationChallengeResponse(ChallengeResponse): 121 """Identification challenge""" 122 123 uid_field = CharField(required=False, allow_blank=True, allow_null=True) 124 password = CharField(required=False, allow_blank=True, allow_null=True) 125 captcha_token = CharField(required=False, allow_blank=True, allow_null=True) 126 passkey = JSONDictField(required=False, allow_null=True) 127 component = CharField(default="ak-stage-identification") 128 129 pre_user: User | None = None 130 passkey_device: WebAuthnDevice | None = None 131 132 def _validate_passkey_response(self, passkey: dict) -> WebAuthnDevice: 133 """Validate passkey/WebAuthn response for passwordless authentication""" 134 # Get the webauthn_stage from the current IdentificationStage 135 current_stage: IdentificationStage = IdentificationStage.objects.get( 136 pk=self.stage.executor.current_stage.pk 137 ) 138 return validate_challenge_webauthn( 139 passkey, self.stage, self.stage.get_pending_user(), current_stage.webauthn_stage 140 ) 141 142 def validate(self, attrs: dict[str, Any]) -> dict[str, Any]: 143 """Validate that user exists, and optionally their password, captcha token, or passkey""" 144 current_stage: IdentificationStage = self.stage.executor.current_stage 145 client_ip = ClientIPMiddleware.get_client_ip(self.stage.request) 146 147 # Check if this is a passkey authentication 148 passkey = attrs.get("passkey") 149 if passkey: 150 device = self._validate_passkey_response(passkey) 151 self.passkey_device = device 152 self.pre_user = device.user 153 # Set backend so password stage policy knows user is already authenticated 154 self.pre_user.backend = class_to_path(IdentificationStageView) 155 return attrs 156 157 # Standard username/password flow 158 uid_field = attrs.get("uid_field") 159 if not uid_field: 160 raise ValidationError(_("No identification data provided.")) 161 162 pre_user = self.stage.get_user(uid_field) 163 if not pre_user: 164 with start_span( 165 op="authentik.stages.identification.validate_invalid_wait", 166 name="Sleep random time on invalid user identifier", 167 ): 168 # hash a random password on invalid identifier, same as with a valid identifier 169 make_password(make_password(None)) 170 # Log in a similar format to Event.new(), but we don't want to create an event here 171 # as this stage is mostly used by unauthenticated users with very high rate limits 172 self.stage.logger.info( 173 "invalid_login", 174 identifier=uid_field, 175 client_ip=client_ip, 176 action="invalid_identifier", 177 context={ 178 "stage": sanitize_item(self.stage), 179 }, 180 ) 181 identification_failed.send(sender=self, request=self.stage.request, uid_field=uid_field) 182 # We set the pending_user even on failure so it's part of the context, even 183 # when the input is invalid 184 # This is so its part of the current flow plan, and on flow restart can be kept, and 185 # policies can be applied. 186 self.stage.executor.plan.context[PLAN_CONTEXT_PENDING_USER] = User( 187 username=uid_field, 188 email=uid_field, 189 ) 190 self.pre_user = self.stage.executor.plan.context[PLAN_CONTEXT_PENDING_USER] 191 if not current_stage.show_matched_user: 192 self.stage.executor.plan.context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER] = uid_field 193 # when `pretend` is enabled, continue regardless 194 if current_stage.pretend_user_exists and not current_stage.password_stage: 195 return attrs 196 raise ValidationError(_("Failed to authenticate.")) 197 self.pre_user = pre_user 198 199 # Captcha check 200 if captcha_stage := current_stage.captcha_stage: 201 captcha_token = attrs.get("captcha_token", None) 202 if not captcha_token: 203 self.stage.logger.warning("Token not set for captcha attempt") 204 try: 205 verify_captcha_token( 206 captcha_stage, 207 captcha_token, 208 client_ip, 209 key=self.stage.executor.plan.context.get(PLAN_CONTEXT_CAPTCHA_PRIVATE_KEY), 210 ) 211 except ValidationError: 212 raise ValidationError(_("Failed to authenticate.")) from None 213 214 # Password check 215 if not current_stage.password_stage: 216 # No password stage select, don't validate the password 217 return attrs 218 219 password = attrs.get("password", None) 220 if not password: 221 self.stage.logger.warning("Password not set for ident+auth attempt") 222 try: 223 with start_span( 224 op="authentik.stages.identification.authenticate", 225 name="User authenticate call (combo stage)", 226 ): 227 user = authenticate( 228 self.stage.request, 229 current_stage.password_stage.backends, 230 current_stage, 231 username=self.pre_user.username, 232 password=password, 233 ) 234 if not user: 235 raise ValidationError(_("Failed to authenticate.")) 236 self.pre_user = user 237 except PermissionDenied as exc: 238 raise ValidationError(str(exc)) from exc 239 return attrs
Identification challenge
def
validate(self, attrs: dict[str, typing.Any]) -> dict[str, typing.Any]:
142 def validate(self, attrs: dict[str, Any]) -> dict[str, Any]: 143 """Validate that user exists, and optionally their password, captcha token, or passkey""" 144 current_stage: IdentificationStage = self.stage.executor.current_stage 145 client_ip = ClientIPMiddleware.get_client_ip(self.stage.request) 146 147 # Check if this is a passkey authentication 148 passkey = attrs.get("passkey") 149 if passkey: 150 device = self._validate_passkey_response(passkey) 151 self.passkey_device = device 152 self.pre_user = device.user 153 # Set backend so password stage policy knows user is already authenticated 154 self.pre_user.backend = class_to_path(IdentificationStageView) 155 return attrs 156 157 # Standard username/password flow 158 uid_field = attrs.get("uid_field") 159 if not uid_field: 160 raise ValidationError(_("No identification data provided.")) 161 162 pre_user = self.stage.get_user(uid_field) 163 if not pre_user: 164 with start_span( 165 op="authentik.stages.identification.validate_invalid_wait", 166 name="Sleep random time on invalid user identifier", 167 ): 168 # hash a random password on invalid identifier, same as with a valid identifier 169 make_password(make_password(None)) 170 # Log in a similar format to Event.new(), but we don't want to create an event here 171 # as this stage is mostly used by unauthenticated users with very high rate limits 172 self.stage.logger.info( 173 "invalid_login", 174 identifier=uid_field, 175 client_ip=client_ip, 176 action="invalid_identifier", 177 context={ 178 "stage": sanitize_item(self.stage), 179 }, 180 ) 181 identification_failed.send(sender=self, request=self.stage.request, uid_field=uid_field) 182 # We set the pending_user even on failure so it's part of the context, even 183 # when the input is invalid 184 # This is so its part of the current flow plan, and on flow restart can be kept, and 185 # policies can be applied. 186 self.stage.executor.plan.context[PLAN_CONTEXT_PENDING_USER] = User( 187 username=uid_field, 188 email=uid_field, 189 ) 190 self.pre_user = self.stage.executor.plan.context[PLAN_CONTEXT_PENDING_USER] 191 if not current_stage.show_matched_user: 192 self.stage.executor.plan.context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER] = uid_field 193 # when `pretend` is enabled, continue regardless 194 if current_stage.pretend_user_exists and not current_stage.password_stage: 195 return attrs 196 raise ValidationError(_("Failed to authenticate.")) 197 self.pre_user = pre_user 198 199 # Captcha check 200 if captcha_stage := current_stage.captcha_stage: 201 captcha_token = attrs.get("captcha_token", None) 202 if not captcha_token: 203 self.stage.logger.warning("Token not set for captcha attempt") 204 try: 205 verify_captcha_token( 206 captcha_stage, 207 captcha_token, 208 client_ip, 209 key=self.stage.executor.plan.context.get(PLAN_CONTEXT_CAPTCHA_PRIVATE_KEY), 210 ) 211 except ValidationError: 212 raise ValidationError(_("Failed to authenticate.")) from None 213 214 # Password check 215 if not current_stage.password_stage: 216 # No password stage select, don't validate the password 217 return attrs 218 219 password = attrs.get("password", None) 220 if not password: 221 self.stage.logger.warning("Password not set for ident+auth attempt") 222 try: 223 with start_span( 224 op="authentik.stages.identification.authenticate", 225 name="User authenticate call (combo stage)", 226 ): 227 user = authenticate( 228 self.stage.request, 229 current_stage.password_stage.backends, 230 current_stage, 231 username=self.pre_user.username, 232 password=password, 233 ) 234 if not user: 235 raise ValidationError(_("Failed to authenticate.")) 236 self.pre_user = user 237 except PermissionDenied as exc: 238 raise ValidationError(str(exc)) from exc 239 return attrs
Validate that user exists, and optionally their password, captcha token, or passkey
242class IdentificationStageView(ChallengeStageView): 243 """Form to identify the user""" 244 245 response_class = IdentificationChallengeResponse 246 247 def get_user(self, uid_value: str) -> User | None: 248 """Find user instance. Returns None if no user was found.""" 249 current_stage: IdentificationStage = self.executor.current_stage 250 query = Q() 251 for search_field in current_stage.user_fields: 252 model_field = { 253 "email": "email", 254 "username": "username", 255 "upn": "attributes__upn", 256 }[search_field] 257 if current_stage.case_insensitive_matching: 258 model_field += "__iexact" 259 else: 260 model_field += "__exact" 261 query |= Q(**{model_field: uid_value}) 262 if not query: 263 self.logger.debug("Empty user query", query=query) 264 return None 265 user = User.objects.filter(query).first() 266 if user: 267 self.logger.debug("Found user", user=user.username, query=query) 268 return user 269 return None 270 271 def get_primary_action(self) -> str: 272 """Get the primary action label for this stage""" 273 if self.executor.flow.designation == FlowDesignation.AUTHENTICATION: 274 return _("Log in") 275 return _("Continue") 276 277 def get_passkey_challenge(self) -> dict | None: 278 """Generate a WebAuthn challenge for passkey/conditional UI authentication""" 279 # Refresh from DB to get the latest configuration 280 current_stage: IdentificationStage = IdentificationStage.objects.get( 281 pk=self.executor.current_stage.pk 282 ) 283 if not current_stage.webauthn_stage: 284 self.logger.debug("No webauthn_stage configured") 285 return None 286 challenge = get_webauthn_challenge_without_user(self, current_stage.webauthn_stage) 287 self.logger.debug("Generated passkey challenge", challenge=challenge) 288 return challenge 289 290 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 291 """Check for existing pending user identifier and skip stage if possible""" 292 current_stage: IdentificationStage = self.executor.current_stage 293 pending_user_identifier = self.executor.plan.context.get( 294 PLAN_CONTEXT_PENDING_USER_IDENTIFIER 295 ) 296 297 if not pending_user_identifier: 298 return super().get(request, *args, **kwargs) 299 300 # Only skip if this is a "simple" identification stage with no extra features 301 can_skip = ( 302 not current_stage.password_stage 303 and not current_stage.captcha_stage 304 and not current_stage.webauthn_stage 305 and not self.executor.current_binding.policies.exists() 306 ) 307 308 if can_skip: 309 # Use the normal validation flow (handles timing protection, logging, signals) 310 response = IdentificationChallengeResponse( 311 data={"uid_field": pending_user_identifier}, 312 stage=self, 313 ) 314 if response.is_valid(): 315 return self.challenge_valid(response) 316 # Validation failed (user doesn't exist and pretend_user_exists is off) 317 # Don't pre-fill invalid username, fall through to show the challenge 318 self.executor.plan.context.pop(PLAN_CONTEXT_PENDING_USER_IDENTIFIER, None) 319 320 # Can't skip - just pre-fill the username field 321 return super().get(request, *args, **kwargs) 322 323 def get_challenge(self) -> Challenge: 324 current_stage: IdentificationStage = self.executor.current_stage 325 challenge = IdentificationChallenge( 326 data={ 327 "component": "ak-stage-identification", 328 "primary_action": self.get_primary_action(), 329 "user_fields": current_stage.user_fields, 330 "password_fields": bool(current_stage.password_stage), 331 "captcha_stage": ( 332 { 333 "js_url": current_stage.captcha_stage.js_url, 334 "site_key": current_stage.captcha_stage.public_key, 335 "interactive": current_stage.captcha_stage.interactive, 336 "pending_user": "", 337 "pending_user_avatar": DEFAULT_AVATAR, 338 } 339 if current_stage.captcha_stage 340 else None 341 ), 342 "allow_show_password": bool(current_stage.password_stage) 343 and current_stage.password_stage.allow_show_password, 344 "show_source_labels": current_stage.show_source_labels, 345 "flow_designation": self.executor.flow.designation, 346 "enable_remember_me": current_stage.enable_remember_me, 347 "passkey_challenge": self.get_passkey_challenge(), 348 } 349 ) 350 # If the user has been redirected to us whilst trying to access an 351 # application, PLAN_CONTEXT_APPLICATION is set in the flow plan 352 if PLAN_CONTEXT_APPLICATION in self.executor.plan.context: 353 app: Application = self.executor.plan.context.get( 354 PLAN_CONTEXT_APPLICATION, Application() 355 ) 356 challenge.initial_data["application_pre"] = app.name 357 if launch_url := app.get_launch_url(): 358 challenge.initial_data["application_pre_launch"] = launch_url 359 if ( 360 PLAN_CONTEXT_DEVICE in self.executor.plan.context 361 and PLAN_CONTEXT_DEVICE_AUTH_TOKEN in self.executor.plan.context 362 ): 363 challenge.initial_data["application_pre"] = self.executor.plan.context.get( 364 PLAN_CONTEXT_DEVICE, Device() 365 ).name 366 get_qs = self.request.session.get(SESSION_KEY_GET, self.request.GET) 367 # Check for related enrollment and recovery flow, add URL to view 368 if current_stage.enrollment_flow: 369 challenge.initial_data["enroll_url"] = reverse_with_qs( 370 "authentik_core:if-flow", 371 query=get_qs, 372 kwargs={"flow_slug": current_stage.enrollment_flow.slug}, 373 ) 374 if current_stage.recovery_flow: 375 challenge.initial_data["recovery_url"] = reverse_with_qs( 376 "authentik_core:if-flow", 377 query=get_qs, 378 kwargs={"flow_slug": current_stage.recovery_flow.slug}, 379 ) 380 if current_stage.passwordless_flow: 381 challenge.initial_data["passwordless_url"] = reverse_with_qs( 382 "authentik_core:if-flow", 383 query=get_qs, 384 kwargs={"flow_slug": current_stage.passwordless_flow.slug}, 385 ) 386 387 # Check all enabled source, add them if they have a UI Login button. 388 ui_sources = [] 389 sources: list[Source] = ( 390 current_stage.sources.filter(enabled=True).order_by("name").select_subclasses() 391 ) 392 for source in sources: 393 ui_login_button = source.ui_login_button(self.request) 394 if ui_login_button: 395 button = asdict(ui_login_button) 396 source_challenge = ui_login_button.challenge 397 source_challenge.is_valid() 398 button["challenge"] = source_challenge.data 399 ui_sources.append(button) 400 challenge.initial_data["sources"] = ui_sources 401 402 # Pre-fill username from login_hint unless user clicked "Not you?" 403 if prefill := self.executor.plan.context.get(PLAN_CONTEXT_PENDING_USER_IDENTIFIER): 404 challenge.initial_data["pending_user_identifier"] = prefill 405 406 return challenge 407 408 def challenge_valid(self, response: IdentificationChallengeResponse) -> HttpResponse: 409 self.executor.plan.context[PLAN_CONTEXT_PENDING_USER] = response.pre_user 410 current_stage: IdentificationStage = self.executor.current_stage 411 412 # Handle passkey authentication 413 if response.passkey_device: 414 self.logger.debug("Passkey authentication successful", user=response.pre_user) 415 self.executor.plan.context[PLAN_CONTEXT_METHOD] = "auth_webauthn_pwl" 416 self.executor.plan.context.setdefault(PLAN_CONTEXT_METHOD_ARGS, {}) 417 self.executor.plan.context[PLAN_CONTEXT_METHOD_ARGS].update( 418 { 419 "device": response.passkey_device, 420 "device_type": response.passkey_device.device_type, 421 } 422 ) 423 # Update device last_used 424 with audit_ignore(): 425 response.passkey_device.last_used = now() 426 response.passkey_device.save() 427 return self.executor.stage_ok() 428 429 if not current_stage.show_matched_user: 430 self.executor.plan.context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER] = ( 431 response.validated_data.get("uid_field") 432 ) 433 return self.executor.stage_ok()
Form to identify the user
response_class =
<class 'IdentificationChallengeResponse'>
247 def get_user(self, uid_value: str) -> User | None: 248 """Find user instance. Returns None if no user was found.""" 249 current_stage: IdentificationStage = self.executor.current_stage 250 query = Q() 251 for search_field in current_stage.user_fields: 252 model_field = { 253 "email": "email", 254 "username": "username", 255 "upn": "attributes__upn", 256 }[search_field] 257 if current_stage.case_insensitive_matching: 258 model_field += "__iexact" 259 else: 260 model_field += "__exact" 261 query |= Q(**{model_field: uid_value}) 262 if not query: 263 self.logger.debug("Empty user query", query=query) 264 return None 265 user = User.objects.filter(query).first() 266 if user: 267 self.logger.debug("Found user", user=user.username, query=query) 268 return user 269 return None
Find user instance. Returns None if no user was found.
def
get_primary_action(self) -> str:
271 def get_primary_action(self) -> str: 272 """Get the primary action label for this stage""" 273 if self.executor.flow.designation == FlowDesignation.AUTHENTICATION: 274 return _("Log in") 275 return _("Continue")
Get the primary action label for this stage
def
get_passkey_challenge(self) -> dict | None:
277 def get_passkey_challenge(self) -> dict | None: 278 """Generate a WebAuthn challenge for passkey/conditional UI authentication""" 279 # Refresh from DB to get the latest configuration 280 current_stage: IdentificationStage = IdentificationStage.objects.get( 281 pk=self.executor.current_stage.pk 282 ) 283 if not current_stage.webauthn_stage: 284 self.logger.debug("No webauthn_stage configured") 285 return None 286 challenge = get_webauthn_challenge_without_user(self, current_stage.webauthn_stage) 287 self.logger.debug("Generated passkey challenge", challenge=challenge) 288 return challenge
Generate a WebAuthn challenge for passkey/conditional UI authentication
def
get( self, request: django.http.request.HttpRequest, *args, **kwargs) -> django.http.response.HttpResponse:
290 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 291 """Check for existing pending user identifier and skip stage if possible""" 292 current_stage: IdentificationStage = self.executor.current_stage 293 pending_user_identifier = self.executor.plan.context.get( 294 PLAN_CONTEXT_PENDING_USER_IDENTIFIER 295 ) 296 297 if not pending_user_identifier: 298 return super().get(request, *args, **kwargs) 299 300 # Only skip if this is a "simple" identification stage with no extra features 301 can_skip = ( 302 not current_stage.password_stage 303 and not current_stage.captcha_stage 304 and not current_stage.webauthn_stage 305 and not self.executor.current_binding.policies.exists() 306 ) 307 308 if can_skip: 309 # Use the normal validation flow (handles timing protection, logging, signals) 310 response = IdentificationChallengeResponse( 311 data={"uid_field": pending_user_identifier}, 312 stage=self, 313 ) 314 if response.is_valid(): 315 return self.challenge_valid(response) 316 # Validation failed (user doesn't exist and pretend_user_exists is off) 317 # Don't pre-fill invalid username, fall through to show the challenge 318 self.executor.plan.context.pop(PLAN_CONTEXT_PENDING_USER_IDENTIFIER, None) 319 320 # Can't skip - just pre-fill the username field 321 return super().get(request, *args, **kwargs)
Check for existing pending user identifier and skip stage if possible
323 def get_challenge(self) -> Challenge: 324 current_stage: IdentificationStage = self.executor.current_stage 325 challenge = IdentificationChallenge( 326 data={ 327 "component": "ak-stage-identification", 328 "primary_action": self.get_primary_action(), 329 "user_fields": current_stage.user_fields, 330 "password_fields": bool(current_stage.password_stage), 331 "captcha_stage": ( 332 { 333 "js_url": current_stage.captcha_stage.js_url, 334 "site_key": current_stage.captcha_stage.public_key, 335 "interactive": current_stage.captcha_stage.interactive, 336 "pending_user": "", 337 "pending_user_avatar": DEFAULT_AVATAR, 338 } 339 if current_stage.captcha_stage 340 else None 341 ), 342 "allow_show_password": bool(current_stage.password_stage) 343 and current_stage.password_stage.allow_show_password, 344 "show_source_labels": current_stage.show_source_labels, 345 "flow_designation": self.executor.flow.designation, 346 "enable_remember_me": current_stage.enable_remember_me, 347 "passkey_challenge": self.get_passkey_challenge(), 348 } 349 ) 350 # If the user has been redirected to us whilst trying to access an 351 # application, PLAN_CONTEXT_APPLICATION is set in the flow plan 352 if PLAN_CONTEXT_APPLICATION in self.executor.plan.context: 353 app: Application = self.executor.plan.context.get( 354 PLAN_CONTEXT_APPLICATION, Application() 355 ) 356 challenge.initial_data["application_pre"] = app.name 357 if launch_url := app.get_launch_url(): 358 challenge.initial_data["application_pre_launch"] = launch_url 359 if ( 360 PLAN_CONTEXT_DEVICE in self.executor.plan.context 361 and PLAN_CONTEXT_DEVICE_AUTH_TOKEN in self.executor.plan.context 362 ): 363 challenge.initial_data["application_pre"] = self.executor.plan.context.get( 364 PLAN_CONTEXT_DEVICE, Device() 365 ).name 366 get_qs = self.request.session.get(SESSION_KEY_GET, self.request.GET) 367 # Check for related enrollment and recovery flow, add URL to view 368 if current_stage.enrollment_flow: 369 challenge.initial_data["enroll_url"] = reverse_with_qs( 370 "authentik_core:if-flow", 371 query=get_qs, 372 kwargs={"flow_slug": current_stage.enrollment_flow.slug}, 373 ) 374 if current_stage.recovery_flow: 375 challenge.initial_data["recovery_url"] = reverse_with_qs( 376 "authentik_core:if-flow", 377 query=get_qs, 378 kwargs={"flow_slug": current_stage.recovery_flow.slug}, 379 ) 380 if current_stage.passwordless_flow: 381 challenge.initial_data["passwordless_url"] = reverse_with_qs( 382 "authentik_core:if-flow", 383 query=get_qs, 384 kwargs={"flow_slug": current_stage.passwordless_flow.slug}, 385 ) 386 387 # Check all enabled source, add them if they have a UI Login button. 388 ui_sources = [] 389 sources: list[Source] = ( 390 current_stage.sources.filter(enabled=True).order_by("name").select_subclasses() 391 ) 392 for source in sources: 393 ui_login_button = source.ui_login_button(self.request) 394 if ui_login_button: 395 button = asdict(ui_login_button) 396 source_challenge = ui_login_button.challenge 397 source_challenge.is_valid() 398 button["challenge"] = source_challenge.data 399 ui_sources.append(button) 400 challenge.initial_data["sources"] = ui_sources 401 402 # Pre-fill username from login_hint unless user clicked "Not you?" 403 if prefill := self.executor.plan.context.get(PLAN_CONTEXT_PENDING_USER_IDENTIFIER): 404 challenge.initial_data["pending_user_identifier"] = prefill 405 406 return challenge
Return the challenge that the client should solve
def
challenge_valid( self, response: IdentificationChallengeResponse) -> django.http.response.HttpResponse:
408 def challenge_valid(self, response: IdentificationChallengeResponse) -> HttpResponse: 409 self.executor.plan.context[PLAN_CONTEXT_PENDING_USER] = response.pre_user 410 current_stage: IdentificationStage = self.executor.current_stage 411 412 # Handle passkey authentication 413 if response.passkey_device: 414 self.logger.debug("Passkey authentication successful", user=response.pre_user) 415 self.executor.plan.context[PLAN_CONTEXT_METHOD] = "auth_webauthn_pwl" 416 self.executor.plan.context.setdefault(PLAN_CONTEXT_METHOD_ARGS, {}) 417 self.executor.plan.context[PLAN_CONTEXT_METHOD_ARGS].update( 418 { 419 "device": response.passkey_device, 420 "device_type": response.passkey_device.device_type, 421 } 422 ) 423 # Update device last_used 424 with audit_ignore(): 425 response.passkey_device.last_used = now() 426 response.passkey_device.save() 427 return self.executor.stage_ok() 428 429 if not current_stage.show_matched_user: 430 self.executor.plan.context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER] = ( 431 response.validated_data.get("uid_field") 432 ) 433 return self.executor.stage_ok()
Callback when the challenge has the correct format