authentik.stages.authenticator_validate.challenge

Validation stage challenge checking

  1"""Validation stage challenge checking"""
  2
  3from typing import TYPE_CHECKING
  4from urllib.parse import urlencode
  5
  6from django.http import HttpRequest
  7from django.http.response import Http404
  8from django.shortcuts import get_object_or_404
  9from django.utils.translation import gettext as __
 10from django.utils.translation import gettext_lazy as _
 11from rest_framework.fields import CharField, ChoiceField, DateTimeField
 12from rest_framework.serializers import ValidationError
 13from structlog.stdlib import get_logger
 14from webauthn.authentication.generate_authentication_options import generate_authentication_options
 15from webauthn.authentication.verify_authentication_response import verify_authentication_response
 16from webauthn.helpers import parse_authentication_credential_json
 17from webauthn.helpers.base64url_to_bytes import base64url_to_bytes
 18from webauthn.helpers.exceptions import InvalidAuthenticationResponse, InvalidJSONStructure
 19from webauthn.helpers.options_to_json_dict import options_to_json_dict
 20from webauthn.helpers.structs import PublicKeyCredentialType, UserVerificationRequirement
 21
 22from authentik.core.api.utils import JSONDictField, PassiveSerializer
 23from authentik.core.models import Application, User
 24from authentik.core.signals import login_failed
 25from authentik.events.middleware import audit_ignore
 26from authentik.events.models import Event, EventAction
 27from authentik.flows.planner import PLAN_CONTEXT_APPLICATION
 28from authentik.flows.stage import StageView
 29from authentik.lib.utils.email import mask_email
 30from authentik.lib.utils.time import timedelta_from_string
 31from authentik.root.middleware import ClientIPMiddleware
 32from authentik.stages.authenticator import match_token
 33from authentik.stages.authenticator.models import Device
 34from authentik.stages.authenticator_duo.models import AuthenticatorDuoStage, DuoDevice
 35from authentik.stages.authenticator_email.models import EmailDevice
 36from authentik.stages.authenticator_sms.models import SMSDevice
 37from authentik.stages.authenticator_validate.models import AuthenticatorValidateStage, DeviceClasses
 38from authentik.stages.authenticator_webauthn.models import UserVerification, WebAuthnDevice
 39from authentik.stages.authenticator_webauthn.stage import PLAN_CONTEXT_WEBAUTHN_CHALLENGE
 40from authentik.stages.authenticator_webauthn.utils import get_origin, get_rp_id
 41from authentik.stages.password.stage import PLAN_CONTEXT_METHOD_ARGS
 42
 43LOGGER = get_logger()
 44if TYPE_CHECKING:
 45    from authentik.stages.authenticator_validate.stage import AuthenticatorValidateStageView
 46
 47
 48class DeviceChallenge(PassiveSerializer):
 49    """Single device challenge"""
 50
 51    device_class = ChoiceField(choices=DeviceClasses.choices)
 52    device_uid = CharField()
 53    challenge = JSONDictField()
 54    last_used = DateTimeField(allow_null=True)
 55
 56
 57def get_challenge_for_device(
 58    stage_view: AuthenticatorValidateStageView, stage: AuthenticatorValidateStage, device: Device
 59) -> dict:
 60    """Generate challenge for a single device"""
 61    if isinstance(device, WebAuthnDevice):
 62        return get_webauthn_challenge(stage_view, stage, device)
 63    if isinstance(device, EmailDevice):
 64        return {"email": mask_email(device.email)}
 65    # Code-based challenges have no hints
 66    return {}
 67
 68
 69def get_webauthn_challenge_without_user(
 70    stage_view: AuthenticatorValidateStageView, stage: AuthenticatorValidateStage
 71) -> dict:
 72    """Same as `get_webauthn_challenge`, but allows any client device. We can then later check
 73    who the device belongs to."""
 74    stage_view.executor.plan.context.pop(PLAN_CONTEXT_WEBAUTHN_CHALLENGE, None)
 75    authentication_options = generate_authentication_options(
 76        rp_id=get_rp_id(stage_view.request),
 77        allow_credentials=[],
 78        user_verification=UserVerificationRequirement(stage.webauthn_user_verification),
 79    )
 80    stage_view.executor.plan.context[PLAN_CONTEXT_WEBAUTHN_CHALLENGE] = (
 81        authentication_options.challenge
 82    )
 83
 84    options_dict = options_to_json_dict(authentication_options)
 85    if stage.webauthn_hints:
 86        options_dict["hints"] = list(stage.webauthn_hints)
 87    return options_dict
 88
 89
 90def get_webauthn_challenge(
 91    stage_view: AuthenticatorValidateStageView,
 92    stage: AuthenticatorValidateStage,
 93    device: WebAuthnDevice | None = None,
 94) -> dict:
 95    """Send the client a challenge that we'll check later"""
 96    stage_view.executor.plan.context.pop(PLAN_CONTEXT_WEBAUTHN_CHALLENGE, None)
 97
 98    allowed_credentials = []
 99
