authentik.stages.identification.stage

Identification stage logic

  1"""Identification stage logic"""
  2
  3from dataclasses import asdict
  4from typing import Any
  5
  6from django.contrib.auth.hashers import make_password
  7from django.core.exceptions import PermissionDenied
  8from django.db.models import Q
  9from django.http import HttpRequest, HttpResponse
 10from django.utils.timezone import now
 11from django.utils.translation import gettext as _
 12from drf_spectacular.utils import PolymorphicProxySerializer, extend_schema_field
 13from rest_framework.fields import BooleanField, CharField, ChoiceField, DictField, ListField
 14from rest_framework.serializers import ValidationError
 15from sentry_sdk import start_span
 16
 17from authentik.core.api.utils import JSONDictField, PassiveSerializer
 18from authentik.core.models import Application, Source, User
 19from authentik.endpoints.connectors.agent.stage import PLAN_CONTEXT_DEVICE_AUTH_TOKEN
 20from authentik.endpoints.models import Device
 21from authentik.events.middleware import audit_ignore
 22from authentik.events.utils import sanitize_item
 23from authentik.flows.challenge import (
 24    Challenge,
 25    ChallengeResponse,
 26    RedirectChallenge,
 27)
 28from authentik.flows.models import FlowDesignation
 29from authentik.flows.planner import (
 30    PLAN_CONTEXT_APPLICATION,
 31    PLAN_CONTEXT_DEVICE,
 32    PLAN_CONTEXT_PENDING_USER,
 33)
 34from authentik.flows.stage import PLAN_CONTEXT_PENDING_USER_IDENTIFIER, ChallengeStageView
 35from authentik.flows.views.executor import SESSION_KEY_GET
 36from authentik.lib.avatars import DEFAULT_AVATAR
 37from authentik.lib.utils.reflection import all_subclasses, class_to_path
 38from authentik.lib.utils.urls import reverse_with_qs
 39from authentik.root.middleware import ClientIPMiddleware
 40from authentik.stages.authenticator_validate.challenge import (
 41    get_webauthn_challenge_without_user,
 42    validate_challenge_webauthn,
 43)
 44from authentik.stages.authenticator_webauthn.models import WebAuthnDevice
 45from authentik.stages.captcha.stage import (
 46    PLAN_CONTEXT_CAPTCHA_PRIVATE_KEY,
 47    CaptchaChallenge,
 48    verify_captcha_token,
 49)
 50from authentik.stages.identification.models import IdentificationStage
 51from authentik.stages.identification.signals import identification_failed
 52from authentik.stages.password.stage import (
 53    PLAN_CONTEXT_METHOD,
 54    PLAN_CONTEXT_METHOD_ARGS,
 55    authenticate,
 56)
 57
 58
 59class LoginChallengeMixin:
 60    """Base login challenge for Identification stage"""
 61
 62
 63def get_login_serializers():
 64    mapping = {
 65        RedirectChallenge().fields["component"].default: RedirectChallenge,
 66    }
 67    for cls in all_subclasses(LoginChallengeMixin):
 68        mapping[cls().fields["component"].default] = cls
 69    return mapping
 70
 71
 72@extend_schema_field(
 73    PolymorphicProxySerializer(
 74        component_name="LoginChallengeTypes",
 75        serializers=get_login_serializers,
 76        resource_type_field_name="component",
 77    )
 78)
 79class ChallengeDictWrapper(DictField):
 80    """Wrapper around DictField that annotates itself as challenge proxy"""
 81
 82
 83class LoginSourceSerializer(PassiveSerializer):
 84    """Serializer for Login buttons of sources"""
 85
 86    name = CharField()
 87    icon_url = CharField(required=False, allow_null=True)
 88    promoted = BooleanField(default=False)
 89
 90    challenge = ChallengeDictWrapper()
 91
 92
 93class IdentificationChallenge(Challenge):
 94    """Identification challenges with all UI elements"""
 95
 96    user_fields = ListField(child=CharField(), allow_empty=True, allow_null=True)
 97    pending_user_identifier = CharField(required=False, allow_null=True)
 98
 99    password_fields = BooleanField()
