authentik.stages.captcha.stage

authentik captcha stage

  1"""authentik captcha stage"""
  2
  3from django.http.response import HttpResponse
  4from django.utils.translation import gettext as _
  5from requests import RequestException
  6from rest_framework.fields import BooleanField, CharField
  7from rest_framework.serializers import ValidationError
  8from structlog.stdlib import get_logger
  9
 10from authentik.flows.challenge import (
 11    Challenge,
 12    ChallengeResponse,
 13    WithUserInfoChallenge,
 14)
 15from authentik.flows.stage import ChallengeStageView
 16from authentik.lib.utils.http import get_http_session
 17from authentik.root.middleware import ClientIPMiddleware
 18from authentik.stages.captcha.models import CaptchaStage
 19
 20LOGGER = get_logger()
 21PLAN_CONTEXT_CAPTCHA = "captcha"
 22PLAN_CONTEXT_CAPTCHA_SITE_KEY = "goauthentik.io/stages/captcha/site_key"
 23PLAN_CONTEXT_CAPTCHA_PRIVATE_KEY = "goauthentik.io/stages/captcha/private_key"
 24
 25
 26class CaptchaChallenge(WithUserInfoChallenge):
 27    """Site public key"""
 28
 29    component = CharField(default="ak-stage-captcha")
 30
 31    site_key = CharField(required=True)
 32    js_url = CharField(required=True)
 33    interactive = BooleanField(required=True)
 34
 35
 36def verify_captcha_token(stage: CaptchaStage, token: str, remote_ip: str, key: str | None = None):
 37    """Validate captcha token"""
 38    try:
 39        response = get_http_session().post(
 40            stage.api_url,
 41            headers={
 42                "Content-type": "application/x-www-form-urlencoded",
 43            },
 44            data={
 45                "secret": key or stage.private_key,
 46                "response": token,
 47                "remoteip": remote_ip,
 48            },
 49        )
 50        response.raise_for_status()
 51        data = response.json()
 52        if stage.error_on_invalid_score:
 53            if not data.get("success", False):
 54                error_codes = data.get("error-codes", ["unknown-error"])
 55                LOGGER.warning("Failed to verify captcha token", error_codes=error_codes)
 56
 57                # These cases can usually be fixed by simply requesting a new token and retrying.
 58                # [reCAPTCHA](https://developers.google.com/recaptcha/docs/verify#error_code_reference)
 59                # [hCaptcha](https://docs.hcaptcha.com/#siteverify-error-codes-table)
 60                # [Turnstile](https://developers.cloudflare.com/turnstile/get-started/server-side-validation/#error-codes)
 61                retryable_error_codes = [
 62                    "missing-input-response",
 63                    "invalid-input-response",
 64                    "timeout-or-duplicate",
 65                    "expired-input-response",
 66                    "already-seen-response",
 67                ]
 68
 69                if set(error_codes).issubset(set(retryable_error_codes)):
 70                    error_message = _("Invalid captcha response. Retrying may solve this issue.")
 71                else:
 72                    error_message = _("Invalid captcha response")
 73                raise ValidationError(error_message)
 74            if "score" in data:
 75                score = float(data.get("score"))
 76                if stage.score_max_threshold > -1 and score > stage.score_max_threshold:
 77                    raise ValidationError(_("Invalid captcha response"))
 78                if stage.score_min_threshold > -1 and score < stage.score_min_threshold:
 79                    raise ValidationError(_("Invalid captcha response"))
 80    except (RequestException, TypeError) as exc:
 81        raise ValidationError(_("Failed to validate token")) from exc
 82
 83    return data
 84
 85
 86class CaptchaChallengeResponse(ChallengeResponse):
 87    """Validate captcha token"""
 88
 89    token = CharField()
 90    component = CharField(default="ak-stage-captcha")
 91
 92    def validate_token(self, token: str) -> str:
 93        """Validate captcha token"""
 94        stage: CaptchaStage = self.stage.executor.current_stage
 95        client_ip = ClientIPMiddleware.get_client_ip(self.stage.request)
 96
 97        return verify_captcha_token(
 98            stage,
 99            token,
100            client_ip,
101            key=self.stage.executor.plan.context.get(PLAN_CONTEXT_CAPTCHA_PRIVATE_KEY),
102        )
103
104
105class CaptchaStageView(ChallengeStageView):
106    """Simple captcha checker, logic is handled in django-captcha module"""
107
108    response_class = CaptchaChallengeResponse
109
110    def get_challenge(self, *args, **kwargs) -> Challenge:
111        site_key = self.executor.plan.context.get(
112            PLAN_CONTEXT_CAPTCHA_SITE_KEY, self.executor.current_stage.public_key
113        )
114        return CaptchaChallenge(
115            data={
116                "js_url": self.executor.current_stage.js_url,
117                "site_key": site_key,
118                "interactive": self.executor.current_stage.interactive,
119            }
120        )
121
122    def challenge_valid(self, response: CaptchaChallengeResponse) -> HttpResponse:
123        response = response.validated_data["token"]
124        self.executor.plan.context[PLAN_CONTEXT_CAPTCHA] = {
125            "response": response,
126            "stage": self.executor.current_stage,
127        }
128        return self.executor.stage_ok()
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
PLAN_CONTEXT_CAPTCHA = 'captcha'
PLAN_CONTEXT_CAPTCHA_SITE_KEY = 'goauthentik.io/stages/captcha/site_key'
PLAN_CONTEXT_CAPTCHA_PRIVATE_KEY = 'goauthentik.io/stages/captcha/private_key'
class CaptchaChallenge(authentik.flows.challenge.WithUserInfoChallenge):
27class CaptchaChallenge(WithUserInfoChallenge):
28    """Site public key"""
29
30    component = CharField(default="ak-stage-captcha")
31
32    site_key = CharField(required=True)
33    js_url = CharField(required=True)
34    interactive = BooleanField(required=True)

