authentik.stages.authenticator_webauthn.stage

WebAuthn stage

  1"""WebAuthn stage"""
  2
  3from dataclasses import dataclass
  4from uuid import UUID
  5
  6from cryptography.hazmat.primitives.serialization import Encoding
  7from cryptography.x509 import load_der_x509_certificate
  8from django.db.models import Q
  9from django.http import HttpRequest, HttpResponse
 10from django.http.request import QueryDict
 11from django.utils.translation import gettext as __
 12from django.utils.translation import gettext_lazy as _
 13from rest_framework.fields import CharField
 14from rest_framework.serializers import ValidationError
 15from webauthn.helpers.bytes_to_base64url import bytes_to_base64url
 16from webauthn.helpers.exceptions import WebAuthnException
 17from webauthn.helpers.options_to_json_dict import options_to_json_dict
 18from webauthn.helpers.parse_attestation_object import parse_attestation_object
 19from webauthn.helpers.structs import (
 20    AttestationConveyancePreference,
 21    AuthenticatorAttachment,
 22    AuthenticatorSelectionCriteria,
 23    PublicKeyCredentialCreationOptions,
 24    PublicKeyCredentialHint,
 25    ResidentKeyRequirement,
 26    UserVerificationRequirement,
 27)
 28from webauthn.registration.generate_registration_options import generate_registration_options
 29from webauthn.registration.verify_registration_response import (
 30    VerifiedRegistration,
 31    verify_registration_response,
 32)
 33
 34from authentik.core.api.utils import JSONDictField
 35from authentik.core.models import User
 36from authentik.crypto.models import fingerprint_sha256
 37from authentik.flows.challenge import (
 38    Challenge,
 39    ChallengeResponse,
 40    WithUserInfoChallenge,
 41)
 42from authentik.flows.stage import ChallengeStageView
 43from authentik.stages.authenticator_webauthn.models import (
 44    UNKNOWN_DEVICE_TYPE_AAGUID,
 45    AuthenticatorWebAuthnStage,
 46    WebAuthnDevice,
 47    WebAuthnDeviceType,
 48)
 49from authentik.stages.authenticator_webauthn.utils import get_origin, get_rp_id
 50
 51PLAN_CONTEXT_WEBAUTHN_CHALLENGE = "goauthentik.io/stages/authenticator_webauthn/challenge"
 52PLAN_CONTEXT_WEBAUTHN_ATTEMPT = "goauthentik.io/stages/authenticator_webauthn/attempt"
 53
 54
 55@dataclass
 56class VerifiedRegistrationData:
 57    registration: VerifiedRegistration
 58    exists_query: Q
 59    attest_cert: str | None = None
 60    attest_cert_fingerprint: str | None = None
 61
 62
 63class AuthenticatorWebAuthnChallenge(WithUserInfoChallenge):
 64    """WebAuthn Challenge"""
 65
 66    registration = JSONDictField()
 67    component = CharField(default="ak-stage-authenticator-webauthn")
 68
 69
 70class AuthenticatorWebAuthnChallengeResponse(ChallengeResponse):
 71    """WebAuthn Challenge response"""
 72
 73    response = JSONDictField()
 74    component = CharField(default="ak-stage-authenticator-webauthn")
 75
 76    request: HttpRequest
 77    user: User
 78
 79    def validate_response(self, response: dict) -> VerifiedRegistrationData:
 80        """Validate webauthn challenge response"""
 81        challenge = self.stage.executor.plan.context[PLAN_CONTEXT_WEBAUTHN_CHALLENGE]
 82
 83        try:
 84            registration: VerifiedRegistration = verify_registration_response(
 85                credential=response,
 86                expected_challenge=challenge,
 87                expected_rp_id=get_rp_id(self.request),
 88                expected_origin=get_origin(self.request),
 89            )
 90        except WebAuthnException as exc:
 91            self.stage.logger.warning("registration failed", exc=exc)
 92            raise ValidationError(f"Registration failed. Error: {exc}") from None
 93
 94        registration_data = VerifiedRegistrationData(
 95            registration,
 96            exists_query=Q(credential_id=bytes_to_base64url(registration.credential_id)),
 97        )
 98        stage: AuthenticatorWebAuthnStage = self.stage.executor.current_stage
 99