100    allow_show_password = BooleanField(default=False)
101    application_pre = CharField(required=False)
102    application_pre_launch = CharField(required=False)
103    flow_designation = ChoiceField(FlowDesignation.choices)
104    captcha_stage = CaptchaChallenge(required=False, allow_null=True)
105
106    enroll_url = CharField(required=False)
107    recovery_url = CharField(required=False)
108    passwordless_url = CharField(required=False)
109    primary_action = CharField()
110    sources = LoginSourceSerializer(many=True, required=False)
111    show_source_labels = BooleanField()
112    enable_remember_me = BooleanField(required=False, default=True)
113
114    passkey_challenge = JSONDictField(required=False, allow_null=True)
115
116    component = CharField(default="ak-stage-identification")
117
118
119class IdentificationChallengeResponse(ChallengeResponse):
120    """Identification challenge"""
121
122    uid_field = CharField(required=False, allow_blank=True, allow_null=True)
123    password = CharField(required=False, allow_blank=True, allow_null=True)
124    captcha_token = CharField(required=False, allow_blank=True, allow_null=True)
125    passkey = JSONDictField(required=False, allow_null=True)
126    component = CharField(default="ak-stage-identification")
127
128    pre_user: User | None = None
129    passkey_device: WebAuthnDevice | None = None
130
131    def _validate_passkey_response(self, passkey: dict) -> WebAuthnDevice:
132        """Validate passkey/WebAuthn response for passwordless authentication"""
133        # Get the webauthn_stage from the current IdentificationStage
134        current_stage: IdentificationStage = IdentificationStage.objects.get(
135            pk=self.stage.executor.current_stage.pk
136        )
137        return validate_challenge_webauthn(
138            passkey, self.stage, self.stage.get_pending_user(), current_stage.webauthn_stage
139        )
140
141    def validate(self, attrs: dict[str, Any]) -> dict[str, Any]:
142        """Validate that user exists, and optionally their password, captcha token, or passkey"""
143        current_stage: IdentificationStage = self.stage.executor.current_stage
144        client_ip = ClientIPMiddleware.get_client_ip(self.stage.request)
145
146        # Check if this is a passkey authentication
147        passkey = attrs.get("passkey")
148        if passkey:
149            device = self._validate_passkey_response(passkey)
150            self.passkey_device = device
151            self.pre_user = device.user
152            # Set backend so password stage policy knows user is already authenticated
153            self.pre_user.backend = class_to_path(IdentificationStageView)
154            return attrs
155
156        # Standard username/password flow
157        uid_field = attrs.get("uid_field")
158        if not uid_field:
159            raise ValidationError(_("No identification data provided."))
160
161        pre_user = self.stage.get_user(uid_field)
162        if not pre_user:
163            with start_span(
164                op="authentik.stages.identification.validate_invalid_wait",
165                name="Sleep random time on invalid user identifier",
166            ):
167                # hash a random password on invalid identifier, same as with a valid identifier
168                make_password(make_password(None))
169            # Log in a similar format to Event.new(), but we don't want to create an event here
170            # as this stage is mostly used by unauthenticated users with very high rate limits
171            self.stage.logger.info(
172                "invalid_login",
173                identifier=uid_field,
174                client_ip=client_ip,
175                action="invalid_identifier",
176                context={
177                    "stage": sanitize_item(self.stage),
178                },
179            )
180            identification_failed.send(sender=self, request=self.stage.request, uid_field=uid_field)
181            # We set the pending_user even on failure so it's part of the context, even
182            # when the input is invalid
183            # This is so its part of the current flow plan, and on flow restart can be kept, and
184            # policies can be applied.
185            self.stage.executor.plan.context[PLAN_CONTEXT_PENDING_USER] = User(
186                username=uid_field,
187                email=uid_field,
188            )
189            self.pre_user = self.stage.executor.plan.context[PLAN_CONTEXT_PENDING_USER]
190            if not current_stage.show_matched_user:
191                self.stage.executor.plan.context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER] = uid_field
192            # when `pretend` is enabled, continue regardless
193            if current_stage.pretend_user_exists and not current_stage.password_stage:
194                return attrs
195            raise ValidationError(_("Failed to authenticate."))
196        self.pre_user = pre_user
197
198        # Captcha check
199        if captcha_stage := current_stage.captcha_stage:
200            captcha_token = attrs.get("captcha_token", None)
201            if not captcha_token:
202                self.stage.logger.warning("Token not set for captcha attempt")
203            try:
204                verify_captcha_token(
205                    captcha_stage,
206                    captcha_token,
207                    client_ip,
208                    key=self.stage.executor.plan.context.get(PLAN_CONTEXT_CAPTCHA_PRIVATE_KEY),
209                )
210            except ValidationError:
211                raise ValidationError(_("Failed to authenticate.")) from None
212
213        # Password check
214        if not current_stage.password_stage:
215            # No password stage select, don't validate the password
216            return attrs
217
218        password = attrs.get("password", None)
219        if not password:
220            self.stage.logger.warning("Password not set for ident+auth attempt")
221        try:
222            with start_span(
223                op="authentik.stages.identification.authenticate",
224                name="User authenticate call (combo stage)",
225            ):
226                user = authenticate(
227                    self.stage.request,
228                    current_stage.password_stage.backends,
229                    current_stage,
230                    username=self.pre_user.username,
231                    password=password,
232                )
233            if not user:
234                raise ValidationError(_("Failed to authenticate."))
235            self.pre_user = user
236        except PermissionDenied as exc:
237            raise ValidationError(str(exc)) from exc
238        return attrs
239
240
241class IdentificationStageView(ChallengeStageView):
242    """Form to identify the user"""
243
244    response_class = IdentificationChallengeResponse
245
246    def get_user(self, uid_value: str) -> User | None:
247        """Find user instance. Returns None if no user was found."""
248        current_stage: IdentificationStage = self.executor.current_stage
249        query = Q()
250        for search_field in current_stage.user_fields:
251            model_field = {
252                "email": "email",
253                "username": "username",
254                "upn": "attributes__upn",
255            }[search_field]
256            if current_stage.case_insensitive_matching:
257                model_field += "__iexact"
258            else:
259                model_field += "__exact"
260            query |= Q(**{model_field: uid_value})
261        if not query:
262            self.logger.debug("Empty user query", query=query)
263            return None
264        user = User.objects.filter(query).first()
265        if user:
266            self.logger.debug("Found user", user=user.username, query=query)
267            return user
268        return None
269
270    def get_primary_action(self) -> str:
271        """Get the primary action label for this stage"""
272        if self.executor.flow.designation == FlowDesignation.AUTHENTICATION:
273            return _("Log in")
274        return _("Continue")
275
276    def get_passkey_challenge(self) -> dict | None:
277        """Generate a WebAuthn challenge for passkey/conditional UI authentication"""
278        # Refresh from DB to get the latest configuration
279        current_stage: IdentificationStage = IdentificationStage.objects.get(
280            pk=self.executor.current_stage.pk
281        )
282        if not current_stage.webauthn_stage:
283            self.logger.debug("No webauthn_stage configured")
284            return None
285        challenge = get_webauthn_challenge_without_user(self, current_stage.webauthn_stage)
286        self.logger.debug("Generated passkey challenge", challenge=challenge)
287        return challenge
288
289    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
290        """Check for existing pending user identifier and skip stage if possible"""
291        current_stage: IdentificationStage = self.executor.current_stage
292        pending_user_identifier = self.executor.plan.context.get(
293            PLAN_CONTEXT_PENDING_USER_IDENTIFIER
294        )
295
296        if not pending_user_identifier:
297            return super().get(request, *args, **kwargs)
298
299        # Only skip if this is a "simple" identification stage with no extra features
300        can_skip = (
301            not current_stage.password_stage
302            and not current_stage.captcha_stage
303            and not current_stage.webauthn_stage
304            and not self.executor.current_binding.policies.exists()
305        )
306
307        if can_skip:
308            # Use the normal validation flow (handles timing protection, logging, signals)
309            response = IdentificationChallengeResponse(
310                data={"uid_field": pending_user_identifier},
311                stage=self,
312            )
313            if response.is_valid():
314                return self.challenge_valid(response)
315            # Validation failed (user doesn't exist and pretend_user_exists is off)
316            # Don't pre-fill invalid username, fall through to show the challenge
317            self.executor.plan.context.pop(PLAN_CONTEXT_PENDING_USER_IDENTIFIER, None)
318
319        # Can't skip - just pre-fill the username field
320        return super().get(request, *args, **kwargs)
321
322    def get_challenge(self) -> Challenge:
323        current_stage: IdentificationStage = self.executor.current_stage
324        challenge = IdentificationChallenge(
325            data={
326                "component": "ak-stage-identification",
327                "primary_action": self.get_primary_action(),
328                "user_fields": current_stage.user_fields,
329                "password_fields": bool(current_stage.password_stage),
330                "captcha_stage": (
331                    {
332                        "js_url": current_stage.captcha_stage.js_url,
333                        "site_key": current_stage.captcha_stage.public_key,
334                        "interactive": current_stage.captcha_stage.interactive,
335                        "pending_user": "",
336                        "pending_user_avatar": DEFAULT_AVATAR,
337                    }
338                    if current_stage.captcha_stage
339                    else None
340                ),
341                "allow_show_password": bool(current_stage.password_stage)
342                and current_stage.password_stage.allow_show_password,
343                "show_source_labels": current_stage.show_source_labels,
344                "flow_designation": self.executor.flow.designation,
345                "enable_remember_me": current_stage.enable_remember_me,
346                "passkey_challenge": self.get_passkey_challenge(),
347            }
348        )
349        # If the user has been redirected to us whilst trying to access an
350        # application, PLAN_CONTEXT_APPLICATION is set in the flow plan
351        if PLAN_CONTEXT_APPLICATION in self.executor.plan.context:
352            app: Application = self.executor.plan.context.get(
353                PLAN_CONTEXT_APPLICATION, Application()
354            )
355            challenge.initial_data["application_pre"] = app.name
356            if launch_url := app.get_launch_url():
357                challenge.initial_data["application_pre_launch"] = launch_url
358        if (
359            PLAN_CONTEXT_DEVICE in self.executor.plan.context
360            and PLAN_CONTEXT_DEVICE_AUTH_TOKEN in self.executor.plan.context
361        ):
362            challenge.initial_data["application_pre"] = self.executor.plan.context.get(
363                PLAN_CONTEXT_DEVICE, Device()
364            ).name
365        get_qs = self.request.session.get(SESSION_KEY_GET, self.request.GET)
366        # Check for related enrollment and recovery flow, add URL to view
367        if current_stage.enrollment_flow:
368            challenge.initial_data["enroll_url"] = reverse_with_qs(
369                "authentik_core:if-flow",
370                query=get_qs,
371                kwargs={"flow_slug": current_stage.enrollment_flow.slug},
372            )
373        if current_stage.recovery_flow:
374            challenge.initial_data["recovery_url"] = reverse_with_qs(
375                "authentik_core:if-flow",
376                query=get_qs,
377                kwargs={"flow_slug": current_stage.recovery_flow.slug},
378            )
379        if current_stage.passwordless_flow:
380            challenge.initial_data["passwordless_url"] = reverse_with_qs(
381                "authentik_core:if-flow",
382                query=get_qs,
383                kwargs={"flow_slug": current_stage.passwordless_flow.slug},
384            )
385
386        # Check all enabled source, add them if they have a UI Login button.
387        ui_sources = []
388        sources: list[Source] = (
389            current_stage.sources.filter(enabled=True).order_by("name").select_subclasses()
390        )
391        for source in sources:
392            ui_login_button = source.ui_login_button(self.request)
393            if ui_login_button:
394                button = asdict(ui_login_button)
395                source_challenge = ui_login_button.challenge
396                source_challenge.is_valid()
397                button["challenge"] = source_challenge.data
398                ui_sources.append(button)
399        challenge.initial_data["sources"] = ui_sources
400
401        # Pre-fill username from login_hint unless user clicked "Not you?"
402        if prefill := self.executor.plan.context.get(PLAN_CONTEXT_PENDING_USER_IDENTIFIER):
403            challenge.initial_data["pending_user_identifier"] = prefill
404
405        return challenge
406
407    def challenge_valid(self, response: IdentificationChallengeResponse) -> HttpResponse:
408        self.executor.plan.context[PLAN_CONTEXT_PENDING_USER] = response.pre_user
409        current_stage: IdentificationStage = self.executor.current_stage
410
411        # Handle passkey authentication
412        if response.passkey_device:
413            self.logger.debug("Passkey authentication successful", user=response.pre_user)
414            self.executor.plan.context[PLAN_CONTEXT_METHOD] = "auth_webauthn_pwl"
415            self.executor.plan.context.setdefault(PLAN_CONTEXT_METHOD_ARGS, {})
416            self.executor.plan.context[PLAN_CONTEXT_METHOD_ARGS].update(
417                {
418                    "device": response.passkey_device,
419                    "device_type": response.passkey_device.device_type,
420                }
421            )
422            # Update device last_used
423            with audit_ignore():
424                response.passkey_device.last_used = now()
425                response.passkey_device.save()
426            return self.executor.stage_ok()
427
428        if not current_stage.show_matched_user:
429            self.executor.plan.context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER] = (
430                response.validated_data.get("uid_field")
431            )
432        return self.executor.stage_ok()
class LoginChallengeMixin:
60class LoginChallengeMixin:
61    """Base login challenge for Identification stage"""