100    if device:
101        # We want all the user's WebAuthn devices and merge their challenges
102        for user_device in WebAuthnDevice.objects.filter(user=device.user).order_by("name"):
103            user_device: WebAuthnDevice
104            allowed_credentials.append(user_device.descriptor)
105
106    authentication_options = generate_authentication_options(
107        rp_id=get_rp_id(stage_view.request),
108        allow_credentials=allowed_credentials,
109        user_verification=UserVerificationRequirement(stage.webauthn_user_verification),
110    )
111
112    stage_view.executor.plan.context[PLAN_CONTEXT_WEBAUTHN_CHALLENGE] = (
113        authentication_options.challenge
114    )
115
116    options_dict = options_to_json_dict(authentication_options)
117    if stage.webauthn_hints:
118        options_dict["hints"] = list(stage.webauthn_hints)
119    return options_dict
120
121
122def select_challenge(request: HttpRequest, device: Device):
123    """Callback when the user selected a challenge in the frontend."""
124    if isinstance(device, SMSDevice):
125        select_challenge_sms(request, device)
126    elif isinstance(device, EmailDevice):
127        select_challenge_email(request, device)
128
129
130def select_challenge_sms(request: HttpRequest, device: SMSDevice):
131    """Send SMS"""
132    device.generate_token()
133    device.stage.send(request, device.token, device)
134
135
136def select_challenge_email(request: HttpRequest, device: EmailDevice):
137    """Send Email"""
138    valid_secs: int = timedelta_from_string(device.stage.token_expiry).total_seconds()
139    device.generate_token(valid_secs=valid_secs)
140    device.stage.send(device)
141
142
143def validate_challenge_code(code: str, stage_view: StageView, user: User) -> Device:
144    """Validate code-based challenges. We test against every device, on purpose, as
145    the user mustn't choose between totp and static devices."""
146    device = match_token(user, code)
147    if not device:
148        login_failed.send(
149            sender=__name__,
150            credentials={"username": user.username},
151            request=stage_view.request,
152            stage=stage_view.executor.current_stage,
153            context={
154                PLAN_CONTEXT_METHOD_ARGS: {
155                    "device_class": DeviceClasses.TOTP.value,
156                }
157            },
158        )
159        raise ValidationError(
160            _("Invalid Token. Please ensure the time on your device is accurate and try again.")
161        )
162    return device
163
164
165def validate_challenge_webauthn(
166    data: dict,
167    stage_view: StageView,
168    user: User,
169    stage: AuthenticatorValidateStage | None = None,
170) -> Device:
171    """Validate WebAuthn Challenge"""
172    request = stage_view.request
173    challenge = stage_view.executor.plan.context.get(PLAN_CONTEXT_WEBAUTHN_CHALLENGE)
174    stage = stage or stage_view.executor.current_stage
175
176    if "MinuteMaid" in request.META.get("HTTP_USER_AGENT", ""):
177        # Workaround for Android sign-in, when signing into Google Workspace on android while
178        # adding the account to the system (not in Chrome), for some reason `type` is not set
179        # so in that case we fall back to `public-key`
180        # since that's the only option we support anyways
181        data.setdefault("type", PublicKeyCredentialType.PUBLIC_KEY)
182    try:
183        credential = parse_authentication_credential_json(data)
184    except InvalidJSONStructure as exc:
185        LOGGER.warning("Invalid WebAuthn challenge response", exc=exc)
186        raise ValidationError("Invalid device", "invalid") from None
187
188    device = WebAuthnDevice.objects.filter(credential_id=credential.id).first()
189    if not device:
190        raise ValidationError("Invalid device", "invalid")
191    # We can only check the device's user if the user we're given isn't anonymous
192    # as this validation is also used for password-less login where webauthn is the very first
193    # step done by a user. Only if this validation happens at a later stage we can check
194    # that the device belongs to the user
195    if not user.is_anonymous and device.user != user:
196        raise ValidationError("Invalid device", "invalid")
197    # When a device_type was set when creating the device (2024.4+), and we have a limitation,
198    # make sure the device type is allowed.
199    if (
200        device.device_type
201        and stage.webauthn_allowed_device_types.exists()
202        and not stage.webauthn_allowed_device_types.filter(pk=device.device_type.pk).exists()
203    ):
204        raise ValidationError(
205            _(
206                "Invalid device type. Contact your {brand} administrator for help.".format(
207                    brand=stage_view.request.brand.branding_title
208                )
209            ),
210            "invalid",
211        )
212    try:
213        authentication_verification = verify_authentication_response(
214            credential=credential,
215            expected_challenge=challenge,
216            expected_rp_id=get_rp_id(request),
217            expected_origin=get_origin(request),
218            credential_public_key=base64url_to_bytes(device.public_key),
219            credential_current_sign_count=device.sign_count,
220            require_user_verification=stage.webauthn_user_verification == UserVerification.REQUIRED,
221        )
222    except InvalidAuthenticationResponse as exc:
223        LOGGER.warning("Assertion failed", exc=exc)
224        login_failed.send(
225            sender=__name__,
226            credentials={"username": user.username},
227            request=stage_view.request,
228            stage=stage_view.executor.current_stage,
229            context={
230                PLAN_CONTEXT_METHOD_ARGS: {
231                    "device": device,
232                    "device_class": DeviceClasses.WEBAUTHN.value,
233                    "device_type": device.device_type,
234                },
235            },
236        )
237        raise ValidationError("Assertion failed") from exc
238
239    with audit_ignore():
240        device.set_sign_count(authentication_verification.new_sign_count)
241    return device
242
243
244def validate_challenge_duo(device_pk: int, stage_view: StageView, user: User) -> Device:
245    """Duo authentication"""
246    device = get_object_or_404(DuoDevice, pk=device_pk)
247    if device.user != user:
248        LOGGER.warning("device mismatch")
249        raise Http404
250    stage: AuthenticatorDuoStage = device.stage
251
252    # Get additional context for push
253    pushinfo = {
254        __("Domain"): stage_view.request.get_host(),
255    }
256    if PLAN_CONTEXT_APPLICATION in stage_view.executor.plan.context:
257        pushinfo[__("Application")] = stage_view.executor.plan.context.get(
258            PLAN_CONTEXT_APPLICATION, Application()
259        ).name
260
261    try:
262        response = stage.auth_client().auth(
263            "auto",
264            user_id=device.duo_user_id,
265            ipaddr=ClientIPMiddleware.get_client_ip(stage_view.request),
266            type=__(
267                "{brand_name} Login request".format_map(
268                    {
269                        "brand_name": stage_view.request.brand.branding_title,
270                    }
271                )
272            ),
273            display_username=user.username,
274            device="auto",
275            pushinfo=urlencode(pushinfo),
276        )
277        # {'result': 'allow', 'status': 'allow', 'status_msg': 'Success. Logging you in...'}
278        if response["result"] == "deny":
279            LOGGER.debug("duo push response", result=response["result"], msg=response["status_msg"])
280            login_failed.send(
281                sender=__name__,
282                credentials={"username": user.username},
283                request=stage_view.request,
284                stage=stage_view.executor.current_stage,
285                context={
286                    PLAN_CONTEXT_METHOD_ARGS: {
287                        "device_class": DeviceClasses.DUO.value,
288                        "duo_response": response,
289                    }
290                },
291            )
292            raise ValidationError("Duo denied access", code="denied")
293        return device
294    except RuntimeError as exc:
295        Event.new(
296            EventAction.CONFIGURATION_ERROR,
297            message=f"Failed to DUO authenticate user: {str(exc)}",
298            user=user,
299        ).from_http(stage_view.request, user)
300        raise ValidationError("Duo denied access", code="denied") from exc
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
class DeviceChallenge(authentik.core.api.utils.PassiveSerializer):
49class DeviceChallenge(PassiveSerializer):
50    """Single device challenge"""
51
52    device_class = ChoiceField(choices=DeviceClasses.choices)
53    device_uid = CharField()
54    challenge = JSONDictField()
55    last_used = DateTimeField(allow_null=True)