100        att_obj = parse_attestation_object(registration.attestation_object)
101        if (
102            att_obj
103            and att_obj.att_stmt
104            and att_obj.att_stmt.x5c is not None
105            and len(att_obj.att_stmt.x5c) > 0
106        ):
107            cert = load_der_x509_certificate(att_obj.att_stmt.x5c[0])
108            registration_data.attest_cert = cert.public_bytes(
109                encoding=Encoding.PEM,
110            ).decode("utf-8")
111            registration_data.attest_cert_fingerprint = fingerprint_sha256(cert)
112            if stage.prevent_duplicate_devices:
113                registration_data.exists_query |= Q(
114                    attestation_certificate_fingerprint=registration_data.attest_cert_fingerprint
115                )
116
117        credential_id_exists = WebAuthnDevice.objects.filter(registration_data.exists_query).first()
118        if credential_id_exists:
119            raise ValidationError("Credential ID already exists.")
120
121        aaguid = registration.aaguid
122        allowed_aaguids = stage.device_type_restrictions.values_list("aaguid", flat=True)
123        if allowed_aaguids.exists():
124            invalid_error = ValidationError(
125                _(
126                    "Invalid device type. Contact your {brand} administrator for help.".format(
127                        brand=self.stage.request.brand.branding_title
128                    )
129                )
130            )
131            # If there are any restrictions set and we didn't get an aaguid, invalid
132            if not aaguid:
133                raise invalid_error
134            # If one of the restrictions is the "special" unknown device type UUID
135            # but we do have a device type for the given aaguid, invalid
136            if (
137                UUID(UNKNOWN_DEVICE_TYPE_AAGUID) in allowed_aaguids
138                and not WebAuthnDeviceType.objects.filter(aaguid=aaguid).exists()
139            ):
140                return registration_data
141            # Otherwise just check if the given aaguid is in the allowed aaguids
142            if UUID(aaguid) not in allowed_aaguids:
143                raise invalid_error
144        return registration_data
145
146
147class AuthenticatorWebAuthnStageView(ChallengeStageView):
148    """WebAuthn stage"""
149
150    response_class = AuthenticatorWebAuthnChallengeResponse
151
152    def get_challenge(self, *args, **kwargs) -> Challenge:
153        stage: AuthenticatorWebAuthnStage = self.executor.current_stage
154        self.executor.plan.context.setdefault(PLAN_CONTEXT_WEBAUTHN_ATTEMPT, 0)
155        # clear flow variables prior to starting a new registration
156        self.executor.plan.context.pop(PLAN_CONTEXT_WEBAUTHN_CHALLENGE, None)
157        user = self.get_pending_user()
158
159        # library accepts none so we store null in the database, but if there is a value
160        # set, cast it to string to ensure it's not a django class
161        authenticator_attachment = stage.authenticator_attachment
162        if authenticator_attachment:
163            authenticator_attachment = AuthenticatorAttachment(str(authenticator_attachment))
164
165        hints = [PublicKeyCredentialHint(h) for h in stage.hints] or None
166
167        # For compatibility with older user agents that don't support hints,
168        # auto-infer authenticatorAttachment from hints when not explicitly set.
169        # https://w3c.github.io/webauthn/#enum-hints
170        if hints and not authenticator_attachment:
171            hint_values = set(stage.hints)
172            cross_platform = {"security-key", "hybrid"}
173            platform = {"client-device"}
174            if hint_values <= cross_platform:
175                authenticator_attachment = AuthenticatorAttachment.CROSS_PLATFORM
176            elif hint_values <= platform:
177                authenticator_attachment = AuthenticatorAttachment.PLATFORM
178
179        registration_options: PublicKeyCredentialCreationOptions = generate_registration_options(
180            rp_id=get_rp_id(self.request),
181            rp_name=self.request.brand.branding_title,
182            user_id=user.uid.encode("utf-8"),
183            user_name=user.username,
184            user_display_name=user.name,
185            authenticator_selection=AuthenticatorSelectionCriteria(
186                resident_key=ResidentKeyRequirement(str(stage.resident_key_requirement)),
187                user_verification=UserVerificationRequirement(str(stage.user_verification)),
188                authenticator_attachment=authenticator_attachment,
189            ),
190            attestation=AttestationConveyancePreference.DIRECT,
191            hints=hints,
192        )
193
194        self.executor.plan.context[PLAN_CONTEXT_WEBAUTHN_CHALLENGE] = registration_options.challenge
195        return AuthenticatorWebAuthnChallenge(
196            data={
197                "registration": options_to_json_dict(registration_options),
198            }
199        )
200
201    def get_response_instance(self, data: QueryDict) -> AuthenticatorWebAuthnChallengeResponse:
202        response: AuthenticatorWebAuthnChallengeResponse = super().get_response_instance(data)
203        response.request = self.request
204        response.user = self.get_pending_user()
205        return response
206
207    def challenge_invalid(self, response):
208        stage: AuthenticatorWebAuthnStage = self.executor.current_stage
209        self.executor.plan.context.setdefault(PLAN_CONTEXT_WEBAUTHN_ATTEMPT, 0)
210        self.executor.plan.context[PLAN_CONTEXT_WEBAUTHN_ATTEMPT] += 1
211        if (
212            stage.max_attempts > 0
213            and self.executor.plan.context[PLAN_CONTEXT_WEBAUTHN_ATTEMPT] >= stage.max_attempts
214        ):
215            return self.executor.stage_invalid(
216                __(
217                    "Exceeded maximum attempts. "
218                    "Contact your {brand} administrator for help.".format(
219                        brand=self.request.brand.branding_title
220                    )
221                )
222            )
223        return super().challenge_invalid(response)
224
225    def challenge_valid(self, response: ChallengeResponse) -> HttpResponse:
226        # Webauthn Challenge has already been validated
227        webauthn_credential: VerifiedRegistrationData = response.validated_data["response"]
228        existing_device = WebAuthnDevice.objects.filter(webauthn_credential.exists_query).first()
229        if not existing_device:
230            name = "WebAuthn Device"
231            device_type = WebAuthnDeviceType.objects.filter(
232                aaguid=webauthn_credential.registration.aaguid
233            ).first()
234            if device_type and device_type.description:
235                name = device_type.description
236            WebAuthnDevice.objects.create(
237                name=name,
238                user=self.get_pending_user(),
239                public_key=bytes_to_base64url(
240                    webauthn_credential.registration.credential_public_key
241                ),
242                credential_id=bytes_to_base64url(webauthn_credential.registration.credential_id),
243                sign_count=webauthn_credential.registration.sign_count,
244                rp_id=get_rp_id(self.request),
245                device_type=device_type,
246                aaguid=webauthn_credential.registration.aaguid,
247                attestation_certificate_pem=webauthn_credential.attest_cert,
248                attestation_certificate_fingerprint=webauthn_credential.attest_cert_fingerprint,
249            )
250        else:
251            return self.executor.stage_invalid("Device with Credential ID already exists.")
252        return self.executor.stage_ok()
PLAN_CONTEXT_WEBAUTHN_CHALLENGE = 'goauthentik.io/stages/authenticator_webauthn/challenge'
PLAN_CONTEXT_WEBAUTHN_ATTEMPT = 'goauthentik.io/stages/authenticator_webauthn/attempt'
@dataclass
class VerifiedRegistrationData:
56@dataclass
57class VerifiedRegistrationData:
58    registration: VerifiedRegistration
59    exists_query: Q
60    attest_cert: str | None = None
61    attest_cert_fingerprint: str | None = None
VerifiedRegistrationData( registration: webauthn.registration.verify_registration_response.VerifiedRegistration, exists_query: django.db.models.query_utils.Q, attest_cert: str | None = None, attest_cert_fingerprint: str | None = None)
registration: webauthn.registration.verify_registration_response.VerifiedRegistration
exists_query: django.db.models.query_utils.Q
attest_cert: str | None = None
attest_cert_fingerprint: str | None = None
class AuthenticatorWebAuthnChallenge(authentik.flows.challenge.WithUserInfoChallenge):
64class AuthenticatorWebAuthnChallenge(WithUserInfoChallenge):
65    """WebAuthn Challenge"""
66
67    registration = JSONDictField()
68    component = CharField(default="ak-stage-authenticator-webauthn")