Base login challenge for Identification stage

def get_login_serializers():
64def get_login_serializers():
65    mapping = {
66        RedirectChallenge().fields["component"].default: RedirectChallenge,
67    }
68    for cls in all_subclasses(LoginChallengeMixin):
69        mapping[cls().fields["component"].default] = cls
70    return mapping
@extend_schema_field(PolymorphicProxySerializer(component_name='LoginChallengeTypes', serializers=get_login_serializers, resource_type_field_name='component'))
class ChallengeDictWrapper(rest_framework.fields.DictField):
73@extend_schema_field(
74    PolymorphicProxySerializer(
75        component_name="LoginChallengeTypes",
76        serializers=get_login_serializers,
77        resource_type_field_name="component",
78    )
79)
80class ChallengeDictWrapper(DictField):
81    """Wrapper around DictField that annotates itself as challenge proxy"""

Wrapper around DictField that annotates itself as challenge proxy

class LoginSourceSerializer(authentik.core.api.utils.PassiveSerializer):
84class LoginSourceSerializer(PassiveSerializer):
85    """Serializer for Login buttons of sources"""
86
87    name = CharField()
88    icon_url = CharField(required=False, allow_null=True)
89    promoted = BooleanField(default=False)
90
91    challenge = ChallengeDictWrapper()

