authentik.stages.consent.stage

authentik consent stage

  1"""authentik consent stage"""
  2
  3from hmac import compare_digest
  4from uuid import uuid4
  5
  6from django.http import HttpRequest, HttpResponse
  7from django.utils.timezone import now
  8from django.utils.translation import gettext as _
  9from rest_framework.exceptions import ValidationError
 10from rest_framework.fields import CharField
 11
 12from authentik.core.api.utils import PassiveSerializer
 13from authentik.flows.challenge import (
 14    Challenge,
 15    ChallengeResponse,
 16    WithUserInfoChallenge,
 17)
 18from authentik.flows.planner import PLAN_CONTEXT_APPLICATION, PLAN_CONTEXT_PENDING_USER
 19from authentik.flows.stage import ChallengeStageView
 20from authentik.lib.utils.time import timedelta_from_string
 21from authentik.stages.consent.models import ConsentMode, ConsentStage, UserConsent
 22
 23PLAN_CONTEXT_CONSENT = "consent"
 24PLAN_CONTEXT_CONSENT_HEADER = "consent_header"
 25PLAN_CONTEXT_CONSENT_PERMISSIONS = "consent_permissions"
 26PLAN_CONTEXT_CONSENT_EXTRA_PERMISSIONS = "consent_additional_permissions"
 27PLAN_CONTEXT_CONSENT_TOKEN = "goauthentik.io/stages/consent/token"  # nosec
 28
 29
 30class ConsentPermissionSerializer(PassiveSerializer):
 31    """Permission used for consent"""
 32
 33    name = CharField(allow_blank=True)
 34    id = CharField()
 35
 36
 37class ConsentChallenge(WithUserInfoChallenge):
 38    """Challenge info for consent screens"""
 39
 40    header_text = CharField(required=False)
 41    permissions = ConsentPermissionSerializer(many=True)
 42    additional_permissions = ConsentPermissionSerializer(many=True)
 43    component = CharField(default="ak-stage-consent")
 44    token = CharField(required=True)
 45
 46
 47class ConsentChallengeResponse(ChallengeResponse):
 48    """Consent challenge response, any valid response request is valid"""
 49
 50    component = CharField(default="ak-stage-consent")
 51    token = CharField(required=True)
 52
 53    def validate_token(self, token: str):
 54        if not compare_digest(
 55            token, self.stage.executor.plan.context.get(PLAN_CONTEXT_CONSENT_TOKEN, "")
 56        ):
 57            raise ValidationError(_("Invalid consent token, re-showing prompt"))
 58        return token
 59
 60
 61class ConsentStageView(ChallengeStageView):
 62    """Simple consent checker."""
 63
 64    response_class = ConsentChallengeResponse
 65
 66    def get_challenge(self) -> Challenge:
 67        token = str(uuid4())
 68        self.executor.plan.context[PLAN_CONTEXT_CONSENT_TOKEN] = token
 69        data = {
 70            "permissions": self.executor.plan.context.get(PLAN_CONTEXT_CONSENT_PERMISSIONS, []),
 71            "additional_permissions": self.executor.plan.context.get(
 72                PLAN_CONTEXT_CONSENT_EXTRA_PERMISSIONS, []
 73            ),
 74            "token": token,
 75        }
 76        if PLAN_CONTEXT_CONSENT_HEADER in self.executor.plan.context:
 77            data["header_text"] = self.executor.plan.context[PLAN_CONTEXT_CONSENT_HEADER]
 78        challenge = ConsentChallenge(data=data)
 79        return challenge
 80
 81    def should_always_prompt(self) -> bool:
 82        """Check if the current request should require a prompt for non consent reasons,
 83        i.e. this stage injected from another stage, mode is always required or no application
 84        is set."""
 85        current_stage: ConsentStage = self.executor.current_stage
 86        # Make this StageView work when injected, in which case `current_stage` is an instance
 87        # of the base class, and we don't save any consent, as it is assumed to be a one-time
 88        # prompt
 89        if not isinstance(current_stage, ConsentStage):
 90            return True
 91        # For always require, we always return the challenge
 92        if current_stage.mode == ConsentMode.ALWAYS_REQUIRE:
 93            return True
 94        # at this point we need to check consent from database
 95        if PLAN_CONTEXT_APPLICATION not in self.executor.plan.context:
 96            # No application in this plan, hence we can't check DB and require user consent
 97            return True
 98        return None
 99