Site public key

component
site_key
js_url
interactive
def verify_captcha_token( stage: authentik.stages.captcha.models.CaptchaStage, token: str, remote_ip: str, key: str | None = None):
37def verify_captcha_token(stage: CaptchaStage, token: str, remote_ip: str, key: str | None = None):
38    """Validate captcha token"""
39    try:
40        response = get_http_session().post(
41            stage.api_url,
42            headers={
43                "Content-type": "application/x-www-form-urlencoded",
44            },
45            data={
46                "secret": key or stage.private_key,
47                "response": token,
48                "remoteip": remote_ip,
49            },
50        )
51        response.raise_for_status()
52        data = response.json()
53        if stage.error_on_invalid_score:
54            if not data.get("success", False):
55                error_codes = data.get("error-codes", ["unknown-error"])
56                LOGGER.warning("Failed to verify captcha token", error_codes=error_codes)
57
58                # These cases can usually be fixed by simply requesting a new token and retrying.
59                # [reCAPTCHA](https://developers.google.com/recaptcha/docs/verify#error_code_reference)
60                # [hCaptcha](https://docs.hcaptcha.com/#siteverify-error-codes-table)
61                # [Turnstile](https://developers.cloudflare.com/turnstile/get-started/server-side-validation/#error-codes)
62                retryable_error_codes = [
63                    "missing-input-response",
64                    "invalid-input-response",
65                    "timeout-or-duplicate",
66                    "expired-input-response",
67                    "already-seen-response",
68                ]
69
70                if set(error_codes).issubset(set(retryable_error_codes)):
71                    error_message = _("Invalid captcha response. Retrying may solve this issue.")
72                else:
73                    error_message = _("Invalid captcha response")
74                raise ValidationError(error_message)
75            if "score" in data:
76                score = float(data.get("score"))
77                if stage.score_max_threshold > -1 and score > stage.score_max_threshold:
78                    raise ValidationError(_("Invalid captcha response"))
79                if stage.score_min_threshold > -1 and score < stage.score_min_threshold:
80                    raise ValidationError(_("Invalid captcha response"))
81    except (RequestException, TypeError) as exc:
82        raise ValidationError(_("Failed to validate token")) from exc
83
84    return data

