authentik.stages.authenticator_validate.challenge
Validation stage challenge checking
1"""Validation stage challenge checking""" 2 3from typing import TYPE_CHECKING 4from urllib.parse import urlencode 5 6from django.http import HttpRequest 7from django.http.response import Http404 8from django.shortcuts import get_object_or_404 9from django.utils.translation import gettext as __ 10from django.utils.translation import gettext_lazy as _ 11from rest_framework.fields import CharField, ChoiceField, DateTimeField 12from rest_framework.serializers import ValidationError 13from structlog.stdlib import get_logger 14from webauthn.authentication.generate_authentication_options import generate_authentication_options 15from webauthn.authentication.verify_authentication_response import verify_authentication_response 16from webauthn.helpers import parse_authentication_credential_json 17from webauthn.helpers.base64url_to_bytes import base64url_to_bytes 18from webauthn.helpers.exceptions import InvalidAuthenticationResponse, InvalidJSONStructure 19from webauthn.helpers.options_to_json_dict import options_to_json_dict 20from webauthn.helpers.structs import PublicKeyCredentialType, UserVerificationRequirement 21 22from authentik.core.api.utils import JSONDictField, PassiveSerializer 23from authentik.core.models import Application, User 24from authentik.core.signals import login_failed 25from authentik.events.middleware import audit_ignore 26from authentik.events.models import Event, EventAction 27from authentik.flows.planner import PLAN_CONTEXT_APPLICATION 28from authentik.flows.stage import StageView 29from authentik.lib.utils.email import mask_email 30from authentik.lib.utils.time import timedelta_from_string 31from authentik.root.middleware import ClientIPMiddleware 32from authentik.stages.authenticator import match_token 33from authentik.stages.authenticator.models import Device 34from authentik.stages.authenticator_duo.models import AuthenticatorDuoStage, DuoDevice 35from authentik.stages.authenticator_email.models import EmailDevice 36from authentik.stages.authenticator_sms.models import SMSDevice 37from authentik.stages.authenticator_validate.models import AuthenticatorValidateStage, DeviceClasses 38from authentik.stages.authenticator_webauthn.models import UserVerification, WebAuthnDevice 39from authentik.stages.authenticator_webauthn.stage import PLAN_CONTEXT_WEBAUTHN_CHALLENGE 40from authentik.stages.authenticator_webauthn.utils import get_origin, get_rp_id 41from authentik.stages.password.stage import PLAN_CONTEXT_METHOD_ARGS 42 43LOGGER = get_logger() 44if TYPE_CHECKING: 45 from authentik.stages.authenticator_validate.stage import AuthenticatorValidateStageView 46 47 48class DeviceChallenge(PassiveSerializer): 49 """Single device challenge""" 50 51 device_class = ChoiceField(choices=DeviceClasses.choices) 52 device_uid = CharField() 53 challenge = JSONDictField() 54 last_used = DateTimeField(allow_null=True) 55 56 57def get_challenge_for_device( 58 stage_view: AuthenticatorValidateStageView, stage: AuthenticatorValidateStage, device: Device 59) -> dict: 60 """Generate challenge for a single device""" 61 if isinstance(device, WebAuthnDevice): 62 return get_webauthn_challenge(stage_view, stage, device) 63 if isinstance(device, EmailDevice): 64 return {"email": mask_email(device.email)} 65 # Code-based challenges have no hints 66 return {} 67 68 69def get_webauthn_challenge_without_user( 70 stage_view: AuthenticatorValidateStageView, stage: AuthenticatorValidateStage 71) -> dict: 72 """Same as `get_webauthn_challenge`, but allows any client device. We can then later check 73 who the device belongs to.""" 74 stage_view.executor.plan.context.pop(PLAN_CONTEXT_WEBAUTHN_CHALLENGE, None) 75 authentication_options = generate_authentication_options( 76 rp_id=get_rp_id(stage_view.request), 77 allow_credentials=[], 78 user_verification=UserVerificationRequirement(stage.webauthn_user_verification), 79 ) 80 stage_view.executor.plan.context[PLAN_CONTEXT_WEBAUTHN_CHALLENGE] = ( 81 authentication_options.challenge 82 ) 83 84 options_dict = options_to_json_dict(authentication_options) 85 if stage.webauthn_hints: 86 options_dict["hints"] = list(stage.webauthn_hints) 87 return options_dict 88 89 90def get_webauthn_challenge( 91 stage_view: AuthenticatorValidateStageView, 92 stage: AuthenticatorValidateStage, 93 device: WebAuthnDevice | None = None, 94) -> dict: 95 """Send the client a challenge that we'll check later""" 96 stage_view.executor.plan.context.pop(PLAN_CONTEXT_WEBAUTHN_CHALLENGE, None) 97 98 allowed_credentials = [] 99 100 if device: 101 # We want all the user's WebAuthn devices and merge their challenges 102 for user_device in WebAuthnDevice.objects.filter(user=device.user).order_by("name"): 103 user_device: WebAuthnDevice 104 allowed_credentials.append(user_device.descriptor) 105 106 authentication_options = generate_authentication_options( 107 rp_id=get_rp_id(stage_view.request), 108 allow_credentials=allowed_credentials, 109 user_verification=UserVerificationRequirement(stage.webauthn_user_verification), 110 ) 111 112 stage_view.executor.plan.context[PLAN_CONTEXT_WEBAUTHN_CHALLENGE] = ( 113 authentication_options.challenge 114 ) 115 116 options_dict = options_to_json_dict(authentication_options) 117 if stage.webauthn_hints: 118 options_dict["hints"] = list(stage.webauthn_hints) 119 return options_dict 120 121 122def select_challenge(request: HttpRequest, device: Device): 123 """Callback when the user selected a challenge in the frontend.""" 124 if isinstance(device, SMSDevice): 125 select_challenge_sms(request, device) 126 elif isinstance(device, EmailDevice): 127 select_challenge_email(request, device) 128 129 130def select_challenge_sms(request: HttpRequest, device: SMSDevice): 131 """Send SMS""" 132 device.generate_token() 133 device.stage.send(request, device.token, device) 134 135 136def select_challenge_email(request: HttpRequest, device: EmailDevice): 137 """Send Email""" 138 valid_secs: int = timedelta_from_string(device.stage.token_expiry).total_seconds() 139 device.generate_token(valid_secs=valid_secs) 140 device.stage.send(device) 141 142 143def validate_challenge_code(code: str, stage_view: StageView, user: User) -> Device: 144 """Validate code-based challenges. We test against every device, on purpose, as 145 the user mustn't choose between totp and static devices.""" 146 device = match_token(user, code) 147 if not device: 148 login_failed.send( 149 sender=__name__, 150 credentials={"username": user.username}, 151 request=stage_view.request, 152 stage=stage_view.executor.current_stage, 153 context={ 154 PLAN_CONTEXT_METHOD_ARGS: { 155 "device_class": DeviceClasses.TOTP.value, 156 } 157 }, 158 ) 159 raise ValidationError( 160 _("Invalid Token. Please ensure the time on your device is accurate and try again.") 161 ) 162 return device 163 164 165def validate_challenge_webauthn( 166 data: dict, 167 stage_view: StageView, 168 user: User, 169 stage: AuthenticatorValidateStage | None = None, 170) -> Device: 171 """Validate WebAuthn Challenge""" 172 request = stage_view.request 173 challenge = stage_view.executor.plan.context.get(PLAN_CONTEXT_WEBAUTHN_CHALLENGE) 174 stage = stage or stage_view.executor.current_stage 175 176 if "MinuteMaid" in request.META.get("HTTP_USER_AGENT", ""): 177 # Workaround for Android sign-in, when signing into Google Workspace on android while 178 # adding the account to the system (not in Chrome), for some reason `type` is not set 179 # so in that case we fall back to `public-key` 180 # since that's the only option we support anyways 181 data.setdefault("type", PublicKeyCredentialType.PUBLIC_KEY) 182 try: 183 credential = parse_authentication_credential_json(data) 184 except InvalidJSONStructure as exc: 185 LOGGER.warning("Invalid WebAuthn challenge response", exc=exc) 186 raise ValidationError("Invalid device", "invalid") from None 187 188 device = WebAuthnDevice.objects.filter(credential_id=credential.id).first() 189 if not device: 190 raise ValidationError("Invalid device", "invalid") 191 # We can only check the device's user if the user we're given isn't anonymous 192 # as this validation is also used for password-less login where webauthn is the very first 193 # step done by a user. Only if this validation happens at a later stage we can check 194 # that the device belongs to the user 195 if not user.is_anonymous and device.user != user: 196 raise ValidationError("Invalid device", "invalid") 197 # When a device_type was set when creating the device (2024.4+), and we have a limitation, 198 # make sure the device type is allowed. 199 if ( 200 device.device_type 201 and stage.webauthn_allowed_device_types.exists() 202 and not stage.webauthn_allowed_device_types.filter(pk=device.device_type.pk).exists() 203 ): 204 raise ValidationError( 205 _( 206 "Invalid device type. Contact your {brand} administrator for help.".format( 207 brand=stage_view.request.brand.branding_title 208 ) 209 ), 210 "invalid", 211 ) 212 try: 213 authentication_verification = verify_authentication_response( 214 credential=credential, 215 expected_challenge=challenge, 216 expected_rp_id=get_rp_id(request), 217 expected_origin=get_origin(request), 218 credential_public_key=base64url_to_bytes(device.public_key), 219 credential_current_sign_count=device.sign_count, 220 require_user_verification=stage.webauthn_user_verification == UserVerification.REQUIRED, 221 ) 222 except InvalidAuthenticationResponse as exc: 223 LOGGER.warning("Assertion failed", exc=exc) 224 login_failed.send( 225 sender=__name__, 226 credentials={"username": user.username}, 227 request=stage_view.request, 228 stage=stage_view.executor.current_stage, 229 context={ 230 PLAN_CONTEXT_METHOD_ARGS: { 231 "device": device, 232 "device_class": DeviceClasses.WEBAUTHN.value, 233 "device_type": device.device_type, 234 }, 235 }, 236 ) 237 raise ValidationError("Assertion failed") from exc 238 239 with audit_ignore(): 240 device.set_sign_count(authentication_verification.new_sign_count) 241 return device 242 243 244def validate_challenge_duo(device_pk: int, stage_view: StageView, user: User) -> Device: 245 """Duo authentication""" 246 device = get_object_or_404(DuoDevice, pk=device_pk) 247 if device.user != user: 248 LOGGER.warning("device mismatch") 249 raise Http404 250 stage: AuthenticatorDuoStage = device.stage 251 252 # Get additional context for push 253 pushinfo = { 254 __("Domain"): stage_view.request.get_host(), 255 } 256 if PLAN_CONTEXT_APPLICATION in stage_view.executor.plan.context: 257 pushinfo[__("Application")] = stage_view.executor.plan.context.get( 258 PLAN_CONTEXT_APPLICATION, Application() 259 ).name 260 261 try: 262 response = stage.auth_client().auth( 263 "auto", 264 user_id=device.duo_user_id, 265 ipaddr=ClientIPMiddleware.get_client_ip(stage_view.request), 266 type=__( 267 "{brand_name} Login request".format_map( 268 { 269 "brand_name": stage_view.request.brand.branding_title, 270 } 271 ) 272 ), 273 display_username=user.username, 274 device="auto", 275 pushinfo=urlencode(pushinfo), 276 ) 277 # {'result': 'allow', 'status': 'allow', 'status_msg': 'Success. Logging you in...'} 278 if response["result"] == "deny": 279 LOGGER.debug("duo push response", result=response["result"], msg=response["status_msg"]) 280 login_failed.send( 281 sender=__name__, 282 credentials={"username": user.username}, 283 request=stage_view.request, 284 stage=stage_view.executor.current_stage, 285 context={ 286 PLAN_CONTEXT_METHOD_ARGS: { 287 "device_class": DeviceClasses.DUO.value, 288 "duo_response": response, 289 } 290 }, 291 ) 292 raise ValidationError("Duo denied access", code="denied") 293 return device 294 except RuntimeError as exc: 295 Event.new( 296 EventAction.CONFIGURATION_ERROR, 297 message=f"Failed to DUO authenticate user: {str(exc)}", 298 user=user, 299 ).from_http(stage_view.request, user) 300 raise ValidationError("Duo denied access", code="denied") from exc
LOGGER =
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
49class DeviceChallenge(PassiveSerializer): 50 """Single device challenge""" 51 52 device_class = ChoiceField(choices=DeviceClasses.choices) 53 device_uid = CharField() 54 challenge = JSONDictField() 55 last_used = DateTimeField(allow_null=True)
Single device challenge
Inherited Members
def
get_challenge_for_device(unknown):
58def get_challenge_for_device( 59 stage_view: AuthenticatorValidateStageView, stage: AuthenticatorValidateStage, device: Device 60) -> dict: 61 """Generate challenge for a single device""" 62 if isinstance(device, WebAuthnDevice): 63 return get_webauthn_challenge(stage_view, stage, device) 64 if isinstance(device, EmailDevice): 65 return {"email": mask_email(device.email)} 66 # Code-based challenges have no hints 67 return {}
Generate challenge for a single device
def
get_webauthn_challenge_without_user(unknown):
70def get_webauthn_challenge_without_user( 71 stage_view: AuthenticatorValidateStageView, stage: AuthenticatorValidateStage 72) -> dict: 73 """Same as `get_webauthn_challenge`, but allows any client device. We can then later check 74 who the device belongs to.""" 75 stage_view.executor.plan.context.pop(PLAN_CONTEXT_WEBAUTHN_CHALLENGE, None) 76 authentication_options = generate_authentication_options( 77 rp_id=get_rp_id(stage_view.request), 78 allow_credentials=[], 79 user_verification=UserVerificationRequirement(stage.webauthn_user_verification), 80 ) 81 stage_view.executor.plan.context[PLAN_CONTEXT_WEBAUTHN_CHALLENGE] = ( 82 authentication_options.challenge 83 ) 84 85 options_dict = options_to_json_dict(authentication_options) 86 if stage.webauthn_hints: 87 options_dict["hints"] = list(stage.webauthn_hints) 88 return options_dict
Same as get_webauthn_challenge, but allows any client device. We can then later check
who the device belongs to.
def
get_webauthn_challenge(unknown):
91def get_webauthn_challenge( 92 stage_view: AuthenticatorValidateStageView, 93 stage: AuthenticatorValidateStage, 94 device: WebAuthnDevice | None = None, 95) -> dict: 96 """Send the client a challenge that we'll check later""" 97 stage_view.executor.plan.context.pop(PLAN_CONTEXT_WEBAUTHN_CHALLENGE, None) 98 99 allowed_credentials = [] 100 101 if device: 102 # We want all the user's WebAuthn devices and merge their challenges 103 for user_device in WebAuthnDevice.objects.filter(user=device.user).order_by("name"): 104 user_device: WebAuthnDevice 105 allowed_credentials.append(user_device.descriptor) 106 107 authentication_options = generate_authentication_options( 108 rp_id=get_rp_id(stage_view.request), 109 allow_credentials=allowed_credentials, 110 user_verification=UserVerificationRequirement(stage.webauthn_user_verification), 111 ) 112 113 stage_view.executor.plan.context[PLAN_CONTEXT_WEBAUTHN_CHALLENGE] = ( 114 authentication_options.challenge 115 ) 116 117 options_dict = options_to_json_dict(authentication_options) 118 if stage.webauthn_hints: 119 options_dict["hints"] = list(stage.webauthn_hints) 120 return options_dict
Send the client a challenge that we'll check later
def
select_challenge( request: django.http.request.HttpRequest, device: authentik.stages.authenticator.models.Device):
123def select_challenge(request: HttpRequest, device: Device): 124 """Callback when the user selected a challenge in the frontend.""" 125 if isinstance(device, SMSDevice): 126 select_challenge_sms(request, device) 127 elif isinstance(device, EmailDevice): 128 select_challenge_email(request, device)
Callback when the user selected a challenge in the frontend.
def
select_challenge_sms( request: django.http.request.HttpRequest, device: authentik.stages.authenticator_sms.models.SMSDevice):
131def select_challenge_sms(request: HttpRequest, device: SMSDevice): 132 """Send SMS""" 133 device.generate_token() 134 device.stage.send(request, device.token, device)
Send SMS
def
select_challenge_email( request: django.http.request.HttpRequest, device: authentik.stages.authenticator_email.models.EmailDevice):
137def select_challenge_email(request: HttpRequest, device: EmailDevice): 138 """Send Email""" 139 valid_secs: int = timedelta_from_string(device.stage.token_expiry).total_seconds() 140 device.generate_token(valid_secs=valid_secs) 141 device.stage.send(device)
Send Email
def
validate_challenge_code( code: str, stage_view: authentik.flows.stage.StageView, user: authentik.core.models.User) -> authentik.stages.authenticator.models.Device:
144def validate_challenge_code(code: str, stage_view: StageView, user: User) -> Device: 145 """Validate code-based challenges. We test against every device, on purpose, as 146 the user mustn't choose between totp and static devices.""" 147 device = match_token(user, code) 148 if not device: 149 login_failed.send( 150 sender=__name__, 151 credentials={"username": user.username}, 152 request=stage_view.request, 153 stage=stage_view.executor.current_stage, 154 context={ 155 PLAN_CONTEXT_METHOD_ARGS: { 156 "device_class": DeviceClasses.TOTP.value, 157 } 158 }, 159 ) 160 raise ValidationError( 161 _("Invalid Token. Please ensure the time on your device is accurate and try again.") 162 ) 163 return device
Validate code-based challenges. We test against every device, on purpose, as the user mustn't choose between totp and static devices.
def
validate_challenge_webauthn( data: dict, stage_view: authentik.flows.stage.StageView, user: authentik.core.models.User, stage: authentik.stages.authenticator_validate.models.AuthenticatorValidateStage | None = None) -> authentik.stages.authenticator.models.Device:
166def validate_challenge_webauthn( 167 data: dict, 168 stage_view: StageView, 169 user: User, 170 stage: AuthenticatorValidateStage | None = None, 171) -> Device: 172 """Validate WebAuthn Challenge""" 173 request = stage_view.request 174 challenge = stage_view.executor.plan.context.get(PLAN_CONTEXT_WEBAUTHN_CHALLENGE) 175 stage = stage or stage_view.executor.current_stage 176 177 if "MinuteMaid" in request.META.get("HTTP_USER_AGENT", ""): 178 # Workaround for Android sign-in, when signing into Google Workspace on android while 179 # adding the account to the system (not in Chrome), for some reason `type` is not set 180 # so in that case we fall back to `public-key` 181 # since that's the only option we support anyways 182 data.setdefault("type", PublicKeyCredentialType.PUBLIC_KEY) 183 try: 184 credential = parse_authentication_credential_json(data) 185 except InvalidJSONStructure as exc: 186 LOGGER.warning("Invalid WebAuthn challenge response", exc=exc) 187 raise ValidationError("Invalid device", "invalid") from None 188 189 device = WebAuthnDevice.objects.filter(credential_id=credential.id).first() 190 if not device: 191 raise ValidationError("Invalid device", "invalid") 192 # We can only check the device's user if the user we're given isn't anonymous 193 # as this validation is also used for password-less login where webauthn is the very first 194 # step done by a user. Only if this validation happens at a later stage we can check 195 # that the device belongs to the user 196 if not user.is_anonymous and device.user != user: 197 raise ValidationError("Invalid device", "invalid") 198 # When a device_type was set when creating the device (2024.4+), and we have a limitation, 199 # make sure the device type is allowed. 200 if ( 201 device.device_type 202 and stage.webauthn_allowed_device_types.exists() 203 and not stage.webauthn_allowed_device_types.filter(pk=device.device_type.pk).exists() 204 ): 205 raise ValidationError( 206 _( 207 "Invalid device type. Contact your {brand} administrator for help.".format( 208 brand=stage_view.request.brand.branding_title 209 ) 210 ), 211 "invalid", 212 ) 213 try: 214 authentication_verification = verify_authentication_response( 215 credential=credential, 216 expected_challenge=challenge, 217 expected_rp_id=get_rp_id(request), 218 expected_origin=get_origin(request), 219 credential_public_key=base64url_to_bytes(device.public_key), 220 credential_current_sign_count=device.sign_count, 221 require_user_verification=stage.webauthn_user_verification == UserVerification.REQUIRED, 222 ) 223 except InvalidAuthenticationResponse as exc: 224 LOGGER.warning("Assertion failed", exc=exc) 225 login_failed.send( 226 sender=__name__, 227 credentials={"username": user.username}, 228 request=stage_view.request, 229 stage=stage_view.executor.current_stage, 230 context={ 231 PLAN_CONTEXT_METHOD_ARGS: { 232 "device": device, 233 "device_class": DeviceClasses.WEBAUTHN.value, 234 "device_type": device.device_type, 235 }, 236 }, 237 ) 238 raise ValidationError("Assertion failed") from exc 239 240 with audit_ignore(): 241 device.set_sign_count(authentication_verification.new_sign_count) 242 return device
Validate WebAuthn Challenge
def
validate_challenge_duo( device_pk: int, stage_view: authentik.flows.stage.StageView, user: authentik.core.models.User) -> authentik.stages.authenticator.models.Device:
245def validate_challenge_duo(device_pk: int, stage_view: StageView, user: User) -> Device: 246 """Duo authentication""" 247 device = get_object_or_404(DuoDevice, pk=device_pk) 248 if device.user != user: 249 LOGGER.warning("device mismatch") 250 raise Http404 251 stage: AuthenticatorDuoStage = device.stage 252 253 # Get additional context for push 254 pushinfo = { 255 __("Domain"): stage_view.request.get_host(), 256 } 257 if PLAN_CONTEXT_APPLICATION in stage_view.executor.plan.context: 258 pushinfo[__("Application")] = stage_view.executor.plan.context.get( 259 PLAN_CONTEXT_APPLICATION, Application() 260 ).name 261 262 try: 263 response = stage.auth_client().auth( 264 "auto", 265 user_id=device.duo_user_id, 266 ipaddr=ClientIPMiddleware.get_client_ip(stage_view.request), 267 type=__( 268 "{brand_name} Login request".format_map( 269 { 270 "brand_name": stage_view.request.brand.branding_title, 271 } 272 ) 273 ), 274 display_username=user.username, 275 device="auto", 276 pushinfo=urlencode(pushinfo), 277 ) 278 # {'result': 'allow', 'status': 'allow', 'status_msg': 'Success. Logging you in...'} 279 if response["result"] == "deny": 280 LOGGER.debug("duo push response", result=response["result"], msg=response["status_msg"]) 281 login_failed.send( 282 sender=__name__, 283 credentials={"username": user.username}, 284 request=stage_view.request, 285 stage=stage_view.executor.current_stage, 286 context={ 287 PLAN_CONTEXT_METHOD_ARGS: { 288 "device_class": DeviceClasses.DUO.value, 289 "duo_response": response, 290 } 291 }, 292 ) 293 raise ValidationError("Duo denied access", code="denied") 294 return device 295 except RuntimeError as exc: 296 Event.new( 297 EventAction.CONFIGURATION_ERROR, 298 message=f"Failed to DUO authenticate user: {str(exc)}", 299 user=user, 300 ).from_http(stage_view.request, user) 301 raise ValidationError("Duo denied access", code="denied") from exc
Duo authentication