authentik.stages.captcha.stage
authentik captcha stage
1"""authentik captcha stage""" 2 3from django.http.response import HttpResponse 4from django.utils.translation import gettext as _ 5from requests import RequestException 6from rest_framework.fields import BooleanField, CharField 7from rest_framework.serializers import ValidationError 8from structlog.stdlib import get_logger 9 10from authentik.flows.challenge import ( 11 Challenge, 12 ChallengeResponse, 13 WithUserInfoChallenge, 14) 15from authentik.flows.stage import ChallengeStageView 16from authentik.lib.utils.http import get_http_session 17from authentik.root.middleware import ClientIPMiddleware 18from authentik.stages.captcha.models import CaptchaStage 19 20LOGGER = get_logger() 21PLAN_CONTEXT_CAPTCHA = "captcha" 22PLAN_CONTEXT_CAPTCHA_SITE_KEY = "goauthentik.io/stages/captcha/site_key" 23PLAN_CONTEXT_CAPTCHA_PRIVATE_KEY = "goauthentik.io/stages/captcha/private_key" 24 25 26class CaptchaChallenge(WithUserInfoChallenge): 27 """Site public key""" 28 29 component = CharField(default="ak-stage-captcha") 30 31 site_key = CharField(required=True) 32 js_url = CharField(required=True) 33 interactive = BooleanField(required=True) 34 35 36def verify_captcha_token(stage: CaptchaStage, token: str, remote_ip: str, key: str | None = None): 37 """Validate captcha token""" 38 try: 39 response = get_http_session().post( 40 stage.api_url, 41 headers={ 42 "Content-type": "application/x-www-form-urlencoded", 43 }, 44 data={ 45 "secret": key or stage.private_key, 46 "response": token, 47 "remoteip": remote_ip, 48 }, 49 ) 50 response.raise_for_status() 51 data = response.json() 52 if stage.error_on_invalid_score: 53 if not data.get("success", False): 54 error_codes = data.get("error-codes", ["unknown-error"]) 55 LOGGER.warning("Failed to verify captcha token", error_codes=error_codes) 56 57 # These cases can usually be fixed by simply requesting a new token and retrying. 58 # [reCAPTCHA](https://developers.google.com/recaptcha/docs/verify#error_code_reference) 59 # [hCaptcha](https://docs.hcaptcha.com/#siteverify-error-codes-table) 60 # [Turnstile](https://developers.cloudflare.com/turnstile/get-started/server-side-validation/#error-codes) 61 retryable_error_codes = [ 62 "missing-input-response", 63 "invalid-input-response", 64 "timeout-or-duplicate", 65 "expired-input-response", 66 "already-seen-response", 67 ] 68 69 if set(error_codes).issubset(set(retryable_error_codes)): 70 error_message = _("Invalid captcha response. Retrying may solve this issue.") 71 else: 72 error_message = _("Invalid captcha response") 73 raise ValidationError(error_message) 74 if "score" in data: 75 score = float(data.get("score")) 76 if stage.score_max_threshold > -1 and score > stage.score_max_threshold: 77 raise ValidationError(_("Invalid captcha response")) 78 if stage.score_min_threshold > -1 and score < stage.score_min_threshold: 79 raise ValidationError(_("Invalid captcha response")) 80 except (RequestException, TypeError) as exc: 81 raise ValidationError(_("Failed to validate token")) from exc 82 83 return data 84 85 86class CaptchaChallengeResponse(ChallengeResponse): 87 """Validate captcha token""" 88 89 token = CharField() 90 component = CharField(default="ak-stage-captcha") 91 92 def validate_token(self, token: str) -> str: 93 """Validate captcha token""" 94 stage: CaptchaStage = self.stage.executor.current_stage 95 client_ip = ClientIPMiddleware.get_client_ip(self.stage.request) 96 97 return verify_captcha_token( 98 stage, 99 token, 100 client_ip, 101 key=self.stage.executor.plan.context.get(PLAN_CONTEXT_CAPTCHA_PRIVATE_KEY), 102 ) 103 104 105class CaptchaStageView(ChallengeStageView): 106 """Simple captcha checker, logic is handled in django-captcha module""" 107 108 response_class = CaptchaChallengeResponse 109 110 def get_challenge(self, *args, **kwargs) -> Challenge: 111 site_key = self.executor.plan.context.get( 112 PLAN_CONTEXT_CAPTCHA_SITE_KEY, self.executor.current_stage.public_key 113 ) 114 return CaptchaChallenge( 115 data={ 116 "js_url": self.executor.current_stage.js_url, 117 "site_key": site_key, 118 "interactive": self.executor.current_stage.interactive, 119 } 120 ) 121 122 def challenge_valid(self, response: CaptchaChallengeResponse) -> HttpResponse: 123 response = response.validated_data["token"] 124 self.executor.plan.context[PLAN_CONTEXT_CAPTCHA] = { 125 "response": response, 126 "stage": self.executor.current_stage, 127 } 128 return self.executor.stage_ok()
LOGGER =
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
PLAN_CONTEXT_CAPTCHA =
'captcha'
PLAN_CONTEXT_CAPTCHA_SITE_KEY =
'goauthentik.io/stages/captcha/site_key'
PLAN_CONTEXT_CAPTCHA_PRIVATE_KEY =
'goauthentik.io/stages/captcha/private_key'
27class CaptchaChallenge(WithUserInfoChallenge): 28 """Site public key""" 29 30 component = CharField(default="ak-stage-captcha") 31 32 site_key = CharField(required=True) 33 js_url = CharField(required=True) 34 interactive = BooleanField(required=True)
Site public key
def
verify_captcha_token( stage: authentik.stages.captcha.models.CaptchaStage, token: str, remote_ip: str, key: str | None = None):
37def verify_captcha_token(stage: CaptchaStage, token: str, remote_ip: str, key: str | None = None): 38 """Validate captcha token""" 39 try: 40 response = get_http_session().post( 41 stage.api_url, 42 headers={ 43 "Content-type": "application/x-www-form-urlencoded", 44 }, 45 data={ 46 "secret": key or stage.private_key, 47 "response": token, 48 "remoteip": remote_ip, 49 }, 50 ) 51 response.raise_for_status() 52 data = response.json() 53 if stage.error_on_invalid_score: 54 if not data.get("success", False): 55 error_codes = data.get("error-codes", ["unknown-error"]) 56 LOGGER.warning("Failed to verify captcha token", error_codes=error_codes) 57 58 # These cases can usually be fixed by simply requesting a new token and retrying. 59 # [reCAPTCHA](https://developers.google.com/recaptcha/docs/verify#error_code_reference) 60 # [hCaptcha](https://docs.hcaptcha.com/#siteverify-error-codes-table) 61 # [Turnstile](https://developers.cloudflare.com/turnstile/get-started/server-side-validation/#error-codes) 62 retryable_error_codes = [ 63 "missing-input-response", 64 "invalid-input-response", 65 "timeout-or-duplicate", 66 "expired-input-response", 67 "already-seen-response", 68 ] 69 70 if set(error_codes).issubset(set(retryable_error_codes)): 71 error_message = _("Invalid captcha response. Retrying may solve this issue.") 72 else: 73 error_message = _("Invalid captcha response") 74 raise ValidationError(error_message) 75 if "score" in data: 76 score = float(data.get("score")) 77 if stage.score_max_threshold > -1 and score > stage.score_max_threshold: 78 raise ValidationError(_("Invalid captcha response")) 79 if stage.score_min_threshold > -1 and score < stage.score_min_threshold: 80 raise ValidationError(_("Invalid captcha response")) 81 except (RequestException, TypeError) as exc: 82 raise ValidationError(_("Failed to validate token")) from exc 83 84 return data
Validate captcha token
87class CaptchaChallengeResponse(ChallengeResponse): 88 """Validate captcha token""" 89 90 token = CharField() 91 component = CharField(default="ak-stage-captcha") 92 93 def validate_token(self, token: str) -> str: 94 """Validate captcha token""" 95 stage: CaptchaStage = self.stage.executor.current_stage 96 client_ip = ClientIPMiddleware.get_client_ip(self.stage.request) 97 98 return verify_captcha_token( 99 stage, 100 token, 101 client_ip, 102 key=self.stage.executor.plan.context.get(PLAN_CONTEXT_CAPTCHA_PRIVATE_KEY), 103 )
Validate captcha token
def
validate_token(self, token: str) -> str:
93 def validate_token(self, token: str) -> str: 94 """Validate captcha token""" 95 stage: CaptchaStage = self.stage.executor.current_stage 96 client_ip = ClientIPMiddleware.get_client_ip(self.stage.request) 97 98 return verify_captcha_token( 99 stage, 100 token, 101 client_ip, 102 key=self.stage.executor.plan.context.get(PLAN_CONTEXT_CAPTCHA_PRIVATE_KEY), 103 )
Validate captcha token
106class CaptchaStageView(ChallengeStageView): 107 """Simple captcha checker, logic is handled in django-captcha module""" 108 109 response_class = CaptchaChallengeResponse 110 111 def get_challenge(self, *args, **kwargs) -> Challenge: 112 site_key = self.executor.plan.context.get( 113 PLAN_CONTEXT_CAPTCHA_SITE_KEY, self.executor.current_stage.public_key 114 ) 115 return CaptchaChallenge( 116 data={ 117 "js_url": self.executor.current_stage.js_url, 118 "site_key": site_key, 119 "interactive": self.executor.current_stage.interactive, 120 } 121 ) 122 123 def challenge_valid(self, response: CaptchaChallengeResponse) -> HttpResponse: 124 response = response.validated_data["token"] 125 self.executor.plan.context[PLAN_CONTEXT_CAPTCHA] = { 126 "response": response, 127 "stage": self.executor.current_stage, 128 } 129 return self.executor.stage_ok()
Simple captcha checker, logic is handled in django-captcha module
response_class =
<class 'CaptchaChallengeResponse'>
111 def get_challenge(self, *args, **kwargs) -> Challenge: 112 site_key = self.executor.plan.context.get( 113 PLAN_CONTEXT_CAPTCHA_SITE_KEY, self.executor.current_stage.public_key 114 ) 115 return CaptchaChallenge( 116 data={ 117 "js_url": self.executor.current_stage.js_url, 118 "site_key": site_key, 119 "interactive": self.executor.current_stage.interactive, 120 } 121 )
Return the challenge that the client should solve
def
challenge_valid( self, response: CaptchaChallengeResponse) -> django.http.response.HttpResponse:
123 def challenge_valid(self, response: CaptchaChallengeResponse) -> HttpResponse: 124 response = response.validated_data["token"] 125 self.executor.plan.context[PLAN_CONTEXT_CAPTCHA] = { 126 "response": response, 127 "stage": self.executor.current_stage, 128 } 129 return self.executor.stage_ok()
Callback when the challenge has the correct format