authentik.stages.authenticator_validate.challenge

Validation stage challenge checking

  1"""Validation stage challenge checking"""
  2
  3from typing import TYPE_CHECKING
  4from urllib.parse import urlencode
  5
  6from django.db import transaction
  7from django.http import HttpRequest
  8from django.http.response import Http404
  9from django.shortcuts import get_object_or_404
 10from django.utils.translation import gettext as __
 11from django.utils.translation import gettext_lazy as _
 12from rest_framework.fields import CharField, ChoiceField, DateTimeField
 13from rest_framework.serializers import ValidationError
 14from structlog.stdlib import get_logger
 15from webauthn.authentication.generate_authentication_options import generate_authentication_options
 16from webauthn.authentication.verify_authentication_response import verify_authentication_response
 17from webauthn.helpers import parse_authentication_credential_json
 18from webauthn.helpers.base64url_to_bytes import base64url_to_bytes
 19from webauthn.helpers.exceptions import InvalidAuthenticationResponse, InvalidJSONStructure
 20from webauthn.helpers.options_to_json_dict import options_to_json_dict
 21from webauthn.helpers.structs import PublicKeyCredentialType, UserVerificationRequirement
 22
 23from authentik.core.api.utils import JSONDictField, PassiveSerializer
 24from authentik.core.models import Application, User
 25from authentik.core.signals import login_failed
 26from authentik.events.middleware import audit_ignore
 27from authentik.events.models import Event, EventAction
 28from authentik.flows.planner import PLAN_CONTEXT_APPLICATION
 29from authentik.flows.stage import StageView
 30from authentik.lib.utils.email import mask_email
 31from authentik.lib.utils.time import timedelta_from_string
 32from authentik.root.middleware import ClientIPMiddleware
 33from authentik.stages.authenticator import devices_for_user
 34from authentik.stages.authenticator.models import Device, ThrottlingMixin
 35from authentik.stages.authenticator_duo.models import AuthenticatorDuoStage, DuoDevice
 36from authentik.stages.authenticator_email.models import EmailDevice
 37from authentik.stages.authenticator_sms.models import SMSDevice
 38from authentik.stages.authenticator_validate.models import AuthenticatorValidateStage, DeviceClasses
 39from authentik.stages.authenticator_webauthn.models import UserVerification, WebAuthnDevice
 40from authentik.stages.authenticator_webauthn.stage import PLAN_CONTEXT_WEBAUTHN_CHALLENGE
 41from authentik.stages.authenticator_webauthn.utils import get_origin, get_rp_id
 42from authentik.stages.password.stage import PLAN_CONTEXT_METHOD_ARGS
 43
 44LOGGER = get_logger()
 45if TYPE_CHECKING:
 46    from authentik.stages.authenticator_validate.stage import AuthenticatorValidateStageView
 47
 48
 49class DeviceChallenge(PassiveSerializer):
 50    """Single device challenge"""
 51
 52    device_class = ChoiceField(choices=DeviceClasses.choices)
 53    device_uid = CharField()
 54    challenge = JSONDictField()
 55    last_used = DateTimeField(allow_null=True)
 56
 57
 58def get_challenge_for_device(
 59    stage_view: AuthenticatorValidateStageView, stage: AuthenticatorValidateStage, device: Device
 60) -> dict:
 61    """Generate challenge for a single device"""
 62    if isinstance(device, WebAuthnDevice):
 63        return get_webauthn_challenge(stage_view, stage, device)
 64    if isinstance(device, EmailDevice):
 65        return {"email": mask_email(device.email)}
 66    # Code-based challenges have no hints
 67    return {}
 68
 69
 70def get_webauthn_challenge_without_user(
 71    stage_view: AuthenticatorValidateStageView, stage: AuthenticatorValidateStage
 72) -> dict:
 73    """Same as `get_webauthn_challenge`, but allows any client device. We can then later check
 74    who the device belongs to."""
 75    stage_view.executor.plan.context.pop(PLAN_CONTEXT_WEBAUTHN_CHALLENGE, None)
 76    authentication_options = generate_authentication_options(
 77        rp_id=get_rp_id(stage_view.request),
 78        allow_credentials=[],
 79        user_verification=UserVerificationRequirement(stage.webauthn_user_verification),
 80    )
 81    stage_view.executor.plan.context[PLAN_CONTEXT_WEBAUTHN_CHALLENGE] = (
 82        authentication_options.challenge
 83    )
 84
 85    options_dict = options_to_json_dict(authentication_options)
 86    if stage.webauthn_hints:
 87        options_dict["hints"] = list(stage.webauthn_hints)
 88    return options_dict
 89
 90
 91def get_webauthn_challenge(
 92    stage_view: AuthenticatorValidateStageView,
 93    stage: AuthenticatorValidateStage,
 94    device: WebAuthnDevice | None = None,
 95) -> dict:
 96    """Send the client a challenge that we'll check later"""
 97    stage_view.executor.plan.context.pop(PLAN_CONTEXT_WEBAUTHN_CHALLENGE, None)
 98
 99    allowed_credentials = []
