authentik.stages.authenticator_webauthn.stage
WebAuthn stage
1"""WebAuthn stage""" 2 3from dataclasses import dataclass 4from uuid import UUID 5 6from cryptography.hazmat.primitives.serialization import Encoding 7from cryptography.x509 import load_der_x509_certificate 8from django.db.models import Q 9from django.http import HttpRequest, HttpResponse 10from django.http.request import QueryDict 11from django.utils.translation import gettext as __ 12from django.utils.translation import gettext_lazy as _ 13from rest_framework.fields import CharField 14from rest_framework.serializers import ValidationError 15from webauthn.helpers.bytes_to_base64url import bytes_to_base64url 16from webauthn.helpers.exceptions import WebAuthnException 17from webauthn.helpers.options_to_json_dict import options_to_json_dict 18from webauthn.helpers.parse_attestation_object import parse_attestation_object 19from webauthn.helpers.structs import ( 20 AttestationConveyancePreference, 21 AuthenticatorAttachment, 22 AuthenticatorSelectionCriteria, 23 PublicKeyCredentialCreationOptions, 24 PublicKeyCredentialHint, 25 ResidentKeyRequirement, 26 UserVerificationRequirement, 27) 28from webauthn.registration.generate_registration_options import generate_registration_options 29from webauthn.registration.verify_registration_response import ( 30 VerifiedRegistration, 31 verify_registration_response, 32) 33 34from authentik.core.api.utils import JSONDictField 35from authentik.core.models import User 36from authentik.crypto.models import fingerprint_sha256 37from authentik.flows.challenge import ( 38 Challenge, 39 ChallengeResponse, 40 WithUserInfoChallenge, 41) 42from authentik.flows.stage import ChallengeStageView 43from authentik.stages.authenticator_webauthn.models import ( 44 UNKNOWN_DEVICE_TYPE_AAGUID, 45 AuthenticatorWebAuthnStage, 46 WebAuthnDevice, 47 WebAuthnDeviceType, 48) 49from authentik.stages.authenticator_webauthn.utils import get_origin, get_rp_id 50 51PLAN_CONTEXT_WEBAUTHN_CHALLENGE = "goauthentik.io/stages/authenticator_webauthn/challenge" 52PLAN_CONTEXT_WEBAUTHN_ATTEMPT = "goauthentik.io/stages/authenticator_webauthn/attempt" 53 54 55@dataclass 56class VerifiedRegistrationData: 57 registration: VerifiedRegistration 58 exists_query: Q 59 attest_cert: str | None = None 60 attest_cert_fingerprint: str | None = None 61 62 63class AuthenticatorWebAuthnChallenge(WithUserInfoChallenge): 64 """WebAuthn Challenge""" 65 66 registration = JSONDictField() 67 component = CharField(default="ak-stage-authenticator-webauthn") 68 69 70class AuthenticatorWebAuthnChallengeResponse(ChallengeResponse): 71 """WebAuthn Challenge response""" 72 73 response = JSONDictField() 74 component = CharField(default="ak-stage-authenticator-webauthn") 75 76 request: HttpRequest 77 user: User 78 79 def validate_response(self, response: dict) -> VerifiedRegistrationData: 80 """Validate webauthn challenge response""" 81 challenge = self.stage.executor.plan.context[PLAN_CONTEXT_WEBAUTHN_CHALLENGE] 82 83 try: 84 registration: VerifiedRegistration = verify_registration_response( 85 credential=response, 86 expected_challenge=challenge, 87 expected_rp_id=get_rp_id(self.request), 88 expected_origin=get_origin(self.request), 89 ) 90 except WebAuthnException as exc: 91 self.stage.logger.warning("registration failed", exc=exc) 92 raise ValidationError(f"Registration failed. Error: {exc}") from None 93 94 registration_data = VerifiedRegistrationData( 95 registration, 96 exists_query=Q(credential_id=bytes_to_base64url(registration.credential_id)), 97 ) 98 stage: AuthenticatorWebAuthnStage = self.stage.executor.current_stage 99 100 att_obj = parse_attestation_object(registration.attestation_object) 101 if ( 102 att_obj 103 and att_obj.att_stmt 104 and att_obj.att_stmt.x5c is not None 105 and len(att_obj.att_stmt.x5c) > 0 106 ): 107 cert = load_der_x509_certificate(att_obj.att_stmt.x5c[0]) 108 registration_data.attest_cert = cert.public_bytes( 109 encoding=Encoding.PEM, 110 ).decode("utf-8") 111 registration_data.attest_cert_fingerprint = fingerprint_sha256(cert) 112 if stage.prevent_duplicate_devices: 113 registration_data.exists_query |= Q( 114 attestation_certificate_fingerprint=registration_data.attest_cert_fingerprint 115 ) 116 117 credential_id_exists = WebAuthnDevice.objects.filter(registration_data.exists_query).first() 118 if credential_id_exists: 119 raise ValidationError("Credential ID already exists.") 120 121 aaguid = registration.aaguid 122 allowed_aaguids = stage.device_type_restrictions.values_list("aaguid", flat=True) 123 if allowed_aaguids.exists(): 124 invalid_error = ValidationError( 125 _( 126 "Invalid device type. Contact your {brand} administrator for help.".format( 127 brand=self.stage.request.brand.branding_title 128 ) 129 ) 130 ) 131 # If there are any restrictions set and we didn't get an aaguid, invalid 132 if not aaguid: 133 raise invalid_error 134 # If one of the restrictions is the "special" unknown device type UUID 135 # but we do have a device type for the given aaguid, invalid 136 if ( 137 UUID(UNKNOWN_DEVICE_TYPE_AAGUID) in allowed_aaguids 138 and not WebAuthnDeviceType.objects.filter(aaguid=aaguid).exists() 139 ): 140 return registration_data 141 # Otherwise just check if the given aaguid is in the allowed aaguids 142 if UUID(aaguid) not in allowed_aaguids: 143 raise invalid_error 144 return registration_data 145 146 147class AuthenticatorWebAuthnStageView(ChallengeStageView): 148 """WebAuthn stage""" 149 150 response_class = AuthenticatorWebAuthnChallengeResponse 151 152 def get_challenge(self, *args, **kwargs) -> Challenge: 153 stage: AuthenticatorWebAuthnStage = self.executor.current_stage 154 self.executor.plan.context.setdefault(PLAN_CONTEXT_WEBAUTHN_ATTEMPT, 0) 155 # clear flow variables prior to starting a new registration 156 self.executor.plan.context.pop(PLAN_CONTEXT_WEBAUTHN_CHALLENGE, None) 157 user = self.get_pending_user() 158 159 # library accepts none so we store null in the database, but if there is a value 160 # set, cast it to string to ensure it's not a django class 161 authenticator_attachment = stage.authenticator_attachment 162 if authenticator_attachment: 163 authenticator_attachment = AuthenticatorAttachment(str(authenticator_attachment)) 164 165 hints = [PublicKeyCredentialHint(h) for h in stage.hints] or None 166 167 # For compatibility with older user agents that don't support hints, 168 # auto-infer authenticatorAttachment from hints when not explicitly set. 169 # https://w3c.github.io/webauthn/#enum-hints 170 if hints and not authenticator_attachment: 171 hint_values = set(stage.hints) 172 cross_platform = {"security-key", "hybrid"} 173 platform = {"client-device"} 174 if hint_values <= cross_platform: 175 authenticator_attachment = AuthenticatorAttachment.CROSS_PLATFORM 176 elif hint_values <= platform: 177 authenticator_attachment = AuthenticatorAttachment.PLATFORM 178 179 registration_options: PublicKeyCredentialCreationOptions = generate_registration_options( 180 rp_id=get_rp_id(self.request), 181 rp_name=self.request.brand.branding_title, 182 user_id=user.uid.encode("utf-8"), 183 user_name=user.username, 184 user_display_name=user.name, 185 authenticator_selection=AuthenticatorSelectionCriteria( 186 resident_key=ResidentKeyRequirement(str(stage.resident_key_requirement)), 187 user_verification=UserVerificationRequirement(str(stage.user_verification)), 188 authenticator_attachment=authenticator_attachment, 189 ), 190 attestation=AttestationConveyancePreference.DIRECT, 191 hints=hints, 192 ) 193 194 self.executor.plan.context[PLAN_CONTEXT_WEBAUTHN_CHALLENGE] = registration_options.challenge 195 return AuthenticatorWebAuthnChallenge( 196 data={ 197 "registration": options_to_json_dict(registration_options), 198 } 199 ) 200 201 def get_response_instance(self, data: QueryDict) -> AuthenticatorWebAuthnChallengeResponse: 202 response: AuthenticatorWebAuthnChallengeResponse = super().get_response_instance(data) 203 response.request = self.request 204 response.user = self.get_pending_user() 205 return response 206 207 def challenge_invalid(self, response): 208 stage: AuthenticatorWebAuthnStage = self.executor.current_stage 209 self.executor.plan.context.setdefault(PLAN_CONTEXT_WEBAUTHN_ATTEMPT, 0) 210 self.executor.plan.context[PLAN_CONTEXT_WEBAUTHN_ATTEMPT] += 1 211 if ( 212 stage.max_attempts > 0 213 and self.executor.plan.context[PLAN_CONTEXT_WEBAUTHN_ATTEMPT] >= stage.max_attempts 214 ): 215 return self.executor.stage_invalid( 216 __( 217 "Exceeded maximum attempts. " 218 "Contact your {brand} administrator for help.".format( 219 brand=self.request.brand.branding_title 220 ) 221 ) 222 ) 223 return super().challenge_invalid(response) 224 225 def challenge_valid(self, response: ChallengeResponse) -> HttpResponse: 226 # Webauthn Challenge has already been validated 227 webauthn_credential: VerifiedRegistrationData = response.validated_data["response"] 228 existing_device = WebAuthnDevice.objects.filter(webauthn_credential.exists_query).first() 229 if not existing_device: 230 name = "WebAuthn Device" 231 device_type = WebAuthnDeviceType.objects.filter( 232 aaguid=webauthn_credential.registration.aaguid 233 ).first() 234 if device_type and device_type.description: 235 name = device_type.description 236 WebAuthnDevice.objects.create( 237 name=name, 238 user=self.get_pending_user(), 239 public_key=bytes_to_base64url( 240 webauthn_credential.registration.credential_public_key 241 ), 242 credential_id=bytes_to_base64url(webauthn_credential.registration.credential_id), 243 sign_count=webauthn_credential.registration.sign_count, 244 rp_id=get_rp_id(self.request), 245 device_type=device_type, 246 aaguid=webauthn_credential.registration.aaguid, 247 attestation_certificate_pem=webauthn_credential.attest_cert, 248 attestation_certificate_fingerprint=webauthn_credential.attest_cert_fingerprint, 249 ) 250 else: 251 return self.executor.stage_invalid("Device with Credential ID already exists.") 252 return self.executor.stage_ok()
PLAN_CONTEXT_WEBAUTHN_CHALLENGE =
'goauthentik.io/stages/authenticator_webauthn/challenge'
PLAN_CONTEXT_WEBAUTHN_ATTEMPT =
'goauthentik.io/stages/authenticator_webauthn/attempt'
@dataclass
class
VerifiedRegistrationData:
56@dataclass 57class VerifiedRegistrationData: 58 registration: VerifiedRegistration 59 exists_query: Q 60 attest_cert: str | None = None 61 attest_cert_fingerprint: str | None = None
64class AuthenticatorWebAuthnChallenge(WithUserInfoChallenge): 65 """WebAuthn Challenge""" 66 67 registration = JSONDictField() 68 component = CharField(default="ak-stage-authenticator-webauthn")
WebAuthn Challenge
71class AuthenticatorWebAuthnChallengeResponse(ChallengeResponse): 72 """WebAuthn Challenge response""" 73 74 response = JSONDictField() 75 component = CharField(default="ak-stage-authenticator-webauthn") 76 77 request: HttpRequest 78 user: User 79 80 def validate_response(self, response: dict) -> VerifiedRegistrationData: 81 """Validate webauthn challenge response""" 82 challenge = self.stage.executor.plan.context[PLAN_CONTEXT_WEBAUTHN_CHALLENGE] 83 84 try: 85 registration: VerifiedRegistration = verify_registration_response( 86 credential=response, 87 expected_challenge=challenge, 88 expected_rp_id=get_rp_id(self.request), 89 expected_origin=get_origin(self.request), 90 ) 91 except WebAuthnException as exc: 92 self.stage.logger.warning("registration failed", exc=exc) 93 raise ValidationError(f"Registration failed. Error: {exc}") from None 94 95 registration_data = VerifiedRegistrationData( 96 registration, 97 exists_query=Q(credential_id=bytes_to_base64url(registration.credential_id)), 98 ) 99 stage: AuthenticatorWebAuthnStage = self.stage.executor.current_stage 100 101 att_obj = parse_attestation_object(registration.attestation_object) 102 if ( 103 att_obj 104 and att_obj.att_stmt 105 and att_obj.att_stmt.x5c is not None 106 and len(att_obj.att_stmt.x5c) > 0 107 ): 108 cert = load_der_x509_certificate(att_obj.att_stmt.x5c[0]) 109 registration_data.attest_cert = cert.public_bytes( 110 encoding=Encoding.PEM, 111 ).decode("utf-8") 112 registration_data.attest_cert_fingerprint = fingerprint_sha256(cert) 113 if stage.prevent_duplicate_devices: 114 registration_data.exists_query |= Q( 115 attestation_certificate_fingerprint=registration_data.attest_cert_fingerprint 116 ) 117 118 credential_id_exists = WebAuthnDevice.objects.filter(registration_data.exists_query).first() 119 if credential_id_exists: 120 raise ValidationError("Credential ID already exists.") 121 122 aaguid = registration.aaguid 123 allowed_aaguids = stage.device_type_restrictions.values_list("aaguid", flat=True) 124 if allowed_aaguids.exists(): 125 invalid_error = ValidationError( 126 _( 127 "Invalid device type. Contact your {brand} administrator for help.".format( 128 brand=self.stage.request.brand.branding_title 129 ) 130 ) 131 ) 132 # If there are any restrictions set and we didn't get an aaguid, invalid 133 if not aaguid: 134 raise invalid_error 135 # If one of the restrictions is the "special" unknown device type UUID 136 # but we do have a device type for the given aaguid, invalid 137 if ( 138 UUID(UNKNOWN_DEVICE_TYPE_AAGUID) in allowed_aaguids 139 and not WebAuthnDeviceType.objects.filter(aaguid=aaguid).exists() 140 ): 141 return registration_data 142 # Otherwise just check if the given aaguid is in the allowed aaguids 143 if UUID(aaguid) not in allowed_aaguids: 144 raise invalid_error 145 return registration_data
WebAuthn Challenge response
80 def validate_response(self, response: dict) -> VerifiedRegistrationData: 81 """Validate webauthn challenge response""" 82 challenge = self.stage.executor.plan.context[PLAN_CONTEXT_WEBAUTHN_CHALLENGE] 83 84 try: 85 registration: VerifiedRegistration = verify_registration_response( 86 credential=response, 87 expected_challenge=challenge, 88 expected_rp_id=get_rp_id(self.request), 89 expected_origin=get_origin(self.request), 90 ) 91 except WebAuthnException as exc: 92 self.stage.logger.warning("registration failed", exc=exc) 93 raise ValidationError(f"Registration failed. Error: {exc}") from None 94 95 registration_data = VerifiedRegistrationData( 96 registration, 97 exists_query=Q(credential_id=bytes_to_base64url(registration.credential_id)), 98 ) 99 stage: AuthenticatorWebAuthnStage = self.stage.executor.current_stage 100 101 att_obj = parse_attestation_object(registration.attestation_object) 102 if ( 103 att_obj 104 and att_obj.att_stmt 105 and att_obj.att_stmt.x5c is not None 106 and len(att_obj.att_stmt.x5c) > 0 107 ): 108 cert = load_der_x509_certificate(att_obj.att_stmt.x5c[0]) 109 registration_data.attest_cert = cert.public_bytes( 110 encoding=Encoding.PEM, 111 ).decode("utf-8") 112 registration_data.attest_cert_fingerprint = fingerprint_sha256(cert) 113 if stage.prevent_duplicate_devices: 114 registration_data.exists_query |= Q( 115 attestation_certificate_fingerprint=registration_data.attest_cert_fingerprint 116 ) 117 118 credential_id_exists = WebAuthnDevice.objects.filter(registration_data.exists_query).first() 119 if credential_id_exists: 120 raise ValidationError("Credential ID already exists.") 121 122 aaguid = registration.aaguid 123 allowed_aaguids = stage.device_type_restrictions.values_list("aaguid", flat=True) 124 if allowed_aaguids.exists(): 125 invalid_error = ValidationError( 126 _( 127 "Invalid device type. Contact your {brand} administrator for help.".format( 128 brand=self.stage.request.brand.branding_title 129 ) 130 ) 131 ) 132 # If there are any restrictions set and we didn't get an aaguid, invalid 133 if not aaguid: 134 raise invalid_error 135 # If one of the restrictions is the "special" unknown device type UUID 136 # but we do have a device type for the given aaguid, invalid 137 if ( 138 UUID(UNKNOWN_DEVICE_TYPE_AAGUID) in allowed_aaguids 139 and not WebAuthnDeviceType.objects.filter(aaguid=aaguid).exists() 140 ): 141 return registration_data 142 # Otherwise just check if the given aaguid is in the allowed aaguids 143 if UUID(aaguid) not in allowed_aaguids: 144 raise invalid_error 145 return registration_data
Validate webauthn challenge response
148class AuthenticatorWebAuthnStageView(ChallengeStageView): 149 """WebAuthn stage""" 150 151 response_class = AuthenticatorWebAuthnChallengeResponse 152 153 def get_challenge(self, *args, **kwargs) -> Challenge: 154 stage: AuthenticatorWebAuthnStage = self.executor.current_stage 155 self.executor.plan.context.setdefault(PLAN_CONTEXT_WEBAUTHN_ATTEMPT, 0) 156 # clear flow variables prior to starting a new registration 157 self.executor.plan.context.pop(PLAN_CONTEXT_WEBAUTHN_CHALLENGE, None) 158 user = self.get_pending_user() 159 160 # library accepts none so we store null in the database, but if there is a value 161 # set, cast it to string to ensure it's not a django class 162 authenticator_attachment = stage.authenticator_attachment 163 if authenticator_attachment: 164 authenticator_attachment = AuthenticatorAttachment(str(authenticator_attachment)) 165 166 hints = [PublicKeyCredentialHint(h) for h in stage.hints] or None 167 168 # For compatibility with older user agents that don't support hints, 169 # auto-infer authenticatorAttachment from hints when not explicitly set. 170 # https://w3c.github.io/webauthn/#enum-hints 171 if hints and not authenticator_attachment: 172 hint_values = set(stage.hints) 173 cross_platform = {"security-key", "hybrid"} 174 platform = {"client-device"} 175 if hint_values <= cross_platform: 176 authenticator_attachment = AuthenticatorAttachment.CROSS_PLATFORM 177 elif hint_values <= platform: 178 authenticator_attachment = AuthenticatorAttachment.PLATFORM 179 180 registration_options: PublicKeyCredentialCreationOptions = generate_registration_options( 181 rp_id=get_rp_id(self.request), 182 rp_name=self.request.brand.branding_title, 183 user_id=user.uid.encode("utf-8"), 184 user_name=user.username, 185 user_display_name=user.name, 186 authenticator_selection=AuthenticatorSelectionCriteria( 187 resident_key=ResidentKeyRequirement(str(stage.resident_key_requirement)), 188 user_verification=UserVerificationRequirement(str(stage.user_verification)), 189 authenticator_attachment=authenticator_attachment, 190 ), 191 attestation=AttestationConveyancePreference.DIRECT, 192 hints=hints, 193 ) 194 195 self.executor.plan.context[PLAN_CONTEXT_WEBAUTHN_CHALLENGE] = registration_options.challenge 196 return AuthenticatorWebAuthnChallenge( 197 data={ 198 "registration": options_to_json_dict(registration_options), 199 } 200 ) 201 202 def get_response_instance(self, data: QueryDict) -> AuthenticatorWebAuthnChallengeResponse: 203 response: AuthenticatorWebAuthnChallengeResponse = super().get_response_instance(data) 204 response.request = self.request 205 response.user = self.get_pending_user() 206 return response 207 208 def challenge_invalid(self, response): 209 stage: AuthenticatorWebAuthnStage = self.executor.current_stage 210 self.executor.plan.context.setdefault(PLAN_CONTEXT_WEBAUTHN_ATTEMPT, 0) 211 self.executor.plan.context[PLAN_CONTEXT_WEBAUTHN_ATTEMPT] += 1 212 if ( 213 stage.max_attempts > 0 214 and self.executor.plan.context[PLAN_CONTEXT_WEBAUTHN_ATTEMPT] >= stage.max_attempts 215 ): 216 return self.executor.stage_invalid( 217 __( 218 "Exceeded maximum attempts. " 219 "Contact your {brand} administrator for help.".format( 220 brand=self.request.brand.branding_title 221 ) 222 ) 223 ) 224 return super().challenge_invalid(response) 225 226 def challenge_valid(self, response: ChallengeResponse) -> HttpResponse: 227 # Webauthn Challenge has already been validated 228 webauthn_credential: VerifiedRegistrationData = response.validated_data["response"] 229 existing_device = WebAuthnDevice.objects.filter(webauthn_credential.exists_query).first() 230 if not existing_device: 231 name = "WebAuthn Device" 232 device_type = WebAuthnDeviceType.objects.filter( 233 aaguid=webauthn_credential.registration.aaguid 234 ).first() 235 if device_type and device_type.description: 236 name = device_type.description 237 WebAuthnDevice.objects.create( 238 name=name, 239 user=self.get_pending_user(), 240 public_key=bytes_to_base64url( 241 webauthn_credential.registration.credential_public_key 242 ), 243 credential_id=bytes_to_base64url(webauthn_credential.registration.credential_id), 244 sign_count=webauthn_credential.registration.sign_count, 245 rp_id=get_rp_id(self.request), 246 device_type=device_type, 247 aaguid=webauthn_credential.registration.aaguid, 248 attestation_certificate_pem=webauthn_credential.attest_cert, 249 attestation_certificate_fingerprint=webauthn_credential.attest_cert_fingerprint, 250 ) 251 else: 252 return self.executor.stage_invalid("Device with Credential ID already exists.") 253 return self.executor.stage_ok()
WebAuthn stage
response_class =
<class 'AuthenticatorWebAuthnChallengeResponse'>
153 def get_challenge(self, *args, **kwargs) -> Challenge: 154 stage: AuthenticatorWebAuthnStage = self.executor.current_stage 155 self.executor.plan.context.setdefault(PLAN_CONTEXT_WEBAUTHN_ATTEMPT, 0) 156 # clear flow variables prior to starting a new registration 157 self.executor.plan.context.pop(PLAN_CONTEXT_WEBAUTHN_CHALLENGE, None) 158 user = self.get_pending_user() 159 160 # library accepts none so we store null in the database, but if there is a value 161 # set, cast it to string to ensure it's not a django class 162 authenticator_attachment = stage.authenticator_attachment 163 if authenticator_attachment: 164 authenticator_attachment = AuthenticatorAttachment(str(authenticator_attachment)) 165 166 hints = [PublicKeyCredentialHint(h) for h in stage.hints] or None 167 168 # For compatibility with older user agents that don't support hints, 169 # auto-infer authenticatorAttachment from hints when not explicitly set. 170 # https://w3c.github.io/webauthn/#enum-hints 171 if hints and not authenticator_attachment: 172 hint_values = set(stage.hints) 173 cross_platform = {"security-key", "hybrid"} 174 platform = {"client-device"} 175 if hint_values <= cross_platform: 176 authenticator_attachment = AuthenticatorAttachment.CROSS_PLATFORM 177 elif hint_values <= platform: 178 authenticator_attachment = AuthenticatorAttachment.PLATFORM 179 180 registration_options: PublicKeyCredentialCreationOptions = generate_registration_options( 181 rp_id=get_rp_id(self.request), 182 rp_name=self.request.brand.branding_title, 183 user_id=user.uid.encode("utf-8"), 184 user_name=user.username, 185 user_display_name=user.name, 186 authenticator_selection=AuthenticatorSelectionCriteria( 187 resident_key=ResidentKeyRequirement(str(stage.resident_key_requirement)), 188 user_verification=UserVerificationRequirement(str(stage.user_verification)), 189 authenticator_attachment=authenticator_attachment, 190 ), 191 attestation=AttestationConveyancePreference.DIRECT, 192 hints=hints, 193 ) 194 195 self.executor.plan.context[PLAN_CONTEXT_WEBAUTHN_CHALLENGE] = registration_options.challenge 196 return AuthenticatorWebAuthnChallenge( 197 data={ 198 "registration": options_to_json_dict(registration_options), 199 } 200 )
Return the challenge that the client should solve
def
get_response_instance( self, data: django.http.request.QueryDict) -> AuthenticatorWebAuthnChallengeResponse:
202 def get_response_instance(self, data: QueryDict) -> AuthenticatorWebAuthnChallengeResponse: 203 response: AuthenticatorWebAuthnChallengeResponse = super().get_response_instance(data) 204 response.request = self.request 205 response.user = self.get_pending_user() 206 return response
Return the response class type
def
challenge_invalid(self, response):
208 def challenge_invalid(self, response): 209 stage: AuthenticatorWebAuthnStage = self.executor.current_stage 210 self.executor.plan.context.setdefault(PLAN_CONTEXT_WEBAUTHN_ATTEMPT, 0) 211 self.executor.plan.context[PLAN_CONTEXT_WEBAUTHN_ATTEMPT] += 1 212 if ( 213 stage.max_attempts > 0 214 and self.executor.plan.context[PLAN_CONTEXT_WEBAUTHN_ATTEMPT] >= stage.max_attempts 215 ): 216 return self.executor.stage_invalid( 217 __( 218 "Exceeded maximum attempts. " 219 "Contact your {brand} administrator for help.".format( 220 brand=self.request.brand.branding_title 221 ) 222 ) 223 ) 224 return super().challenge_invalid(response)
Callback when the challenge has the incorrect format
def
challenge_valid( self, response: authentik.flows.challenge.ChallengeResponse) -> django.http.response.HttpResponse:
226 def challenge_valid(self, response: ChallengeResponse) -> HttpResponse: 227 # Webauthn Challenge has already been validated 228 webauthn_credential: VerifiedRegistrationData = response.validated_data["response"] 229 existing_device = WebAuthnDevice.objects.filter(webauthn_credential.exists_query).first() 230 if not existing_device: 231 name = "WebAuthn Device" 232 device_type = WebAuthnDeviceType.objects.filter( 233 aaguid=webauthn_credential.registration.aaguid 234 ).first() 235 if device_type and device_type.description: 236 name = device_type.description 237 WebAuthnDevice.objects.create( 238 name=name, 239 user=self.get_pending_user(), 240 public_key=bytes_to_base64url( 241 webauthn_credential.registration.credential_public_key 242 ), 243 credential_id=bytes_to_base64url(webauthn_credential.registration.credential_id), 244 sign_count=webauthn_credential.registration.sign_count, 245 rp_id=get_rp_id(self.request), 246 device_type=device_type, 247 aaguid=webauthn_credential.registration.aaguid, 248 attestation_certificate_pem=webauthn_credential.attest_cert, 249 attestation_certificate_fingerprint=webauthn_credential.attest_cert_fingerprint, 250 ) 251 else: 252 return self.executor.stage_invalid("Device with Credential ID already exists.") 253 return self.executor.stage_ok()
Callback when the challenge has the correct format