100    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
101        if self.should_always_prompt():
102            return super().get(request, *args, **kwargs)
103        application = self.executor.plan.context[PLAN_CONTEXT_APPLICATION]
104
105        user = self.request.user
106        if PLAN_CONTEXT_PENDING_USER in self.executor.plan.context:
107            user = self.executor.plan.context[PLAN_CONTEXT_PENDING_USER]
108
109        consent: UserConsent | None = UserConsent.objects.filter(
110            user=user, application=application
111        ).first()
112        self.executor.plan.context[PLAN_CONTEXT_CONSENT] = consent
113
114        if consent:
115            perms = self.executor.plan.context.get(PLAN_CONTEXT_CONSENT_PERMISSIONS, [])
116            allowed_perms = set(consent.permissions.split(" ") if consent.permissions != "" else [])
117            requested_perms = set(x["id"] for x in perms)
118
119            if allowed_perms != requested_perms:
120                self.executor.plan.context[PLAN_CONTEXT_CONSENT_PERMISSIONS] = [
121                    x for x in perms if x["id"] in allowed_perms
122                ]
123                self.executor.plan.context[PLAN_CONTEXT_CONSENT_EXTRA_PERMISSIONS] = [
124                    x for x in perms if x["id"] in requested_perms.difference(allowed_perms)
125                ]
126                return super().get(request, *args, **kwargs)
127            return self.executor.stage_ok()
128
129        # No consent found, return consent prompt
130        return super().get(request, *args, **kwargs)
131
132    def challenge_valid(self, response: ChallengeResponse) -> HttpResponse:
133        if self.should_always_prompt():
134            return self.executor.stage_ok()
135        current_stage: ConsentStage = self.executor.current_stage
136        application = self.executor.plan.context[PLAN_CONTEXT_APPLICATION]
137        permissions = self.executor.plan.context.get(
138            PLAN_CONTEXT_CONSENT_PERMISSIONS, []
139        ) + self.executor.plan.context.get(PLAN_CONTEXT_CONSENT_EXTRA_PERMISSIONS, [])
140        permissions_string = " ".join(x["id"] for x in permissions)
141
142        if not self.executor.plan.context.get(PLAN_CONTEXT_CONSENT, None):
143            self.executor.plan.context[PLAN_CONTEXT_CONSENT] = UserConsent(
144                user=self.request.user,
145                application=application,
146            )
147        consent: UserConsent = self.executor.plan.context[PLAN_CONTEXT_CONSENT]
148        consent.permissions = permissions_string
149        if current_stage.mode == ConsentMode.PERMANENT:
150            consent.expiring = False
151        if current_stage.mode == ConsentMode.EXPIRING:
152            consent.expires = now() + timedelta_from_string(current_stage.consent_expire_in)
153        consent.save()
154        return self.executor.stage_ok()
class ConsentPermissionSerializer(authentik.core.api.utils.PassiveSerializer):
31class ConsentPermissionSerializer(PassiveSerializer):
32    """Permission used for consent"""
33
34    name = CharField(allow_blank=True)
35    id = CharField()

Permission used for consent

name
id
class ConsentChallenge(authentik.flows.challenge.WithUserInfoChallenge):
38class ConsentChallenge(WithUserInfoChallenge):
39    """Challenge info for consent screens"""
40
41    header_text = CharField(required=False)
42    permissions = ConsentPermissionSerializer(many=True)
43    additional_permissions = ConsentPermissionSerializer(many=True)
44    component = CharField(default="ak-stage-consent")
45    token = CharField(required=True)

Challenge info for consent screens

header_text
permissions
additional_permissions
component
token
class ConsentChallengeResponse(authentik.flows.challenge.ChallengeResponse):
48class ConsentChallengeResponse(ChallengeResponse):
49    """Consent challenge response, any valid response request is valid"""
50
51    component = CharField(default="ak-stage-consent")
52    token = CharField(required=True)
53
54    def validate_token(self, token: str):
55        if not compare_digest(
56            token, self.stage.executor.plan.context.get(PLAN_CONTEXT_CONSENT_TOKEN, "")
57        ):
58            raise ValidationError(_("Invalid consent token, re-showing prompt"))
59        return token

Consent challenge response, any valid response request is valid