100
101    if device:
102        # We want all the user's WebAuthn devices and merge their challenges
103        for user_device in WebAuthnDevice.objects.filter(user=device.user).order_by("name"):
104            user_device: WebAuthnDevice
105            allowed_credentials.append(user_device.descriptor)
106
107    authentication_options = generate_authentication_options(
108        rp_id=get_rp_id(stage_view.request),
109        allow_credentials=allowed_credentials,
110        user_verification=UserVerificationRequirement(stage.webauthn_user_verification),
111    )
112
113    stage_view.executor.plan.context[PLAN_CONTEXT_WEBAUTHN_CHALLENGE] = (
114        authentication_options.challenge
115    )
116
117    options_dict = options_to_json_dict(authentication_options)
118    if stage.webauthn_hints:
119        options_dict["hints"] = list(stage.webauthn_hints)
120    return options_dict
121
122
123def select_challenge(request: HttpRequest, device: Device):
124    """Callback when the user selected a challenge in the frontend."""
125    if isinstance(device, SMSDevice):
126        select_challenge_sms(request, device)
127    elif isinstance(device, EmailDevice):
128        select_challenge_email(request, device)
129
130
131def select_challenge_sms(request: HttpRequest, device: SMSDevice):
132    """Send SMS"""
133    device.generate_token()
134    device.stage.send(request, device.token, device)
135
136
137def select_challenge_email(request: HttpRequest, device: EmailDevice):
138    """Send Email"""
139    valid_secs: int = timedelta_from_string(device.stage.token_expiry).total_seconds()
140    device.generate_token(valid_secs=valid_secs)
141    device.stage.send(device)
142
143
144def validate_challenge_code(code: str, stage_view: StageView, user: User) -> Device:
145    """Validate code-based challenges. We test against every device, on purpose, as
146    the user mustn't choose between totp and static devices."""
147
148    with transaction.atomic():
149        for device in devices_for_user(user, for_verify=True):
150            if isinstance(device, ThrottlingMixin):
151                throttling_factor = stage_view.executor.current_stage.get_throttling_factor(
152                    DeviceClasses.from_model_label(device.model_label())
153                )
154                if throttling_factor is not None:
155                    device.set_throttle_factor(throttling_factor)
156            if device.verify_token(code):
157                break
158        else:
159            device = None
160
161    if not device:
162        login_failed.send(
163            sender=__name__,
164            credentials={"username": user.username},
165            request=stage_view.request,
166            stage=stage_view.executor.current_stage,
167            context={
168                PLAN_CONTEXT_METHOD_ARGS: {
169                    "device_class": DeviceClasses.TOTP.value,
170                }
171            },
172        )
173        raise ValidationError(
174            _("Invalid Token. Please ensure the time on your device is accurate and try again.")
175        )
176    return device
177
178
179def validate_challenge_webauthn(
180    data: dict,
181    stage_view: StageView,
182    user: User,
183    stage: AuthenticatorValidateStage | None = None,
184) -> Device:
185    """Validate WebAuthn Challenge"""
186    request = stage_view.request
187    challenge = stage_view.executor.plan.context.get(PLAN_CONTEXT_WEBAUTHN_CHALLENGE)
188    stage = stage or stage_view.executor.current_stage
189
190    if "MinuteMaid" in request.META.get("HTTP_USER_AGENT", ""):
191        # Workaround for Android sign-in, when signing into Google Workspace on android while
192        # adding the account to the system (not in Chrome), for some reason `type` is not set
193        # so in that case we fall back to `public-key`
194        # since that's the only option we support anyways
195        data.setdefault("type", PublicKeyCredentialType.PUBLIC_KEY)
196    try:
197        credential = parse_authentication_credential_json(data)
198    except InvalidJSONStructure as exc:
199        LOGGER.warning("Invalid WebAuthn challenge response", exc=exc)
200        raise ValidationError("Invalid device", "invalid") from None
201
202    device = WebAuthnDevice.objects.filter(credential_id=credential.id).first()
203    if not device:
204        raise ValidationError("Invalid device", "invalid")
205    # We can only check the device's user if the user we're given isn't anonymous
206    # as this validation is also used for password-less login where webauthn is the very first
207    # step done by a user. Only if this validation happens at a later stage we can check
208    # that the device belongs to the user
209    if not user.is_anonymous and device.user != user:
210        raise ValidationError("Invalid device", "invalid")
211    # When a device_type was set when creating the device (2024.4+), and we have a limitation,
212    # make sure the device type is allowed.
213    if (
214        device.device_type
215        and stage.webauthn_allowed_device_types.exists()
216        and not stage.webauthn_allowed_device_types.filter(pk=device.device_type.pk).exists()
217    ):
218        raise ValidationError(
219            _(
220                "Invalid device type. Contact your {brand} administrator for help.".format(
221                    brand=stage_view.request.brand.branding_title
222                )
223            ),
224            "invalid",
225        )
226    try:
227        authentication_verification = verify_authentication_response(
228            credential=credential,
229            expected_challenge=challenge,
230            expected_rp_id=get_rp_id(request),
231            expected_origin=get_origin(request),
232            credential_public_key=base64url_to_bytes(device.public_key),
233            credential_current_sign_count=device.sign_count,
234            require_user_verification=stage.webauthn_user_verification == UserVerification.REQUIRED,
235        )
236    except InvalidAuthenticationResponse as exc:
237        LOGGER.warning("Assertion failed", exc=exc)
238        login_failed.send(
239            sender=__name__,
240            credentials={"username": user.username},
241            request=stage_view.request,
242            stage=stage_view.executor.current_stage,
243            context={
244                PLAN_CONTEXT_METHOD_ARGS: {
245                    "device": device,
246                    "device_class": DeviceClasses.WEBAUTHN.value,
247                    "device_type": device.device_type,
248                },
249            },
250        )
251        raise ValidationError("Assertion failed") from exc
252
253    with audit_ignore():
254        device.set_sign_count(authentication_verification.new_sign_count)
255    return device
256
257
258def validate_challenge_duo(device_pk: int, stage_view: StageView, user: User) -> Device:
259    """Duo authentication"""
260    device = get_object_or_404(DuoDevice, pk=device_pk)
261    if device.user != user:
262        LOGGER.warning("device mismatch")
263        raise Http404
264    stage: AuthenticatorDuoStage = device.stage
265
266    # Get additional context for push
267    pushinfo = {
268        __("Domain"): stage_view.request.get_host(),
269    }
270    if PLAN_CONTEXT_APPLICATION in stage_view.executor.plan.context:
271        pushinfo[__("Application")] = stage_view.executor.plan.context.get(
272            PLAN_CONTEXT_APPLICATION, Application()
273        ).name
274
275    try:
276        response = stage.auth_client().auth(
277            "auto",
278            user_id=device.duo_user_id,
279            ipaddr=ClientIPMiddleware.get_client_ip(stage_view.request),
280            type=__(
281                "{brand_name} Login request".format_map(
282                    {
283                        "brand_name": stage_view.request.brand.branding_title,
284                    }
285                )
286            ),
287            display_username=user.username,
288            device="auto",
289            pushinfo=urlencode(pushinfo),
290        )
291        # {'result': 'allow', 'status': 'allow', 'status_msg': 'Success. Logging you in...'}
292        if response["result"] == "deny":
293            LOGGER.debug("duo push response", result=response["result"], msg=response["status_msg"])
294            login_failed.send(
295                sender=__name__,
296                credentials={"username": user.username},
297                request=stage_view.request,
298                stage=stage_view.executor.current_stage,
299                context={
300                    PLAN_CONTEXT_METHOD_ARGS: {
301                        "device_class": DeviceClasses.DUO.value,
302                        "duo_response": response,
303                    }
304                },
305            )
306            raise ValidationError("Duo denied access", code="denied")
307        return device
308    except RuntimeError as exc:
309        Event.new(
310            EventAction.CONFIGURATION_ERROR,
311            message=f"Failed to DUO authenticate user: {str(exc)}",
312            user=user,
313        ).from_http(stage_view.request, user)
314        raise ValidationError("Duo denied access", code="denied") from exc
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
class DeviceChallenge(authentik.core.api.utils.PassiveSerializer):
50class DeviceChallenge(PassiveSerializer):
51    """Single device challenge"""
52
53    device_class = ChoiceField(choices=DeviceClasses.choices)
54    device_uid = CharField()
55    challenge = JSONDictField()
56    last_used = DateTimeField(allow_null=True)