Single device challenge

device_class
device_uid
challenge
last_used
def get_challenge_for_device(unknown):
58def get_challenge_for_device(
59    stage_view: AuthenticatorValidateStageView, stage: AuthenticatorValidateStage, device: Device
60) -> dict:
61    """Generate challenge for a single device"""
62    if isinstance(device, WebAuthnDevice):
63        return get_webauthn_challenge(stage_view, stage, device)
64    if isinstance(device, EmailDevice):
65        return {"email": mask_email(device.email)}
66    # Code-based challenges have no hints
67    return {}

Generate challenge for a single device

def get_webauthn_challenge_without_user(unknown):
70def get_webauthn_challenge_without_user(
71    stage_view: AuthenticatorValidateStageView, stage: AuthenticatorValidateStage
72) -> dict:
73    """Same as `get_webauthn_challenge`, but allows any client device. We can then later check
74    who the device belongs to."""
75    stage_view.executor.plan.context.pop(PLAN_CONTEXT_WEBAUTHN_CHALLENGE, None)
76    authentication_options = generate_authentication_options(
77        rp_id=get_rp_id(stage_view.request),
78        allow_credentials=[],
79        user_verification=UserVerificationRequirement(stage.webauthn_user_verification),
80    )
81    stage_view.executor.plan.context[PLAN_CONTEXT_WEBAUTHN_CHALLENGE] = (
82        authentication_options.challenge
83    )
84
85    options_dict = options_to_json_dict(authentication_options)
86    if stage.webauthn_hints:
87        options_dict["hints"] = list(stage.webauthn_hints)
88    return options_dict

