authentik.stages.invitation.stage

invitation stage logic

 1"""invitation stage logic"""
 2
 3from deepmerge import always_merger
 4from django.core.exceptions import ValidationError
 5from django.http import HttpRequest, HttpResponse
 6from django.utils.translation import gettext_lazy as _
 7
 8from authentik.flows.stage import StageView
 9from authentik.flows.views.executor import SESSION_KEY_GET
10from authentik.stages.invitation.models import Invitation, InvitationStage
11from authentik.stages.invitation.signals import invitation_used
12from authentik.stages.prompt.stage import PLAN_CONTEXT_PROMPT
13
14QS_INVITATION_TOKEN_KEY = "itoken"  # nosec
15PLAN_CONTEXT_INVITATION_TOKEN = "token"  # nosec
16PLAN_CONTEXT_INVITATION_IN_EFFECT = "invitation_in_effect"
17PLAN_CONTEXT_INVITATION = "invitation"
18
19
20class InvitationStageView(StageView):
21    """Finalise Authentication flow by logging the user in"""
22
23    def get_token(self) -> str | None:
24        """Get token from saved get-arguments or prompt_data"""
25        # Check for ?token= and ?itoken=
26        if QS_INVITATION_TOKEN_KEY in self.request.session.get(SESSION_KEY_GET, {}):
27            return self.request.session[SESSION_KEY_GET][QS_INVITATION_TOKEN_KEY]
28        if PLAN_CONTEXT_INVITATION_TOKEN in self.request.session.get(SESSION_KEY_GET, {}):
29            return self.request.session[SESSION_KEY_GET][PLAN_CONTEXT_INVITATION_TOKEN]
30        # Check for {'token': ''} in the context
31        if PLAN_CONTEXT_INVITATION_TOKEN in self.executor.plan.context.get(PLAN_CONTEXT_PROMPT, {}):
32            return self.executor.plan.context[PLAN_CONTEXT_PROMPT][PLAN_CONTEXT_INVITATION_TOKEN]
33        return None
34
35    def get_invite(self) -> Invitation | None:
36        """Check the token, find the invite and check it's flow"""
37        token = self.get_token()
38        if not token:
39            return None
40        try:
41            invite: Invitation | None = Invitation.objects.filter(pk=token).first()
42        except ValidationError:
43            self.logger.debug("invalid invitation", token=token)
44            return None
45        if not invite:
46            self.logger.debug("invalid invitation", token=token)
47            return None
48        if invite.flow and invite.flow.pk.hex != self.executor.plan.flow_pk:
49            self.logger.debug("invite for incorrect flow", expected=invite.flow.slug)
50            return None
51        return invite
52
53    def dispatch(self, request: HttpRequest) -> HttpResponse:
54        """Apply data to the current flow based on a URL"""
55        stage: InvitationStage = self.executor.current_stage
56
57        invite = self.get_invite()
58        if not invite:
59            if stage.continue_flow_without_invitation:
60                return self.executor.stage_ok()
61            return self.executor.stage_invalid(_("Invalid invite/invite not found"))
62
63        self.executor.plan.context[PLAN_CONTEXT_INVITATION_IN_EFFECT] = True
64        self.executor.plan.context[PLAN_CONTEXT_INVITATION] = invite
65
66        context = {}
67        always_merger.merge(context, self.executor.plan.context.get(PLAN_CONTEXT_PROMPT, {}))
68        always_merger.merge(context, invite.fixed_data)
69        self.executor.plan.context[PLAN_CONTEXT_PROMPT] = context
70
71        invitation_used.send(sender=self, request=request, invitation=invite)
72        if invite.single_use:
73            invite.delete()
74            self.logger.debug("Deleted invitation", token=str(invite.invite_uuid))
75        return self.executor.stage_ok()
QS_INVITATION_TOKEN_KEY = 'itoken'
PLAN_CONTEXT_INVITATION_TOKEN = 'token'
PLAN_CONTEXT_INVITATION_IN_EFFECT = 'invitation_in_effect'
PLAN_CONTEXT_INVITATION = 'invitation'
class InvitationStageView(authentik.flows.stage.StageView):
21class InvitationStageView(StageView):
22    """Finalise Authentication flow by logging the user in"""
23
24    def get_token(self) -> str | None:
25        """Get token from saved get-arguments or prompt_data"""
26        # Check for ?token= and ?itoken=
27        if QS_INVITATION_TOKEN_KEY in self.request.session.get(SESSION_KEY_GET, {}):
28            return self.request.session[SESSION_KEY_GET][QS_INVITATION_TOKEN_KEY]
29        if PLAN_CONTEXT_INVITATION_TOKEN in self.request.session.get(SESSION_KEY_GET, {}):
30            return self.request.session[SESSION_KEY_GET][PLAN_CONTEXT_INVITATION_TOKEN]
31        # Check for {'token': ''} in the context
32        if PLAN_CONTEXT_INVITATION_TOKEN in self.executor.plan.context.get(PLAN_CONTEXT_PROMPT, {}):
33            return self.executor.plan.context[PLAN_CONTEXT_PROMPT][PLAN_CONTEXT_INVITATION_TOKEN]
34        return None
35
36    def get_invite(self) -> Invitation | None:
37        """Check the token, find the invite and check it's flow"""
38        token = self.get_token()
39        if not token:
40            return None
41        try:
42            invite: Invitation | None = Invitation.objects.filter(pk=token).first()
43        except ValidationError:
44            self.logger.debug("invalid invitation", token=token)
45            return None
46        if not invite:
47            self.logger.debug("invalid invitation", token=token)
48            return None
49        if invite.flow and invite.flow.pk.hex != self.executor.plan.flow_pk:
50            self.logger.debug("invite for incorrect flow", expected=invite.flow.slug)
51            return None
52        return invite
53
54    def dispatch(self, request: HttpRequest) -> HttpResponse:
55        """Apply data to the current flow based on a URL"""
56        stage: InvitationStage = self.executor.current_stage
57
58        invite = self.get_invite()
59        if not invite:
60            if stage.continue_flow_without_invitation:
61                return self.executor.stage_ok()
62            return self.executor.stage_invalid(_("Invalid invite/invite not found"))
63
64        self.executor.plan.context[PLAN_CONTEXT_INVITATION_IN_EFFECT] = True
65        self.executor.plan.context[PLAN_CONTEXT_INVITATION] = invite
66
67        context = {}
68        always_merger.merge(context, self.executor.plan.context.get(PLAN_CONTEXT_PROMPT, {}))
69        always_merger.merge(context, invite.fixed_data)
70        self.executor.plan.context[PLAN_CONTEXT_PROMPT] = context
71
72        invitation_used.send(sender=self, request=request, invitation=invite)
73        if invite.single_use:
74            invite.delete()
75            self.logger.debug("Deleted invitation", token=str(invite.invite_uuid))
76        return self.executor.stage_ok()