Validate captcha token

class CaptchaChallengeResponse(authentik.flows.challenge.ChallengeResponse):
 87class CaptchaChallengeResponse(ChallengeResponse):
 88    """Validate captcha token"""
 89
 90    token = CharField()
 91    component = CharField(default="ak-stage-captcha")
 92
 93    def validate_token(self, token: str) -> str:
 94        """Validate captcha token"""
 95        stage: CaptchaStage = self.stage.executor.current_stage
 96        client_ip = ClientIPMiddleware.get_client_ip(self.stage.request)
 97
 98        return verify_captcha_token(
 99            stage,
100            token,
101            client_ip,
102            key=self.stage.executor.plan.context.get(PLAN_CONTEXT_CAPTCHA_PRIVATE_KEY),
103        )

Validate captcha token

token
component
def validate_token(self, token: str) -> str:
 93    def validate_token(self, token: str) -> str:
 94        """Validate captcha token"""
 95        stage: CaptchaStage = self.stage.executor.current_stage
 96        client_ip = ClientIPMiddleware.get_client_ip(self.stage.request)
 97
 98        return verify_captcha_token(
 99            stage,
100            token,
101            client_ip,
102            key=self.stage.executor.plan.context.get(PLAN_CONTEXT_CAPTCHA_PRIVATE_KEY),
103        )

Validate captcha token

class CaptchaStageView(authentik.flows.stage.ChallengeStageView):
106class CaptchaStageView(ChallengeStageView):
107    """Simple captcha checker, logic is handled in django-captcha module"""
108
109    response_class = CaptchaChallengeResponse
110
111    def get_challenge(self, *args, **kwargs) -> Challenge:
112        site_key = self.executor.plan.context.get(
113            PLAN_CONTEXT_CAPTCHA_SITE_KEY, self.executor.current_stage.public_key
114        )
115        return CaptchaChallenge(
116            data={
117                "js_url": self.executor.current_stage.js_url,
118                "site_key": site_key,
119                "interactive": self.executor.current_stage.interactive,
120            }
121        )
122
123    def challenge_valid(self, response: CaptchaChallengeResponse) -> HttpResponse:
124        response = response.validated_data["token"]
125        self.executor.plan.context[PLAN_CONTEXT_CAPTCHA] = {
126            "response": response,
127            "stage": self.executor.current_stage,
128        }
129        return self.executor.stage_ok()

Simple captcha checker, logic is handled in django-captcha module

response_class = <class 'CaptchaChallengeResponse'>
def get_challenge(self, *args, **kwargs) -> authentik.flows.challenge.Challenge:
111    def get_challenge(self, *args, **kwargs) -> Challenge:
112        site_key = self.executor.plan.context.get(
113            PLAN_CONTEXT_CAPTCHA_SITE_KEY, self.executor.current_stage.public_key
114        )
115        return CaptchaChallenge(
116            data={
117                "js_url": self.executor.current_stage.js_url,
118                "site_key": site_key,
119                "interactive": self.executor.current_stage.interactive,
120            }
121        )

Return the challenge that the client should solve

def challenge_valid( self, response: CaptchaChallengeResponse) -> django.http.response.HttpResponse:
123    def challenge_valid(self, response: CaptchaChallengeResponse) -> HttpResponse:
124        response = response.validated_data["token"]
125        self.executor.plan.context[PLAN_CONTEXT_CAPTCHA] = {
126            "response": response,
127            "stage": self.executor.current_stage,
128        }
129        return self.executor.stage_ok()

Callback when the challenge has the correct format