Same as get_webauthn_challenge, but allows any client device. We can then later check who the device belongs to.

def get_webauthn_challenge(unknown):
 91def get_webauthn_challenge(
 92    stage_view: AuthenticatorValidateStageView,
 93    stage: AuthenticatorValidateStage,
 94    device: WebAuthnDevice | None = None,
 95) -> dict:
 96    """Send the client a challenge that we'll check later"""
 97    stage_view.executor.plan.context.pop(PLAN_CONTEXT_WEBAUTHN_CHALLENGE, None)
 98
 99    allowed_credentials = []
100
101    if device:
102        # We want all the user's WebAuthn devices and merge their challenges
103        for user_device in WebAuthnDevice.objects.filter(user=device.user).order_by("name"):
104            user_device: WebAuthnDevice
105            allowed_credentials.append(user_device.descriptor)
106
107    authentication_options = generate_authentication_options(
108        rp_id=get_rp_id(stage_view.request),
109        allow_credentials=allowed_credentials,
110        user_verification=UserVerificationRequirement(stage.webauthn_user_verification),
111    )
112
113    stage_view.executor.plan.context[PLAN_CONTEXT_WEBAUTHN_CHALLENGE] = (
114        authentication_options.challenge
115    )
116
117    options_dict = options_to_json_dict(authentication_options)
118    if stage.webauthn_hints:
119        options_dict["hints"] = list(stage.webauthn_hints)
120    return options_dict

