authentik.stages.invitation.stage
invitation stage logic
1"""invitation stage logic""" 2 3from deepmerge import always_merger 4from django.core.exceptions import ValidationError 5from django.http import HttpRequest, HttpResponse 6from django.utils.translation import gettext_lazy as _ 7 8from authentik.flows.stage import StageView 9from authentik.flows.views.executor import SESSION_KEY_GET 10from authentik.stages.invitation.models import Invitation, InvitationStage 11from authentik.stages.invitation.signals import invitation_used 12from authentik.stages.prompt.stage import PLAN_CONTEXT_PROMPT 13 14QS_INVITATION_TOKEN_KEY = "itoken" # nosec 15PLAN_CONTEXT_INVITATION_TOKEN = "token" # nosec 16PLAN_CONTEXT_INVITATION_IN_EFFECT = "invitation_in_effect" 17PLAN_CONTEXT_INVITATION = "invitation" 18 19 20class InvitationStageView(StageView): 21 """Finalise Authentication flow by logging the user in""" 22 23 def get_token(self) -> str | None: 24 """Get token from saved get-arguments or prompt_data""" 25 # Check for ?token= and ?itoken= 26 if QS_INVITATION_TOKEN_KEY in self.request.session.get(SESSION_KEY_GET, {}): 27 return self.request.session[SESSION_KEY_GET][QS_INVITATION_TOKEN_KEY] 28 if PLAN_CONTEXT_INVITATION_TOKEN in self.request.session.get(SESSION_KEY_GET, {}): 29 return self.request.session[SESSION_KEY_GET][PLAN_CONTEXT_INVITATION_TOKEN] 30 # Check for {'token': ''} in the context 31 if PLAN_CONTEXT_INVITATION_TOKEN in self.executor.plan.context.get(PLAN_CONTEXT_PROMPT, {}): 32 return self.executor.plan.context[PLAN_CONTEXT_PROMPT][PLAN_CONTEXT_INVITATION_TOKEN] 33 return None 34 35 def get_invite(self) -> Invitation | None: 36 """Check the token, find the invite and check it's flow""" 37 token = self.get_token() 38 if not token: 39 return None 40 try: 41 invite: Invitation | None = Invitation.objects.filter(pk=token).first() 42 except ValidationError: 43 self.logger.debug("invalid invitation", token=token) 44 return None 45 if not invite: 46 self.logger.debug("invalid invitation", token=token) 47 return None 48 if invite.flow and invite.flow.pk.hex != self.executor.plan.flow_pk: 49 self.logger.debug("invite for incorrect flow", expected=invite.flow.slug) 50 return None 51 return invite 52 53 def dispatch(self, request: HttpRequest) -> HttpResponse: 54 """Apply data to the current flow based on a URL""" 55 stage: InvitationStage = self.executor.current_stage 56 57 invite = self.get_invite() 58 if not invite: 59 if stage.continue_flow_without_invitation: 60 return self.executor.stage_ok() 61 return self.executor.stage_invalid(_("Invalid invite/invite not found")) 62 63 self.executor.plan.context[PLAN_CONTEXT_INVITATION_IN_EFFECT] = True 64 self.executor.plan.context[PLAN_CONTEXT_INVITATION] = invite 65 66 context = {} 67 always_merger.merge(context, self.executor.plan.context.get(PLAN_CONTEXT_PROMPT, {})) 68 always_merger.merge(context, invite.fixed_data) 69 self.executor.plan.context[PLAN_CONTEXT_PROMPT] = context 70 71 invitation_used.send(sender=self, request=request, invitation=invite) 72 if invite.single_use: 73 invite.delete() 74 self.logger.debug("Deleted invitation", token=str(invite.invite_uuid)) 75 return self.executor.stage_ok()
QS_INVITATION_TOKEN_KEY =
'itoken'
PLAN_CONTEXT_INVITATION_TOKEN =
'token'
PLAN_CONTEXT_INVITATION_IN_EFFECT =
'invitation_in_effect'
PLAN_CONTEXT_INVITATION =
'invitation'
21class InvitationStageView(StageView): 22 """Finalise Authentication flow by logging the user in""" 23 24 def get_token(self) -> str | None: 25 """Get token from saved get-arguments or prompt_data""" 26 # Check for ?token= and ?itoken= 27 if QS_INVITATION_TOKEN_KEY in self.request.session.get(SESSION_KEY_GET, {}): 28 return self.request.session[SESSION_KEY_GET][QS_INVITATION_TOKEN_KEY] 29 if PLAN_CONTEXT_INVITATION_TOKEN in self.request.session.get(SESSION_KEY_GET, {}): 30 return self.request.session[SESSION_KEY_GET][PLAN_CONTEXT_INVITATION_TOKEN] 31 # Check for {'token': ''} in the context 32 if PLAN_CONTEXT_INVITATION_TOKEN in self.executor.plan.context.get(PLAN_CONTEXT_PROMPT, {}): 33 return self.executor.plan.context[PLAN_CONTEXT_PROMPT][PLAN_CONTEXT_INVITATION_TOKEN] 34 return None 35 36 def get_invite(self) -> Invitation | None: 37 """Check the token, find the invite and check it's flow""" 38 token = self.get_token() 39 if not token: 40 return None 41 try: 42 invite: Invitation | None = Invitation.objects.filter(pk=token).first() 43 except ValidationError: 44 self.logger.debug("invalid invitation", token=token) 45 return None 46 if not invite: 47 self.logger.debug("invalid invitation", token=token) 48 return None 49 if invite.flow and invite.flow.pk.hex != self.executor.plan.flow_pk: 50 self.logger.debug("invite for incorrect flow", expected=invite.flow.slug) 51 return None 52 return invite 53 54 def dispatch(self, request: HttpRequest) -> HttpResponse: 55 """Apply data to the current flow based on a URL""" 56 stage: InvitationStage = self.executor.current_stage 57 58 invite = self.get_invite() 59 if not invite: 60 if stage.continue_flow_without_invitation: 61 return self.executor.stage_ok() 62 return self.executor.stage_invalid(_("Invalid invite/invite not found")) 63 64 self.executor.plan.context[PLAN_CONTEXT_INVITATION_IN_EFFECT] = True 65 self.executor.plan.context[PLAN_CONTEXT_INVITATION] = invite 66 67 context = {} 68 always_merger.merge(context, self.executor.plan.context.get(PLAN_CONTEXT_PROMPT, {})) 69 always_merger.merge(context, invite.fixed_data) 70 self.executor.plan.context[PLAN_CONTEXT_PROMPT] = context 71 72 invitation_used.send(sender=self, request=request, invitation=invite) 73 if invite.single_use: 74 invite.delete() 75 self.logger.debug("Deleted invitation", token=str(invite.invite_uuid)) 76 return self.executor.stage_ok()
Finalise Authentication flow by logging the user in
def
get_token(self) -> str | None:
24 def get_token(self) -> str | None: 25 """Get token from saved get-arguments or prompt_data""" 26 # Check for ?token= and ?itoken= 27 if QS_INVITATION_TOKEN_KEY in self.request.session.get(SESSION_KEY_GET, {}): 28 return self.request.session[SESSION_KEY_GET][QS_INVITATION_TOKEN_KEY] 29 if PLAN_CONTEXT_INVITATION_TOKEN in self.request.session.get(SESSION_KEY_GET, {}): 30 return self.request.session[SESSION_KEY_GET][PLAN_CONTEXT_INVITATION_TOKEN] 31 # Check for {'token': ''} in the context 32 if PLAN_CONTEXT_INVITATION_TOKEN in self.executor.plan.context.get(PLAN_CONTEXT_PROMPT, {}): 33 return self.executor.plan.context[PLAN_CONTEXT_PROMPT][PLAN_CONTEXT_INVITATION_TOKEN] 34 return None
Get token from saved get-arguments or prompt_data
36 def get_invite(self) -> Invitation | None: 37 """Check the token, find the invite and check it's flow""" 38 token = self.get_token() 39 if not token: 40 return None 41 try: 42 invite: Invitation | None = Invitation.objects.filter(pk=token).first() 43 except ValidationError: 44 self.logger.debug("invalid invitation", token=token) 45 return None 46 if not invite: 47 self.logger.debug("invalid invitation", token=token) 48 return None 49 if invite.flow and invite.flow.pk.hex != self.executor.plan.flow_pk: 50 self.logger.debug("invite for incorrect flow", expected=invite.flow.slug) 51 return None 52 return invite
Check the token, find the invite and check it's flow
def
dispatch( self, request: django.http.request.HttpRequest) -> django.http.response.HttpResponse:
54 def dispatch(self, request: HttpRequest) -> HttpResponse: 55 """Apply data to the current flow based on a URL""" 56 stage: InvitationStage = self.executor.current_stage 57 58 invite = self.get_invite() 59 if not invite: 60 if stage.continue_flow_without_invitation: 61 return self.executor.stage_ok() 62 return self.executor.stage_invalid(_("Invalid invite/invite not found")) 63 64 self.executor.plan.context[PLAN_CONTEXT_INVITATION_IN_EFFECT] = True 65 self.executor.plan.context[PLAN_CONTEXT_INVITATION] = invite 66 67 context = {} 68 always_merger.merge(context, self.executor.plan.context.get(PLAN_CONTEXT_PROMPT, {})) 69 always_merger.merge(context, invite.fixed_data) 70 self.executor.plan.context[PLAN_CONTEXT_PROMPT] = context 71 72 invitation_used.send(sender=self, request=request, invitation=invite) 73 if invite.single_use: 74 invite.delete() 75 self.logger.debug("Deleted invitation", token=str(invite.invite_uuid)) 76 return self.executor.stage_ok()
Apply data to the current flow based on a URL