authentik.stages.email.flow

 1from base64 import b64encode
 2from copy import deepcopy
 3from pickle import dumps  # nosec
 4
 5from django.utils.translation import gettext as _
 6
 7from authentik.flows.models import FlowToken, in_memory_stage
 8from authentik.flows.planner import PLAN_CONTEXT_IS_RESTORED, FlowPlan
 9from authentik.stages.consent.stage import PLAN_CONTEXT_CONSENT_HEADER, ConsentStageView
10
11
12def pickle_flow_token_for_email(plan: FlowPlan):
13    """Insert a consent stage into the flow plan and pickle it for a FlowToken,
14    to be sent via Email. This is to prevent automated email scanners, which sometimes
15    open links in emails in a full browser from breaking the link."""
16    plan_copy = deepcopy(plan)
17    plan_copy.insert_stage(in_memory_stage(EmailTokenRevocationConsentStageView), index=0)
18    plan_copy.context[PLAN_CONTEXT_CONSENT_HEADER] = _("Continue to confirm this email address.")
19    data = dumps(plan_copy)
20    return b64encode(data).decode()
21
22
23class EmailTokenRevocationConsentStageView(ConsentStageView):
24
25    def get(self, request, *args, **kwargs):
26        token: FlowToken = self.executor.plan.context[PLAN_CONTEXT_IS_RESTORED]
27        try:
28            token.refresh_from_db()
29        except FlowToken.DoesNotExist:
30            return self.executor.stage_invalid(
31                _("Link was already used, please request a new link.")
32            )
33        return super().get(request, *args, **kwargs)
34
35    def challenge_valid(self, response):
36        token: FlowToken = self.executor.plan.context[PLAN_CONTEXT_IS_RESTORED]
37        token.delete()
38        return super().challenge_valid(response)
def pickle_flow_token_for_email(plan: authentik.flows.planner.FlowPlan):
13def pickle_flow_token_for_email(plan: FlowPlan):
14    """Insert a consent stage into the flow plan and pickle it for a FlowToken,
15    to be sent via Email. This is to prevent automated email scanners, which sometimes
16    open links in emails in a full browser from breaking the link."""
17    plan_copy = deepcopy(plan)
18    plan_copy.insert_stage(in_memory_stage(EmailTokenRevocationConsentStageView), index=0)
19    plan_copy.context[PLAN_CONTEXT_CONSENT_HEADER] = _("Continue to confirm this email address.")
20    data = dumps(plan_copy)
21    return b64encode(data).decode()

Insert a consent stage into the flow plan and pickle it for a FlowToken, to be sent via Email. This is to prevent automated email scanners, which sometimes open links in emails in a full browser from breaking the link.

class EmailTokenRevocationConsentStageView(authentik.stages.consent.stage.ConsentStageView):
24class EmailTokenRevocationConsentStageView(ConsentStageView):
25
26    def get(self, request, *args, **kwargs):
27        token: FlowToken = self.executor.plan.context[PLAN_CONTEXT_IS_RESTORED]
28        try:
29            token.refresh_from_db()
30        except FlowToken.DoesNotExist:
31            return self.executor.stage_invalid(
32                _("Link was already used, please request a new link.")
33            )
34        return super().get(request, *args, **kwargs)
35
36    def challenge_valid(self, response):
37        token: FlowToken = self.executor.plan.context[PLAN_CONTEXT_IS_RESTORED]
38        token.delete()
39        return super().challenge_valid(response)

Simple consent checker.

def get(self, request, *args, **kwargs):
26    def get(self, request, *args, **kwargs):
27        token: FlowToken = self.executor.plan.context[PLAN_CONTEXT_IS_RESTORED]
28        try:
29            token.refresh_from_db()
30        except FlowToken.DoesNotExist:
31            return self.executor.stage_invalid(
32                _("Link was already used, please request a new link.")
33            )
34        return super().get(request, *args, **kwargs)

Return a challenge for the frontend to solve

def challenge_valid(self, response):
36    def challenge_valid(self, response):
37        token: FlowToken = self.executor.plan.context[PLAN_CONTEXT_IS_RESTORED]
38        token.delete()
39        return super().challenge_valid(response)

Callback when the challenge has the correct format