authentik.stages.consent.stage
authentik consent stage
1"""authentik consent stage""" 2 3from hmac import compare_digest 4from uuid import uuid4 5 6from django.http import HttpRequest, HttpResponse 7from django.utils.timezone import now 8from django.utils.translation import gettext as _ 9from rest_framework.exceptions import ValidationError 10from rest_framework.fields import CharField 11 12from authentik.core.api.utils import PassiveSerializer 13from authentik.flows.challenge import ( 14 Challenge, 15 ChallengeResponse, 16 WithUserInfoChallenge, 17) 18from authentik.flows.planner import PLAN_CONTEXT_APPLICATION, PLAN_CONTEXT_PENDING_USER 19from authentik.flows.stage import ChallengeStageView 20from authentik.lib.utils.time import timedelta_from_string 21from authentik.stages.consent.models import ConsentMode, ConsentStage, UserConsent 22 23PLAN_CONTEXT_CONSENT = "consent" 24PLAN_CONTEXT_CONSENT_HEADER = "consent_header" 25PLAN_CONTEXT_CONSENT_PERMISSIONS = "consent_permissions" 26PLAN_CONTEXT_CONSENT_EXTRA_PERMISSIONS = "consent_additional_permissions" 27PLAN_CONTEXT_CONSENT_TOKEN = "goauthentik.io/stages/consent/token" # nosec 28 29 30class ConsentPermissionSerializer(PassiveSerializer): 31 """Permission used for consent""" 32 33 name = CharField(allow_blank=True) 34 id = CharField() 35 36 37class ConsentChallenge(WithUserInfoChallenge): 38 """Challenge info for consent screens""" 39 40 header_text = CharField(required=False) 41 permissions = ConsentPermissionSerializer(many=True) 42 additional_permissions = ConsentPermissionSerializer(many=True) 43 component = CharField(default="ak-stage-consent") 44 token = CharField(required=True) 45 46 47class ConsentChallengeResponse(ChallengeResponse): 48 """Consent challenge response, any valid response request is valid""" 49 50 component = CharField(default="ak-stage-consent") 51 token = CharField(required=True) 52 53 def validate_token(self, token: str): 54 if not compare_digest( 55 token, self.stage.executor.plan.context.get(PLAN_CONTEXT_CONSENT_TOKEN, "") 56 ): 57 raise ValidationError(_("Invalid consent token, re-showing prompt")) 58 return token 59 60 61class ConsentStageView(ChallengeStageView): 62 """Simple consent checker.""" 63 64 response_class = ConsentChallengeResponse 65 66 def get_challenge(self) -> Challenge: 67 token = str(uuid4()) 68 self.executor.plan.context[PLAN_CONTEXT_CONSENT_TOKEN] = token 69 data = { 70 "permissions": self.executor.plan.context.get(PLAN_CONTEXT_CONSENT_PERMISSIONS, []), 71 "additional_permissions": self.executor.plan.context.get( 72 PLAN_CONTEXT_CONSENT_EXTRA_PERMISSIONS, [] 73 ), 74 "token": token, 75 } 76 if PLAN_CONTEXT_CONSENT_HEADER in self.executor.plan.context: 77 data["header_text"] = self.executor.plan.context[PLAN_CONTEXT_CONSENT_HEADER] 78 challenge = ConsentChallenge(data=data) 79 return challenge 80 81 def should_always_prompt(self) -> bool: 82 """Check if the current request should require a prompt for non consent reasons, 83 i.e. this stage injected from another stage, mode is always required or no application 84 is set.""" 85 current_stage: ConsentStage = self.executor.current_stage 86 # Make this StageView work when injected, in which case `current_stage` is an instance 87 # of the base class, and we don't save any consent, as it is assumed to be a one-time 88 # prompt 89 if not isinstance(current_stage, ConsentStage): 90 return True 91 # For always require, we always return the challenge 92 if current_stage.mode == ConsentMode.ALWAYS_REQUIRE: 93 return True 94 # at this point we need to check consent from database 95 if PLAN_CONTEXT_APPLICATION not in self.executor.plan.context: 96 # No application in this plan, hence we can't check DB and require user consent 97 return True 98 return None 99 100 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 101 if self.should_always_prompt(): 102 return super().get(request, *args, **kwargs) 103 application = self.executor.plan.context[PLAN_CONTEXT_APPLICATION] 104 105 user = self.request.user 106 if PLAN_CONTEXT_PENDING_USER in self.executor.plan.context: 107 user = self.executor.plan.context[PLAN_CONTEXT_PENDING_USER] 108 109 consent: UserConsent | None = UserConsent.objects.filter( 110 user=user, application=application 111 ).first() 112 self.executor.plan.context[PLAN_CONTEXT_CONSENT] = consent 113 114 if consent: 115 perms = self.executor.plan.context.get(PLAN_CONTEXT_CONSENT_PERMISSIONS, []) 116 allowed_perms = set(consent.permissions.split(" ") if consent.permissions != "" else []) 117 requested_perms = set(x["id"] for x in perms) 118 119 if allowed_perms != requested_perms: 120 self.executor.plan.context[PLAN_CONTEXT_CONSENT_PERMISSIONS] = [ 121 x for x in perms if x["id"] in allowed_perms 122 ] 123 self.executor.plan.context[PLAN_CONTEXT_CONSENT_EXTRA_PERMISSIONS] = [ 124 x for x in perms if x["id"] in requested_perms.difference(allowed_perms) 125 ] 126 return super().get(request, *args, **kwargs) 127 return self.executor.stage_ok() 128 129 # No consent found, return consent prompt 130 return super().get(request, *args, **kwargs) 131 132 def challenge_valid(self, response: ChallengeResponse) -> HttpResponse: 133 if self.should_always_prompt(): 134 return self.executor.stage_ok() 135 current_stage: ConsentStage = self.executor.current_stage 136 application = self.executor.plan.context[PLAN_CONTEXT_APPLICATION] 137 permissions = self.executor.plan.context.get( 138 PLAN_CONTEXT_CONSENT_PERMISSIONS, [] 139 ) + self.executor.plan.context.get(PLAN_CONTEXT_CONSENT_EXTRA_PERMISSIONS, []) 140 permissions_string = " ".join(x["id"] for x in permissions) 141 142 if not self.executor.plan.context.get(PLAN_CONTEXT_CONSENT, None): 143 self.executor.plan.context[PLAN_CONTEXT_CONSENT] = UserConsent( 144 user=self.request.user, 145 application=application, 146 ) 147 consent: UserConsent = self.executor.plan.context[PLAN_CONTEXT_CONSENT] 148 consent.permissions = permissions_string 149 if current_stage.mode == ConsentMode.PERMANENT: 150 consent.expiring = False 151 if current_stage.mode == ConsentMode.EXPIRING: 152 consent.expires = now() + timedelta_from_string(current_stage.consent_expire_in) 153 consent.save() 154 return self.executor.stage_ok()
PLAN_CONTEXT_CONSENT =
'consent'
PLAN_CONTEXT_CONSENT_HEADER =
'consent_header'
PLAN_CONTEXT_CONSENT_PERMISSIONS =
'consent_permissions'
PLAN_CONTEXT_CONSENT_EXTRA_PERMISSIONS =
'consent_additional_permissions'
PLAN_CONTEXT_CONSENT_TOKEN =
'goauthentik.io/stages/consent/token'
31class ConsentPermissionSerializer(PassiveSerializer): 32 """Permission used for consent""" 33 34 name = CharField(allow_blank=True) 35 id = CharField()
Permission used for consent
Inherited Members
38class ConsentChallenge(WithUserInfoChallenge): 39 """Challenge info for consent screens""" 40 41 header_text = CharField(required=False) 42 permissions = ConsentPermissionSerializer(many=True) 43 additional_permissions = ConsentPermissionSerializer(many=True) 44 component = CharField(default="ak-stage-consent") 45 token = CharField(required=True)
Challenge info for consent screens
48class ConsentChallengeResponse(ChallengeResponse): 49 """Consent challenge response, any valid response request is valid""" 50 51 component = CharField(default="ak-stage-consent") 52 token = CharField(required=True) 53 54 def validate_token(self, token: str): 55 if not compare_digest( 56 token, self.stage.executor.plan.context.get(PLAN_CONTEXT_CONSENT_TOKEN, "") 57 ): 58 raise ValidationError(_("Invalid consent token, re-showing prompt")) 59 return token
Consent challenge response, any valid response request is valid
62class ConsentStageView(ChallengeStageView): 63 """Simple consent checker.""" 64 65 response_class = ConsentChallengeResponse 66 67 def get_challenge(self) -> Challenge: 68 token = str(uuid4()) 69 self.executor.plan.context[PLAN_CONTEXT_CONSENT_TOKEN] = token 70 data = { 71 "permissions": self.executor.plan.context.get(PLAN_CONTEXT_CONSENT_PERMISSIONS, []), 72 "additional_permissions": self.executor.plan.context.get( 73 PLAN_CONTEXT_CONSENT_EXTRA_PERMISSIONS, [] 74 ), 75 "token": token, 76 } 77 if PLAN_CONTEXT_CONSENT_HEADER in self.executor.plan.context: 78 data["header_text"] = self.executor.plan.context[PLAN_CONTEXT_CONSENT_HEADER] 79 challenge = ConsentChallenge(data=data) 80 return challenge 81 82 def should_always_prompt(self) -> bool: 83 """Check if the current request should require a prompt for non consent reasons, 84 i.e. this stage injected from another stage, mode is always required or no application 85 is set.""" 86 current_stage: ConsentStage = self.executor.current_stage 87 # Make this StageView work when injected, in which case `current_stage` is an instance 88 # of the base class, and we don't save any consent, as it is assumed to be a one-time 89 # prompt 90 if not isinstance(current_stage, ConsentStage): 91 return True 92 # For always require, we always return the challenge 93 if current_stage.mode == ConsentMode.ALWAYS_REQUIRE: 94 return True 95 # at this point we need to check consent from database 96 if PLAN_CONTEXT_APPLICATION not in self.executor.plan.context: 97 # No application in this plan, hence we can't check DB and require user consent 98 return True 99 return None 100 101 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 102 if self.should_always_prompt(): 103 return super().get(request, *args, **kwargs) 104 application = self.executor.plan.context[PLAN_CONTEXT_APPLICATION] 105 106 user = self.request.user 107 if PLAN_CONTEXT_PENDING_USER in self.executor.plan.context: 108 user = self.executor.plan.context[PLAN_CONTEXT_PENDING_USER] 109 110 consent: UserConsent | None = UserConsent.objects.filter( 111 user=user, application=application 112 ).first() 113 self.executor.plan.context[PLAN_CONTEXT_CONSENT] = consent 114 115 if consent: 116 perms = self.executor.plan.context.get(PLAN_CONTEXT_CONSENT_PERMISSIONS, []) 117 allowed_perms = set(consent.permissions.split(" ") if consent.permissions != "" else []) 118 requested_perms = set(x["id"] for x in perms) 119 120 if allowed_perms != requested_perms: 121 self.executor.plan.context[PLAN_CONTEXT_CONSENT_PERMISSIONS] = [ 122 x for x in perms if x["id"] in allowed_perms 123 ] 124 self.executor.plan.context[PLAN_CONTEXT_CONSENT_EXTRA_PERMISSIONS] = [ 125 x for x in perms if x["id"] in requested_perms.difference(allowed_perms) 126 ] 127 return super().get(request, *args, **kwargs) 128 return self.executor.stage_ok() 129 130 # No consent found, return consent prompt 131 return super().get(request, *args, **kwargs) 132 133 def challenge_valid(self, response: ChallengeResponse) -> HttpResponse: 134 if self.should_always_prompt(): 135 return self.executor.stage_ok() 136 current_stage: ConsentStage = self.executor.current_stage 137 application = self.executor.plan.context[PLAN_CONTEXT_APPLICATION] 138 permissions = self.executor.plan.context.get( 139 PLAN_CONTEXT_CONSENT_PERMISSIONS, [] 140 ) + self.executor.plan.context.get(PLAN_CONTEXT_CONSENT_EXTRA_PERMISSIONS, []) 141 permissions_string = " ".join(x["id"] for x in permissions) 142 143 if not self.executor.plan.context.get(PLAN_CONTEXT_CONSENT, None): 144 self.executor.plan.context[PLAN_CONTEXT_CONSENT] = UserConsent( 145 user=self.request.user, 146 application=application, 147 ) 148 consent: UserConsent = self.executor.plan.context[PLAN_CONTEXT_CONSENT] 149 consent.permissions = permissions_string 150 if current_stage.mode == ConsentMode.PERMANENT: 151 consent.expiring = False 152 if current_stage.mode == ConsentMode.EXPIRING: 153 consent.expires = now() + timedelta_from_string(current_stage.consent_expire_in) 154 consent.save() 155 return self.executor.stage_ok()
Simple consent checker.
response_class =
<class 'ConsentChallengeResponse'>
67 def get_challenge(self) -> Challenge: 68 token = str(uuid4()) 69 self.executor.plan.context[PLAN_CONTEXT_CONSENT_TOKEN] = token 70 data = { 71 "permissions": self.executor.plan.context.get(PLAN_CONTEXT_CONSENT_PERMISSIONS, []), 72 "additional_permissions": self.executor.plan.context.get( 73 PLAN_CONTEXT_CONSENT_EXTRA_PERMISSIONS, [] 74 ), 75 "token": token, 76 } 77 if PLAN_CONTEXT_CONSENT_HEADER in self.executor.plan.context: 78 data["header_text"] = self.executor.plan.context[PLAN_CONTEXT_CONSENT_HEADER] 79 challenge = ConsentChallenge(data=data) 80 return challenge
Return the challenge that the client should solve
def
should_always_prompt(self) -> bool:
82 def should_always_prompt(self) -> bool: 83 """Check if the current request should require a prompt for non consent reasons, 84 i.e. this stage injected from another stage, mode is always required or no application 85 is set.""" 86 current_stage: ConsentStage = self.executor.current_stage 87 # Make this StageView work when injected, in which case `current_stage` is an instance 88 # of the base class, and we don't save any consent, as it is assumed to be a one-time 89 # prompt 90 if not isinstance(current_stage, ConsentStage): 91 return True 92 # For always require, we always return the challenge 93 if current_stage.mode == ConsentMode.ALWAYS_REQUIRE: 94 return True 95 # at this point we need to check consent from database 96 if PLAN_CONTEXT_APPLICATION not in self.executor.plan.context: 97 # No application in this plan, hence we can't check DB and require user consent 98 return True 99 return None
Check if the current request should require a prompt for non consent reasons, i.e. this stage injected from another stage, mode is always required or no application is set.
def
get( self, request: django.http.request.HttpRequest, *args, **kwargs) -> django.http.response.HttpResponse:
101 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 102 if self.should_always_prompt(): 103 return super().get(request, *args, **kwargs) 104 application = self.executor.plan.context[PLAN_CONTEXT_APPLICATION] 105 106 user = self.request.user 107 if PLAN_CONTEXT_PENDING_USER in self.executor.plan.context: 108 user = self.executor.plan.context[PLAN_CONTEXT_PENDING_USER] 109 110 consent: UserConsent | None = UserConsent.objects.filter( 111 user=user, application=application 112 ).first() 113 self.executor.plan.context[PLAN_CONTEXT_CONSENT] = consent 114 115 if consent: 116 perms = self.executor.plan.context.get(PLAN_CONTEXT_CONSENT_PERMISSIONS, []) 117 allowed_perms = set(consent.permissions.split(" ") if consent.permissions != "" else []) 118 requested_perms = set(x["id"] for x in perms) 119 120 if allowed_perms != requested_perms: 121 self.executor.plan.context[PLAN_CONTEXT_CONSENT_PERMISSIONS] = [ 122 x for x in perms if x["id"] in allowed_perms 123 ] 124 self.executor.plan.context[PLAN_CONTEXT_CONSENT_EXTRA_PERMISSIONS] = [ 125 x for x in perms if x["id"] in requested_perms.difference(allowed_perms) 126 ] 127 return super().get(request, *args, **kwargs) 128 return self.executor.stage_ok() 129 130 # No consent found, return consent prompt 131 return super().get(request, *args, **kwargs)
Return a challenge for the frontend to solve
def
challenge_valid( self, response: authentik.flows.challenge.ChallengeResponse) -> django.http.response.HttpResponse:
133 def challenge_valid(self, response: ChallengeResponse) -> HttpResponse: 134 if self.should_always_prompt(): 135 return self.executor.stage_ok() 136 current_stage: ConsentStage = self.executor.current_stage 137 application = self.executor.plan.context[PLAN_CONTEXT_APPLICATION] 138 permissions = self.executor.plan.context.get( 139 PLAN_CONTEXT_CONSENT_PERMISSIONS, [] 140 ) + self.executor.plan.context.get(PLAN_CONTEXT_CONSENT_EXTRA_PERMISSIONS, []) 141 permissions_string = " ".join(x["id"] for x in permissions) 142 143 if not self.executor.plan.context.get(PLAN_CONTEXT_CONSENT, None): 144 self.executor.plan.context[PLAN_CONTEXT_CONSENT] = UserConsent( 145 user=self.request.user, 146 application=application, 147 ) 148 consent: UserConsent = self.executor.plan.context[PLAN_CONTEXT_CONSENT] 149 consent.permissions = permissions_string 150 if current_stage.mode == ConsentMode.PERMANENT: 151 consent.expiring = False 152 if current_stage.mode == ConsentMode.EXPIRING: 153 consent.expires = now() + timedelta_from_string(current_stage.consent_expire_in) 154 consent.save() 155 return self.executor.stage_ok()
Callback when the challenge has the correct format