Single device challenge

device_class
device_uid
challenge
last_used
def get_challenge_for_device(unknown):
59def get_challenge_for_device(
60    stage_view: AuthenticatorValidateStageView, stage: AuthenticatorValidateStage, device: Device
61) -> dict:
62    """Generate challenge for a single device"""
63    if isinstance(device, WebAuthnDevice):
64        return get_webauthn_challenge(stage_view, stage, device)
65    if isinstance(device, EmailDevice):
66        return {"email": mask_email(device.email)}
67    # Code-based challenges have no hints
68    return {}

Generate challenge for a single device

def get_webauthn_challenge_without_user(unknown):
71def get_webauthn_challenge_without_user(
72    stage_view: AuthenticatorValidateStageView, stage: AuthenticatorValidateStage
73) -> dict:
74    """Same as `get_webauthn_challenge`, but allows any client device. We can then later check
75    who the device belongs to."""
76    stage_view.executor.plan.context.pop(PLAN_CONTEXT_WEBAUTHN_CHALLENGE, None)
77    authentication_options = generate_authentication_options(
78        rp_id=get_rp_id(stage_view.request),
79        allow_credentials=[],
80        user_verification=UserVerificationRequirement(stage.webauthn_user_verification),
81    )
82    stage_view.executor.plan.context[PLAN_CONTEXT_WEBAUTHN_CHALLENGE] = (
83        authentication_options.challenge
84    )
85
86    options_dict = options_to_json_dict(authentication_options)
87    if stage.webauthn_hints:
88        options_dict["hints"] = list(stage.webauthn_hints)
89    return options_dict

