authentik.stages.authenticator_validate.stage
Authenticator Validation
1"""Authenticator Validation""" 2 3from datetime import datetime 4from hashlib import sha256 5 6from django.conf import settings 7from django.http import HttpRequest, HttpResponse 8from django.utils.timezone import now 9from django.utils.translation import gettext_lazy as _ 10from jwt import PyJWTError, decode, encode 11from rest_framework.fields import CharField, IntegerField, ListField, UUIDField 12from rest_framework.serializers import ValidationError 13 14from authentik.core.api.utils import JSONDictField, PassiveSerializer 15from authentik.core.models import User 16from authentik.events.middleware import audit_ignore 17from authentik.events.models import Event, EventAction 18from authentik.flows.challenge import ChallengeResponse, WithUserInfoChallenge 19from authentik.flows.exceptions import FlowSkipStageException, StageInvalidException 20from authentik.flows.models import FlowDesignation, NotConfiguredAction, Stage 21from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER 22from authentik.flows.stage import ChallengeStageView 23from authentik.lib.utils.time import timedelta_from_string 24from authentik.policies.reputation.signals import update_score 25from authentik.stages.authenticator import devices_for_user 26from authentik.stages.authenticator.models import Device 27from authentik.stages.authenticator_email.models import EmailDevice 28from authentik.stages.authenticator_sms.models import SMSDevice 29from authentik.stages.authenticator_validate.challenge import ( 30 DeviceChallenge, 31 get_challenge_for_device, 32 get_webauthn_challenge_without_user, 33 select_challenge, 34 validate_challenge_code, 35 validate_challenge_duo, 36 validate_challenge_webauthn, 37) 38from authentik.stages.authenticator_validate.models import AuthenticatorValidateStage, DeviceClasses 39from authentik.stages.authenticator_webauthn.models import WebAuthnDevice 40from authentik.stages.password.stage import PLAN_CONTEXT_METHOD, PLAN_CONTEXT_METHOD_ARGS 41from authentik.tenants.utils import get_unique_identifier 42 43COOKIE_NAME_MFA = "authentik_mfa" 44 45PLAN_CONTEXT_STAGES = "goauthentik.io/stages/authenticator_validate/stages" 46PLAN_CONTEXT_SELECTED_STAGE = "goauthentik.io/stages/authenticator_validate/selected_stage" 47PLAN_CONTEXT_DEVICE_CHALLENGES = "goauthentik.io/stages/authenticator_validate/device_challenges" 48 49 50class SelectableStageSerializer(PassiveSerializer): 51 """Serializer for stages which can be selected by users""" 52 53 pk = UUIDField() 54 name = CharField() 55 verbose_name = CharField() 56 meta_model_name = CharField() 57 58 59class AuthenticatorValidationChallenge(WithUserInfoChallenge): 60 """Authenticator challenge""" 61 62 device_challenges = ListField(child=DeviceChallenge()) 63 component = CharField(default="ak-stage-authenticator-validate") 64 configuration_stages = ListField(child=SelectableStageSerializer()) 65 66 67class AuthenticatorValidationChallengeResponse(ChallengeResponse): 68 """Challenge used for Code-based and WebAuthn authenticators""" 69 70 device: Device | None 71 72 selected_challenge = DeviceChallenge(required=False) 73 selected_stage = CharField(required=False) 74 75 code = CharField(required=False) 76 webauthn = JSONDictField(required=False) 77 duo = IntegerField(required=False) 78 component = CharField(default="ak-stage-authenticator-validate") 79 80 def _challenge_allowed(self, classes: list): 81 device_challenges: list[dict] = self.stage.executor.plan.context.get( 82 PLAN_CONTEXT_DEVICE_CHALLENGES, [] 83 ) 84 if not any(x["device_class"] in classes for x in device_challenges): 85 raise ValidationError("No compatible device class allowed") 86 87 def validate_code(self, code: str) -> str: 88 """Validate code-based response, raise error if code isn't allowed""" 89 self._challenge_allowed( 90 [DeviceClasses.TOTP, DeviceClasses.STATIC, DeviceClasses.SMS, DeviceClasses.EMAIL] 91 ) 92 self.device = validate_challenge_code(code, self.stage, self.stage.get_pending_user()) 93 return code 94 95 def validate_webauthn(self, webauthn: dict) -> dict: 96 """Validate webauthn response, raise error if webauthn wasn't allowed 97 or response is invalid""" 98 self._challenge_allowed([DeviceClasses.WEBAUTHN]) 99 self.device = validate_challenge_webauthn( 100 webauthn, self.stage, self.stage.get_pending_user() 101 ) 102 return webauthn 103 104 def validate_duo(self, duo: int) -> int: 105 """Initiate Duo authentication""" 106 self._challenge_allowed([DeviceClasses.DUO]) 107 self.device = validate_challenge_duo(duo, self.stage, self.stage.get_pending_user()) 108 return duo 109 110 def validate_selected_challenge(self, challenge: dict) -> dict: 111 """Check which challenge the user has selected. Actual logic only used for SMS stage.""" 112 # First check if the challenge is valid 113 allowed = False 114 for device_challenge in self.stage.executor.plan.context.get( 115 PLAN_CONTEXT_DEVICE_CHALLENGES, [] 116 ): 117 if device_challenge.get("device_class", "") == challenge.get( 118 "device_class", "" 119 ) and device_challenge.get("device_uid", "") == challenge.get("device_uid", ""): 120 allowed = True 121 if not allowed: 122 raise ValidationError("invalid challenge selected") 123 124 device_class = challenge.get("device_class", "") 125 if device_class == "sms": 126 devices = SMSDevice.objects.filter(pk=int(challenge.get("device_uid", "0"))) 127 if not devices.exists(): 128 raise ValidationError("invalid challenge selected") 129 select_challenge(self.stage.request, devices.first()) 130 elif device_class == "email": 131 devices = EmailDevice.objects.filter(pk=int(challenge.get("device_uid", "0"))) 132 if not devices.exists(): 133 raise ValidationError("invalid challenge selected") 134 select_challenge(self.stage.request, devices.first()) 135 return challenge 136 137 def validate_selected_stage(self, stage_pk: str) -> str: 138 """Check that the selected stage is valid""" 139 stages = self.stage.executor.plan.context.get(PLAN_CONTEXT_STAGES, []) 140 if not any(str(stage.pk) == stage_pk for stage in stages): 141 raise ValidationError("Selected stage is invalid") 142 self.stage.logger.debug("Setting selected stage to ", stage=stage_pk) 143 self.stage.executor.plan.context[PLAN_CONTEXT_SELECTED_STAGE] = stage_pk 144 return stage_pk 145 146 def validate(self, attrs: dict): 147 # Checking if the given data is from a valid device class is done above 148 # Here we only check if the any data was sent at all 149 if "code" not in attrs and "webauthn" not in attrs and "duo" not in attrs: 150 raise ValidationError("Empty response") 151 self.stage.executor.plan.context.setdefault(PLAN_CONTEXT_METHOD, "auth_mfa") 152 self.stage.executor.plan.context.setdefault(PLAN_CONTEXT_METHOD_ARGS, {}) 153 self.stage.executor.plan.context[PLAN_CONTEXT_METHOD_ARGS].setdefault("mfa_devices", []) 154 self.stage.executor.plan.context[PLAN_CONTEXT_METHOD_ARGS]["mfa_devices"].append( 155 self.device 156 ) 157 with audit_ignore(): 158 self.device.last_used = now() 159 self.device.save() 160 return attrs 161 162 163class AuthenticatorValidateStageView(ChallengeStageView): 164 """Authenticator Validation""" 165 166 response_class = AuthenticatorValidationChallengeResponse 167 168 def get_device_challenges(self) -> list[dict]: 169 """Get a list of all device challenges applicable for the current stage""" 170 challenges = [] 171 pending_user = self.get_pending_user() 172 if pending_user.is_anonymous: 173 # We shouldn't get here without any kind of authentication data 174 raise StageInvalidException() 175 # When `pretend_user_exists` is enabled in the identification stage, 176 # `pending_user` will be a user model that isn't save to the DB 177 # hence it doesn't have a PK. In that case we just return an empty list of 178 # authenticators 179 if not pending_user.pk: 180 return [] 181 # Convert to a list to have usable log output instead of just <generator ...> 182 user_devices = list(devices_for_user(self.get_pending_user())) 183 self.logger.debug("Got devices for user", devices=user_devices) 184 185 # static and totp are only shown once 186 # since their challenges are device-independent 187 seen_classes = [] 188 189 stage: AuthenticatorValidateStage = self.executor.current_stage 190 191 threshold = timedelta_from_string(stage.last_auth_threshold) 192 allowed_devices = [] 193 194 has_webauthn_filters_set = stage.webauthn_allowed_device_types.exists() 195 196 for device in user_devices: 197 device_class = device.__class__.__name__.lower().replace("device", "") 198 if device_class not in stage.device_classes: 199 self.logger.debug("device class not allowed", device_class=device_class) 200 continue 201 if isinstance(device, SMSDevice) and device.is_hashed: 202 self.logger.debug("Hashed SMS device, skipping", device=device) 203 continue 204 allowed_devices.append(device) 205 # Ignore WebAuthn devices which are not in the allowed types 206 if ( 207 isinstance(device, WebAuthnDevice) 208 and device.device_type 209 and has_webauthn_filters_set 210 ): 211 if not stage.webauthn_allowed_device_types.filter( 212 pk=device.device_type.pk 213 ).exists(): 214 self.logger.debug( 215 "WebAuthn device type not allowed", device=device, type=device.device_type 216 ) 217 continue 218 # Ensure only one challenge per device class 219 # WebAuthn does another device loop to find all WebAuthn devices 220 if device_class in seen_classes: 221 continue 222 if device_class not in seen_classes: 223 seen_classes.append(device_class) 224 challenge = DeviceChallenge( 225 data={ 226 "device_class": device_class, 227 "device_uid": device.pk, 228 "challenge": get_challenge_for_device(self, stage, device), 229 "last_used": device.last_used, 230 } 231 ) 232 challenge.is_valid() 233 challenges.append(challenge.data) 234 self.logger.debug("adding challenge for device", challenge=challenge) 235 # check if we have an MFA cookie and if it's valid 236 if threshold.total_seconds() > 0: 237 self.check_mfa_cookie(allowed_devices) 238 return challenges 239 240 def get_webauthn_challenge_without_user(self) -> list[dict]: 241 """Get a WebAuthn challenge when no pending user is set.""" 242 challenge = DeviceChallenge( 243 data={ 244 "device_class": DeviceClasses.WEBAUTHN, 245 "device_uid": -1, 246 "challenge": get_webauthn_challenge_without_user( 247 self, 248 self.executor.current_stage, 249 ), 250 "last_used": None, 251 } 252 ) 253 challenge.is_valid() 254 return [challenge.data] 255 256 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: # noqa: PLR0911 257 """Check if a user is set, and check if the user has any devices 258 if not, we can skip this entire stage""" 259 user = self.get_pending_user() 260 stage: AuthenticatorValidateStage = self.executor.current_stage 261 if user and not user.is_anonymous: 262 try: 263 challenges = self.get_device_challenges() 264 except FlowSkipStageException: 265 return self.executor.stage_ok() 266 else: 267 if self.executor.flow.designation != FlowDesignation.AUTHENTICATION: 268 self.logger.debug("Refusing passwordless flow in non-authentication flow") 269 return self.executor.stage_ok() 270 # Passwordless auth, with just webauthn 271 if DeviceClasses.WEBAUTHN in stage.device_classes: 272 self.logger.debug("Flow without user, getting generic webauthn challenge") 273 challenges = self.get_webauthn_challenge_without_user() 274 else: 275 self.logger.debug("No pending user, continuing") 276 return self.executor.stage_ok() 277 self.executor.plan.context[PLAN_CONTEXT_DEVICE_CHALLENGES] = challenges 278 279 # No allowed devices 280 if len(challenges) < 1: 281 if stage.not_configured_action == NotConfiguredAction.SKIP: 282 self.logger.debug("Authenticator not configured, skipping stage") 283 return self.executor.stage_ok() 284 if stage.not_configured_action == NotConfiguredAction.DENY: 285 self.logger.debug("Authenticator not configured, denying") 286 return self.executor.stage_invalid(_("No (allowed) MFA authenticator configured.")) 287 if stage.not_configured_action == NotConfiguredAction.CONFIGURE: 288 self.logger.debug("Authenticator not configured, forcing configure") 289 return self.prepare_stages(user) 290 return super().get(request, *args, **kwargs) 291 292 def prepare_stages(self, user: User, *args, **kwargs) -> HttpResponse: 293 """Check how the user can configure themselves. If no stages are set, return an error. 294 If a single stage is set, insert that stage directly. If multiple are selected, include 295 them in the challenge.""" 296 stage: AuthenticatorValidateStage = self.executor.current_stage 297 if not stage.configuration_stages.exists(): 298 Event.new( 299 EventAction.CONFIGURATION_ERROR, 300 message=( 301 "Authenticator validation stage is set to configure user " 302 "but no configuration flow is set." 303 ), 304 stage=self, 305 ).from_http(self.request).set_user(user).save() 306 return self.executor.stage_invalid() 307 if stage.configuration_stages.count() == 1: 308 next_stage = Stage.objects.get_subclass(pk=stage.configuration_stages.first().pk) 309 self.logger.debug("Single stage configured, auto-selecting", stage=next_stage) 310 self.executor.plan.context[PLAN_CONTEXT_SELECTED_STAGE] = next_stage 311 # Because that normal execution only happens on post, we directly inject it here and 312 # return it 313 self.executor.plan.insert_stage(next_stage) 314 return self.executor.stage_ok() 315 stages = Stage.objects.filter(pk__in=stage.configuration_stages.all()).select_subclasses() 316 self.executor.plan.context[PLAN_CONTEXT_STAGES] = stages 317 return super().get(self.request, *args, **kwargs) 318 319 def post(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 320 res = super().post(request, *args, **kwargs) 321 if ( 322 PLAN_CONTEXT_SELECTED_STAGE in self.executor.plan.context 323 and self.executor.current_stage.not_configured_action == NotConfiguredAction.CONFIGURE 324 ): 325 self.logger.debug("Got selected stage in context, running that") 326 stage_pk = self.executor.plan.context.get(PLAN_CONTEXT_SELECTED_STAGE) 327 # Because the foreign key to stage.configuration_stage points to 328 # a base stage class, we need to do another lookup 329 stage = Stage.objects.get_subclass(pk=stage_pk) 330 # plan.insert inserts at 1 index, so when stage_ok pops 0, 331 # the configuration stage is next 332 self.executor.plan.insert_stage(stage) 333 return self.executor.stage_ok() 334 return res 335 336 def get_challenge(self) -> AuthenticatorValidationChallenge: 337 challenges = self.executor.plan.context.get(PLAN_CONTEXT_DEVICE_CHALLENGES, []) 338 stages = self.executor.plan.context.get(PLAN_CONTEXT_STAGES, []) 339 stage_challenges = [] 340 for stage in stages: 341 serializer = SelectableStageSerializer( 342 data={ 343 "pk": stage.pk, 344 "name": getattr(stage, "friendly_name", stage.name) or stage.name, 345 "verbose_name": str(stage._meta.verbose_name) 346 .replace("Setup Stage", "") 347 .strip(), 348 "meta_model_name": f"{stage._meta.app_label}.{stage._meta.model_name}", 349 } 350 ) 351 serializer.is_valid() 352 stage_challenges.append(serializer.data) 353 return AuthenticatorValidationChallenge( 354 data={ 355 "component": "ak-stage-authenticator-validate", 356 "device_challenges": challenges, 357 "configuration_stages": stage_challenges, 358 } 359 ) 360 361 @property 362 def cookie_jwt_key(self) -> str: 363 """Signing key for MFA Cookie for this stage""" 364 return sha256( 365 f"{get_unique_identifier()}:{self.executor.current_stage.pk.hex}".encode("ascii") 366 ).hexdigest() 367 368 def check_mfa_cookie(self, allowed_devices: list[Device]): 369 """Check if an MFA cookie has been set, whether it's valid and applies 370 to the current stage and device. 371 372 The list of devices passed to this function must only contain devices for the 373 correct user and with an allowed class""" 374 if COOKIE_NAME_MFA not in self.request.COOKIES: 375 return 376 stage: AuthenticatorValidateStage = self.executor.current_stage 377 threshold = timedelta_from_string(stage.last_auth_threshold) 378 latest_allowed = datetime.now() + threshold 379 try: 380 payload = decode(self.request.COOKIES[COOKIE_NAME_MFA], self.cookie_jwt_key, ["HS256"]) 381 if payload["stage"] != stage.pk.hex: 382 self.logger.warning("Invalid stage PK") 383 return 384 if datetime.fromtimestamp(payload["exp"]) > latest_allowed: 385 self.logger.warning("Expired MFA cookie") 386 return 387 if not any(device.pk == payload["device"] for device in allowed_devices): 388 self.logger.warning("Invalid device PK") 389 return 390 self.logger.info("MFA has been used within threshold") 391 raise FlowSkipStageException() 392 except (PyJWTError, ValueError, TypeError) as exc: 393 self.logger.info("Invalid mfa cookie for device", exc=exc) 394 395 def set_valid_mfa_cookie(self, device: Device) -> HttpResponse: 396 """Set an MFA cookie to allow users to skip MFA validation in this context (browser) 397 398 The cookie is JWT which is signed with a hash of the secret key and the UID of the stage""" 399 stage: AuthenticatorValidateStage = self.executor.current_stage 400 delta = timedelta_from_string(stage.last_auth_threshold) 401 if delta.total_seconds() < 1: 402 self.logger.info("Not setting MFA cookie since threshold is not set.") 403 return self.executor.stage_ok() 404 expiry = datetime.now() + delta 405 cookie_payload = { 406 "device": device.pk, 407 "stage": stage.pk.hex, 408 "exp": expiry.timestamp(), 409 } 410 response = self.executor.stage_ok() 411 cookie = encode(cookie_payload, self.cookie_jwt_key) 412 response.set_cookie( 413 COOKIE_NAME_MFA, 414 cookie, 415 expires=expiry, 416 path=settings.SESSION_COOKIE_PATH, 417 domain=settings.SESSION_COOKIE_DOMAIN, 418 samesite=settings.SESSION_COOKIE_SAMESITE, 419 ) 420 return response 421 422 def challenge_invalid(self, response: AuthenticatorValidationChallengeResponse) -> HttpResponse: 423 update_score(self.request, self.get_pending_user().username, -1) 424 return super().challenge_invalid(response) 425 426 def challenge_valid(self, response: AuthenticatorValidationChallengeResponse) -> HttpResponse: 427 # All validation is done by the serializer 428 user = self.executor.plan.context.get(PLAN_CONTEXT_PENDING_USER) 429 if not user and "webauthn" in response.data: 430 webauthn_device: WebAuthnDevice = response.device 431 self.logger.debug("Set user from user-less flow", user=webauthn_device.user) 432 self.executor.plan.context[PLAN_CONTEXT_PENDING_USER] = webauthn_device.user 433 # We already set a default method in the validator above 434 # so this needs to have higher priority 435 self.executor.plan.context[PLAN_CONTEXT_METHOD] = "auth_webauthn_pwl" 436 self.executor.plan.context.setdefault(PLAN_CONTEXT_METHOD_ARGS, {}) 437 self.executor.plan.context[PLAN_CONTEXT_METHOD_ARGS].update( 438 { 439 "device": webauthn_device, 440 "device_type": webauthn_device.device_type, 441 } 442 ) 443 return self.set_valid_mfa_cookie(response.device)
51class SelectableStageSerializer(PassiveSerializer): 52 """Serializer for stages which can be selected by users""" 53 54 pk = UUIDField() 55 name = CharField() 56 verbose_name = CharField() 57 meta_model_name = CharField()
Serializer for stages which can be selected by users
Inherited Members
60class AuthenticatorValidationChallenge(WithUserInfoChallenge): 61 """Authenticator challenge""" 62 63 device_challenges = ListField(child=DeviceChallenge()) 64 component = CharField(default="ak-stage-authenticator-validate") 65 configuration_stages = ListField(child=SelectableStageSerializer())
Authenticator challenge
68class AuthenticatorValidationChallengeResponse(ChallengeResponse): 69 """Challenge used for Code-based and WebAuthn authenticators""" 70 71 device: Device | None 72 73 selected_challenge = DeviceChallenge(required=False) 74 selected_stage = CharField(required=False) 75 76 code = CharField(required=False) 77 webauthn = JSONDictField(required=False) 78 duo = IntegerField(required=False) 79 component = CharField(default="ak-stage-authenticator-validate") 80 81 def _challenge_allowed(self, classes: list): 82 device_challenges: list[dict] = self.stage.executor.plan.context.get( 83 PLAN_CONTEXT_DEVICE_CHALLENGES, [] 84 ) 85 if not any(x["device_class"] in classes for x in device_challenges): 86 raise ValidationError("No compatible device class allowed") 87 88 def validate_code(self, code: str) -> str: 89 """Validate code-based response, raise error if code isn't allowed""" 90 self._challenge_allowed( 91 [DeviceClasses.TOTP, DeviceClasses.STATIC, DeviceClasses.SMS, DeviceClasses.EMAIL] 92 ) 93 self.device = validate_challenge_code(code, self.stage, self.stage.get_pending_user()) 94 return code 95 96 def validate_webauthn(self, webauthn: dict) -> dict: 97 """Validate webauthn response, raise error if webauthn wasn't allowed 98 or response is invalid""" 99 self._challenge_allowed([DeviceClasses.WEBAUTHN]) 100 self.device = validate_challenge_webauthn( 101 webauthn, self.stage, self.stage.get_pending_user() 102 ) 103 return webauthn 104 105 def validate_duo(self, duo: int) -> int: 106 """Initiate Duo authentication""" 107 self._challenge_allowed([DeviceClasses.DUO]) 108 self.device = validate_challenge_duo(duo, self.stage, self.stage.get_pending_user()) 109 return duo 110 111 def validate_selected_challenge(self, challenge: dict) -> dict: 112 """Check which challenge the user has selected. Actual logic only used for SMS stage.""" 113 # First check if the challenge is valid 114 allowed = False 115 for device_challenge in self.stage.executor.plan.context.get( 116 PLAN_CONTEXT_DEVICE_CHALLENGES, [] 117 ): 118 if device_challenge.get("device_class", "") == challenge.get( 119 "device_class", "" 120 ) and device_challenge.get("device_uid", "") == challenge.get("device_uid", ""): 121 allowed = True 122 if not allowed: 123 raise ValidationError("invalid challenge selected") 124 125 device_class = challenge.get("device_class", "") 126 if device_class == "sms": 127 devices = SMSDevice.objects.filter(pk=int(challenge.get("device_uid", "0"))) 128 if not devices.exists(): 129 raise ValidationError("invalid challenge selected") 130 select_challenge(self.stage.request, devices.first()) 131 elif device_class == "email": 132 devices = EmailDevice.objects.filter(pk=int(challenge.get("device_uid", "0"))) 133 if not devices.exists(): 134 raise ValidationError("invalid challenge selected") 135 select_challenge(self.stage.request, devices.first()) 136 return challenge 137 138 def validate_selected_stage(self, stage_pk: str) -> str: 139 """Check that the selected stage is valid""" 140 stages = self.stage.executor.plan.context.get(PLAN_CONTEXT_STAGES, []) 141 if not any(str(stage.pk) == stage_pk for stage in stages): 142 raise ValidationError("Selected stage is invalid") 143 self.stage.logger.debug("Setting selected stage to ", stage=stage_pk) 144 self.stage.executor.plan.context[PLAN_CONTEXT_SELECTED_STAGE] = stage_pk 145 return stage_pk 146 147 def validate(self, attrs: dict): 148 # Checking if the given data is from a valid device class is done above 149 # Here we only check if the any data was sent at all 150 if "code" not in attrs and "webauthn" not in attrs and "duo" not in attrs: 151 raise ValidationError("Empty response") 152 self.stage.executor.plan.context.setdefault(PLAN_CONTEXT_METHOD, "auth_mfa") 153 self.stage.executor.plan.context.setdefault(PLAN_CONTEXT_METHOD_ARGS, {}) 154 self.stage.executor.plan.context[PLAN_CONTEXT_METHOD_ARGS].setdefault("mfa_devices", []) 155 self.stage.executor.plan.context[PLAN_CONTEXT_METHOD_ARGS]["mfa_devices"].append( 156 self.device 157 ) 158 with audit_ignore(): 159 self.device.last_used = now() 160 self.device.save() 161 return attrs
Challenge used for Code-based and WebAuthn authenticators
88 def validate_code(self, code: str) -> str: 89 """Validate code-based response, raise error if code isn't allowed""" 90 self._challenge_allowed( 91 [DeviceClasses.TOTP, DeviceClasses.STATIC, DeviceClasses.SMS, DeviceClasses.EMAIL] 92 ) 93 self.device = validate_challenge_code(code, self.stage, self.stage.get_pending_user()) 94 return code
Validate code-based response, raise error if code isn't allowed
96 def validate_webauthn(self, webauthn: dict) -> dict: 97 """Validate webauthn response, raise error if webauthn wasn't allowed 98 or response is invalid""" 99 self._challenge_allowed([DeviceClasses.WEBAUTHN]) 100 self.device = validate_challenge_webauthn( 101 webauthn, self.stage, self.stage.get_pending_user() 102 ) 103 return webauthn
Validate webauthn response, raise error if webauthn wasn't allowed or response is invalid
105 def validate_duo(self, duo: int) -> int: 106 """Initiate Duo authentication""" 107 self._challenge_allowed([DeviceClasses.DUO]) 108 self.device = validate_challenge_duo(duo, self.stage, self.stage.get_pending_user()) 109 return duo
Initiate Duo authentication
111 def validate_selected_challenge(self, challenge: dict) -> dict: 112 """Check which challenge the user has selected. Actual logic only used for SMS stage.""" 113 # First check if the challenge is valid 114 allowed = False 115 for device_challenge in self.stage.executor.plan.context.get( 116 PLAN_CONTEXT_DEVICE_CHALLENGES, [] 117 ): 118 if device_challenge.get("device_class", "") == challenge.get( 119 "device_class", "" 120 ) and device_challenge.get("device_uid", "") == challenge.get("device_uid", ""): 121 allowed = True 122 if not allowed: 123 raise ValidationError("invalid challenge selected") 124 125 device_class = challenge.get("device_class", "") 126 if device_class == "sms": 127 devices = SMSDevice.objects.filter(pk=int(challenge.get("device_uid", "0"))) 128 if not devices.exists(): 129 raise ValidationError("invalid challenge selected") 130 select_challenge(self.stage.request, devices.first()) 131 elif device_class == "email": 132 devices = EmailDevice.objects.filter(pk=int(challenge.get("device_uid", "0"))) 133 if not devices.exists(): 134 raise ValidationError("invalid challenge selected") 135 select_challenge(self.stage.request, devices.first()) 136 return challenge
Check which challenge the user has selected. Actual logic only used for SMS stage.
138 def validate_selected_stage(self, stage_pk: str) -> str: 139 """Check that the selected stage is valid""" 140 stages = self.stage.executor.plan.context.get(PLAN_CONTEXT_STAGES, []) 141 if not any(str(stage.pk) == stage_pk for stage in stages): 142 raise ValidationError("Selected stage is invalid") 143 self.stage.logger.debug("Setting selected stage to ", stage=stage_pk) 144 self.stage.executor.plan.context[PLAN_CONTEXT_SELECTED_STAGE] = stage_pk 145 return stage_pk
Check that the selected stage is valid
147 def validate(self, attrs: dict): 148 # Checking if the given data is from a valid device class is done above 149 # Here we only check if the any data was sent at all 150 if "code" not in attrs and "webauthn" not in attrs and "duo" not in attrs: 151 raise ValidationError("Empty response") 152 self.stage.executor.plan.context.setdefault(PLAN_CONTEXT_METHOD, "auth_mfa") 153 self.stage.executor.plan.context.setdefault(PLAN_CONTEXT_METHOD_ARGS, {}) 154 self.stage.executor.plan.context[PLAN_CONTEXT_METHOD_ARGS].setdefault("mfa_devices", []) 155 self.stage.executor.plan.context[PLAN_CONTEXT_METHOD_ARGS]["mfa_devices"].append( 156 self.device 157 ) 158 with audit_ignore(): 159 self.device.last_used = now() 160 self.device.save() 161 return attrs
164class AuthenticatorValidateStageView(ChallengeStageView): 165 """Authenticator Validation""" 166 167 response_class = AuthenticatorValidationChallengeResponse 168 169 def get_device_challenges(self) -> list[dict]: 170 """Get a list of all device challenges applicable for the current stage""" 171 challenges = [] 172 pending_user = self.get_pending_user() 173 if pending_user.is_anonymous: 174 # We shouldn't get here without any kind of authentication data 175 raise StageInvalidException() 176 # When `pretend_user_exists` is enabled in the identification stage, 177 # `pending_user` will be a user model that isn't save to the DB 178 # hence it doesn't have a PK. In that case we just return an empty list of 179 # authenticators 180 if not pending_user.pk: 181 return [] 182 # Convert to a list to have usable log output instead of just <generator ...> 183 user_devices = list(devices_for_user(self.get_pending_user())) 184 self.logger.debug("Got devices for user", devices=user_devices) 185 186 # static and totp are only shown once 187 # since their challenges are device-independent 188 seen_classes = [] 189 190 stage: AuthenticatorValidateStage = self.executor.current_stage 191 192 threshold = timedelta_from_string(stage.last_auth_threshold) 193 allowed_devices = [] 194 195 has_webauthn_filters_set = stage.webauthn_allowed_device_types.exists() 196 197 for device in user_devices: 198 device_class = device.__class__.__name__.lower().replace("device", "") 199 if device_class not in stage.device_classes: 200 self.logger.debug("device class not allowed", device_class=device_class) 201 continue 202 if isinstance(device, SMSDevice) and device.is_hashed: 203 self.logger.debug("Hashed SMS device, skipping", device=device) 204 continue 205 allowed_devices.append(device) 206 # Ignore WebAuthn devices which are not in the allowed types 207 if ( 208 isinstance(device, WebAuthnDevice) 209 and device.device_type 210 and has_webauthn_filters_set 211 ): 212 if not stage.webauthn_allowed_device_types.filter( 213 pk=device.device_type.pk 214 ).exists(): 215 self.logger.debug( 216 "WebAuthn device type not allowed", device=device, type=device.device_type 217 ) 218 continue 219 # Ensure only one challenge per device class 220 # WebAuthn does another device loop to find all WebAuthn devices 221 if device_class in seen_classes: 222 continue 223 if device_class not in seen_classes: 224 seen_classes.append(device_class) 225 challenge = DeviceChallenge( 226 data={ 227 "device_class": device_class, 228 "device_uid": device.pk, 229 "challenge": get_challenge_for_device(self, stage, device), 230 "last_used": device.last_used, 231 } 232 ) 233 challenge.is_valid() 234 challenges.append(challenge.data) 235 self.logger.debug("adding challenge for device", challenge=challenge) 236 # check if we have an MFA cookie and if it's valid 237 if threshold.total_seconds() > 0: 238 self.check_mfa_cookie(allowed_devices) 239 return challenges 240 241 def get_webauthn_challenge_without_user(self) -> list[dict]: 242 """Get a WebAuthn challenge when no pending user is set.""" 243 challenge = DeviceChallenge( 244 data={ 245 "device_class": DeviceClasses.WEBAUTHN, 246 "device_uid": -1, 247 "challenge": get_webauthn_challenge_without_user( 248 self, 249 self.executor.current_stage, 250 ), 251 "last_used": None, 252 } 253 ) 254 challenge.is_valid() 255 return [challenge.data] 256 257 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: # noqa: PLR0911 258 """Check if a user is set, and check if the user has any devices 259 if not, we can skip this entire stage""" 260 user = self.get_pending_user() 261 stage: AuthenticatorValidateStage = self.executor.current_stage 262 if user and not user.is_anonymous: 263 try: 264 challenges = self.get_device_challenges() 265 except FlowSkipStageException: 266 return self.executor.stage_ok() 267 else: 268 if self.executor.flow.designation != FlowDesignation.AUTHENTICATION: 269 self.logger.debug("Refusing passwordless flow in non-authentication flow") 270 return self.executor.stage_ok() 271 # Passwordless auth, with just webauthn 272 if DeviceClasses.WEBAUTHN in stage.device_classes: 273 self.logger.debug("Flow without user, getting generic webauthn challenge") 274 challenges = self.get_webauthn_challenge_without_user() 275 else: 276 self.logger.debug("No pending user, continuing") 277 return self.executor.stage_ok() 278 self.executor.plan.context[PLAN_CONTEXT_DEVICE_CHALLENGES] = challenges 279 280 # No allowed devices 281 if len(challenges) < 1: 282 if stage.not_configured_action == NotConfiguredAction.SKIP: 283 self.logger.debug("Authenticator not configured, skipping stage") 284 return self.executor.stage_ok() 285 if stage.not_configured_action == NotConfiguredAction.DENY: 286 self.logger.debug("Authenticator not configured, denying") 287 return self.executor.stage_invalid(_("No (allowed) MFA authenticator configured.")) 288 if stage.not_configured_action == NotConfiguredAction.CONFIGURE: 289 self.logger.debug("Authenticator not configured, forcing configure") 290 return self.prepare_stages(user) 291 return super().get(request, *args, **kwargs) 292 293 def prepare_stages(self, user: User, *args, **kwargs) -> HttpResponse: 294 """Check how the user can configure themselves. If no stages are set, return an error. 295 If a single stage is set, insert that stage directly. If multiple are selected, include 296 them in the challenge.""" 297 stage: AuthenticatorValidateStage = self.executor.current_stage 298 if not stage.configuration_stages.exists(): 299 Event.new( 300 EventAction.CONFIGURATION_ERROR, 301 message=( 302 "Authenticator validation stage is set to configure user " 303 "but no configuration flow is set." 304 ), 305 stage=self, 306 ).from_http(self.request).set_user(user).save() 307 return self.executor.stage_invalid() 308 if stage.configuration_stages.count() == 1: 309 next_stage = Stage.objects.get_subclass(pk=stage.configuration_stages.first().pk) 310 self.logger.debug("Single stage configured, auto-selecting", stage=next_stage) 311 self.executor.plan.context[PLAN_CONTEXT_SELECTED_STAGE] = next_stage 312 # Because that normal execution only happens on post, we directly inject it here and 313 # return it 314 self.executor.plan.insert_stage(next_stage) 315 return self.executor.stage_ok() 316 stages = Stage.objects.filter(pk__in=stage.configuration_stages.all()).select_subclasses() 317 self.executor.plan.context[PLAN_CONTEXT_STAGES] = stages 318 return super().get(self.request, *args, **kwargs) 319 320 def post(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 321 res = super().post(request, *args, **kwargs) 322 if ( 323 PLAN_CONTEXT_SELECTED_STAGE in self.executor.plan.context 324 and self.executor.current_stage.not_configured_action == NotConfiguredAction.CONFIGURE 325 ): 326 self.logger.debug("Got selected stage in context, running that") 327 stage_pk = self.executor.plan.context.get(PLAN_CONTEXT_SELECTED_STAGE) 328 # Because the foreign key to stage.configuration_stage points to 329 # a base stage class, we need to do another lookup 330 stage = Stage.objects.get_subclass(pk=stage_pk) 331 # plan.insert inserts at 1 index, so when stage_ok pops 0, 332 # the configuration stage is next 333 self.executor.plan.insert_stage(stage) 334 return self.executor.stage_ok() 335 return res 336 337 def get_challenge(self) -> AuthenticatorValidationChallenge: 338 challenges = self.executor.plan.context.get(PLAN_CONTEXT_DEVICE_CHALLENGES, []) 339 stages = self.executor.plan.context.get(PLAN_CONTEXT_STAGES, []) 340 stage_challenges = [] 341 for stage in stages: 342 serializer = SelectableStageSerializer( 343 data={ 344 "pk": stage.pk, 345 "name": getattr(stage, "friendly_name", stage.name) or stage.name, 346 "verbose_name": str(stage._meta.verbose_name) 347 .replace("Setup Stage", "") 348 .strip(), 349 "meta_model_name": f"{stage._meta.app_label}.{stage._meta.model_name}", 350 } 351 ) 352 serializer.is_valid() 353 stage_challenges.append(serializer.data) 354 return AuthenticatorValidationChallenge( 355 data={ 356 "component": "ak-stage-authenticator-validate", 357 "device_challenges": challenges, 358 "configuration_stages": stage_challenges, 359 } 360 ) 361 362 @property 363 def cookie_jwt_key(self) -> str: 364 """Signing key for MFA Cookie for this stage""" 365 return sha256( 366 f"{get_unique_identifier()}:{self.executor.current_stage.pk.hex}".encode("ascii") 367 ).hexdigest() 368 369 def check_mfa_cookie(self, allowed_devices: list[Device]): 370 """Check if an MFA cookie has been set, whether it's valid and applies 371 to the current stage and device. 372 373 The list of devices passed to this function must only contain devices for the 374 correct user and with an allowed class""" 375 if COOKIE_NAME_MFA not in self.request.COOKIES: 376 return 377 stage: AuthenticatorValidateStage = self.executor.current_stage 378 threshold = timedelta_from_string(stage.last_auth_threshold) 379 latest_allowed = datetime.now() + threshold 380 try: 381 payload = decode(self.request.COOKIES[COOKIE_NAME_MFA], self.cookie_jwt_key, ["HS256"]) 382 if payload["stage"] != stage.pk.hex: 383 self.logger.warning("Invalid stage PK") 384 return 385 if datetime.fromtimestamp(payload["exp"]) > latest_allowed: 386 self.logger.warning("Expired MFA cookie") 387 return 388 if not any(device.pk == payload["device"] for device in allowed_devices): 389 self.logger.warning("Invalid device PK") 390 return 391 self.logger.info("MFA has been used within threshold") 392 raise FlowSkipStageException() 393 except (PyJWTError, ValueError, TypeError) as exc: 394 self.logger.info("Invalid mfa cookie for device", exc=exc) 395 396 def set_valid_mfa_cookie(self, device: Device) -> HttpResponse: 397 """Set an MFA cookie to allow users to skip MFA validation in this context (browser) 398 399 The cookie is JWT which is signed with a hash of the secret key and the UID of the stage""" 400 stage: AuthenticatorValidateStage = self.executor.current_stage 401 delta = timedelta_from_string(stage.last_auth_threshold) 402 if delta.total_seconds() < 1: 403 self.logger.info("Not setting MFA cookie since threshold is not set.") 404 return self.executor.stage_ok() 405 expiry = datetime.now() + delta 406 cookie_payload = { 407 "device": device.pk, 408 "stage": stage.pk.hex, 409 "exp": expiry.timestamp(), 410 } 411 response = self.executor.stage_ok() 412 cookie = encode(cookie_payload, self.cookie_jwt_key) 413 response.set_cookie( 414 COOKIE_NAME_MFA, 415 cookie, 416 expires=expiry, 417 path=settings.SESSION_COOKIE_PATH, 418 domain=settings.SESSION_COOKIE_DOMAIN, 419 samesite=settings.SESSION_COOKIE_SAMESITE, 420 ) 421 return response 422 423 def challenge_invalid(self, response: AuthenticatorValidationChallengeResponse) -> HttpResponse: 424 update_score(self.request, self.get_pending_user().username, -1) 425 return super().challenge_invalid(response) 426 427 def challenge_valid(self, response: AuthenticatorValidationChallengeResponse) -> HttpResponse: 428 # All validation is done by the serializer 429 user = self.executor.plan.context.get(PLAN_CONTEXT_PENDING_USER) 430 if not user and "webauthn" in response.data: 431 webauthn_device: WebAuthnDevice = response.device 432 self.logger.debug("Set user from user-less flow", user=webauthn_device.user) 433 self.executor.plan.context[PLAN_CONTEXT_PENDING_USER] = webauthn_device.user 434 # We already set a default method in the validator above 435 # so this needs to have higher priority 436 self.executor.plan.context[PLAN_CONTEXT_METHOD] = "auth_webauthn_pwl" 437 self.executor.plan.context.setdefault(PLAN_CONTEXT_METHOD_ARGS, {}) 438 self.executor.plan.context[PLAN_CONTEXT_METHOD_ARGS].update( 439 { 440 "device": webauthn_device, 441 "device_type": webauthn_device.device_type, 442 } 443 ) 444 return self.set_valid_mfa_cookie(response.device)
Authenticator Validation
169 def get_device_challenges(self) -> list[dict]: 170 """Get a list of all device challenges applicable for the current stage""" 171 challenges = [] 172 pending_user = self.get_pending_user() 173 if pending_user.is_anonymous: 174 # We shouldn't get here without any kind of authentication data 175 raise StageInvalidException() 176 # When `pretend_user_exists` is enabled in the identification stage, 177 # `pending_user` will be a user model that isn't save to the DB 178 # hence it doesn't have a PK. In that case we just return an empty list of 179 # authenticators 180 if not pending_user.pk: 181 return [] 182 # Convert to a list to have usable log output instead of just <generator ...> 183 user_devices = list(devices_for_user(self.get_pending_user())) 184 self.logger.debug("Got devices for user", devices=user_devices) 185 186 # static and totp are only shown once 187 # since their challenges are device-independent 188 seen_classes = [] 189 190 stage: AuthenticatorValidateStage = self.executor.current_stage 191 192 threshold = timedelta_from_string(stage.last_auth_threshold) 193 allowed_devices = [] 194 195 has_webauthn_filters_set = stage.webauthn_allowed_device_types.exists() 196 197 for device in user_devices: 198 device_class = device.__class__.__name__.lower().replace("device", "") 199 if device_class not in stage.device_classes: 200 self.logger.debug("device class not allowed", device_class=device_class) 201 continue 202 if isinstance(device, SMSDevice) and device.is_hashed: 203 self.logger.debug("Hashed SMS device, skipping", device=device) 204 continue 205 allowed_devices.append(device) 206 # Ignore WebAuthn devices which are not in the allowed types 207 if ( 208 isinstance(device, WebAuthnDevice) 209 and device.device_type 210 and has_webauthn_filters_set 211 ): 212 if not stage.webauthn_allowed_device_types.filter( 213 pk=device.device_type.pk 214 ).exists(): 215 self.logger.debug( 216 "WebAuthn device type not allowed", device=device, type=device.device_type 217 ) 218 continue 219 # Ensure only one challenge per device class 220 # WebAuthn does another device loop to find all WebAuthn devices 221 if device_class in seen_classes: 222 continue 223 if device_class not in seen_classes: 224 seen_classes.append(device_class) 225 challenge = DeviceChallenge( 226 data={ 227 "device_class": device_class, 228 "device_uid": device.pk, 229 "challenge": get_challenge_for_device(self, stage, device), 230 "last_used": device.last_used, 231 } 232 ) 233 challenge.is_valid() 234 challenges.append(challenge.data) 235 self.logger.debug("adding challenge for device", challenge=challenge) 236 # check if we have an MFA cookie and if it's valid 237 if threshold.total_seconds() > 0: 238 self.check_mfa_cookie(allowed_devices) 239 return challenges
Get a list of all device challenges applicable for the current stage
241 def get_webauthn_challenge_without_user(self) -> list[dict]: 242 """Get a WebAuthn challenge when no pending user is set.""" 243 challenge = DeviceChallenge( 244 data={ 245 "device_class": DeviceClasses.WEBAUTHN, 246 "device_uid": -1, 247 "challenge": get_webauthn_challenge_without_user( 248 self, 249 self.executor.current_stage, 250 ), 251 "last_used": None, 252 } 253 ) 254 challenge.is_valid() 255 return [challenge.data]
Get a WebAuthn challenge when no pending user is set.
257 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: # noqa: PLR0911 258 """Check if a user is set, and check if the user has any devices 259 if not, we can skip this entire stage""" 260 user = self.get_pending_user() 261 stage: AuthenticatorValidateStage = self.executor.current_stage 262 if user and not user.is_anonymous: 263 try: 264 challenges = self.get_device_challenges() 265 except FlowSkipStageException: 266 return self.executor.stage_ok() 267 else: 268 if self.executor.flow.designation != FlowDesignation.AUTHENTICATION: 269 self.logger.debug("Refusing passwordless flow in non-authentication flow") 270 return self.executor.stage_ok() 271 # Passwordless auth, with just webauthn 272 if DeviceClasses.WEBAUTHN in stage.device_classes: 273 self.logger.debug("Flow without user, getting generic webauthn challenge") 274 challenges = self.get_webauthn_challenge_without_user() 275 else: 276 self.logger.debug("No pending user, continuing") 277 return self.executor.stage_ok() 278 self.executor.plan.context[PLAN_CONTEXT_DEVICE_CHALLENGES] = challenges 279 280 # No allowed devices 281 if len(challenges) < 1: 282 if stage.not_configured_action == NotConfiguredAction.SKIP: 283 self.logger.debug("Authenticator not configured, skipping stage") 284 return self.executor.stage_ok() 285 if stage.not_configured_action == NotConfiguredAction.DENY: 286 self.logger.debug("Authenticator not configured, denying") 287 return self.executor.stage_invalid(_("No (allowed) MFA authenticator configured.")) 288 if stage.not_configured_action == NotConfiguredAction.CONFIGURE: 289 self.logger.debug("Authenticator not configured, forcing configure") 290 return self.prepare_stages(user) 291 return super().get(request, *args, **kwargs)
Check if a user is set, and check if the user has any devices if not, we can skip this entire stage
293 def prepare_stages(self, user: User, *args, **kwargs) -> HttpResponse: 294 """Check how the user can configure themselves. If no stages are set, return an error. 295 If a single stage is set, insert that stage directly. If multiple are selected, include 296 them in the challenge.""" 297 stage: AuthenticatorValidateStage = self.executor.current_stage 298 if not stage.configuration_stages.exists(): 299 Event.new( 300 EventAction.CONFIGURATION_ERROR, 301 message=( 302 "Authenticator validation stage is set to configure user " 303 "but no configuration flow is set." 304 ), 305 stage=self, 306 ).from_http(self.request).set_user(user).save() 307 return self.executor.stage_invalid() 308 if stage.configuration_stages.count() == 1: 309 next_stage = Stage.objects.get_subclass(pk=stage.configuration_stages.first().pk) 310 self.logger.debug("Single stage configured, auto-selecting", stage=next_stage) 311 self.executor.plan.context[PLAN_CONTEXT_SELECTED_STAGE] = next_stage 312 # Because that normal execution only happens on post, we directly inject it here and 313 # return it 314 self.executor.plan.insert_stage(next_stage) 315 return self.executor.stage_ok() 316 stages = Stage.objects.filter(pk__in=stage.configuration_stages.all()).select_subclasses() 317 self.executor.plan.context[PLAN_CONTEXT_STAGES] = stages 318 return super().get(self.request, *args, **kwargs)
Check how the user can configure themselves. If no stages are set, return an error. If a single stage is set, insert that stage directly. If multiple are selected, include them in the challenge.
320 def post(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 321 res = super().post(request, *args, **kwargs) 322 if ( 323 PLAN_CONTEXT_SELECTED_STAGE in self.executor.plan.context 324 and self.executor.current_stage.not_configured_action == NotConfiguredAction.CONFIGURE 325 ): 326 self.logger.debug("Got selected stage in context, running that") 327 stage_pk = self.executor.plan.context.get(PLAN_CONTEXT_SELECTED_STAGE) 328 # Because the foreign key to stage.configuration_stage points to 329 # a base stage class, we need to do another lookup 330 stage = Stage.objects.get_subclass(pk=stage_pk) 331 # plan.insert inserts at 1 index, so when stage_ok pops 0, 332 # the configuration stage is next 333 self.executor.plan.insert_stage(stage) 334 return self.executor.stage_ok() 335 return res
Handle challenge response
337 def get_challenge(self) -> AuthenticatorValidationChallenge: 338 challenges = self.executor.plan.context.get(PLAN_CONTEXT_DEVICE_CHALLENGES, []) 339 stages = self.executor.plan.context.get(PLAN_CONTEXT_STAGES, []) 340 stage_challenges = [] 341 for stage in stages: 342 serializer = SelectableStageSerializer( 343 data={ 344 "pk": stage.pk, 345 "name": getattr(stage, "friendly_name", stage.name) or stage.name, 346 "verbose_name": str(stage._meta.verbose_name) 347 .replace("Setup Stage", "") 348 .strip(), 349 "meta_model_name": f"{stage._meta.app_label}.{stage._meta.model_name}", 350 } 351 ) 352 serializer.is_valid() 353 stage_challenges.append(serializer.data) 354 return AuthenticatorValidationChallenge( 355 data={ 356 "component": "ak-stage-authenticator-validate", 357 "device_challenges": challenges, 358 "configuration_stages": stage_challenges, 359 } 360 )
Return the challenge that the client should solve
423 def challenge_invalid(self, response: AuthenticatorValidationChallengeResponse) -> HttpResponse: 424 update_score(self.request, self.get_pending_user().username, -1) 425 return super().challenge_invalid(response)
Callback when the challenge has the incorrect format
427 def challenge_valid(self, response: AuthenticatorValidationChallengeResponse) -> HttpResponse: 428 # All validation is done by the serializer 429 user = self.executor.plan.context.get(PLAN_CONTEXT_PENDING_USER) 430 if not user and "webauthn" in response.data: 431 webauthn_device: WebAuthnDevice = response.device 432 self.logger.debug("Set user from user-less flow", user=webauthn_device.user) 433 self.executor.plan.context[PLAN_CONTEXT_PENDING_USER] = webauthn_device.user 434 # We already set a default method in the validator above 435 # so this needs to have higher priority 436 self.executor.plan.context[PLAN_CONTEXT_METHOD] = "auth_webauthn_pwl" 437 self.executor.plan.context.setdefault(PLAN_CONTEXT_METHOD_ARGS, {}) 438 self.executor.plan.context[PLAN_CONTEXT_METHOD_ARGS].update( 439 { 440 "device": webauthn_device, 441 "device_type": webauthn_device.device_type, 442 } 443 ) 444 return self.set_valid_mfa_cookie(response.device)
Callback when the challenge has the correct format