Serializer for Login buttons of sources

name
icon_url
promoted
challenge
class IdentificationChallenge(authentik.flows.challenge.Challenge):
 94class IdentificationChallenge(Challenge):
 95    """Identification challenges with all UI elements"""
 96
 97    user_fields = ListField(child=CharField(), allow_empty=True, allow_null=True)
 98    pending_user_identifier = CharField(required=False, allow_null=True)
 99
100    password_fields = BooleanField()
101    allow_show_password = BooleanField(default=False)
102    application_pre = CharField(required=False)
103    application_pre_launch = CharField(required=False)
104    flow_designation = ChoiceField(FlowDesignation.choices)
105    captcha_stage = CaptchaChallenge(required=False, allow_null=True)
106
107    enroll_url = CharField(required=False)
108    recovery_url = CharField(required=False)
109    passwordless_url = CharField(required=False)
110    primary_action = CharField()
111    sources = LoginSourceSerializer(many=True, required=False)
112    show_source_labels = BooleanField()
113    enable_remember_me = BooleanField(required=False, default=True)
114
115    passkey_challenge = JSONDictField(required=False, allow_null=True)
116
117    component = CharField(default="ak-stage-identification")

Identification challenges with all UI elements

user_fields
pending_user_identifier
password_fields
allow_show_password
application_pre
application_pre_launch
flow_designation
captcha_stage
enroll_url
recovery_url
passwordless_url
primary_action
sources
show_source_labels
enable_remember_me
passkey_challenge
component
class IdentificationChallengeResponse(authentik.flows.challenge.ChallengeResponse):
120class IdentificationChallengeResponse(ChallengeResponse):
121    """Identification challenge"""
122
123    uid_field = CharField(required=False, allow_blank=True, allow_null=True)
124    password = CharField(required=False, allow_blank=True, allow_null=True)
125    captcha_token = CharField(required=False, allow_blank=True, allow_null=True)
126    passkey = JSONDictField(required=False, allow_null=True)
127    component = CharField(default="ak-stage-identification")
128
129    pre_user: User | None = None
130    passkey_device: WebAuthnDevice | None = None
131
132    def _validate_passkey_response(self, passkey: dict) -> WebAuthnDevice:
133        """Validate passkey/WebAuthn response for passwordless authentication"""
134        # Get the webauthn_stage from the current IdentificationStage
135        current_stage: IdentificationStage = IdentificationStage.objects.get(
136            pk=self.stage.executor.current_stage.pk
137        )
138        return validate_challenge_webauthn(
139            passkey, self.stage, self.stage.get_pending_user(), current_stage.webauthn_stage
140        )
141
142    def validate(self, attrs: dict[str, Any]) -> dict[str, Any]:
143        """Validate that user exists, and optionally their password, captcha token, or passkey"""
144        current_stage: IdentificationStage = self.stage.executor.current_stage
145        client_ip = ClientIPMiddleware.get_client_ip(self.stage.request)
146
147        # Check if this is a passkey authentication
148        passkey = attrs.get("passkey")
149        if passkey:
150            device = self._validate_passkey_response(passkey)
151            self.passkey_device = device
152            self.pre_user = device.user
153            # Set backend so password stage policy knows user is already authenticated
154            self.pre_user.backend = class_to_path(IdentificationStageView)
155            return attrs
156
157        # Standard username/password flow
158        uid_field = attrs.get("uid_field")
159        if not uid_field:
160            raise ValidationError(_("No identification data provided."))
161
162        pre_user = self.stage.get_user(uid_field)
163        if not pre_user:
164            with start_span(
165                op="authentik.stages.identification.validate_invalid_wait",
166                name="Sleep random time on invalid user identifier",
167            ):
168                # hash a random password on invalid identifier, same as with a valid identifier
169                make_password(make_password(None))
170            # Log in a similar format to Event.new(), but we don't want to create an event here
171            # as this stage is mostly used by unauthenticated users with very high rate limits
172            self.stage.logger.info(
173                "invalid_login",
174                identifier=uid_field,
175                client_ip=client_ip,
176                action="invalid_identifier",
177                context={
178                    "stage": sanitize_item(self.stage),
179                },
180            )
181            identification_failed.send(sender=self, request=self.stage.request, uid_field=uid_field)
182            # We set the pending_user even on failure so it's part of the context, even
183            # when the input is invalid
184            # This is so its part of the current flow plan, and on flow restart can be kept, and
185            # policies can be applied.
186            self.stage.executor.plan.context[PLAN_CONTEXT_PENDING_USER] = User(
187                username=uid_field,
188                email=uid_field,
189            )
190            self.pre_user = self.stage.executor.plan.context[PLAN_CONTEXT_PENDING_USER]
191            if not current_stage.show_matched_user:
192                self.stage.executor.plan.context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER] = uid_field
193            # when `pretend` is enabled, continue regardless
194            if current_stage.pretend_user_exists and not current_stage.password_stage:
195                return attrs
196            raise ValidationError(_("Failed to authenticate."))
197        self.pre_user = pre_user
198
199        # Captcha check
200        if captcha_stage := current_stage.captcha_stage:
201            captcha_token = attrs.get("captcha_token", None)
202            if not captcha_token:
203                self.stage.logger.warning("Token not set for captcha attempt")
204            try:
205                verify_captcha_token(
206                    captcha_stage,
207                    captcha_token,
208                    client_ip,
209                    key=self.stage.executor.plan.context.get(PLAN_CONTEXT_CAPTCHA_PRIVATE_KEY),
210                )
211            except ValidationError:
212                raise ValidationError(_("Failed to authenticate.")) from None
213
214        # Password check
215        if not current_stage.password_stage:
216            # No password stage select, don't validate the password
217            return attrs
218
219        password = attrs.get("password", None)
220        if not password:
221            self.stage.logger.warning("Password not set for ident+auth attempt")
222        try:
223            with start_span(
224                op="authentik.stages.identification.authenticate",
225                name="User authenticate call (combo stage)",
226            ):
227                user = authenticate(
228                    self.stage.request,
229                    current_stage.password_stage.backends,
230                    current_stage,
231                    username=self.pre_user.username,
232                    password=password,
233                )
234            if not user:
235                raise ValidationError(_("Failed to authenticate."))
236            self.pre_user = user
237        except PermissionDenied as exc:
238            raise ValidationError(str(exc)) from exc
239        return attrs