Same as get_webauthn_challenge, but allows any client device. We can then later check who the device belongs to.

def get_webauthn_challenge(unknown):
 92def get_webauthn_challenge(
 93    stage_view: AuthenticatorValidateStageView,
 94    stage: AuthenticatorValidateStage,
 95    device: WebAuthnDevice | None = None,
 96) -> dict:
 97    """Send the client a challenge that we'll check later"""
 98    stage_view.executor.plan.context.pop(PLAN_CONTEXT_WEBAUTHN_CHALLENGE, None)
 99
100    allowed_credentials = []
101
102    if device:
103        # We want all the user's WebAuthn devices and merge their challenges
104        for user_device in WebAuthnDevice.objects.filter(user=device.user).order_by("name"):
105            user_device: WebAuthnDevice
106            allowed_credentials.append(user_device.descriptor)
107
108    authentication_options = generate_authentication_options(
109        rp_id=get_rp_id(stage_view.request),
110        allow_credentials=allowed_credentials,
111        user_verification=UserVerificationRequirement(stage.webauthn_user_verification),
112    )
113
114    stage_view.executor.plan.context[PLAN_CONTEXT_WEBAUTHN_CHALLENGE] = (
115        authentication_options.challenge
116    )
117
118    options_dict = options_to_json_dict(authentication_options)
119    if stage.webauthn_hints:
120        options_dict["hints"] = list(stage.webauthn_hints)
121    return options_dict