Send the client a challenge that we'll check later

def select_challenge( request: django.http.request.HttpRequest, device: authentik.stages.authenticator.models.Device):
123def select_challenge(request: HttpRequest, device: Device):
124    """Callback when the user selected a challenge in the frontend."""
125    if isinstance(device, SMSDevice):
126        select_challenge_sms(request, device)
127    elif isinstance(device, EmailDevice):
128        select_challenge_email(request, device)

Callback when the user selected a challenge in the frontend.

def select_challenge_sms( request: django.http.request.HttpRequest, device: authentik.stages.authenticator_sms.models.SMSDevice):
131def select_challenge_sms(request: HttpRequest, device: SMSDevice):
132    """Send SMS"""
133    device.generate_token()
134    device.stage.send(request, device.token, device)

Send SMS

def select_challenge_email( request: django.http.request.HttpRequest, device: authentik.stages.authenticator_email.models.EmailDevice):
137def select_challenge_email(request: HttpRequest, device: EmailDevice):
138    """Send Email"""
139    valid_secs: int = timedelta_from_string(device.stage.token_expiry).total_seconds()
140    device.generate_token(valid_secs=valid_secs)
141    device.stage.send(device)

Send Email

def validate_challenge_code( code: str, stage_view: authentik.flows.stage.StageView, user: authentik.core.models.User) -> authentik.stages.authenticator.models.Device:
144def validate_challenge_code(code: str, stage_view: StageView, user: User) -> Device:
145    """Validate code-based challenges. We test against every device, on purpose, as
146    the user mustn't choose between totp and static devices."""
147    device = match_token(user, code)
148    if not device:
149        login_failed.send(
150            sender=__name__,
151            credentials={"username": user.username},
152            request=stage_view.request,
153            stage=stage_view.executor.current_stage,
154            context={
155                PLAN_CONTEXT_METHOD_ARGS: {
156                    "device_class": DeviceClasses.TOTP.value,
157                }
158            },
159        )
160        raise ValidationError(
161            _("Invalid Token. Please ensure the time on your device is accurate and try again.")
162        )
163    return device

Validate code-based challenges. We test against every device, on purpose, as the user mustn't choose between totp and static devices.

166def validate_challenge_webauthn(
167    data: dict,
168    stage_view: StageView,
169    user: User,
170    stage: AuthenticatorValidateStage | None = None,
171) -> Device:
172    """Validate WebAuthn Challenge"""
173    request = stage_view.request
174    challenge = stage_view.executor.plan.context.get(PLAN_CONTEXT_WEBAUTHN_CHALLENGE)
175    stage = stage or stage_view.executor.current_stage
176
177    if "MinuteMaid" in request.META.get("HTTP_USER_AGENT", ""):
178        # Workaround for Android sign-in, when signing into Google Workspace on android while
179        # adding the account to the system (not in Chrome), for some reason `type` is not set
180        # so in that case we fall back to `public-key`
181        # since that's the only option we support anyways
182        data.setdefault("type", PublicKeyCredentialType.PUBLIC_KEY)
183    try:
184        credential = parse_authentication_credential_json(data)
185    except InvalidJSONStructure as exc:
186        LOGGER.warning("Invalid WebAuthn challenge response", exc=exc)
187        raise ValidationError("Invalid device", "invalid") from None
188
189    device = WebAuthnDevice.objects.filter(credential_id=credential.id).first()
190    if not device:
191        raise ValidationError("Invalid device", "invalid")
192    # We can only check the device's user if the user we're given isn't anonymous
193    # as this validation is also used for password-less login where webauthn is the very first
194    # step done by a user. Only if this validation happens at a later stage we can check
195    # that the device belongs to the user
196    if not user.is_anonymous and device.user != user:
197        raise ValidationError("Invalid device", "invalid")
198    # When a device_type was set when creating the device (2024.4+), and we have a limitation,
199    # make sure the device type is allowed.
200    if (
201        device.device_type
202        and stage.webauthn_allowed_device_types.exists()
203        and not stage.webauthn_allowed_device_types.filter(pk=device.device_type.pk).exists()
204    ):
205        raise ValidationError(
206            _(
207                "Invalid device type. Contact your {brand} administrator for help.".format(
208                    brand=stage_view.request.brand.branding_title
209                )
210            ),
211            "invalid",
212        )
213    try:
214        authentication_verification = verify_authentication_response(
215            credential=credential,
216            expected_challenge=challenge,
217            expected_rp_id=get_rp_id(request),
218            expected_origin=get_origin(request),
219            credential_public_key=base64url_to_bytes(device.public_key),
220            credential_current_sign_count=device.sign_count,
221            require_user_verification=stage.webauthn_user_verification == UserVerification.REQUIRED,
222        )
223    except InvalidAuthenticationResponse as exc:
224        LOGGER.warning("Assertion failed", exc=exc)
225        login_failed.send(
226            sender=__name__,
227            credentials={"username": user.username},
228            request=stage_view.request,
229            stage=stage_view.executor.current_stage,
230            context={
231                PLAN_CONTEXT_METHOD_ARGS: {
232                    "device": device,
233                    "device_class": DeviceClasses.WEBAUTHN.value,
234                    "device_type": device.device_type,
235                },
236            },
237        )
238        raise ValidationError("Assertion failed") from exc
239
240    with audit_ignore():
241        device.set_sign_count(authentication_verification.new_sign_count)
242    return device