Finalise Authentication flow by logging the user in

def get_token(self) -> str | None:
24    def get_token(self) -> str | None:
25        """Get token from saved get-arguments or prompt_data"""
26        # Check for ?token= and ?itoken=
27        if QS_INVITATION_TOKEN_KEY in self.request.session.get(SESSION_KEY_GET, {}):
28            return self.request.session[SESSION_KEY_GET][QS_INVITATION_TOKEN_KEY]
29        if PLAN_CONTEXT_INVITATION_TOKEN in self.request.session.get(SESSION_KEY_GET, {}):
30            return self.request.session[SESSION_KEY_GET][PLAN_CONTEXT_INVITATION_TOKEN]
31        # Check for {'token': ''} in the context
32        if PLAN_CONTEXT_INVITATION_TOKEN in self.executor.plan.context.get(PLAN_CONTEXT_PROMPT, {}):
33            return self.executor.plan.context[PLAN_CONTEXT_PROMPT][PLAN_CONTEXT_INVITATION_TOKEN]
34        return None

Get token from saved get-arguments or prompt_data

def get_invite(self) -> authentik.stages.invitation.models.Invitation | None:
36    def get_invite(self) -> Invitation | None:
37        """Check the token, find the invite and check it's flow"""
38        token = self.get_token()
39        if not token:
40            return None
41        try:
42            invite: Invitation | None = Invitation.objects.filter(pk=token).first()
43        except ValidationError:
44            self.logger.debug("invalid invitation", token=token)
45            return None
46        if not invite:
47            self.logger.debug("invalid invitation", token=token)
48            return None
49        if invite.flow and invite.flow.pk.hex != self.executor.plan.flow_pk:
50            self.logger.debug("invite for incorrect flow", expected=invite.flow.slug)
51            return None
52        return invite

Check the token, find the invite and check it's flow

def dispatch( self, request: django.http.request.HttpRequest) -> django.http.response.HttpResponse:
54    def dispatch(self, request: HttpRequest) -> HttpResponse:
55        """Apply data to the current flow based on a URL"""
56        stage: InvitationStage = self.executor.current_stage
57
58        invite = self.get_invite()
59        if not invite:
60            if stage.continue_flow_without_invitation:
61                return self.executor.stage_ok()
62            return self.executor.stage_invalid(_("Invalid invite/invite not found"))
63
64        self.executor.plan.context[PLAN_CONTEXT_INVITATION_IN_EFFECT] = True
65        self.executor.plan.context[PLAN_CONTEXT_INVITATION] = invite
66
67        context = {}
68        always_merger.merge(context, self.executor.plan.context.get(PLAN_CONTEXT_PROMPT, {}))
69        always_merger.merge(context, invite.fixed_data)
70        self.executor.plan.context[PLAN_CONTEXT_PROMPT] = context
71
72        invitation_used.send(sender=self, request=request, invitation=invite)
73        if invite.single_use:
74            invite.delete()
75            self.logger.debug("Deleted invitation", token=str(invite.invite_uuid))
76        return self.executor.stage_ok()

Apply data to the current flow based on a URL