Send the client a challenge that we'll check later

def select_challenge( request: django.http.request.HttpRequest, device: authentik.stages.authenticator.models.Device):
124def select_challenge(request: HttpRequest, device: Device):
125    """Callback when the user selected a challenge in the frontend."""
126    if isinstance(device, SMSDevice):
127        select_challenge_sms(request, device)
128    elif isinstance(device, EmailDevice):
129        select_challenge_email(request, device)

Callback when the user selected a challenge in the frontend.

def select_challenge_sms( request: django.http.request.HttpRequest, device: authentik.stages.authenticator_sms.models.SMSDevice):
132def select_challenge_sms(request: HttpRequest, device: SMSDevice):
133    """Send SMS"""
134    device.generate_token()
135    device.stage.send(request, device.token, device)

Send SMS

def select_challenge_email( request: django.http.request.HttpRequest, device: authentik.stages.authenticator_email.models.EmailDevice):
138def select_challenge_email(request: HttpRequest, device: EmailDevice):
139    """Send Email"""
140    valid_secs: int = timedelta_from_string(device.stage.token_expiry).total_seconds()
141    device.generate_token(valid_secs=valid_secs)
142    device.stage.send(device)

Send Email

def validate_challenge_code( code: str, stage_view: authentik.flows.stage.StageView, user: authentik.core.models.User) -> authentik.stages.authenticator.models.Device:
145def validate_challenge_code(code: str, stage_view: StageView, user: User) -> Device:
146    """Validate code-based challenges. We test against every device, on purpose, as
147    the user mustn't choose between totp and static devices."""
148
149    with transaction.atomic():
150        for device in devices_for_user(user, for_verify=True):
151            if isinstance(device, ThrottlingMixin):
152                throttling_factor = stage_view.executor.current_stage.get_throttling_factor(
153                    DeviceClasses.from_model_label(device.model_label())
154                )
155                if throttling_factor is not None:
156                    device.set_throttle_factor(throttling_factor)
157            if device.verify_token(code):
158                break
159        else:
160            device = None
161
162    if not device:
163        login_failed.send(
164            sender=__name__,
165            credentials={"username": user.username},
166            request=stage_view.request,
167            stage=stage_view.executor.current_stage,
168            context={
169                PLAN_CONTEXT_METHOD_ARGS: {
170                    "device_class": DeviceClasses.TOTP.value,
171                }
172            },
173        )
174        raise ValidationError(
175            _("Invalid Token. Please ensure the time on your device is accurate and try again.")
176        )
177    return device

Validate code-based challenges. We test against every device, on purpose, as the user mustn't choose between totp and static devices.