component
token
def validate_token(self, token: str):
54    def validate_token(self, token: str):
55        if not compare_digest(
56            token, self.stage.executor.plan.context.get(PLAN_CONTEXT_CONSENT_TOKEN, "")
57        ):
58            raise ValidationError(_("Invalid consent token, re-showing prompt"))
59        return token
class ConsentStageView(authentik.flows.stage.ChallengeStageView):
 62class ConsentStageView(ChallengeStageView):
 63    """Simple consent checker."""
 64
 65    response_class = ConsentChallengeResponse
 66
 67    def get_challenge(self) -> Challenge:
 68        token = str(uuid4())
 69        self.executor.plan.context[PLAN_CONTEXT_CONSENT_TOKEN] = token
 70        data = {
 71            "permissions": self.executor.plan.context.get(PLAN_CONTEXT_CONSENT_PERMISSIONS, []),
 72            "additional_permissions": self.executor.plan.context.get(
 73                PLAN_CONTEXT_CONSENT_EXTRA_PERMISSIONS, []
 74            ),
 75            "token": token,
 76        }
 77        if PLAN_CONTEXT_CONSENT_HEADER in self.executor.plan.context:
 78            data["header_text"] = self.executor.plan.context[PLAN_CONTEXT_CONSENT_HEADER]
 79        challenge = ConsentChallenge(data=data)
 80        return challenge
 81
 82    def should_always_prompt(self) -> bool:
 83        """Check if the current request should require a prompt for non consent reasons,
 84        i.e. this stage injected from another stage, mode is always required or no application
 85        is set."""
 86        current_stage: ConsentStage = self.executor.current_stage
 87        # Make this StageView work when injected, in which case `current_stage` is an instance
 88        # of the base class, and we don't save any consent, as it is assumed to be a one-time
 89        # prompt
 90        if not isinstance(current_stage, ConsentStage):
 91            return True
 92        # For always require, we always return the challenge
 93        if current_stage.mode == ConsentMode.ALWAYS_REQUIRE:
 94            return True
 95        # at this point we need to check consent from database
 96        if PLAN_CONTEXT_APPLICATION not in self.executor.plan.context:
 97            # No application in this plan, hence we can't check DB and require user consent
 98            return True
 99        return None
100
101    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
102        if self.should_always_prompt():
103            return super().get(request, *args, **kwargs)
104        application = self.executor.plan.context[PLAN_CONTEXT_APPLICATION]
105
106        user = self.request.user
107        if PLAN_CONTEXT_PENDING_USER in self.executor.plan.context:
108            user = self.executor.plan.context[PLAN_CONTEXT_PENDING_USER]
109
110        consent: UserConsent | None = UserConsent.objects.filter(
111            user=user, application=application
112        ).first()
113        self.executor.plan.context[PLAN_CONTEXT_CONSENT] = consent
114
115        if consent:
116            perms = self.executor.plan.context.get(PLAN_CONTEXT_CONSENT_PERMISSIONS, [])
117            allowed_perms = set(consent.permissions.split(" ") if consent.permissions != "" else [])
118            requested_perms = set(x["id"] for x in perms)
119
120            if allowed_perms != requested_perms:
121                self.executor.plan.context[PLAN_CONTEXT_CONSENT_PERMISSIONS] = [
122                    x for x in perms if x["id"] in allowed_perms
123                ]
124                self.executor.plan.context[PLAN_CONTEXT_CONSENT_EXTRA_PERMISSIONS] = [
125                    x for x in perms if x["id"] in requested_perms.difference(allowed_perms)
126                ]
127                return super().get(request, *args, **kwargs)
128            return self.executor.stage_ok()
129
130        # No consent found, return consent prompt
131        return super().get(request, *args, **kwargs)
132
133    def challenge_valid(self, response: ChallengeResponse) -> HttpResponse:
134        if self.should_always_prompt():
135            return self.executor.stage_ok()
136        current_stage: ConsentStage = self.executor.current_stage
137        application = self.executor.plan.context[PLAN_CONTEXT_APPLICATION]
138        permissions = self.executor.plan.context.get(
139            PLAN_CONTEXT_CONSENT_PERMISSIONS, []
140        ) + self.executor.plan.context.get(PLAN_CONTEXT_CONSENT_EXTRA_PERMISSIONS, [])
141        permissions_string = " ".join(x["id"] for x in permissions)
142
143        if not self.executor.plan.context.get(PLAN_CONTEXT_CONSENT, None):
144            self.executor.plan.context[PLAN_CONTEXT_CONSENT] = UserConsent(
145                user=self.request.user,
146                application=application,
147            )
148        consent: UserConsent = self.executor.plan.context[PLAN_CONTEXT_CONSENT]
149        consent.permissions = permissions_string
150        if current_stage.mode == ConsentMode.PERMANENT:
151            consent.expiring = False
152        if current_stage.mode == ConsentMode.EXPIRING:
153            consent.expires = now() + timedelta_from_string(current_stage.consent_expire_in)
154        consent.save()
155        return self.executor.stage_ok()

Simple consent checker.

