authentik.stages.authenticator_validate.challenge
Validation stage challenge checking
1"""Validation stage challenge checking""" 2 3from typing import TYPE_CHECKING 4from urllib.parse import urlencode 5 6from django.db import transaction 7from django.http import HttpRequest 8from django.http.response import Http404 9from django.shortcuts import get_object_or_404 10from django.utils.translation import gettext as __ 11from django.utils.translation import gettext_lazy as _ 12from rest_framework.fields import CharField, ChoiceField, DateTimeField 13from rest_framework.serializers import ValidationError 14from structlog.stdlib import get_logger 15from webauthn.authentication.generate_authentication_options import generate_authentication_options 16from webauthn.authentication.verify_authentication_response import verify_authentication_response 17from webauthn.helpers import parse_authentication_credential_json 18from webauthn.helpers.base64url_to_bytes import base64url_to_bytes 19from webauthn.helpers.exceptions import InvalidAuthenticationResponse, InvalidJSONStructure 20from webauthn.helpers.options_to_json_dict import options_to_json_dict 21from webauthn.helpers.structs import PublicKeyCredentialType, UserVerificationRequirement 22 23from authentik.core.api.utils import JSONDictField, PassiveSerializer 24from authentik.core.models import Application, User 25from authentik.core.signals import login_failed 26from authentik.events.middleware import audit_ignore 27from authentik.events.models import Event, EventAction 28from authentik.flows.planner import PLAN_CONTEXT_APPLICATION 29from authentik.flows.stage import StageView 30from authentik.lib.utils.email import mask_email 31from authentik.lib.utils.time import timedelta_from_string 32from authentik.root.middleware import ClientIPMiddleware 33from authentik.stages.authenticator import devices_for_user 34from authentik.stages.authenticator.models import Device, ThrottlingMixin 35from authentik.stages.authenticator_duo.models import AuthenticatorDuoStage, DuoDevice 36from authentik.stages.authenticator_email.models import EmailDevice 37from authentik.stages.authenticator_sms.models import SMSDevice 38from authentik.stages.authenticator_validate.models import AuthenticatorValidateStage, DeviceClasses 39from authentik.stages.authenticator_webauthn.models import UserVerification, WebAuthnDevice 40from authentik.stages.authenticator_webauthn.stage import PLAN_CONTEXT_WEBAUTHN_CHALLENGE 41from authentik.stages.authenticator_webauthn.utils import get_origin, get_rp_id 42from authentik.stages.password.stage import PLAN_CONTEXT_METHOD_ARGS 43 44LOGGER = get_logger() 45if TYPE_CHECKING: 46 from authentik.stages.authenticator_validate.stage import AuthenticatorValidateStageView 47 48 49class DeviceChallenge(PassiveSerializer): 50 """Single device challenge""" 51 52 device_class = ChoiceField(choices=DeviceClasses.choices) 53 device_uid = CharField() 54 challenge = JSONDictField() 55 last_used = DateTimeField(allow_null=True) 56 57 58def get_challenge_for_device( 59 stage_view: AuthenticatorValidateStageView, stage: AuthenticatorValidateStage, device: Device 60) -> dict: 61 """Generate challenge for a single device""" 62 if isinstance(device, WebAuthnDevice): 63 return get_webauthn_challenge(stage_view, stage, device) 64 if isinstance(device, EmailDevice): 65 return {"email": mask_email(device.email)} 66 # Code-based challenges have no hints 67 return {} 68 69 70def get_webauthn_challenge_without_user( 71 stage_view: AuthenticatorValidateStageView, stage: AuthenticatorValidateStage 72) -> dict: 73 """Same as `get_webauthn_challenge`, but allows any client device. We can then later check 74 who the device belongs to.""" 75 stage_view.executor.plan.context.pop(PLAN_CONTEXT_WEBAUTHN_CHALLENGE, None) 76 authentication_options = generate_authentication_options( 77 rp_id=get_rp_id(stage_view.request), 78 allow_credentials=[], 79 user_verification=UserVerificationRequirement(stage.webauthn_user_verification), 80 ) 81 stage_view.executor.plan.context[PLAN_CONTEXT_WEBAUTHN_CHALLENGE] = ( 82 authentication_options.challenge 83 ) 84 85 options_dict = options_to_json_dict(authentication_options) 86 if stage.webauthn_hints: 87 options_dict["hints"] = list(stage.webauthn_hints) 88 return options_dict 89 90 91def get_webauthn_challenge( 92 stage_view: AuthenticatorValidateStageView, 93 stage: AuthenticatorValidateStage, 94 device: WebAuthnDevice | None = None, 95) -> dict: 96 """Send the client a challenge that we'll check later""" 97 stage_view.executor.plan.context.pop(PLAN_CONTEXT_WEBAUTHN_CHALLENGE, None) 98 99 allowed_credentials = [] 100 101 if device: 102 # We want all the user's WebAuthn devices and merge their challenges 103 for user_device in WebAuthnDevice.objects.filter(user=device.user).order_by("name"): 104 user_device: WebAuthnDevice 105 allowed_credentials.append(user_device.descriptor) 106 107 authentication_options = generate_authentication_options( 108 rp_id=get_rp_id(stage_view.request), 109 allow_credentials=allowed_credentials, 110 user_verification=UserVerificationRequirement(stage.webauthn_user_verification), 111 ) 112 113 stage_view.executor.plan.context[PLAN_CONTEXT_WEBAUTHN_CHALLENGE] = ( 114 authentication_options.challenge 115 ) 116 117 options_dict = options_to_json_dict(authentication_options) 118 if stage.webauthn_hints: 119 options_dict["hints"] = list(stage.webauthn_hints) 120 return options_dict 121 122 123def select_challenge(request: HttpRequest, device: Device): 124 """Callback when the user selected a challenge in the frontend.""" 125 if isinstance(device, SMSDevice): 126 select_challenge_sms(request, device) 127 elif isinstance(device, EmailDevice): 128 select_challenge_email(request, device) 129 130 131def select_challenge_sms(request: HttpRequest, device: SMSDevice): 132 """Send SMS""" 133 device.generate_token() 134 device.stage.send(request, device.token, device) 135 136 137def select_challenge_email(request: HttpRequest, device: EmailDevice): 138 """Send Email""" 139 valid_secs: int = timedelta_from_string(device.stage.token_expiry).total_seconds() 140 device.generate_token(valid_secs=valid_secs) 141 device.stage.send(device) 142 143 144def validate_challenge_code(code: str, stage_view: StageView, user: User) -> Device: 145 """Validate code-based challenges. We test against every device, on purpose, as 146 the user mustn't choose between totp and static devices.""" 147 148 with transaction.atomic(): 149 for device in devices_for_user(user, for_verify=True): 150 if isinstance(device, ThrottlingMixin): 151 throttling_factor = stage_view.executor.current_stage.get_throttling_factor( 152 DeviceClasses.from_model_label(device.model_label()) 153 ) 154 if throttling_factor is not None: 155 device.set_throttle_factor(throttling_factor) 156 if device.verify_token(code): 157 break 158 else: 159 device = None 160 161 if not device: 162 login_failed.send( 163 sender=__name__, 164 credentials={"username": user.username}, 165 request=stage_view.request, 166 stage=stage_view.executor.current_stage, 167 context={ 168 PLAN_CONTEXT_METHOD_ARGS: { 169 "device_class": DeviceClasses.TOTP.value, 170 } 171 }, 172 ) 173 raise ValidationError( 174 _("Invalid Token. Please ensure the time on your device is accurate and try again.") 175 ) 176 return device 177 178 179def validate_challenge_webauthn( 180 data: dict, 181 stage_view: StageView, 182 user: User, 183 stage: AuthenticatorValidateStage | None = None, 184) -> Device: 185 """Validate WebAuthn Challenge""" 186 request = stage_view.request 187 challenge = stage_view.executor.plan.context.get(PLAN_CONTEXT_WEBAUTHN_CHALLENGE) 188 stage = stage or stage_view.executor.current_stage 189 190 if "MinuteMaid" in request.META.get("HTTP_USER_AGENT", ""): 191 # Workaround for Android sign-in, when signing into Google Workspace on android while 192 # adding the account to the system (not in Chrome), for some reason `type` is not set 193 # so in that case we fall back to `public-key` 194 # since that's the only option we support anyways 195 data.setdefault("type", PublicKeyCredentialType.PUBLIC_KEY) 196 try: 197 credential = parse_authentication_credential_json(data) 198 except InvalidJSONStructure as exc: 199 LOGGER.warning("Invalid WebAuthn challenge response", exc=exc) 200 raise ValidationError("Invalid device", "invalid") from None 201 202 device = WebAuthnDevice.objects.filter(credential_id=credential.id).first() 203 if not device: 204 raise ValidationError("Invalid device", "invalid") 205 # We can only check the device's user if the user we're given isn't anonymous 206 # as this validation is also used for password-less login where webauthn is the very first 207 # step done by a user. Only if this validation happens at a later stage we can check 208 # that the device belongs to the user 209 if not user.is_anonymous and device.user != user: 210 raise ValidationError("Invalid device", "invalid") 211 # When a device_type was set when creating the device (2024.4+), and we have a limitation, 212 # make sure the device type is allowed. 213 if ( 214 device.device_type 215 and stage.webauthn_allowed_device_types.exists() 216 and not stage.webauthn_allowed_device_types.filter(pk=device.device_type.pk).exists() 217 ): 218 raise ValidationError( 219 _( 220 "Invalid device type. Contact your {brand} administrator for help.".format( 221 brand=stage_view.request.brand.branding_title 222 ) 223 ), 224 "invalid", 225 ) 226 try: 227 authentication_verification = verify_authentication_response( 228 credential=credential, 229 expected_challenge=challenge, 230 expected_rp_id=get_rp_id(request), 231 expected_origin=get_origin(request), 232 credential_public_key=base64url_to_bytes(device.public_key), 233 credential_current_sign_count=device.sign_count, 234 require_user_verification=stage.webauthn_user_verification == UserVerification.REQUIRED, 235 ) 236 except InvalidAuthenticationResponse as exc: 237 LOGGER.warning("Assertion failed", exc=exc) 238 login_failed.send( 239 sender=__name__, 240 credentials={"username": user.username}, 241 request=stage_view.request, 242 stage=stage_view.executor.current_stage, 243 context={ 244 PLAN_CONTEXT_METHOD_ARGS: { 245 "device": device, 246 "device_class": DeviceClasses.WEBAUTHN.value, 247 "device_type": device.device_type, 248 }, 249 }, 250 ) 251 raise ValidationError("Assertion failed") from exc 252 253 with audit_ignore(): 254 device.set_sign_count(authentication_verification.new_sign_count) 255 return device 256 257 258def validate_challenge_duo(device_pk: int, stage_view: StageView, user: User) -> Device: 259 """Duo authentication""" 260 device = get_object_or_404(DuoDevice, pk=device_pk) 261 if device.user != user: 262 LOGGER.warning("device mismatch") 263 raise Http404 264 stage: AuthenticatorDuoStage = device.stage 265 266 # Get additional context for push 267 pushinfo = { 268 __("Domain"): stage_view.request.get_host(), 269 } 270 if PLAN_CONTEXT_APPLICATION in stage_view.executor.plan.context: 271 pushinfo[__("Application")] = stage_view.executor.plan.context.get( 272 PLAN_CONTEXT_APPLICATION, Application() 273 ).name 274 275 try: 276 response = stage.auth_client().auth( 277 "auto", 278 user_id=device.duo_user_id, 279 ipaddr=ClientIPMiddleware.get_client_ip(stage_view.request), 280 type=__( 281 "{brand_name} Login request".format_map( 282 { 283 "brand_name": stage_view.request.brand.branding_title, 284 } 285 ) 286 ), 287 display_username=user.username, 288 device="auto", 289 pushinfo=urlencode(pushinfo), 290 ) 291 # {'result': 'allow', 'status': 'allow', 'status_msg': 'Success. Logging you in...'} 292 if response["result"] == "deny": 293 LOGGER.debug("duo push response", result=response["result"], msg=response["status_msg"]) 294 login_failed.send( 295 sender=__name__, 296 credentials={"username": user.username}, 297 request=stage_view.request, 298 stage=stage_view.executor.current_stage, 299 context={ 300 PLAN_CONTEXT_METHOD_ARGS: { 301 "device_class": DeviceClasses.DUO.value, 302 "duo_response": response, 303 } 304 }, 305 ) 306 raise ValidationError("Duo denied access", code="denied") 307 return device 308 except RuntimeError as exc: 309 Event.new( 310 EventAction.CONFIGURATION_ERROR, 311 message=f"Failed to DUO authenticate user: {str(exc)}", 312 user=user, 313 ).from_http(stage_view.request, user) 314 raise ValidationError("Duo denied access", code="denied") from exc
LOGGER =
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
50class DeviceChallenge(PassiveSerializer): 51 """Single device challenge""" 52 53 device_class = ChoiceField(choices=DeviceClasses.choices) 54 device_uid = CharField() 55 challenge = JSONDictField() 56 last_used = DateTimeField(allow_null=True)
Single device challenge
Inherited Members
def
get_challenge_for_device(unknown):
59def get_challenge_for_device( 60 stage_view: AuthenticatorValidateStageView, stage: AuthenticatorValidateStage, device: Device 61) -> dict: 62 """Generate challenge for a single device""" 63 if isinstance(device, WebAuthnDevice): 64 return get_webauthn_challenge(stage_view, stage, device) 65 if isinstance(device, EmailDevice): 66 return {"email": mask_email(device.email)} 67 # Code-based challenges have no hints 68 return {}
Generate challenge for a single device
def
get_webauthn_challenge_without_user(unknown):
71def get_webauthn_challenge_without_user( 72 stage_view: AuthenticatorValidateStageView, stage: AuthenticatorValidateStage 73) -> dict: 74 """Same as `get_webauthn_challenge`, but allows any client device. We can then later check 75 who the device belongs to.""" 76 stage_view.executor.plan.context.pop(PLAN_CONTEXT_WEBAUTHN_CHALLENGE, None) 77 authentication_options = generate_authentication_options( 78 rp_id=get_rp_id(stage_view.request), 79 allow_credentials=[], 80 user_verification=UserVerificationRequirement(stage.webauthn_user_verification), 81 ) 82 stage_view.executor.plan.context[PLAN_CONTEXT_WEBAUTHN_CHALLENGE] = ( 83 authentication_options.challenge 84 ) 85 86 options_dict = options_to_json_dict(authentication_options) 87 if stage.webauthn_hints: 88 options_dict["hints"] = list(stage.webauthn_hints) 89 return options_dict
Same as get_webauthn_challenge, but allows any client device. We can then later check
who the device belongs to.
def
get_webauthn_challenge(unknown):
92def get_webauthn_challenge( 93 stage_view: AuthenticatorValidateStageView, 94 stage: AuthenticatorValidateStage, 95 device: WebAuthnDevice | None = None, 96) -> dict: 97 """Send the client a challenge that we'll check later""" 98 stage_view.executor.plan.context.pop(PLAN_CONTEXT_WEBAUTHN_CHALLENGE, None) 99 100 allowed_credentials = [] 101 102 if device: 103 # We want all the user's WebAuthn devices and merge their challenges 104 for user_device in WebAuthnDevice.objects.filter(user=device.user).order_by("name"): 105 user_device: WebAuthnDevice 106 allowed_credentials.append(user_device.descriptor) 107 108 authentication_options = generate_authentication_options( 109 rp_id=get_rp_id(stage_view.request), 110 allow_credentials=allowed_credentials, 111 user_verification=UserVerificationRequirement(stage.webauthn_user_verification), 112 ) 113 114 stage_view.executor.plan.context[PLAN_CONTEXT_WEBAUTHN_CHALLENGE] = ( 115 authentication_options.challenge 116 ) 117 118 options_dict = options_to_json_dict(authentication_options) 119 if stage.webauthn_hints: 120 options_dict["hints"] = list(stage.webauthn_hints) 121 return options_dict
Send the client a challenge that we'll check later
def
select_challenge( request: django.http.request.HttpRequest, device: authentik.stages.authenticator.models.Device):
124def select_challenge(request: HttpRequest, device: Device): 125 """Callback when the user selected a challenge in the frontend.""" 126 if isinstance(device, SMSDevice): 127 select_challenge_sms(request, device) 128 elif isinstance(device, EmailDevice): 129 select_challenge_email(request, device)
Callback when the user selected a challenge in the frontend.
def
select_challenge_sms( request: django.http.request.HttpRequest, device: authentik.stages.authenticator_sms.models.SMSDevice):
132def select_challenge_sms(request: HttpRequest, device: SMSDevice): 133 """Send SMS""" 134 device.generate_token() 135 device.stage.send(request, device.token, device)
Send SMS
def
select_challenge_email( request: django.http.request.HttpRequest, device: authentik.stages.authenticator_email.models.EmailDevice):
138def select_challenge_email(request: HttpRequest, device: EmailDevice): 139 """Send Email""" 140 valid_secs: int = timedelta_from_string(device.stage.token_expiry).total_seconds() 141 device.generate_token(valid_secs=valid_secs) 142 device.stage.send(device)
Send Email
def
validate_challenge_code( code: str, stage_view: authentik.flows.stage.StageView, user: authentik.core.models.User) -> authentik.stages.authenticator.models.Device:
145def validate_challenge_code(code: str, stage_view: StageView, user: User) -> Device: 146 """Validate code-based challenges. We test against every device, on purpose, as 147 the user mustn't choose between totp and static devices.""" 148 149 with transaction.atomic(): 150 for device in devices_for_user(user, for_verify=True): 151 if isinstance(device, ThrottlingMixin): 152 throttling_factor = stage_view.executor.current_stage.get_throttling_factor( 153 DeviceClasses.from_model_label(device.model_label()) 154 ) 155 if throttling_factor is not None: 156 device.set_throttle_factor(throttling_factor) 157 if device.verify_token(code): 158 break 159 else: 160 device = None 161 162 if not device: 163 login_failed.send( 164 sender=__name__, 165 credentials={"username": user.username}, 166 request=stage_view.request, 167 stage=stage_view.executor.current_stage, 168 context={ 169 PLAN_CONTEXT_METHOD_ARGS: { 170 "device_class": DeviceClasses.TOTP.value, 171 } 172 }, 173 ) 174 raise ValidationError( 175 _("Invalid Token. Please ensure the time on your device is accurate and try again.") 176 ) 177 return device
Validate code-based challenges. We test against every device, on purpose, as the user mustn't choose between totp and static devices.
def
validate_challenge_webauthn( data: dict, stage_view: authentik.flows.stage.StageView, user: authentik.core.models.User, stage: authentik.stages.authenticator_validate.models.AuthenticatorValidateStage | None = None) -> authentik.stages.authenticator.models.Device:
180def validate_challenge_webauthn( 181 data: dict, 182 stage_view: StageView, 183 user: User, 184 stage: AuthenticatorValidateStage | None = None, 185) -> Device: 186 """Validate WebAuthn Challenge""" 187 request = stage_view.request 188 challenge = stage_view.executor.plan.context.get(PLAN_CONTEXT_WEBAUTHN_CHALLENGE) 189 stage = stage or stage_view.executor.current_stage 190 191 if "MinuteMaid" in request.META.get("HTTP_USER_AGENT", ""): 192 # Workaround for Android sign-in, when signing into Google Workspace on android while 193 # adding the account to the system (not in Chrome), for some reason `type` is not set 194 # so in that case we fall back to `public-key` 195 # since that's the only option we support anyways 196 data.setdefault("type", PublicKeyCredentialType.PUBLIC_KEY) 197 try: 198 credential = parse_authentication_credential_json(data) 199 except InvalidJSONStructure as exc: 200 LOGGER.warning("Invalid WebAuthn challenge response", exc=exc) 201 raise ValidationError("Invalid device", "invalid") from None 202 203 device = WebAuthnDevice.objects.filter(credential_id=credential.id).first() 204 if not device: 205 raise ValidationError("Invalid device", "invalid") 206 # We can only check the device's user if the user we're given isn't anonymous 207 # as this validation is also used for password-less login where webauthn is the very first 208 # step done by a user. Only if this validation happens at a later stage we can check 209 # that the device belongs to the user 210 if not user.is_anonymous and device.user != user: 211 raise ValidationError("Invalid device", "invalid") 212 # When a device_type was set when creating the device (2024.4+), and we have a limitation, 213 # make sure the device type is allowed. 214 if ( 215 device.device_type 216 and stage.webauthn_allowed_device_types.exists() 217 and not stage.webauthn_allowed_device_types.filter(pk=device.device_type.pk).exists() 218 ): 219 raise ValidationError( 220 _( 221 "Invalid device type. Contact your {brand} administrator for help.".format( 222 brand=stage_view.request.brand.branding_title 223 ) 224 ), 225 "invalid", 226 ) 227 try: 228 authentication_verification = verify_authentication_response( 229 credential=credential, 230 expected_challenge=challenge, 231 expected_rp_id=get_rp_id(request), 232 expected_origin=get_origin(request), 233 credential_public_key=base64url_to_bytes(device.public_key), 234 credential_current_sign_count=device.sign_count, 235 require_user_verification=stage.webauthn_user_verification == UserVerification.REQUIRED, 236 ) 237 except InvalidAuthenticationResponse as exc: 238 LOGGER.warning("Assertion failed", exc=exc) 239 login_failed.send( 240 sender=__name__, 241 credentials={"username": user.username}, 242 request=stage_view.request, 243 stage=stage_view.executor.current_stage, 244 context={ 245 PLAN_CONTEXT_METHOD_ARGS: { 246 "device": device, 247 "device_class": DeviceClasses.WEBAUTHN.value, 248 "device_type": device.device_type, 249 }, 250 }, 251 ) 252 raise ValidationError("Assertion failed") from exc 253 254 with audit_ignore(): 255 device.set_sign_count(authentication_verification.new_sign_count) 256 return device
Validate WebAuthn Challenge
def
validate_challenge_duo( device_pk: int, stage_view: authentik.flows.stage.StageView, user: authentik.core.models.User) -> authentik.stages.authenticator.models.Device:
259def validate_challenge_duo(device_pk: int, stage_view: StageView, user: User) -> Device: 260 """Duo authentication""" 261 device = get_object_or_404(DuoDevice, pk=device_pk) 262 if device.user != user: 263 LOGGER.warning("device mismatch") 264 raise Http404 265 stage: AuthenticatorDuoStage = device.stage 266 267 # Get additional context for push 268 pushinfo = { 269 __("Domain"): stage_view.request.get_host(), 270 } 271 if PLAN_CONTEXT_APPLICATION in stage_view.executor.plan.context: 272 pushinfo[__("Application")] = stage_view.executor.plan.context.get( 273 PLAN_CONTEXT_APPLICATION, Application() 274 ).name 275 276 try: 277 response = stage.auth_client().auth( 278 "auto", 279 user_id=device.duo_user_id, 280 ipaddr=ClientIPMiddleware.get_client_ip(stage_view.request), 281 type=__( 282 "{brand_name} Login request".format_map( 283 { 284 "brand_name": stage_view.request.brand.branding_title, 285 } 286 ) 287 ), 288 display_username=user.username, 289 device="auto", 290 pushinfo=urlencode(pushinfo), 291 ) 292 # {'result': 'allow', 'status': 'allow', 'status_msg': 'Success. Logging you in...'} 293 if response["result"] == "deny": 294 LOGGER.debug("duo push response", result=response["result"], msg=response["status_msg"]) 295 login_failed.send( 296 sender=__name__, 297 credentials={"username": user.username}, 298 request=stage_view.request, 299 stage=stage_view.executor.current_stage, 300 context={ 301 PLAN_CONTEXT_METHOD_ARGS: { 302 "device_class": DeviceClasses.DUO.value, 303 "duo_response": response, 304 } 305 }, 306 ) 307 raise ValidationError("Duo denied access", code="denied") 308 return device 309 except RuntimeError as exc: 310 Event.new( 311 EventAction.CONFIGURATION_ERROR, 312 message=f"Failed to DUO authenticate user: {str(exc)}", 313 user=user, 314 ).from_http(stage_view.request, user) 315 raise ValidationError("Duo denied access", code="denied") from exc
Duo authentication