WebAuthn Challenge

registration
component
class AuthenticatorWebAuthnChallengeResponse(authentik.flows.challenge.ChallengeResponse):
 71class AuthenticatorWebAuthnChallengeResponse(ChallengeResponse):
 72    """WebAuthn Challenge response"""
 73
 74    response = JSONDictField()
 75    component = CharField(default="ak-stage-authenticator-webauthn")
 76
 77    request: HttpRequest
 78    user: User
 79
 80    def validate_response(self, response: dict) -> VerifiedRegistrationData:
 81        """Validate webauthn challenge response"""
 82        challenge = self.stage.executor.plan.context[PLAN_CONTEXT_WEBAUTHN_CHALLENGE]
 83
 84        try:
 85            registration: VerifiedRegistration = verify_registration_response(
 86                credential=response,
 87                expected_challenge=challenge,
 88                expected_rp_id=get_rp_id(self.request),
 89                expected_origin=get_origin(self.request),
 90            )
 91        except WebAuthnException as exc:
 92            self.stage.logger.warning("registration failed", exc=exc)
 93            raise ValidationError(f"Registration failed. Error: {exc}") from None
 94
 95        registration_data = VerifiedRegistrationData(
 96            registration,
 97            exists_query=Q(credential_id=bytes_to_base64url(registration.credential_id)),
 98        )
 99        stage: AuthenticatorWebAuthnStage = self.stage.executor.current_stage