Identification challenge

uid_field
password
captcha_token
passkey
component
pre_user: authentik.core.models.User | None = None
def validate(self, attrs: dict[str, typing.Any]) -> dict[str, typing.Any]:
142    def validate(self, attrs: dict[str, Any]) -> dict[str, Any]:
143        """Validate that user exists, and optionally their password, captcha token, or passkey"""
144        current_stage: IdentificationStage = self.stage.executor.current_stage
145        client_ip = ClientIPMiddleware.get_client_ip(self.stage.request)
146
147        # Check if this is a passkey authentication
148        passkey = attrs.get("passkey")
149        if passkey:
150            device = self._validate_passkey_response(passkey)
151            self.passkey_device = device
152            self.pre_user = device.user
153            # Set backend so password stage policy knows user is already authenticated
154            self.pre_user.backend = class_to_path(IdentificationStageView)
155            return attrs
156
157        # Standard username/password flow
158        uid_field = attrs.get("uid_field")
159        if not uid_field:
160            raise ValidationError(_("No identification data provided."))
161
162        pre_user = self.stage.get_user(uid_field)
163        if not pre_user:
164            with start_span(
165                op="authentik.stages.identification.validate_invalid_wait",
166                name="Sleep random time on invalid user identifier",
167            ):
168                # hash a random password on invalid identifier, same as with a valid identifier
169                make_password(make_password(None))
170            # Log in a similar format to Event.new(), but we don't want to create an event here
171            # as this stage is mostly used by unauthenticated users with very high rate limits
172            self.stage.logger.info(
173                "invalid_login",
174                identifier=uid_field,
175                client_ip=client_ip,
176                action="invalid_identifier",
177                context={
178                    "stage": sanitize_item(self.stage),
179                },
180            )
181            identification_failed.send(sender=self, request=self.stage.request, uid_field=uid_field)
182            # We set the pending_user even on failure so it's part of the context, even
183            # when the input is invalid
184            # This is so its part of the current flow plan, and on flow restart can be kept, and
185            # policies can be applied.
186            self.stage.executor.plan.context[PLAN_CONTEXT_PENDING_USER] = User(
187                username=uid_field,
188                email=uid_field,
189            )
190            self.pre_user = self.stage.executor.plan.context[PLAN_CONTEXT_PENDING_USER]
191            if not current_stage.show_matched_user:
192                self.stage.executor.plan.context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER] = uid_field
193            # when `pretend` is enabled, continue regardless
194            if current_stage.pretend_user_exists and not current_stage.password_stage:
195                return attrs
196            raise ValidationError(_("Failed to authenticate."))
197        self.pre_user = pre_user
198
199        # Captcha check
200        if captcha_stage := current_stage.captcha_stage:
201            captcha_token = attrs.get("captcha_token", None)
202            if not captcha_token:
203                self.stage.logger.warning("Token not set for captcha attempt")
204            try:
205                verify_captcha_token(
206                    captcha_stage,
207                    captcha_token,
208                    client_ip,
209                    key=self.stage.executor.plan.context.get(PLAN_CONTEXT_CAPTCHA_PRIVATE_KEY),
210                )
211            except ValidationError:
212                raise ValidationError(_("Failed to authenticate.")) from None
213
214        # Password check
215        if not current_stage.password_stage:
216            # No password stage select, don't validate the password
217            return attrs
218
219        password = attrs.get("password", None)
220        if not password:
221            self.stage.logger.warning("Password not set for ident+auth attempt")
222        try:
223            with start_span(
224                op="authentik.stages.identification.authenticate",
225                name="User authenticate call (combo stage)",
226            ):
227                user = authenticate(
228                    self.stage.request,
229                    current_stage.password_stage.backends,
230                    current_stage,
231                    username=self.pre_user.username,
232                    password=password,
233                )
234            if not user:
235                raise ValidationError(_("Failed to authenticate."))
236            self.pre_user = user
237        except PermissionDenied as exc:
238            raise ValidationError(str(exc)) from exc
239        return attrs

