authentik.stages.authenticator_validate.stage

Authenticator Validation

  1"""Authenticator Validation"""
  2
  3from datetime import datetime
  4from hashlib import sha256
  5
  6from django.conf import settings
  7from django.http import HttpRequest, HttpResponse
  8from django.utils.timezone import now
  9from django.utils.translation import gettext_lazy as _
 10from jwt import PyJWTError, decode, encode
 11from rest_framework.fields import CharField, IntegerField, ListField, UUIDField
 12from rest_framework.serializers import ValidationError
 13
 14from authentik.core.api.utils import JSONDictField, PassiveSerializer
 15from authentik.core.models import User
 16from authentik.events.middleware import audit_ignore
 17from authentik.events.models import Event, EventAction
 18from authentik.flows.challenge import ChallengeResponse, WithUserInfoChallenge
 19from authentik.flows.exceptions import FlowSkipStageException, StageInvalidException
 20from authentik.flows.models import FlowDesignation, NotConfiguredAction, Stage
 21from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER
 22from authentik.flows.stage import ChallengeStageView
 23from authentik.lib.utils.time import timedelta_from_string
 24from authentik.policies.reputation.signals import update_score
 25from authentik.stages.authenticator import devices_for_user
 26from authentik.stages.authenticator.models import Device
 27from authentik.stages.authenticator_email.models import EmailDevice
 28from authentik.stages.authenticator_sms.models import SMSDevice
 29from authentik.stages.authenticator_validate.challenge import (
 30    DeviceChallenge,
 31    get_challenge_for_device,
 32    get_webauthn_challenge_without_user,
 33    select_challenge,
 34    validate_challenge_code,
 35    validate_challenge_duo,
 36    validate_challenge_webauthn,
 37)
 38from authentik.stages.authenticator_validate.models import AuthenticatorValidateStage, DeviceClasses
 39from authentik.stages.authenticator_webauthn.models import WebAuthnDevice
 40from authentik.stages.password.stage import PLAN_CONTEXT_METHOD, PLAN_CONTEXT_METHOD_ARGS
 41from authentik.tenants.utils import get_unique_identifier
 42
 43COOKIE_NAME_MFA = "authentik_mfa"
 44
 45PLAN_CONTEXT_STAGES = "goauthentik.io/stages/authenticator_validate/stages"
 46PLAN_CONTEXT_SELECTED_STAGE = "goauthentik.io/stages/authenticator_validate/selected_stage"
 47PLAN_CONTEXT_DEVICE_CHALLENGES = "goauthentik.io/stages/authenticator_validate/device_challenges"
 48
 49
 50class SelectableStageSerializer(PassiveSerializer):
 51    """Serializer for stages which can be selected by users"""
 52
 53    pk = UUIDField()
 54    name = CharField()
 55    verbose_name = CharField()
 56    meta_model_name = CharField()
 57
 58
 59class AuthenticatorValidationChallenge(WithUserInfoChallenge):
 60    """Authenticator challenge"""
 61
 62    device_challenges = ListField(child=DeviceChallenge())
 63    component = CharField(default="ak-stage-authenticator-validate")
 64    configuration_stages = ListField(child=SelectableStageSerializer())
 65
 66
 67class AuthenticatorValidationChallengeResponse(ChallengeResponse):
 68    """Challenge used for Code-based and WebAuthn authenticators"""
 69
 70    device: Device | None
 71
 72    selected_challenge = DeviceChallenge(required=False)
 73    selected_stage = CharField(required=False)
 74
 75    code = CharField(required=False)
 76    webauthn = JSONDictField(required=False)
 77    duo = IntegerField(required=False)
 78    component = CharField(default="ak-stage-authenticator-validate")
 79
 80    def _challenge_allowed(self, classes: list):
 81        device_challenges: list[dict] = self.stage.executor.plan.context.get(
 82            PLAN_CONTEXT_DEVICE_CHALLENGES, []
 83        )
 84        if not any(x["device_class"] in classes for x in device_challenges):
 85            raise ValidationError("No compatible device class allowed")
 86
 87    def validate_code(self, code: str) -> str:
 88        """Validate code-based response, raise error if code isn't allowed"""
 89        self._challenge_allowed(
 90            [DeviceClasses.TOTP, DeviceClasses.STATIC, DeviceClasses.SMS, DeviceClasses.EMAIL]
 91        )
 92        self.device = validate_challenge_code(code, self.stage, self.stage.get_pending_user())
 93        return code
 94
 95    def validate_webauthn(self, webauthn: dict) -> dict:
 96        """Validate webauthn response, raise error if webauthn wasn't allowed
 97        or response is invalid"""
 98        self._challenge_allowed([DeviceClasses.WEBAUTHN])
 99        self.device = validate_challenge_webauthn(
100            webauthn, self.stage, self.stage.get_pending_user()
101        )
102        return webauthn
103
104    def validate_duo(self, duo: int) -> int:
105        """Initiate Duo authentication"""
106        self._challenge_allowed([DeviceClasses.DUO])
107        self.device = validate_challenge_duo(duo, self.stage, self.stage.get_pending_user())
108        return duo
109
110    def validate_selected_challenge(self, challenge: dict) -> dict:
111        """Check which challenge the user has selected. Actual logic only used for SMS stage."""
112        # First check if the challenge is valid
113        allowed = False
114        for device_challenge in self.stage.executor.plan.context.get(
115            PLAN_CONTEXT_DEVICE_CHALLENGES, []
116        ):
117            if device_challenge.get("device_class", "") == challenge.get(
118                "device_class", ""
119            ) and device_challenge.get("device_uid", "") == challenge.get("device_uid", ""):
120                allowed = True
121        if not allowed:
122            raise ValidationError("invalid challenge selected")
123
124        device_class = challenge.get("device_class", "")
125        if device_class == "sms":
126            devices = SMSDevice.objects.filter(pk=int(challenge.get("device_uid", "0")))
127            if not devices.exists():
128                raise ValidationError("invalid challenge selected")
129            select_challenge(self.stage.request, devices.first())
130        elif device_class == "email":
131            devices = EmailDevice.objects.filter(pk=int(challenge.get("device_uid", "0")))
132            if not devices.exists():
133                raise ValidationError("invalid challenge selected")
134            select_challenge(self.stage.request, devices.first())
135        return challenge
136
137    def validate_selected_stage(self, stage_pk: str) -> str:
138        """Check that the selected stage is valid"""
139        stages = self.stage.executor.plan.context.get(PLAN_CONTEXT_STAGES, [])
140        if not any(str(stage.pk) == stage_pk for stage in stages):
141            raise ValidationError("Selected stage is invalid")
142        self.stage.logger.debug("Setting selected stage to ", stage=stage_pk)
143        self.stage.executor.plan.context[PLAN_CONTEXT_SELECTED_STAGE] = stage_pk
144        return stage_pk
145
146    def validate(self, attrs: dict):
147        # Checking if the given data is from a valid device class is done above
148        # Here we only check if the any data was sent at all
149        if "code" not in attrs and "webauthn" not in attrs and "duo" not in attrs:
150            raise ValidationError("Empty response")
151        self.stage.executor.plan.context.setdefault(PLAN_CONTEXT_METHOD, "auth_mfa")
152        self.stage.executor.plan.context.setdefault(PLAN_CONTEXT_METHOD_ARGS, {})
153        self.stage.executor.plan.context[PLAN_CONTEXT_METHOD_ARGS].setdefault("mfa_devices", [])
154        self.stage.executor.plan.context[PLAN_CONTEXT_METHOD_ARGS]["mfa_devices"].append(
155            self.device
156        )
157        with audit_ignore():
158            self.device.last_used = now()
159            self.device.save()
160        return attrs
161
162
163class AuthenticatorValidateStageView(ChallengeStageView):
164    """Authenticator Validation"""
165
166    response_class = AuthenticatorValidationChallengeResponse
167
168    def get_device_challenges(self) -> list[dict]:
169        """Get a list of all device challenges applicable for the current stage"""
170        challenges = []
171        pending_user = self.get_pending_user()
172        if pending_user.is_anonymous:
173            # We shouldn't get here without any kind of authentication data
174            raise StageInvalidException()
175        # When `pretend_user_exists` is enabled in the identification stage,
176        # `pending_user` will be a user model that isn't save to the DB
177        # hence it doesn't have a PK. In that case we just return an empty list of
178        # authenticators
179        if not pending_user.pk:
180            return []
181        # Convert to a list to have usable log output instead of just <generator ...>
182        user_devices = list(devices_for_user(self.get_pending_user()))
183        self.logger.debug("Got devices for user", devices=user_devices)
184
185        # static and totp are only shown once
186        # since their challenges are device-independent
187        seen_classes = []
188
189        stage: AuthenticatorValidateStage = self.executor.current_stage
190
191        threshold = timedelta_from_string(stage.last_auth_threshold)
192        allowed_devices = []
193
194        has_webauthn_filters_set = stage.webauthn_allowed_device_types.exists()
195
196        for device in user_devices:
197            device_class = device.__class__.__name__.lower().replace("device", "")
198            if device_class not in stage.device_classes:
199                self.logger.debug("device class not allowed", device_class=device_class)
200                continue
201            if isinstance(device, SMSDevice) and device.is_hashed:
202                self.logger.debug("Hashed SMS device, skipping", device=device)
203                continue
204            allowed_devices.append(device)
205            # Ignore WebAuthn devices which are not in the allowed types
206            if (
207                isinstance(device, WebAuthnDevice)
208                and device.device_type
209                and has_webauthn_filters_set
210            ):
211                if not stage.webauthn_allowed_device_types.filter(
212                    pk=device.device_type.pk
213                ).exists():
214                    self.logger.debug(
215                        "WebAuthn device type not allowed", device=device, type=device.device_type
216                    )
217                    continue
218            # Ensure only one challenge per device class
219            # WebAuthn does another device loop to find all WebAuthn devices
220            if device_class in seen_classes:
221                continue
222            if device_class not in seen_classes:
223                seen_classes.append(device_class)
224            challenge = DeviceChallenge(
225                data={
226                    "device_class": device_class,
227                    "device_uid": device.pk,
228                    "challenge": get_challenge_for_device(self, stage, device),
229                    "last_used": device.last_used,
230                }
231            )
232            challenge.is_valid()
233            challenges.append(challenge.data)
234            self.logger.debug("adding challenge for device", challenge=challenge)
235        # check if we have an MFA cookie and if it's valid
236        if threshold.total_seconds() > 0:
237            self.check_mfa_cookie(allowed_devices)
238        return challenges
239
240    def get_webauthn_challenge_without_user(self) -> list[dict]:
241        """Get a WebAuthn challenge when no pending user is set."""
242        challenge = DeviceChallenge(
243            data={
244                "device_class": DeviceClasses.WEBAUTHN,
245                "device_uid": -1,
246                "challenge": get_webauthn_challenge_without_user(
247                    self,
248                    self.executor.current_stage,
249                ),
250                "last_used": None,
251            }
252        )
253        challenge.is_valid()
254        return [challenge.data]
255
256    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:  # noqa: PLR0911
257        """Check if a user is set, and check if the user has any devices
258        if not, we can skip this entire stage"""
259        user = self.get_pending_user()
260        stage: AuthenticatorValidateStage = self.executor.current_stage
261        if user and not user.is_anonymous:
262            try:
263                challenges = self.get_device_challenges()
264            except FlowSkipStageException:
265                return self.executor.stage_ok()
266        else:
267            if self.executor.flow.designation != FlowDesignation.AUTHENTICATION:
268                self.logger.debug("Refusing passwordless flow in non-authentication flow")
269                return self.executor.stage_ok()
270            # Passwordless auth, with just webauthn
271            if DeviceClasses.WEBAUTHN in stage.device_classes:
272                self.logger.debug("Flow without user, getting generic webauthn challenge")
273                challenges = self.get_webauthn_challenge_without_user()
274            else:
275                self.logger.debug("No pending user, continuing")
276                return self.executor.stage_ok()
277        self.executor.plan.context[PLAN_CONTEXT_DEVICE_CHALLENGES] = challenges
278
279        # No allowed devices
280        if len(challenges) < 1:
281            if stage.not_configured_action == NotConfiguredAction.SKIP:
282                self.logger.debug("Authenticator not configured, skipping stage")
283                return self.executor.stage_ok()
284            if stage.not_configured_action == NotConfiguredAction.DENY:
285                self.logger.debug("Authenticator not configured, denying")
286                return self.executor.stage_invalid(_("No (allowed) MFA authenticator configured."))
287            if stage.not_configured_action == NotConfiguredAction.CONFIGURE:
288                self.logger.debug("Authenticator not configured, forcing configure")
289                return self.prepare_stages(user)
290        return super().get(request, *args, **kwargs)
291
292    def prepare_stages(self, user: User, *args, **kwargs) -> HttpResponse:
293        """Check how the user can configure themselves. If no stages are set, return an error.
294        If a single stage is set, insert that stage directly. If multiple are selected, include
295        them in the challenge."""
296        stage: AuthenticatorValidateStage = self.executor.current_stage
297        if not stage.configuration_stages.exists():
298            Event.new(
299                EventAction.CONFIGURATION_ERROR,
300                message=(
301                    "Authenticator validation stage is set to configure user "
302                    "but no configuration flow is set."
303                ),
304                stage=self,
305            ).from_http(self.request).set_user(user).save()
306            return self.executor.stage_invalid()
307        if stage.configuration_stages.count() == 1:
308            next_stage = Stage.objects.get_subclass(pk=stage.configuration_stages.first().pk)
309            self.logger.debug("Single stage configured, auto-selecting", stage=next_stage)
310            self.executor.plan.context[PLAN_CONTEXT_SELECTED_STAGE] = next_stage
311            # Because that normal execution only happens on post, we directly inject it here and
312            # return it
313            self.executor.plan.insert_stage(next_stage)
314            return self.executor.stage_ok()
315        stages = Stage.objects.filter(pk__in=stage.configuration_stages.all()).select_subclasses()
316        self.executor.plan.context[PLAN_CONTEXT_STAGES] = stages
317        return super().get(self.request, *args, **kwargs)
318
319    def post(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
320        res = super().post(request, *args, **kwargs)
321        if (
322            PLAN_CONTEXT_SELECTED_STAGE in self.executor.plan.context
323            and self.executor.current_stage.not_configured_action == NotConfiguredAction.CONFIGURE
324        ):
325            self.logger.debug("Got selected stage in context, running that")
326            stage_pk = self.executor.plan.context.get(PLAN_CONTEXT_SELECTED_STAGE)
327            # Because the foreign key to stage.configuration_stage points to
328            # a base stage class, we need to do another lookup
329            stage = Stage.objects.get_subclass(pk=stage_pk)
330            # plan.insert inserts at 1 index, so when stage_ok pops 0,
331            # the configuration stage is next
332            self.executor.plan.insert_stage(stage)
333            return self.executor.stage_ok()
334        return res
335
336    def get_challenge(self) -> AuthenticatorValidationChallenge:
337        challenges = self.executor.plan.context.get(PLAN_CONTEXT_DEVICE_CHALLENGES, [])
338        stages = self.executor.plan.context.get(PLAN_CONTEXT_STAGES, [])
339        stage_challenges = []
340        for stage in stages:
341            serializer = SelectableStageSerializer(
342                data={
343                    "pk": stage.pk,
344                    "name": getattr(stage, "friendly_name", stage.name) or stage.name,
345                    "verbose_name": str(stage._meta.verbose_name)
346                    .replace("Setup Stage", "")
347                    .strip(),
348                    "meta_model_name": f"{stage._meta.app_label}.{stage._meta.model_name}",
349                }
350            )
351            serializer.is_valid()
352            stage_challenges.append(serializer.data)
353        return AuthenticatorValidationChallenge(
354            data={
355                "component": "ak-stage-authenticator-validate",
356                "device_challenges": challenges,
357                "configuration_stages": stage_challenges,
358            }
359        )
360
361    @property
362    def cookie_jwt_key(self) -> str:
363        """Signing key for MFA Cookie for this stage"""
364        return sha256(
365            f"{get_unique_identifier()}:{self.executor.current_stage.pk.hex}".encode("ascii")
366        ).hexdigest()
367
368    def check_mfa_cookie(self, allowed_devices: list[Device]):
369        """Check if an MFA cookie has been set, whether it's valid and applies
370        to the current stage and device.
371
372        The list of devices passed to this function must only contain devices for the
373        correct user and with an allowed class"""
374        if COOKIE_NAME_MFA not in self.request.COOKIES:
375            return
376        stage: AuthenticatorValidateStage = self.executor.current_stage
377        threshold = timedelta_from_string(stage.last_auth_threshold)
378        latest_allowed = datetime.now() + threshold
379        try:
380            payload = decode(self.request.COOKIES[COOKIE_NAME_MFA], self.cookie_jwt_key, ["HS256"])
381            if payload["stage"] != stage.pk.hex:
382                self.logger.warning("Invalid stage PK")
383                return
384            if datetime.fromtimestamp(payload["exp"]) > latest_allowed:
385                self.logger.warning("Expired MFA cookie")
386                return
387            if not any(device.pk == payload["device"] for device in allowed_devices):
388                self.logger.warning("Invalid device PK")
389                return
390            self.logger.info("MFA has been used within threshold")
391            raise FlowSkipStageException()
392        except (PyJWTError, ValueError, TypeError) as exc:
393            self.logger.info("Invalid mfa cookie for device", exc=exc)
394
395    def set_valid_mfa_cookie(self, device: Device) -> HttpResponse:
396        """Set an MFA cookie to allow users to skip MFA validation in this context (browser)
397
398        The cookie is JWT which is signed with a hash of the secret key and the UID of the stage"""
399        stage: AuthenticatorValidateStage = self.executor.current_stage
400        delta = timedelta_from_string(stage.last_auth_threshold)
401        if delta.total_seconds() < 1:
402            self.logger.info("Not setting MFA cookie since threshold is not set.")
403            return self.executor.stage_ok()
404        expiry = datetime.now() + delta
405        cookie_payload = {
406            "device": device.pk,
407            "stage": stage.pk.hex,
408            "exp": expiry.timestamp(),
409        }
410        response = self.executor.stage_ok()
411        cookie = encode(cookie_payload, self.cookie_jwt_key)
412        response.set_cookie(
413            COOKIE_NAME_MFA,
414            cookie,
415            expires=expiry,
416            path=settings.SESSION_COOKIE_PATH,
417            domain=settings.SESSION_COOKIE_DOMAIN,
418            samesite=settings.SESSION_COOKIE_SAMESITE,
419        )
420        return response
421
422    def challenge_invalid(self, response: AuthenticatorValidationChallengeResponse) -> HttpResponse:
423        update_score(self.request, self.get_pending_user().username, -1)
424        return super().challenge_invalid(response)
425
426    def challenge_valid(self, response: AuthenticatorValidationChallengeResponse) -> HttpResponse:
427        # All validation is done by the serializer
428        user = self.executor.plan.context.get(PLAN_CONTEXT_PENDING_USER)
429        if not user and "webauthn" in response.data:
430            webauthn_device: WebAuthnDevice = response.device
431            self.logger.debug("Set user from user-less flow", user=webauthn_device.user)
432            self.executor.plan.context[PLAN_CONTEXT_PENDING_USER] = webauthn_device.user
433            # We already set a default method in the validator above
434            # so this needs to have higher priority
435            self.executor.plan.context[PLAN_CONTEXT_METHOD] = "auth_webauthn_pwl"
436            self.executor.plan.context.setdefault(PLAN_CONTEXT_METHOD_ARGS, {})
437            self.executor.plan.context[PLAN_CONTEXT_METHOD_ARGS].update(
438                {
439                    "device": webauthn_device,
440                    "device_type": webauthn_device.device_type,
441                }
442            )
443        return self.set_valid_mfa_cookie(response.device)
PLAN_CONTEXT_STAGES = 'goauthentik.io/stages/authenticator_validate/stages'
PLAN_CONTEXT_SELECTED_STAGE = 'goauthentik.io/stages/authenticator_validate/selected_stage'
PLAN_CONTEXT_DEVICE_CHALLENGES = 'goauthentik.io/stages/authenticator_validate/device_challenges'
class SelectableStageSerializer(authentik.core.api.utils.PassiveSerializer):
51class SelectableStageSerializer(PassiveSerializer):
52    """Serializer for stages which can be selected by users"""
53
54    pk = UUIDField()
55    name = CharField()
56    verbose_name = CharField()
57    meta_model_name = CharField()

Serializer for stages which can be selected by users

pk
name
verbose_name
meta_model_name
class AuthenticatorValidationChallenge(authentik.flows.challenge.WithUserInfoChallenge):
60class AuthenticatorValidationChallenge(WithUserInfoChallenge):
61    """Authenticator challenge"""
62
63    device_challenges = ListField(child=DeviceChallenge())
64    component = CharField(default="ak-stage-authenticator-validate")
65    configuration_stages = ListField(child=SelectableStageSerializer())

Authenticator challenge

device_challenges
component
configuration_stages
class AuthenticatorValidationChallengeResponse(authentik.flows.challenge.ChallengeResponse):
 68class AuthenticatorValidationChallengeResponse(ChallengeResponse):
 69    """Challenge used for Code-based and WebAuthn authenticators"""
 70
 71    device: Device | None
 72
 73    selected_challenge = DeviceChallenge(required=False)
 74    selected_stage = CharField(required=False)
 75
 76    code = CharField(required=False)
 77    webauthn = JSONDictField(required=False)
 78    duo = IntegerField(required=False)
 79    component = CharField(default="ak-stage-authenticator-validate")
 80
 81    def _challenge_allowed(self, classes: list):
 82        device_challenges: list[dict] = self.stage.executor.plan.context.get(
 83            PLAN_CONTEXT_DEVICE_CHALLENGES, []
 84        )
 85        if not any(x["device_class"] in classes for x in device_challenges):
 86            raise ValidationError("No compatible device class allowed")
 87
 88    def validate_code(self, code: str) -> str:
 89        """Validate code-based response, raise error if code isn't allowed"""
 90        self._challenge_allowed(
 91            [DeviceClasses.TOTP, DeviceClasses.STATIC, DeviceClasses.SMS, DeviceClasses.EMAIL]
 92        )
 93        self.device = validate_challenge_code(code, self.stage, self.stage.get_pending_user())
 94        return code
 95
 96    def validate_webauthn(self, webauthn: dict) -> dict:
 97        """Validate webauthn response, raise error if webauthn wasn't allowed
 98        or response is invalid"""
 99        self._challenge_allowed([DeviceClasses.WEBAUTHN])
100        self.device = validate_challenge_webauthn(
101            webauthn, self.stage, self.stage.get_pending_user()
102        )
103        return webauthn
104
105    def validate_duo(self, duo: int) -> int:
106        """Initiate Duo authentication"""
107        self._challenge_allowed([DeviceClasses.DUO])
108        self.device = validate_challenge_duo(duo, self.stage, self.stage.get_pending_user())
109        return duo
110
111    def validate_selected_challenge(self, challenge: dict) -> dict:
112        """Check which challenge the user has selected. Actual logic only used for SMS stage."""
113        # First check if the challenge is valid
114        allowed = False
115        for device_challenge in self.stage.executor.plan.context.get(
116            PLAN_CONTEXT_DEVICE_CHALLENGES, []
117        ):
118            if device_challenge.get("device_class", "") == challenge.get(
119                "device_class", ""
120            ) and device_challenge.get("device_uid", "") == challenge.get("device_uid", ""):
121                allowed = True
122        if not allowed:
123            raise ValidationError("invalid challenge selected")
124
125        device_class = challenge.get("device_class", "")
126        if device_class == "sms":
127            devices = SMSDevice.objects.filter(pk=int(challenge.get("device_uid", "0")))
128            if not devices.exists():
129                raise ValidationError("invalid challenge selected")
130            select_challenge(self.stage.request, devices.first())
131        elif device_class == "email":
132            devices = EmailDevice.objects.filter(pk=int(challenge.get("device_uid", "0")))
133            if not devices.exists():
134                raise ValidationError("invalid challenge selected")
135            select_challenge(self.stage.request, devices.first())
136        return challenge
137
138    def validate_selected_stage(self, stage_pk: str) -> str:
139        """Check that the selected stage is valid"""
140        stages = self.stage.executor.plan.context.get(PLAN_CONTEXT_STAGES, [])
141        if not any(str(stage.pk) == stage_pk for stage in stages):
142            raise ValidationError("Selected stage is invalid")
143        self.stage.logger.debug("Setting selected stage to ", stage=stage_pk)
144        self.stage.executor.plan.context[PLAN_CONTEXT_SELECTED_STAGE] = stage_pk
145        return stage_pk
146
147    def validate(self, attrs: dict):
148        # Checking if the given data is from a valid device class is done above
149        # Here we only check if the any data was sent at all
150        if "code" not in attrs and "webauthn" not in attrs and "duo" not in attrs:
151            raise ValidationError("Empty response")
152        self.stage.executor.plan.context.setdefault(PLAN_CONTEXT_METHOD, "auth_mfa")
153        self.stage.executor.plan.context.setdefault(PLAN_CONTEXT_METHOD_ARGS, {})
154        self.stage.executor.plan.context[PLAN_CONTEXT_METHOD_ARGS].setdefault("mfa_devices", [])
155        self.stage.executor.plan.context[PLAN_CONTEXT_METHOD_ARGS]["mfa_devices"].append(
156            self.device
157        )
158        with audit_ignore():
159            self.device.last_used = now()
160            self.device.save()
161        return attrs

Challenge used for Code-based and WebAuthn authenticators

selected_challenge
selected_stage
code
webauthn
duo
component
def validate_code(self, code: str) -> str:
88    def validate_code(self, code: str) -> str:
89        """Validate code-based response, raise error if code isn't allowed"""
90        self._challenge_allowed(
91            [DeviceClasses.TOTP, DeviceClasses.STATIC, DeviceClasses.SMS, DeviceClasses.EMAIL]
92        )
93        self.device = validate_challenge_code(code, self.stage, self.stage.get_pending_user())
94        return code

Validate code-based response, raise error if code isn't allowed

def validate_webauthn(self, webauthn: dict) -> dict:
 96    def validate_webauthn(self, webauthn: dict) -> dict:
 97        """Validate webauthn response, raise error if webauthn wasn't allowed
 98        or response is invalid"""
 99        self._challenge_allowed([DeviceClasses.WEBAUTHN])
100        self.device = validate_challenge_webauthn(
101            webauthn, self.stage, self.stage.get_pending_user()
102        )
103        return webauthn

Validate webauthn response, raise error if webauthn wasn't allowed or response is invalid

def validate_duo(self, duo: int) -> int:
105    def validate_duo(self, duo: int) -> int:
106        """Initiate Duo authentication"""
107        self._challenge_allowed([DeviceClasses.DUO])
108        self.device = validate_challenge_duo(duo, self.stage, self.stage.get_pending_user())
109        return duo

Initiate Duo authentication

def validate_selected_challenge(self, challenge: dict) -> dict:
111    def validate_selected_challenge(self, challenge: dict) -> dict:
112        """Check which challenge the user has selected. Actual logic only used for SMS stage."""
113        # First check if the challenge is valid
114        allowed = False
115        for device_challenge in self.stage.executor.plan.context.get(
116            PLAN_CONTEXT_DEVICE_CHALLENGES, []
117        ):
118            if device_challenge.get("device_class", "") == challenge.get(
119                "device_class", ""
120            ) and device_challenge.get("device_uid", "") == challenge.get("device_uid", ""):
121                allowed = True
122        if not allowed:
123            raise ValidationError("invalid challenge selected")
124
125        device_class = challenge.get("device_class", "")
126        if device_class == "sms":
127            devices = SMSDevice.objects.filter(pk=int(challenge.get("device_uid", "0")))
128            if not devices.exists():
129                raise ValidationError("invalid challenge selected")
130            select_challenge(self.stage.request, devices.first())
131        elif device_class == "email":
132            devices = EmailDevice.objects.filter(pk=int(challenge.get("device_uid", "0")))
133            if not devices.exists():
134                raise ValidationError("invalid challenge selected")
135            select_challenge(self.stage.request, devices.first())
136        return challenge

Check which challenge the user has selected. Actual logic only used for SMS stage.

def validate_selected_stage(self, stage_pk: str) -> str:
138    def validate_selected_stage(self, stage_pk: str) -> str:
139        """Check that the selected stage is valid"""
140        stages = self.stage.executor.plan.context.get(PLAN_CONTEXT_STAGES, [])
141        if not any(str(stage.pk) == stage_pk for stage in stages):
142            raise ValidationError("Selected stage is invalid")
143        self.stage.logger.debug("Setting selected stage to ", stage=stage_pk)
144        self.stage.executor.plan.context[PLAN_CONTEXT_SELECTED_STAGE] = stage_pk
145        return stage_pk

Check that the selected stage is valid

def validate(self, attrs: dict):
147    def validate(self, attrs: dict):
148        # Checking if the given data is from a valid device class is done above
149        # Here we only check if the any data was sent at all
150        if "code" not in attrs and "webauthn" not in attrs and "duo" not in attrs:
151            raise ValidationError("Empty response")
152        self.stage.executor.plan.context.setdefault(PLAN_CONTEXT_METHOD, "auth_mfa")
153        self.stage.executor.plan.context.setdefault(PLAN_CONTEXT_METHOD_ARGS, {})
154        self.stage.executor.plan.context[PLAN_CONTEXT_METHOD_ARGS].setdefault("mfa_devices", [])
155        self.stage.executor.plan.context[PLAN_CONTEXT_METHOD_ARGS]["mfa_devices"].append(
156            self.device
157        )
158        with audit_ignore():
159            self.device.last_used = now()
160            self.device.save()
161        return attrs
class AuthenticatorValidateStageView(authentik.flows.stage.ChallengeStageView):
164class AuthenticatorValidateStageView(ChallengeStageView):
165    """Authenticator Validation"""
166
167    response_class = AuthenticatorValidationChallengeResponse
168
169    def get_device_challenges(self) -> list[dict]:
170        """Get a list of all device challenges applicable for the current stage"""
171        challenges = []
172        pending_user = self.get_pending_user()
173        if pending_user.is_anonymous:
174            # We shouldn't get here without any kind of authentication data
175            raise StageInvalidException()
176        # When `pretend_user_exists` is enabled in the identification stage,
177        # `pending_user` will be a user model that isn't save to the DB
178        # hence it doesn't have a PK. In that case we just return an empty list of
179        # authenticators
180        if not pending_user.pk:
181            return []
182        # Convert to a list to have usable log output instead of just <generator ...>
183        user_devices = list(devices_for_user(self.get_pending_user()))
184        self.logger.debug("Got devices for user", devices=user_devices)
185
186        # static and totp are only shown once
187        # since their challenges are device-independent
188        seen_classes = []
189
190        stage: AuthenticatorValidateStage = self.executor.current_stage
191
192        threshold = timedelta_from_string(stage.last_auth_threshold)
193        allowed_devices = []
194
195        has_webauthn_filters_set = stage.webauthn_allowed_device_types.exists()
196
197        for device in user_devices:
198            device_class = device.__class__.__name__.lower().replace("device", "")
199            if device_class not in stage.device_classes:
200                self.logger.debug("device class not allowed", device_class=device_class)
201                continue
202            if isinstance(device, SMSDevice) and device.is_hashed:
203                self.logger.debug("Hashed SMS device, skipping", device=device)
204                continue
205            allowed_devices.append(device)
206            # Ignore WebAuthn devices which are not in the allowed types
207            if (
208                isinstance(device, WebAuthnDevice)
209                and device.device_type
210                and has_webauthn_filters_set
211            ):
212                if not stage.webauthn_allowed_device_types.filter(
213                    pk=device.device_type.pk
214                ).exists():
215                    self.logger.debug(
216                        "WebAuthn device type not allowed", device=device, type=device.device_type
217                    )
218                    continue
219            # Ensure only one challenge per device class
220            # WebAuthn does another device loop to find all WebAuthn devices
221            if device_class in seen_classes:
222                continue
223            if device_class not in seen_classes:
224                seen_classes.append(device_class)
225            challenge = DeviceChallenge(
226                data={
227                    "device_class": device_class,
228                    "device_uid": device.pk,
229                    "challenge": get_challenge_for_device(self, stage, device),
230                    "last_used": device.last_used,
231                }
232            )
233            challenge.is_valid()
234            challenges.append(challenge.data)
235            self.logger.debug("adding challenge for device", challenge=challenge)
236        # check if we have an MFA cookie and if it's valid
237        if threshold.total_seconds() > 0:
238            self.check_mfa_cookie(allowed_devices)
239        return challenges
240
241    def get_webauthn_challenge_without_user(self) -> list[dict]:
242        """Get a WebAuthn challenge when no pending user is set."""
243        challenge = DeviceChallenge(
244            data={
245                "device_class": DeviceClasses.WEBAUTHN,
246                "device_uid": -1,
247                "challenge": get_webauthn_challenge_without_user(
248                    self,
249                    self.executor.current_stage,
250                ),
251                "last_used": None,
252            }
253        )
254        challenge.is_valid()
255        return [challenge.data]
256
257    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:  # noqa: PLR0911
258        """Check if a user is set, and check if the user has any devices
259        if not, we can skip this entire stage"""
260        user = self.get_pending_user()
261        stage: AuthenticatorValidateStage = self.executor.current_stage
262        if user and not user.is_anonymous:
263            try:
264                challenges = self.get_device_challenges()
265            except FlowSkipStageException:
266                return self.executor.stage_ok()
267        else:
268            if self.executor.flow.designation != FlowDesignation.AUTHENTICATION:
269                self.logger.debug("Refusing passwordless flow in non-authentication flow")
270                return self.executor.stage_ok()
271            # Passwordless auth, with just webauthn
272            if DeviceClasses.WEBAUTHN in stage.device_classes:
273                self.logger.debug("Flow without user, getting generic webauthn challenge")
274                challenges = self.get_webauthn_challenge_without_user()
275            else:
276                self.logger.debug("No pending user, continuing")
277                return self.executor.stage_ok()
278        self.executor.plan.context[PLAN_CONTEXT_DEVICE_CHALLENGES] = challenges
279
280        # No allowed devices
281        if len(challenges) < 1:
282            if stage.not_configured_action == NotConfiguredAction.SKIP:
283                self.logger.debug("Authenticator not configured, skipping stage")
284                return self.executor.stage_ok()
285            if stage.not_configured_action == NotConfiguredAction.DENY:
286                self.logger.debug("Authenticator not configured, denying")
287                return self.executor.stage_invalid(_("No (allowed) MFA authenticator configured."))
288            if stage.not_configured_action == NotConfiguredAction.CONFIGURE:
289                self.logger.debug("Authenticator not configured, forcing configure")
290                return self.prepare_stages(user)
291        return super().get(request, *args, **kwargs)
292
293    def prepare_stages(self, user: User, *args, **kwargs) -> HttpResponse:
294        """Check how the user can configure themselves. If no stages are set, return an error.
295        If a single stage is set, insert that stage directly. If multiple are selected, include
296        them in the challenge."""
297        stage: AuthenticatorValidateStage = self.executor.current_stage
298        if not stage.configuration_stages.exists():
299            Event.new(
300                EventAction.CONFIGURATION_ERROR,
301                message=(
302                    "Authenticator validation stage is set to configure user "
303                    "but no configuration flow is set."
304                ),
305                stage=self,
306            ).from_http(self.request).set_user(user).save()
307            return self.executor.stage_invalid()
308        if stage.configuration_stages.count() == 1:
309            next_stage = Stage.objects.get_subclass(pk=stage.configuration_stages.first().pk)
310            self.logger.debug("Single stage configured, auto-selecting", stage=next_stage)
311            self.executor.plan.context[PLAN_CONTEXT_SELECTED_STAGE] = next_stage
312            # Because that normal execution only happens on post, we directly inject it here and
313            # return it
314            self.executor.plan.insert_stage(next_stage)
315            return self.executor.stage_ok()
316        stages = Stage.objects.filter(pk__in=stage.configuration_stages.all()).select_subclasses()
317        self.executor.plan.context[PLAN_CONTEXT_STAGES] = stages
318        return super().get(self.request, *args, **kwargs)
319
320    def post(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
321        res = super().post(request, *args, **kwargs)
322        if (
323            PLAN_CONTEXT_SELECTED_STAGE in self.executor.plan.context
324            and self.executor.current_stage.not_configured_action == NotConfiguredAction.CONFIGURE
325        ):
326            self.logger.debug("Got selected stage in context, running that")
327            stage_pk = self.executor.plan.context.get(PLAN_CONTEXT_SELECTED_STAGE)
328            # Because the foreign key to stage.configuration_stage points to
329            # a base stage class, we need to do another lookup
330            stage = Stage.objects.get_subclass(pk=stage_pk)
331            # plan.insert inserts at 1 index, so when stage_ok pops 0,
332            # the configuration stage is next
333            self.executor.plan.insert_stage(stage)
334            return self.executor.stage_ok()
335        return res
336
337    def get_challenge(self) -> AuthenticatorValidationChallenge:
338        challenges = self.executor.plan.context.get(PLAN_CONTEXT_DEVICE_CHALLENGES, [])
339        stages = self.executor.plan.context.get(PLAN_CONTEXT_STAGES, [])
340        stage_challenges = []
341        for stage in stages:
342            serializer = SelectableStageSerializer(
343                data={
344                    "pk": stage.pk,
345                    "name": getattr(stage, "friendly_name", stage.name) or stage.name,
346                    "verbose_name": str(stage._meta.verbose_name)
347                    .replace("Setup Stage", "")
348                    .strip(),
349                    "meta_model_name": f"{stage._meta.app_label}.{stage._meta.model_name}",
350                }
351            )
352            serializer.is_valid()
353            stage_challenges.append(serializer.data)
354        return AuthenticatorValidationChallenge(
355            data={
356                "component": "ak-stage-authenticator-validate",
357                "device_challenges": challenges,
358                "configuration_stages": stage_challenges,
359            }
360        )
361
362    @property
363    def cookie_jwt_key(self) -> str:
364        """Signing key for MFA Cookie for this stage"""
365        return sha256(
366            f"{get_unique_identifier()}:{self.executor.current_stage.pk.hex}".encode("ascii")
367        ).hexdigest()
368
369    def check_mfa_cookie(self, allowed_devices: list[Device]):
370        """Check if an MFA cookie has been set, whether it's valid and applies
371        to the current stage and device.
372
373        The list of devices passed to this function must only contain devices for the
374        correct user and with an allowed class"""
375        if COOKIE_NAME_MFA not in self.request.COOKIES:
376            return
377        stage: AuthenticatorValidateStage = self.executor.current_stage
378        threshold = timedelta_from_string(stage.last_auth_threshold)
379        latest_allowed = datetime.now() + threshold
380        try:
381            payload = decode(self.request.COOKIES[COOKIE_NAME_MFA], self.cookie_jwt_key, ["HS256"])
382            if payload["stage"] != stage.pk.hex:
383                self.logger.warning("Invalid stage PK")
384                return
385            if datetime.fromtimestamp(payload["exp"]) > latest_allowed:
386                self.logger.warning("Expired MFA cookie")
387                return
388            if not any(device.pk == payload["device"] for device in allowed_devices):
389                self.logger.warning("Invalid device PK")
390                return
391            self.logger.info("MFA has been used within threshold")
392            raise FlowSkipStageException()
393        except (PyJWTError, ValueError, TypeError) as exc:
394            self.logger.info("Invalid mfa cookie for device", exc=exc)
395
396    def set_valid_mfa_cookie(self, device: Device) -> HttpResponse:
397        """Set an MFA cookie to allow users to skip MFA validation in this context (browser)
398
399        The cookie is JWT which is signed with a hash of the secret key and the UID of the stage"""
400        stage: AuthenticatorValidateStage = self.executor.current_stage
401        delta = timedelta_from_string(stage.last_auth_threshold)
402        if delta.total_seconds() < 1:
403            self.logger.info("Not setting MFA cookie since threshold is not set.")
404            return self.executor.stage_ok()
405        expiry = datetime.now() + delta
406        cookie_payload = {
407            "device": device.pk,
408            "stage": stage.pk.hex,
409            "exp": expiry.timestamp(),
410        }
411        response = self.executor.stage_ok()
412        cookie = encode(cookie_payload, self.cookie_jwt_key)
413        response.set_cookie(
414            COOKIE_NAME_MFA,
415            cookie,
416            expires=expiry,
417            path=settings.SESSION_COOKIE_PATH,
418            domain=settings.SESSION_COOKIE_DOMAIN,
419            samesite=settings.SESSION_COOKIE_SAMESITE,
420        )
421        return response
422
423    def challenge_invalid(self, response: AuthenticatorValidationChallengeResponse) -> HttpResponse:
424        update_score(self.request, self.get_pending_user().username, -1)
425        return super().challenge_invalid(response)
426
427    def challenge_valid(self, response: AuthenticatorValidationChallengeResponse) -> HttpResponse:
428        # All validation is done by the serializer
429        user = self.executor.plan.context.get(PLAN_CONTEXT_PENDING_USER)
430        if not user and "webauthn" in response.data:
431            webauthn_device: WebAuthnDevice = response.device
432            self.logger.debug("Set user from user-less flow", user=webauthn_device.user)
433            self.executor.plan.context[PLAN_CONTEXT_PENDING_USER] = webauthn_device.user
434            # We already set a default method in the validator above
435            # so this needs to have higher priority
436            self.executor.plan.context[PLAN_CONTEXT_METHOD] = "auth_webauthn_pwl"
437            self.executor.plan.context.setdefault(PLAN_CONTEXT_METHOD_ARGS, {})
438            self.executor.plan.context[PLAN_CONTEXT_METHOD_ARGS].update(
439                {
440                    "device": webauthn_device,
441                    "device_type": webauthn_device.device_type,
442                }
443            )
444        return self.set_valid_mfa_cookie(response.device)

Authenticator Validation

response_class = <class 'AuthenticatorValidationChallengeResponse'>
def get_device_challenges(self) -> list[dict]:
169    def get_device_challenges(self) -> list[dict]:
170        """Get a list of all device challenges applicable for the current stage"""
171        challenges = []
172        pending_user = self.get_pending_user()
173        if pending_user.is_anonymous:
174            # We shouldn't get here without any kind of authentication data
175            raise StageInvalidException()
176        # When `pretend_user_exists` is enabled in the identification stage,
177        # `pending_user` will be a user model that isn't save to the DB
178        # hence it doesn't have a PK. In that case we just return an empty list of
179        # authenticators
180        if not pending_user.pk:
181            return []
182        # Convert to a list to have usable log output instead of just <generator ...>
183        user_devices = list(devices_for_user(self.get_pending_user()))
184        self.logger.debug("Got devices for user", devices=user_devices)
185
186        # static and totp are only shown once
187        # since their challenges are device-independent
188        seen_classes = []
189
190        stage: AuthenticatorValidateStage = self.executor.current_stage
191
192        threshold = timedelta_from_string(stage.last_auth_threshold)
193        allowed_devices = []
194
195        has_webauthn_filters_set = stage.webauthn_allowed_device_types.exists()
196
197        for device in user_devices:
198            device_class = device.__class__.__name__.lower().replace("device", "")
199            if device_class not in stage.device_classes:
200                self.logger.debug("device class not allowed", device_class=device_class)
201                continue
202            if isinstance(device, SMSDevice) and device.is_hashed:
203                self.logger.debug("Hashed SMS device, skipping", device=device)
204                continue
205            allowed_devices.append(device)
206            # Ignore WebAuthn devices which are not in the allowed types
207            if (
208                isinstance(device, WebAuthnDevice)
209                and device.device_type
210                and has_webauthn_filters_set
211            ):
212                if not stage.webauthn_allowed_device_types.filter(
213                    pk=device.device_type.pk
214                ).exists():
215                    self.logger.debug(
216                        "WebAuthn device type not allowed", device=device, type=device.device_type
217                    )
218                    continue
219            # Ensure only one challenge per device class
220            # WebAuthn does another device loop to find all WebAuthn devices
221            if device_class in seen_classes:
222                continue
223            if device_class not in seen_classes:
224                seen_classes.append(device_class)
225            challenge = DeviceChallenge(
226                data={
227                    "device_class": device_class,
228                    "device_uid": device.pk,
229                    "challenge": get_challenge_for_device(self, stage, device),
230                    "last_used": device.last_used,
231                }
232            )
233            challenge.is_valid()
234            challenges.append(challenge.data)
235            self.logger.debug("adding challenge for device", challenge=challenge)
236        # check if we have an MFA cookie and if it's valid
237        if threshold.total_seconds() > 0:
238            self.check_mfa_cookie(allowed_devices)
239        return challenges

Get a list of all device challenges applicable for the current stage

def get_webauthn_challenge_without_user(self) -> list[dict]:
241    def get_webauthn_challenge_without_user(self) -> list[dict]:
242        """Get a WebAuthn challenge when no pending user is set."""
243        challenge = DeviceChallenge(
244            data={
245                "device_class": DeviceClasses.WEBAUTHN,
246                "device_uid": -1,
247                "challenge": get_webauthn_challenge_without_user(
248                    self,
249                    self.executor.current_stage,
250                ),
251                "last_used": None,
252            }
253        )
254        challenge.is_valid()
255        return [challenge.data]

Get a WebAuthn challenge when no pending user is set.

def get( self, request: django.http.request.HttpRequest, *args, **kwargs) -> django.http.response.HttpResponse:
257    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:  # noqa: PLR0911
258        """Check if a user is set, and check if the user has any devices
259        if not, we can skip this entire stage"""
260        user = self.get_pending_user()
261        stage: AuthenticatorValidateStage = self.executor.current_stage
262        if user and not user.is_anonymous:
263            try:
264                challenges = self.get_device_challenges()
265            except FlowSkipStageException:
266                return self.executor.stage_ok()
267        else:
268            if self.executor.flow.designation != FlowDesignation.AUTHENTICATION:
269                self.logger.debug("Refusing passwordless flow in non-authentication flow")
270                return self.executor.stage_ok()
271            # Passwordless auth, with just webauthn
272            if DeviceClasses.WEBAUTHN in stage.device_classes:
273                self.logger.debug("Flow without user, getting generic webauthn challenge")
274                challenges = self.get_webauthn_challenge_without_user()
275            else:
276                self.logger.debug("No pending user, continuing")
277                return self.executor.stage_ok()
278        self.executor.plan.context[PLAN_CONTEXT_DEVICE_CHALLENGES] = challenges
279
280        # No allowed devices
281        if len(challenges) < 1:
282            if stage.not_configured_action == NotConfiguredAction.SKIP:
283                self.logger.debug("Authenticator not configured, skipping stage")
284                return self.executor.stage_ok()
285            if stage.not_configured_action == NotConfiguredAction.DENY:
286                self.logger.debug("Authenticator not configured, denying")
287                return self.executor.stage_invalid(_("No (allowed) MFA authenticator configured."))
288            if stage.not_configured_action == NotConfiguredAction.CONFIGURE:
289                self.logger.debug("Authenticator not configured, forcing configure")
290                return self.prepare_stages(user)
291        return super().get(request, *args, **kwargs)

Check if a user is set, and check if the user has any devices if not, we can skip this entire stage

def prepare_stages( self, user: authentik.core.models.User, *args, **kwargs) -> django.http.response.HttpResponse:
293    def prepare_stages(self, user: User, *args, **kwargs) -> HttpResponse:
294        """Check how the user can configure themselves. If no stages are set, return an error.
295        If a single stage is set, insert that stage directly. If multiple are selected, include
296        them in the challenge."""
297        stage: AuthenticatorValidateStage = self.executor.current_stage
298        if not stage.configuration_stages.exists():
299            Event.new(
300                EventAction.CONFIGURATION_ERROR,
301                message=(
302                    "Authenticator validation stage is set to configure user "
303                    "but no configuration flow is set."
304                ),
305                stage=self,
306            ).from_http(self.request).set_user(user).save()
307            return self.executor.stage_invalid()
308        if stage.configuration_stages.count() == 1:
309            next_stage = Stage.objects.get_subclass(pk=stage.configuration_stages.first().pk)
310            self.logger.debug("Single stage configured, auto-selecting", stage=next_stage)
311            self.executor.plan.context[PLAN_CONTEXT_SELECTED_STAGE] = next_stage
312            # Because that normal execution only happens on post, we directly inject it here and
313            # return it
314            self.executor.plan.insert_stage(next_stage)
315            return self.executor.stage_ok()
316        stages = Stage.objects.filter(pk__in=stage.configuration_stages.all()).select_subclasses()
317        self.executor.plan.context[PLAN_CONTEXT_STAGES] = stages
318        return super().get(self.request, *args, **kwargs)

Check how the user can configure themselves. If no stages are set, return an error. If a single stage is set, insert that stage directly. If multiple are selected, include them in the challenge.

def post( self, request: django.http.request.HttpRequest, *args, **kwargs) -> django.http.response.HttpResponse:
320    def post(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
321        res = super().post(request, *args, **kwargs)
322        if (
323            PLAN_CONTEXT_SELECTED_STAGE in self.executor.plan.context
324            and self.executor.current_stage.not_configured_action == NotConfiguredAction.CONFIGURE
325        ):
326            self.logger.debug("Got selected stage in context, running that")
327            stage_pk = self.executor.plan.context.get(PLAN_CONTEXT_SELECTED_STAGE)
328            # Because the foreign key to stage.configuration_stage points to
329            # a base stage class, we need to do another lookup
330            stage = Stage.objects.get_subclass(pk=stage_pk)
331            # plan.insert inserts at 1 index, so when stage_ok pops 0,
332            # the configuration stage is next
333            self.executor.plan.insert_stage(stage)
334            return self.executor.stage_ok()
335        return res

Handle challenge response

def get_challenge( self) -> AuthenticatorValidationChallenge:
337    def get_challenge(self) -> AuthenticatorValidationChallenge:
338        challenges = self.executor.plan.context.get(PLAN_CONTEXT_DEVICE_CHALLENGES, [])
339        stages = self.executor.plan.context.get(PLAN_CONTEXT_STAGES, [])
340        stage_challenges = []
341        for stage in stages:
342            serializer = SelectableStageSerializer(
343                data={
344                    "pk": stage.pk,
345                    "name": getattr(stage, "friendly_name", stage.name) or stage.name,
346                    "verbose_name": str(stage._meta.verbose_name)
347                    .replace("Setup Stage", "")
348                    .strip(),
349                    "meta_model_name": f"{stage._meta.app_label}.{stage._meta.model_name}",
350                }
351            )
352            serializer.is_valid()
353            stage_challenges.append(serializer.data)
354        return AuthenticatorValidationChallenge(
355            data={
356                "component": "ak-stage-authenticator-validate",
357                "device_challenges": challenges,
358                "configuration_stages": stage_challenges,
359            }
360        )

Return the challenge that the client should solve

cookie_jwt_key: str
362    @property
363    def cookie_jwt_key(self) -> str:
364        """Signing key for MFA Cookie for this stage"""
365        return sha256(
366            f"{get_unique_identifier()}:{self.executor.current_stage.pk.hex}".encode("ascii")
367        ).hexdigest()

Signing key for MFA Cookie for this stage

def challenge_invalid( self, response: AuthenticatorValidationChallengeResponse) -> django.http.response.HttpResponse:
423    def challenge_invalid(self, response: AuthenticatorValidationChallengeResponse) -> HttpResponse:
424        update_score(self.request, self.get_pending_user().username, -1)
425        return super().challenge_invalid(response)

Callback when the challenge has the incorrect format

def challenge_valid( self, response: AuthenticatorValidationChallengeResponse) -> django.http.response.HttpResponse:
427    def challenge_valid(self, response: AuthenticatorValidationChallengeResponse) -> HttpResponse:
428        # All validation is done by the serializer
429        user = self.executor.plan.context.get(PLAN_CONTEXT_PENDING_USER)
430        if not user and "webauthn" in response.data:
431            webauthn_device: WebAuthnDevice = response.device
432            self.logger.debug("Set user from user-less flow", user=webauthn_device.user)
433            self.executor.plan.context[PLAN_CONTEXT_PENDING_USER] = webauthn_device.user
434            # We already set a default method in the validator above
435            # so this needs to have higher priority
436            self.executor.plan.context[PLAN_CONTEXT_METHOD] = "auth_webauthn_pwl"
437            self.executor.plan.context.setdefault(PLAN_CONTEXT_METHOD_ARGS, {})
438            self.executor.plan.context[PLAN_CONTEXT_METHOD_ARGS].update(
439                {
440                    "device": webauthn_device,
441                    "device_type": webauthn_device.device_type,
442                }
443            )
444        return self.set_valid_mfa_cookie(response.device)

Callback when the challenge has the correct format