100
101        att_obj = parse_attestation_object(registration.attestation_object)
102        if (
103            att_obj
104            and att_obj.att_stmt
105            and att_obj.att_stmt.x5c is not None
106            and len(att_obj.att_stmt.x5c) > 0
107        ):
108            cert = load_der_x509_certificate(att_obj.att_stmt.x5c[0])
109            registration_data.attest_cert = cert.public_bytes(
110                encoding=Encoding.PEM,
111            ).decode("utf-8")
112            registration_data.attest_cert_fingerprint = fingerprint_sha256(cert)
113            if stage.prevent_duplicate_devices:
114                registration_data.exists_query |= Q(
115                    attestation_certificate_fingerprint=registration_data.attest_cert_fingerprint
116                )
117
118        credential_id_exists = WebAuthnDevice.objects.filter(registration_data.exists_query).first()
119        if credential_id_exists:
120            raise ValidationError("Credential ID already exists.")
121
122        aaguid = registration.aaguid
123        allowed_aaguids = stage.device_type_restrictions.values_list("aaguid", flat=True)
124        if allowed_aaguids.exists():
125            invalid_error = ValidationError(
126                _(
127                    "Invalid device type. Contact your {brand} administrator for help.".format(
128                        brand=self.stage.request.brand.branding_title
129                    )
130                )
131            )
132            # If there are any restrictions set and we didn't get an aaguid, invalid
133            if not aaguid:
134                raise invalid_error
135            # If one of the restrictions is the "special" unknown device type UUID
136            # but we do have a device type for the given aaguid, invalid
137            if (
138                UUID(UNKNOWN_DEVICE_TYPE_AAGUID) in allowed_aaguids
139                and not WebAuthnDeviceType.objects.filter(aaguid=aaguid).exists()
140            ):
141                return registration_data
142            # Otherwise just check if the given aaguid is in the allowed aaguids
143            if UUID(aaguid) not in allowed_aaguids:
144                raise invalid_error
145        return registration_data

WebAuthn Challenge response

response
component
request: django.http.request.HttpRequest
def validate_response( self, response: dict) -> VerifiedRegistrationData:
 80    def validate_response(self, response: dict) -> VerifiedRegistrationData:
 81        """Validate webauthn challenge response"""
 82        challenge = self.stage.executor.plan.context[PLAN_CONTEXT_WEBAUTHN_CHALLENGE]
 83
 84        try:
 85            registration: VerifiedRegistration = verify_registration_response(
 86                credential=response,
 87                expected_challenge=challenge,
 88                expected_rp_id=get_rp_id(self.request),
 89                expected_origin=get_origin(self.request),
 90            )
 91        except WebAuthnException as exc:
 92            self.stage.logger.warning("registration failed", exc=exc)
 93            raise ValidationError(f"Registration failed. Error: {exc}") from None
 94
 95        registration_data = VerifiedRegistrationData(
 96            registration,
 97            exists_query=Q(credential_id=bytes_to_base64url(registration.credential_id)),
 98        )
 99        stage: AuthenticatorWebAuthnStage = self.stage.executor.current_stage