Validate that user exists, and optionally their password, captcha token, or passkey

class IdentificationStageView(authentik.flows.stage.ChallengeStageView):
242class IdentificationStageView(ChallengeStageView):
243    """Form to identify the user"""
244
245    response_class = IdentificationChallengeResponse
246
247    def get_user(self, uid_value: str) -> User | None:
248        """Find user instance. Returns None if no user was found."""
249        current_stage: IdentificationStage = self.executor.current_stage
250        query = Q()
251        for search_field in current_stage.user_fields:
252            model_field = {
253                "email": "email",
254                "username": "username",
255                "upn": "attributes__upn",
256            }[search_field]
257            if current_stage.case_insensitive_matching:
258                model_field += "__iexact"
259            else:
260                model_field += "__exact"
261            query |= Q(**{model_field: uid_value})
262        if not query:
263            self.logger.debug("Empty user query", query=query)
264            return None
265        user = User.objects.filter(query).first()
266        if user:
267            self.logger.debug("Found user", user=user.username, query=query)
268            return user
269        return None
270
271    def get_primary_action(self) -> str:
272        """Get the primary action label for this stage"""
273        if self.executor.flow.designation == FlowDesignation.AUTHENTICATION:
274            return _("Log in")
275        return _("Continue")
276
277    def get_passkey_challenge(self) -> dict | None:
278        """Generate a WebAuthn challenge for passkey/conditional UI authentication"""
279        # Refresh from DB to get the latest configuration
280        current_stage: IdentificationStage = IdentificationStage.objects.get(
281            pk=self.executor.current_stage.pk
282        )
283        if not current_stage.webauthn_stage:
284            self.logger.debug("No webauthn_stage configured")
285            return None
286        challenge = get_webauthn_challenge_without_user(self, current_stage.webauthn_stage)
287        self.logger.debug("Generated passkey challenge", challenge=challenge)
288        return challenge
289
290    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
291        """Check for existing pending user identifier and skip stage if possible"""
292        current_stage: IdentificationStage = self.executor.current_stage
293        pending_user_identifier = self.executor.plan.context.get(
294            PLAN_CONTEXT_PENDING_USER_IDENTIFIER
295        )
296
297        if not pending_user_identifier:
298            return super().get(request, *args, **kwargs)
299
300        # Only skip if this is a "simple" identification stage with no extra features
301        can_skip = (
302            not current_stage.password_stage
303            and not current_stage.captcha_stage
304            and not current_stage.webauthn_stage
305            and not self.executor.current_binding.policies.exists()
306        )
307
308        if can_skip:
309            # Use the normal validation flow (handles timing protection, logging, signals)
310            response = IdentificationChallengeResponse(
311                data={"uid_field": pending_user_identifier},
312                stage=self,
313            )
314            if response.is_valid():
315                return self.challenge_valid(response)
316            # Validation failed (user doesn't exist and pretend_user_exists is off)
317            # Don't pre-fill invalid username, fall through to show the challenge
318            self.executor.plan.context.pop(PLAN_CONTEXT_PENDING_USER_IDENTIFIER, None)
319
320        # Can't skip - just pre-fill the username field
321        return super().get(request, *args, **kwargs)
322
323    def get_challenge(self) -> Challenge:
324        current_stage: IdentificationStage = self.executor.current_stage
325        challenge = IdentificationChallenge(
326            data={
327                "component": "ak-stage-identification",
328                "primary_action": self.get_primary_action(),
329                "user_fields": current_stage.user_fields,
330                "password_fields": bool(current_stage.password_stage),
331                "captcha_stage": (
332                    {
333                        "js_url": current_stage.captcha_stage.js_url,
334                        "site_key": current_stage.captcha_stage.public_key,
335                        "interactive": current_stage.captcha_stage.interactive,
336                        "pending_user": "",
337                        "pending_user_avatar": DEFAULT_AVATAR,
338                    }
339                    if current_stage.captcha_stage
340                    else None
341                ),
342                "allow_show_password": bool(current_stage.password_stage)
343                and current_stage.password_stage.allow_show_password,
344                "show_source_labels": current_stage.show_source_labels,
345                "flow_designation": self.executor.flow.designation,
346                "enable_remember_me": current_stage.enable_remember_me,
347                "passkey_challenge": self.get_passkey_challenge(),
348            }
349        )
350        # If the user has been redirected to us whilst trying to access an
351        # application, PLAN_CONTEXT_APPLICATION is set in the flow plan
352        if PLAN_CONTEXT_APPLICATION in self.executor.plan.context:
353            app: Application = self.executor.plan.context.get(
354                PLAN_CONTEXT_APPLICATION, Application()
355            )
356            challenge.initial_data["application_pre"] = app.name
357            if launch_url := app.get_launch_url():
358                challenge.initial_data["application_pre_launch"] = launch_url
359        if (
360            PLAN_CONTEXT_DEVICE in self.executor.plan.context
361            and PLAN_CONTEXT_DEVICE_AUTH_TOKEN in self.executor.plan.context
362        ):
363            challenge.initial_data["application_pre"] = self.executor.plan.context.get(
364                PLAN_CONTEXT_DEVICE, Device()
365            ).name
366        get_qs = self.request.session.get(SESSION_KEY_GET, self.request.GET)
367        # Check for related enrollment and recovery flow, add URL to view
368        if current_stage.enrollment_flow:
369            challenge.initial_data["enroll_url"] = reverse_with_qs(
370                "authentik_core:if-flow",
371                query=get_qs,
372                kwargs={"flow_slug": current_stage.enrollment_flow.slug},
373            )
374        if current_stage.recovery_flow:
375            challenge.initial_data["recovery_url"] = reverse_with_qs(
376                "authentik_core:if-flow",
377                query=get_qs,
378                kwargs={"flow_slug": current_stage.recovery_flow.slug},
379            )
380        if current_stage.passwordless_flow:
381            challenge.initial_data["passwordless_url"] = reverse_with_qs(
382                "authentik_core:if-flow",
383                query=get_qs,
384                kwargs={"flow_slug": current_stage.passwordless_flow.slug},
385            )
386
387        # Check all enabled source, add them if they have a UI Login button.
388        ui_sources = []
389        sources: list[Source] = (
390            current_stage.sources.filter(enabled=True).order_by("name").select_subclasses()
391        )
392        for source in sources:
393            ui_login_button = source.ui_login_button(self.request)
394            if ui_login_button:
395                button = asdict(ui_login_button)
396                source_challenge = ui_login_button.challenge
397                source_challenge.is_valid()
398                button["challenge"] = source_challenge.data
399                ui_sources.append(button)
400        challenge.initial_data["sources"] = ui_sources
401
402        # Pre-fill username from login_hint unless user clicked "Not you?"
403        if prefill := self.executor.plan.context.get(PLAN_CONTEXT_PENDING_USER_IDENTIFIER):
404            challenge.initial_data["pending_user_identifier"] = prefill
405
406        return challenge
407
408    def challenge_valid(self, response: IdentificationChallengeResponse) -> HttpResponse:
409        self.executor.plan.context[PLAN_CONTEXT_PENDING_USER] = response.pre_user
410        current_stage: IdentificationStage = self.executor.current_stage
411
412        # Handle passkey authentication
413        if response.passkey_device:
414            self.logger.debug("Passkey authentication successful", user=response.pre_user)
415            self.executor.plan.context[PLAN_CONTEXT_METHOD] = "auth_webauthn_pwl"
416            self.executor.plan.context.setdefault(PLAN_CONTEXT_METHOD_ARGS, {})
417            self.executor.plan.context[PLAN_CONTEXT_METHOD_ARGS].update(
418                {
419                    "device": response.passkey_device,
420                    "device_type": response.passkey_device.device_type,
421                }
422            )
423            # Update device last_used
424            with audit_ignore():
425                response.passkey_device.last_used = now()
426                response.passkey_device.save()
427            return self.executor.stage_ok()
428
429        if not current_stage.show_matched_user:
430            self.executor.plan.context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER] = (
431                response.validated_data.get("uid_field")
432            )
433        return self.executor.stage_ok()