response_class = <class 'ConsentChallengeResponse'>
def get_challenge(self) -> authentik.flows.challenge.Challenge:
67    def get_challenge(self) -> Challenge:
68        token = str(uuid4())
69        self.executor.plan.context[PLAN_CONTEXT_CONSENT_TOKEN] = token
70        data = {
71            "permissions": self.executor.plan.context.get(PLAN_CONTEXT_CONSENT_PERMISSIONS, []),
72            "additional_permissions": self.executor.plan.context.get(
73                PLAN_CONTEXT_CONSENT_EXTRA_PERMISSIONS, []
74            ),
75            "token": token,
76        }
77        if PLAN_CONTEXT_CONSENT_HEADER in self.executor.plan.context:
78            data["header_text"] = self.executor.plan.context[PLAN_CONTEXT_CONSENT_HEADER]
79        challenge = ConsentChallenge(data=data)
80        return challenge

Return the challenge that the client should solve

def should_always_prompt(self) -> bool:
82    def should_always_prompt(self) -> bool:
83        """Check if the current request should require a prompt for non consent reasons,
84        i.e. this stage injected from another stage, mode is always required or no application
85        is set."""
86        current_stage: ConsentStage = self.executor.current_stage
87        # Make this StageView work when injected, in which case `current_stage` is an instance
88        # of the base class, and we don't save any consent, as it is assumed to be a one-time
89        # prompt
90        if not isinstance(current_stage, ConsentStage):
91            return True
92        # For always require, we always return the challenge
93        if current_stage.mode == ConsentMode.ALWAYS_REQUIRE:
94            return True
95        # at this point we need to check consent from database
96        if PLAN_CONTEXT_APPLICATION not in self.executor.plan.context:
97            # No application in this plan, hence we can't check DB and require user consent
98            return True
99        return None

Check if the current request should require a prompt for non consent reasons, i.e. this stage injected from another stage, mode is always required or no application is set.

def get( self, request: django.http.request.HttpRequest, *args, **kwargs) -> django.http.response.HttpResponse:
101    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
102        if self.should_always_prompt():
103            return super().get(request, *args, **kwargs)
104        application = self.executor.plan.context[PLAN_CONTEXT_APPLICATION]
105
106        user = self.request.user
107        if PLAN_CONTEXT_PENDING_USER in self.executor.plan.context:
108            user = self.executor.plan.context[PLAN_CONTEXT_PENDING_USER]
109
110        consent: UserConsent | None = UserConsent.objects.filter(
111            user=user, application=application
112        ).first()
113        self.executor.plan.context[PLAN_CONTEXT_CONSENT] = consent
114
115        if consent:
116            perms = self.executor.plan.context.get(PLAN_CONTEXT_CONSENT_PERMISSIONS, [])
117            allowed_perms = set(consent.permissions.split(" ") if consent.permissions != "" else [])
118            requested_perms = set(x["id"] for x in perms)
119
120            if allowed_perms != requested_perms:
121                self.executor.plan.context[PLAN_CONTEXT_CONSENT_PERMISSIONS] = [
122                    x for x in perms if x["id"] in allowed_perms
123                ]
124                self.executor.plan.context[PLAN_CONTEXT_CONSENT_EXTRA_PERMISSIONS] = [
125                    x for x in perms if x["id"] in requested_perms.difference(allowed_perms)
126                ]
127                return super().get(request, *args, **kwargs)
128            return self.executor.stage_ok()
129
130        # No consent found, return consent prompt
131        return super().get(request, *args, **kwargs)

Return a challenge for the frontend to solve

def challenge_valid( self, response: authentik.flows.challenge.ChallengeResponse) -> django.http.response.HttpResponse:
133    def challenge_valid(self, response: ChallengeResponse) -> HttpResponse:
134        if self.should_always_prompt():
135            return self.executor.stage_ok()
136        current_stage: ConsentStage = self.executor.current_stage
137        application = self.executor.plan.context[PLAN_CONTEXT_APPLICATION]
138        permissions = self.executor.plan.context.get(
139            PLAN_CONTEXT_CONSENT_PERMISSIONS, []
140        ) + self.executor.plan.context.get(PLAN_CONTEXT_CONSENT_EXTRA_PERMISSIONS, [])
141        permissions_string = " ".join(x["id"] for x in permissions)
142
143        if not self.executor.plan.context.get(PLAN_CONTEXT_CONSENT, None):
144            self.executor.plan.context[PLAN_CONTEXT_CONSENT] = UserConsent(
145                user=self.request.user,
146                application=application,
147            )
148        consent: UserConsent = self.executor.plan.context[PLAN_CONTEXT_CONSENT]
149        consent.permissions = permissions_string
150        if current_stage.mode == ConsentMode.PERMANENT:
151            consent.expiring = False
152        if current_stage.mode == ConsentMode.EXPIRING:
153            consent.expires = now() + timedelta_from_string(current_stage.consent_expire_in)
154        consent.save()
155        return self.executor.stage_ok()

Callback when the challenge has the correct format