100
101        att_obj = parse_attestation_object(registration.attestation_object)
102        if (
103            att_obj
104            and att_obj.att_stmt
105            and att_obj.att_stmt.x5c is not None
106            and len(att_obj.att_stmt.x5c) > 0
107        ):
108            cert = load_der_x509_certificate(att_obj.att_stmt.x5c[0])
109            registration_data.attest_cert = cert.public_bytes(
110                encoding=Encoding.PEM,
111            ).decode("utf-8")
112            registration_data.attest_cert_fingerprint = fingerprint_sha256(cert)
113            if stage.prevent_duplicate_devices:
114                registration_data.exists_query |= Q(
115                    attestation_certificate_fingerprint=registration_data.attest_cert_fingerprint
116                )
117
118        credential_id_exists = WebAuthnDevice.objects.filter(registration_data.exists_query).first()
119        if credential_id_exists:
120            raise ValidationError("Credential ID already exists.")
121
122        aaguid = registration.aaguid
123        allowed_aaguids = stage.device_type_restrictions.values_list("aaguid", flat=True)
124        if allowed_aaguids.exists():
125            invalid_error = ValidationError(
126                _(
127                    "Invalid device type. Contact your {brand} administrator for help.".format(
128                        brand=self.stage.request.brand.branding_title
129                    )
130                )
131            )
132            # If there are any restrictions set and we didn't get an aaguid, invalid
133            if not aaguid:
134                raise invalid_error
135            # If one of the restrictions is the "special" unknown device type UUID
136            # but we do have a device type for the given aaguid, invalid
137            if (
138                UUID(UNKNOWN_DEVICE_TYPE_AAGUID) in allowed_aaguids
139                and not WebAuthnDeviceType.objects.filter(aaguid=aaguid).exists()
140            ):
141                return registration_data
142            # Otherwise just check if the given aaguid is in the allowed aaguids
143            if UUID(aaguid) not in allowed_aaguids:
144                raise invalid_error
145        return registration_data

Validate webauthn challenge response