Form to identify the user

response_class = <class 'IdentificationChallengeResponse'>
def get_user(self, uid_value: str) -> authentik.core.models.User | None:
247    def get_user(self, uid_value: str) -> User | None:
248        """Find user instance. Returns None if no user was found."""
249        current_stage: IdentificationStage = self.executor.current_stage
250        query = Q()
251        for search_field in current_stage.user_fields:
252            model_field = {
253                "email": "email",
254                "username": "username",
255                "upn": "attributes__upn",
256            }[search_field]
257            if current_stage.case_insensitive_matching:
258                model_field += "__iexact"
259            else:
260                model_field += "__exact"
261            query |= Q(**{model_field: uid_value})
262        if not query:
263            self.logger.debug("Empty user query", query=query)
264            return None
265        user = User.objects.filter(query).first()
266        if user:
267            self.logger.debug("Found user", user=user.username, query=query)
268            return user
269        return None

Find user instance. Returns None if no user was found.

def get_primary_action(self) -> str:
271    def get_primary_action(self) -> str:
272        """Get the primary action label for this stage"""
273        if self.executor.flow.designation == FlowDesignation.AUTHENTICATION:
274            return _("Log in")
275        return _("Continue")

Get the primary action label for this stage

def get_passkey_challenge(self) -> dict | None:
277    def get_passkey_challenge(self) -> dict | None:
278        """Generate a WebAuthn challenge for passkey/conditional UI authentication"""
279        # Refresh from DB to get the latest configuration
280        current_stage: IdentificationStage = IdentificationStage.objects.get(
281            pk=self.executor.current_stage.pk
282        )
283        if not current_stage.webauthn_stage:
284            self.logger.debug("No webauthn_stage configured")
285            return None
286        challenge = get_webauthn_challenge_without_user(self, current_stage.webauthn_stage)
287        self.logger.debug("Generated passkey challenge", challenge=challenge)
288        return challenge

Generate a WebAuthn challenge for passkey/conditional UI authentication

def get( self, request: django.http.request.HttpRequest, *args, **kwargs) -> django.http.response.HttpResponse:
290    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
291        """Check for existing pending user identifier and skip stage if possible"""
292        current_stage: IdentificationStage = self.executor.current_stage
293        pending_user_identifier = self.executor.plan.context.get(
294            PLAN_CONTEXT_PENDING_USER_IDENTIFIER
295        )
296
297        if not pending_user_identifier:
298            return super().get(request, *args, **kwargs)
299
300        # Only skip if this is a "simple" identification stage with no extra features
301        can_skip = (
302            not current_stage.password_stage
303            and not current_stage.captcha_stage
304            and not current_stage.webauthn_stage
305            and not self.executor.current_binding.policies.exists()
306        )
307
308        if can_skip:
309            # Use the normal validation flow (handles timing protection, logging, signals)
310            response = IdentificationChallengeResponse(
311                data={"uid_field": pending_user_identifier},
312                stage=self,
313            )
314            if response.is_valid():
315                return self.challenge_valid(response)
316            # Validation failed (user doesn't exist and pretend_user_exists is off)
317            # Don't pre-fill invalid username, fall through to show the challenge
318            self.executor.plan.context.pop(PLAN_CONTEXT_PENDING_USER_IDENTIFIER, None)
319
320        # Can't skip - just pre-fill the username field
321        return super().get(request, *args, **kwargs)