Validate WebAuthn Challenge

def validate_challenge_duo( device_pk: int, stage_view: authentik.flows.stage.StageView, user: authentik.core.models.User) -> authentik.stages.authenticator.models.Device:
245def validate_challenge_duo(device_pk: int, stage_view: StageView, user: User) -> Device:
246    """Duo authentication"""
247    device = get_object_or_404(DuoDevice, pk=device_pk)
248    if device.user != user:
249        LOGGER.warning("device mismatch")
250        raise Http404
251    stage: AuthenticatorDuoStage = device.stage
252
253    # Get additional context for push
254    pushinfo = {
255        __("Domain"): stage_view.request.get_host(),
256    }
257    if PLAN_CONTEXT_APPLICATION in stage_view.executor.plan.context:
258        pushinfo[__("Application")] = stage_view.executor.plan.context.get(
259            PLAN_CONTEXT_APPLICATION, Application()
260        ).name
261
262    try:
263        response = stage.auth_client().auth(
264            "auto",
265            user_id=device.duo_user_id,
266            ipaddr=ClientIPMiddleware.get_client_ip(stage_view.request),
267            type=__(
268                "{brand_name} Login request".format_map(
269                    {
270                        "brand_name": stage_view.request.brand.branding_title,
271                    }
272                )
273            ),
274            display_username=user.username,
275            device="auto",
276            pushinfo=urlencode(pushinfo),
277        )
278        # {'result': 'allow', 'status': 'allow', 'status_msg': 'Success. Logging you in...'}
279        if response["result"] == "deny":
280            LOGGER.debug("duo push response", result=response["result"], msg=response["status_msg"])
281            login_failed.send(
282                sender=__name__,
283                credentials={"username": user.username},
284                request=stage_view.request,
285                stage=stage_view.executor.current_stage,
286                context={
287                    PLAN_CONTEXT_METHOD_ARGS: {
288                        "device_class": DeviceClasses.DUO.value,
289                        "duo_response": response,
290                    }
291                },
292            )
293            raise ValidationError("Duo denied access", code="denied")
294        return device
295    except RuntimeError as exc:
296        Event.new(
297            EventAction.CONFIGURATION_ERROR,
298            message=f"Failed to DUO authenticate user: {str(exc)}",
299            user=user,
300        ).from_http(stage_view.request, user)
301        raise ValidationError("Duo denied access", code="denied") from exc

Duo authentication