class AuthenticatorWebAuthnStageView(authentik.flows.stage.ChallengeStageView):
148class AuthenticatorWebAuthnStageView(ChallengeStageView):
149    """WebAuthn stage"""
150
151    response_class = AuthenticatorWebAuthnChallengeResponse
152
153    def get_challenge(self, *args, **kwargs) -> Challenge:
154        stage: AuthenticatorWebAuthnStage = self.executor.current_stage
155        self.executor.plan.context.setdefault(PLAN_CONTEXT_WEBAUTHN_ATTEMPT, 0)
156        # clear flow variables prior to starting a new registration
157        self.executor.plan.context.pop(PLAN_CONTEXT_WEBAUTHN_CHALLENGE, None)
158        user = self.get_pending_user()
159
160        # library accepts none so we store null in the database, but if there is a value
161        # set, cast it to string to ensure it's not a django class
162        authenticator_attachment = stage.authenticator_attachment
163        if authenticator_attachment:
164            authenticator_attachment = AuthenticatorAttachment(str(authenticator_attachment))
165
166        hints = [PublicKeyCredentialHint(h) for h in stage.hints] or None
167
168        # For compatibility with older user agents that don't support hints,
169        # auto-infer authenticatorAttachment from hints when not explicitly set.
170        # https://w3c.github.io/webauthn/#enum-hints
171        if hints and not authenticator_attachment:
172            hint_values = set(stage.hints)
173            cross_platform = {"security-key", "hybrid"}
174            platform = {"client-device"}
175            if hint_values <= cross_platform:
176                authenticator_attachment = AuthenticatorAttachment.CROSS_PLATFORM
177            elif hint_values <= platform:
178                authenticator_attachment = AuthenticatorAttachment.PLATFORM
179
180        registration_options: PublicKeyCredentialCreationOptions = generate_registration_options(
181            rp_id=get_rp_id(self.request),
182            rp_name=self.request.brand.branding_title,
183            user_id=user.uid.encode("utf-8"),
184            user_name=user.username,
185            user_display_name=user.name,
186            authenticator_selection=AuthenticatorSelectionCriteria(
187                resident_key=ResidentKeyRequirement(str(stage.resident_key_requirement)),
188                user_verification=UserVerificationRequirement(str(stage.user_verification)),
189                authenticator_attachment=authenticator_attachment,
190            ),
191            attestation=AttestationConveyancePreference.DIRECT,
192            hints=hints,
193        )
194
195        self.executor.plan.context[PLAN_CONTEXT_WEBAUTHN_CHALLENGE] = registration_options.challenge
196        return AuthenticatorWebAuthnChallenge(
197            data={
198                "registration": options_to_json_dict(registration_options),
199            }
200        )
201
202    def get_response_instance(self, data: QueryDict) -> AuthenticatorWebAuthnChallengeResponse:
203        response: AuthenticatorWebAuthnChallengeResponse = super().get_response_instance(data)
204        response.request = self.request
205        response.user = self.get_pending_user()
206        return response
207
208    def challenge_invalid(self, response):
209        stage: AuthenticatorWebAuthnStage = self.executor.current_stage
210        self.executor.plan.context.setdefault(PLAN_CONTEXT_WEBAUTHN_ATTEMPT, 0)
211        self.executor.plan.context[PLAN_CONTEXT_WEBAUTHN_ATTEMPT] += 1
212        if (
213            stage.max_attempts > 0
214            and self.executor.plan.context[PLAN_CONTEXT_WEBAUTHN_ATTEMPT] >= stage.max_attempts
215        ):
216            return self.executor.stage_invalid(
217                __(
218                    "Exceeded maximum attempts. "
219                    "Contact your {brand} administrator for help.".format(
220                        brand=self.request.brand.branding_title
221                    )
222                )
223            )
224        return super().challenge_invalid(response)
225
226    def challenge_valid(self, response: ChallengeResponse) -> HttpResponse:
227        # Webauthn Challenge has already been validated
228        webauthn_credential: VerifiedRegistrationData = response.validated_data["response"]
229        existing_device = WebAuthnDevice.objects.filter(webauthn_credential.exists_query).first()
230        if not existing_device:
231            name = "WebAuthn Device"
232            device_type = WebAuthnDeviceType.objects.filter(
233                aaguid=webauthn_credential.registration.aaguid
234            ).first()
235            if device_type and device_type.description:
236                name = device_type.description
237            WebAuthnDevice.objects.create(
238                name=name,
239                user=self.get_pending_user(),
240                public_key=bytes_to_base64url(
241                    webauthn_credential.registration.credential_public_key
242                ),
243                credential_id=bytes_to_base64url(webauthn_credential.registration.credential_id),
244                sign_count=webauthn_credential.registration.sign_count,
245                rp_id=get_rp_id(self.request),
246                device_type=device_type,
247                aaguid=webauthn_credential.registration.aaguid,
248                attestation_certificate_pem=webauthn_credential.attest_cert,
249                attestation_certificate_fingerprint=webauthn_credential.attest_cert_fingerprint,
250            )
251        else:
252            return self.executor.stage_invalid("Device with Credential ID already exists.")
253        return self.executor.stage_ok()

WebAuthn stage