180def validate_challenge_webauthn(
181    data: dict,
182    stage_view: StageView,
183    user: User,
184    stage: AuthenticatorValidateStage | None = None,
185) -> Device:
186    """Validate WebAuthn Challenge"""
187    request = stage_view.request
188    challenge = stage_view.executor.plan.context.get(PLAN_CONTEXT_WEBAUTHN_CHALLENGE)
189    stage = stage or stage_view.executor.current_stage
190
191    if "MinuteMaid" in request.META.get("HTTP_USER_AGENT", ""):
192        # Workaround for Android sign-in, when signing into Google Workspace on android while
193        # adding the account to the system (not in Chrome), for some reason `type` is not set
194        # so in that case we fall back to `public-key`
195        # since that's the only option we support anyways
196        data.setdefault("type", PublicKeyCredentialType.PUBLIC_KEY)
197    try:
198        credential = parse_authentication_credential_json(data)
199    except InvalidJSONStructure as exc:
200        LOGGER.warning("Invalid WebAuthn challenge response", exc=exc)
201        raise ValidationError("Invalid device", "invalid") from None
202
203    device = WebAuthnDevice.objects.filter(credential_id=credential.id).first()
204    if not device:
205        raise ValidationError("Invalid device", "invalid")
206    # We can only check the device's user if the user we're given isn't anonymous
207    # as this validation is also used for password-less login where webauthn is the very first
208    # step done by a user. Only if this validation happens at a later stage we can check
209    # that the device belongs to the user
210    if not user.is_anonymous and device.user != user:
211        raise ValidationError("Invalid device", "invalid")
212    # When a device_type was set when creating the device (2024.4+), and we have a limitation,
213    # make sure the device type is allowed.
214    if (
215        device.device_type
216        and stage.webauthn_allowed_device_types.exists()
217        and not stage.webauthn_allowed_device_types.filter(pk=device.device_type.pk).exists()
218    ):
219        raise ValidationError(
220            _(
221                "Invalid device type. Contact your {brand} administrator for help.".format(
222                    brand=stage_view.request.brand.branding_title
223                )
224            ),
225            "invalid",
226        )
227    try:
228        authentication_verification = verify_authentication_response(
229            credential=credential,
230            expected_challenge=challenge,
231            expected_rp_id=get_rp_id(request),
232            expected_origin=get_origin(request),
233            credential_public_key=base64url_to_bytes(device.public_key),
234            credential_current_sign_count=device.sign_count,
235            require_user_verification=stage.webauthn_user_verification == UserVerification.REQUIRED,
236        )
237    except InvalidAuthenticationResponse as exc:
238        LOGGER.warning("Assertion failed", exc=exc)
239        login_failed.send(
240            sender=__name__,
241            credentials={"username": user.username},
242            request=stage_view.request,
243            stage=stage_view.executor.current_stage,
244            context={
245                PLAN_CONTEXT_METHOD_ARGS: {
246                    "device": device,
247                    "device_class": DeviceClasses.WEBAUTHN.value,
248                    "device_type": device.device_type,
249                },
250            },
251        )
252        raise ValidationError("Assertion failed") from exc
253
254    with audit_ignore():
255        device.set_sign_count(authentication_verification.new_sign_count)
256    return device

Validate WebAuthn Challenge

def validate_challenge_duo( device_pk: int, stage_view: authentik.flows.stage.StageView, user: authentik.core.models.User) -> authentik.stages.authenticator.models.Device:
259def validate_challenge_duo(device_pk: int, stage_view: StageView, user: User) -> Device:
260    """Duo authentication"""
261    device = get_object_or_404(DuoDevice, pk=device_pk)
262    if device.user != user:
263        LOGGER.warning("device mismatch")
264        raise Http404
265    stage: AuthenticatorDuoStage = device.stage
266
267    # Get additional context for push
268    pushinfo = {
269        __("Domain"): stage_view.request.get_host(),
270    }
271    if PLAN_CONTEXT_APPLICATION in stage_view.executor.plan.context:
272        pushinfo[__("Application")] = stage_view.executor.plan.context.get(
273            PLAN_CONTEXT_APPLICATION, Application()
274        ).name
275
276    try:
277        response = stage.auth_client().auth(
278            "auto",
279            user_id=device.duo_user_id,
280            ipaddr=ClientIPMiddleware.get_client_ip(stage_view.request),
281            type=__(
282                "{brand_name} Login request".format_map(
283                    {
284                        "brand_name": stage_view.request.brand.branding_title,
285                    }
286                )
287            ),
288            display_username=user.username,
289            device="auto",
290            pushinfo=urlencode(pushinfo),
291        )
292        # {'result': 'allow', 'status': 'allow', 'status_msg': 'Success. Logging you in...'}
293        if response["result"] == "deny":
294            LOGGER.debug("duo push response", result=response["result"], msg=response["status_msg"])
295            login_failed.send(
296                sender=__name__,
297                credentials={"username": user.username},
298                request=stage_view.request,
299                stage=stage_view.executor.current_stage,
300                context={
301                    PLAN_CONTEXT_METHOD_ARGS: {
302                        "device_class": DeviceClasses.DUO.value,
303                        "duo_response": response,
304                    }
305                },
306            )
307            raise ValidationError("Duo denied access", code="denied")
308        return device
309    except RuntimeError as exc:
310        Event.new(
311            EventAction.CONFIGURATION_ERROR,
312            message=f"Failed to DUO authenticate user: {str(exc)}",
313            user=user,
314        ).from_http(stage_view.request, user)
315        raise ValidationError("Duo denied access", code="denied") from exc

Duo authentication