Check for existing pending user identifier and skip stage if possible

def get_challenge(self) -> authentik.flows.challenge.Challenge:
323    def get_challenge(self) -> Challenge:
324        current_stage: IdentificationStage = self.executor.current_stage
325        challenge = IdentificationChallenge(
326            data={
327                "component": "ak-stage-identification",
328                "primary_action": self.get_primary_action(),
329                "user_fields": current_stage.user_fields,
330                "password_fields": bool(current_stage.password_stage),
331                "captcha_stage": (
332                    {
333                        "js_url": current_stage.captcha_stage.js_url,
334                        "site_key": current_stage.captcha_stage.public_key,
335                        "interactive": current_stage.captcha_stage.interactive,
336                        "pending_user": "",
337                        "pending_user_avatar": DEFAULT_AVATAR,
338                    }
339                    if current_stage.captcha_stage
340                    else None
341                ),
342                "allow_show_password": bool(current_stage.password_stage)
343                and current_stage.password_stage.allow_show_password,
344                "show_source_labels": current_stage.show_source_labels,
345                "flow_designation": self.executor.flow.designation,
346                "enable_remember_me": current_stage.enable_remember_me,
347                "passkey_challenge": self.get_passkey_challenge(),
348            }
349        )
350        # If the user has been redirected to us whilst trying to access an
351        # application, PLAN_CONTEXT_APPLICATION is set in the flow plan
352        if PLAN_CONTEXT_APPLICATION in self.executor.plan.context:
353            app: Application = self.executor.plan.context.get(
354                PLAN_CONTEXT_APPLICATION, Application()
355            )
356            challenge.initial_data["application_pre"] = app.name
357            if launch_url := app.get_launch_url():
358                challenge.initial_data["application_pre_launch"] = launch_url
359        if (
360            PLAN_CONTEXT_DEVICE in self.executor.plan.context
361            and PLAN_CONTEXT_DEVICE_AUTH_TOKEN in self.executor.plan.context
362        ):
363            challenge.initial_data["application_pre"] = self.executor.plan.context.get(
364                PLAN_CONTEXT_DEVICE, Device()
365            ).name
366        get_qs = self.request.session.get(SESSION_KEY_GET, self.request.GET)
367        # Check for related enrollment and recovery flow, add URL to view
368        if current_stage.enrollment_flow:
369            challenge.initial_data["enroll_url"] = reverse_with_qs(
370                "authentik_core:if-flow",
371                query=get_qs,
372                kwargs={"flow_slug": current_stage.enrollment_flow.slug},
373            )
374        if current_stage.recovery_flow:
375            challenge.initial_data["recovery_url"] = reverse_with_qs(
376                "authentik_core:if-flow",
377                query=get_qs,
378                kwargs={"flow_slug": current_stage.recovery_flow.slug},
379            )
380        if current_stage.passwordless_flow:
381            challenge.initial_data["passwordless_url"] = reverse_with_qs(
382                "authentik_core:if-flow",
383                query=get_qs,
384                kwargs={"flow_slug": current_stage.passwordless_flow.slug},
385            )
386
387        # Check all enabled source, add them if they have a UI Login button.
388        ui_sources = []
389        sources: list[Source] = (
390            current_stage.sources.filter(enabled=True).order_by("name").select_subclasses()
391        )
392        for source in sources:
393            ui_login_button = source.ui_login_button(self.request)
394            if ui_login_button:
395                button = asdict(ui_login_button)
396                source_challenge = ui_login_button.challenge
397                source_challenge.is_valid()
398                button["challenge"] = source_challenge.data
399                ui_sources.append(button)
400        challenge.initial_data["sources"] = ui_sources
401
402        # Pre-fill username from login_hint unless user clicked "Not you?"
403        if prefill := self.executor.plan.context.get(PLAN_CONTEXT_PENDING_USER_IDENTIFIER):
404            challenge.initial_data["pending_user_identifier"] = prefill
405
406        return challenge

Return the challenge that the client should solve

def challenge_valid( self, response: IdentificationChallengeResponse) -> django.http.response.HttpResponse:
408    def challenge_valid(self, response: IdentificationChallengeResponse) -> HttpResponse:
409        self.executor.plan.context[PLAN_CONTEXT_PENDING_USER] = response.pre_user
410        current_stage: IdentificationStage = self.executor.current_stage
411
412        # Handle passkey authentication
413        if response.passkey_device:
414            self.logger.debug("Passkey authentication successful", user=response.pre_user)
415            self.executor.plan.context[PLAN_CONTEXT_METHOD] = "auth_webauthn_pwl"
416            self.executor.plan.context.setdefault(PLAN_CONTEXT_METHOD_ARGS, {})
417            self.executor.plan.context[PLAN_CONTEXT_METHOD_ARGS].update(
418                {
419                    "device": response.passkey_device,
420                    "device_type": response.passkey_device.device_type,
421                }
422            )
423            # Update device last_used
424            with audit_ignore():
425                response.passkey_device.last_used = now()
426                response.passkey_device.save()
427            return self.executor.stage_ok()
428
429        if not current_stage.show_matched_user:
430            self.executor.plan.context[PLAN_CONTEXT_PENDING_USER_IDENTIFIER] = (
431                response.validated_data.get("uid_field")
432            )
433        return self.executor.stage_ok()

Callback when the challenge has the correct format