response_class = <class 'AuthenticatorWebAuthnChallengeResponse'>
def get_challenge(self, *args, **kwargs) -> authentik.flows.challenge.Challenge:
153    def get_challenge(self, *args, **kwargs) -> Challenge:
154        stage: AuthenticatorWebAuthnStage = self.executor.current_stage
155        self.executor.plan.context.setdefault(PLAN_CONTEXT_WEBAUTHN_ATTEMPT, 0)
156        # clear flow variables prior to starting a new registration
157        self.executor.plan.context.pop(PLAN_CONTEXT_WEBAUTHN_CHALLENGE, None)
158        user = self.get_pending_user()
159
160        # library accepts none so we store null in the database, but if there is a value
161        # set, cast it to string to ensure it's not a django class
162        authenticator_attachment = stage.authenticator_attachment
163        if authenticator_attachment:
164            authenticator_attachment = AuthenticatorAttachment(str(authenticator_attachment))
165
166        hints = [PublicKeyCredentialHint(h) for h in stage.hints] or None
167
168        # For compatibility with older user agents that don't support hints,
169        # auto-infer authenticatorAttachment from hints when not explicitly set.
170        # https://w3c.github.io/webauthn/#enum-hints
171        if hints and not authenticator_attachment:
172            hint_values = set(stage.hints)
173            cross_platform = {"security-key", "hybrid"}
174            platform = {"client-device"}
175            if hint_values <= cross_platform:
176                authenticator_attachment = AuthenticatorAttachment.CROSS_PLATFORM
177            elif hint_values <= platform:
178                authenticator_attachment = AuthenticatorAttachment.PLATFORM
179
180        registration_options: PublicKeyCredentialCreationOptions = generate_registration_options(
181            rp_id=get_rp_id(self.request),
182            rp_name=self.request.brand.branding_title,
183            user_id=user.uid.encode("utf-8"),
184            user_name=user.username,
185            user_display_name=user.name,
186            authenticator_selection=AuthenticatorSelectionCriteria(
187                resident_key=ResidentKeyRequirement(str(stage.resident_key_requirement)),
188                user_verification=UserVerificationRequirement(str(stage.user_verification)),
189                authenticator_attachment=authenticator_attachment,
190            ),
191            attestation=AttestationConveyancePreference.DIRECT,
192            hints=hints,
193        )
194
195        self.executor.plan.context[PLAN_CONTEXT_WEBAUTHN_CHALLENGE] = registration_options.challenge
196        return AuthenticatorWebAuthnChallenge(
197            data={
198                "registration": options_to_json_dict(registration_options),
199            }
200        )

Return the challenge that the client should solve

def get_response_instance( self, data: django.http.request.QueryDict) -> AuthenticatorWebAuthnChallengeResponse:
202    def get_response_instance(self, data: QueryDict) -> AuthenticatorWebAuthnChallengeResponse:
203        response: AuthenticatorWebAuthnChallengeResponse = super().get_response_instance(data)
204        response.request = self.request
205        response.user = self.get_pending_user()
206        return response

Return the response class type

def challenge_invalid(self, response):
208    def challenge_invalid(self, response):
209        stage: AuthenticatorWebAuthnStage = self.executor.current_stage
210        self.executor.plan.context.setdefault(PLAN_CONTEXT_WEBAUTHN_ATTEMPT, 0)
211        self.executor.plan.context[PLAN_CONTEXT_WEBAUTHN_ATTEMPT] += 1
212        if (
213            stage.max_attempts > 0
214            and self.executor.plan.context[PLAN_CONTEXT_WEBAUTHN_ATTEMPT] >= stage.max_attempts
215        ):
216            return self.executor.stage_invalid(
217                __(
218                    "Exceeded maximum attempts. "
219                    "Contact your {brand} administrator for help.".format(
220                        brand=self.request.brand.branding_title
221                    )
222                )
223            )
224        return super().challenge_invalid(response)

Callback when the challenge has the incorrect format

def challenge_valid( self, response: authentik.flows.challenge.ChallengeResponse) -> django.http.response.HttpResponse:
226    def challenge_valid(self, response: ChallengeResponse) -> HttpResponse:
227        # Webauthn Challenge has already been validated
228        webauthn_credential: VerifiedRegistrationData = response.validated_data["response"]
229        existing_device = WebAuthnDevice.objects.filter(webauthn_credential.exists_query).first()
230        if not existing_device:
231            name = "WebAuthn Device"
232            device_type = WebAuthnDeviceType.objects.filter(
233                aaguid=webauthn_credential.registration.aaguid
234            ).first()
235            if device_type and device_type.description:
236                name = device_type.description
237            WebAuthnDevice.objects.create(
238                name=name,
239                user=self.get_pending_user(),
240                public_key=bytes_to_base64url(
241                    webauthn_credential.registration.credential_public_key
242                ),
243                credential_id=bytes_to_base64url(webauthn_credential.registration.credential_id),
244                sign_count=webauthn_credential.registration.sign_count,
245                rp_id=get_rp_id(self.request),
246                device_type=device_type,
247                aaguid=webauthn_credential.registration.aaguid,
248                attestation_certificate_pem=webauthn_credential.attest_cert,
249                attestation_certificate_fingerprint=webauthn_credential.attest_cert_fingerprint,
250            )
251        else:
252            return self.executor.stage_invalid("Device with Credential ID already exists.")
253        return self.executor.stage_ok()

Callback when the challenge has the correct format