authentik.stages.authenticator_email.stage

Email Setup stage

  1"""Email Setup stage"""
  2
  3from django.db.models import Q
  4from django.http import HttpRequest, HttpResponse
  5from django.http.request import QueryDict
  6from django.template.exceptions import TemplateSyntaxError
  7from django.utils.translation import gettext_lazy as _
  8from rest_framework.exceptions import ValidationError
  9from rest_framework.fields import BooleanField, CharField
 10
 11from authentik.events.models import Event, EventAction
 12from authentik.flows.challenge import (
 13    Challenge,
 14    ChallengeResponse,
 15    WithUserInfoChallenge,
 16)
 17from authentik.flows.exceptions import StageInvalidException
 18from authentik.flows.stage import ChallengeStageView
 19from authentik.lib.utils.email import mask_email
 20from authentik.lib.utils.time import timedelta_from_string
 21from authentik.stages.authenticator_email.models import (
 22    AuthenticatorEmailStage,
 23    EmailDevice,
 24)
 25from authentik.stages.email.tasks import send_mails
 26from authentik.stages.email.utils import TemplateEmailMessage
 27from authentik.stages.prompt.stage import PLAN_CONTEXT_PROMPT
 28
 29PLAN_CONTEXT_EMAIL_DEVICE = "goauthentik.io/stages/authenticator_email/email_device"
 30PLAN_CONTEXT_EMAIL = "email"
 31PLAN_CONTEXT_EMAIL_SENT = "email_sent"
 32PLAN_CONTEXT_EMAIL_OVERRIDE = "email"
 33
 34
 35class AuthenticatorEmailChallenge(WithUserInfoChallenge):
 36    """Authenticator Email Setup challenge"""
 37
 38    # Set to true if no previous prompt stage set the email
 39    # this stage will also check prompt_data.email
 40    email = CharField(default=None, allow_blank=True, allow_null=True)
 41    email_required = BooleanField(default=True)
 42    component = CharField(default="ak-stage-authenticator-email")
 43
 44
 45class AuthenticatorEmailChallengeResponse(ChallengeResponse):
 46    """Authenticator Email Challenge response, device is set by get_response_instance"""
 47
 48    device: EmailDevice
 49
 50    code = CharField(required=False)
 51    email = CharField(required=False)
 52
 53    component = CharField(default="ak-stage-authenticator-email")
 54
 55    def validate(self, attrs: dict) -> dict:
 56        """Check"""
 57        if "code" not in attrs:
 58            if "email" not in attrs:
 59                raise ValidationError("email required")
 60            self.device.email = attrs["email"]
 61            self.stage.validate_and_send(attrs["email"])
 62            return super().validate(attrs)
 63        if not self.device.verify_token(str(attrs["code"])):
 64            raise ValidationError(_("Code does not match"))
 65        self.device.confirmed = True
 66        return super().validate(attrs)
 67
 68
 69class AuthenticatorEmailStageView(ChallengeStageView):
 70    """Authenticator Email Setup stage"""
 71
 72    response_class = AuthenticatorEmailChallengeResponse
 73
 74    def validate_and_send(self, email: str):
 75        """Validate email and send message"""
 76        pending_user = self.get_pending_user()
 77
 78        stage: AuthenticatorEmailStage = self.executor.current_stage
 79        if EmailDevice.objects.filter(Q(email=email), stage=stage.pk).exists():
 80            raise ValidationError(_("Invalid email"))
 81
 82        device: EmailDevice = self.executor.plan.context[PLAN_CONTEXT_EMAIL_DEVICE]
 83
 84        try:
 85            message = TemplateEmailMessage(
 86                subject=_(stage.subject),
 87                to=[(pending_user.name, email)],
 88                language=pending_user.locale(self.request),
 89                template_name=stage.template,
 90                template_context={
 91                    "user": pending_user,
 92                    "expires": device.valid_until,
 93                    "token": device.token,
 94                },
 95            )
 96
 97            send_mails(stage, message)
 98        except TemplateSyntaxError as exc:
 99            Event.new(
100                EventAction.CONFIGURATION_ERROR,
101                message=_("Exception occurred while rendering E-mail template"),
102                template=stage.template,
103            ).with_exception(exc).from_http(self.request)
104            raise StageInvalidException from exc
105
106    def _has_email(self) -> str | None:
107        context = self.executor.plan.context
108
109        # Check user's email attribute
110        user = self.get_pending_user()
111        if user.email:
112            self.logger.debug("got email from user attributes")
113            return user.email
114        # Check plan context for email
115        if PLAN_CONTEXT_EMAIL in context.get(PLAN_CONTEXT_PROMPT, {}):
116            self.logger.debug("got email from plan context")
117            return context.get(PLAN_CONTEXT_PROMPT, {}).get(PLAN_CONTEXT_EMAIL)
118        # Check device for email
119        if PLAN_CONTEXT_EMAIL_DEVICE in self.executor.plan.context:
120            self.logger.debug("got email from device in session")
121            device: EmailDevice = self.executor.plan.context[PLAN_CONTEXT_EMAIL_DEVICE]
122            if device.email == "":
123                return None
124            return device.email
125        return None
126
127    def get_challenge(self, *args, **kwargs) -> Challenge:
128        email = self._has_email()
129        return AuthenticatorEmailChallenge(
130            data={
131                "email": mask_email(email),
132                "email_required": email is None,
133            }
134        )
135
136    def get_response_instance(self, data: QueryDict) -> ChallengeResponse:
137        response = super().get_response_instance(data)
138        response.device = self.executor.plan.context[PLAN_CONTEXT_EMAIL_DEVICE]
139        return response
140
141    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
142        user = self.get_pending_user()
143
144        stage: AuthenticatorEmailStage = self.executor.current_stage
145        # For the moment we only allow one email device per user
146        if EmailDevice.objects.filter(Q(user=user), stage=stage.pk).exists():
147            return self.executor.stage_invalid(
148                _("The user already has an email address registered for MFA.")
149            )
150        if PLAN_CONTEXT_EMAIL_DEVICE not in self.executor.plan.context:
151            device = EmailDevice(user=user, confirmed=False, stage=stage, name="Email Device")
152            valid_secs: int = timedelta_from_string(stage.token_expiry).total_seconds()
153            device.generate_token(valid_secs=valid_secs, commit=False)
154            self.executor.plan.context[PLAN_CONTEXT_EMAIL_DEVICE] = device
155            if email := self._has_email():
156                device.email = email
157                try:
158                    self.validate_and_send(email)
159                except ValidationError as exc:
160                    # We had an email given already (at this point only possible from flow
161                    # context), but an error occurred while sending (most likely)
162                    # due to a duplicate device, so delete the email we got given, reset the state
163                    # (ish) and retry
164                    device.email = ""
165                    self.executor.plan.context.get(PLAN_CONTEXT_PROMPT, {}).pop(
166                        PLAN_CONTEXT_EMAIL, None
167                    )
168                    self.executor.plan.context.pop(PLAN_CONTEXT_EMAIL_DEVICE, None)
169                    self.logger.warning("failed to send email to pre-set address", exc=exc)
170                    return self.get(request, *args, **kwargs)
171        return super().get(request, *args, **kwargs)
172
173    def challenge_valid(self, response: ChallengeResponse) -> HttpResponse:
174        """Email Token is validated by challenge"""
175        device: EmailDevice = self.executor.plan.context[PLAN_CONTEXT_EMAIL_DEVICE]
176        if not device.confirmed:
177            return self.challenge_invalid(response)
178        device.save()
179        del self.executor.plan.context[PLAN_CONTEXT_EMAIL_DEVICE]
180        return self.executor.stage_ok()
PLAN_CONTEXT_EMAIL_DEVICE = 'goauthentik.io/stages/authenticator_email/email_device'
PLAN_CONTEXT_EMAIL = 'email'
PLAN_CONTEXT_EMAIL_SENT = 'email_sent'
PLAN_CONTEXT_EMAIL_OVERRIDE = 'email'
class AuthenticatorEmailChallenge(authentik.flows.challenge.WithUserInfoChallenge):
36class AuthenticatorEmailChallenge(WithUserInfoChallenge):
37    """Authenticator Email Setup challenge"""
38
39    # Set to true if no previous prompt stage set the email
40    # this stage will also check prompt_data.email
41    email = CharField(default=None, allow_blank=True, allow_null=True)
42    email_required = BooleanField(default=True)
43    component = CharField(default="ak-stage-authenticator-email")

Authenticator Email Setup challenge

email
email_required
component
class AuthenticatorEmailChallengeResponse(authentik.flows.challenge.ChallengeResponse):
46class AuthenticatorEmailChallengeResponse(ChallengeResponse):
47    """Authenticator Email Challenge response, device is set by get_response_instance"""
48
49    device: EmailDevice
50
51    code = CharField(required=False)
52    email = CharField(required=False)
53
54    component = CharField(default="ak-stage-authenticator-email")
55
56    def validate(self, attrs: dict) -> dict:
57        """Check"""
58        if "code" not in attrs:
59            if "email" not in attrs:
60                raise ValidationError("email required")
61            self.device.email = attrs["email"]
62            self.stage.validate_and_send(attrs["email"])
63            return super().validate(attrs)
64        if not self.device.verify_token(str(attrs["code"])):
65            raise ValidationError(_("Code does not match"))
66        self.device.confirmed = True
67        return super().validate(attrs)

Authenticator Email Challenge response, device is set by get_response_instance

code
email
component
def validate(self, attrs: dict) -> dict:
56    def validate(self, attrs: dict) -> dict:
57        """Check"""
58        if "code" not in attrs:
59            if "email" not in attrs:
60                raise ValidationError("email required")
61            self.device.email = attrs["email"]
62            self.stage.validate_and_send(attrs["email"])
63            return super().validate(attrs)
64        if not self.device.verify_token(str(attrs["code"])):
65            raise ValidationError(_("Code does not match"))
66        self.device.confirmed = True
67        return super().validate(attrs)

Check

class AuthenticatorEmailStageView(authentik.flows.stage.ChallengeStageView):
 70class AuthenticatorEmailStageView(ChallengeStageView):
 71    """Authenticator Email Setup stage"""
 72
 73    response_class = AuthenticatorEmailChallengeResponse
 74
 75    def validate_and_send(self, email: str):
 76        """Validate email and send message"""
 77        pending_user = self.get_pending_user()
 78
 79        stage: AuthenticatorEmailStage = self.executor.current_stage
 80        if EmailDevice.objects.filter(Q(email=email), stage=stage.pk).exists():
 81            raise ValidationError(_("Invalid email"))
 82
 83        device: EmailDevice = self.executor.plan.context[PLAN_CONTEXT_EMAIL_DEVICE]
 84
 85        try:
 86            message = TemplateEmailMessage(
 87                subject=_(stage.subject),
 88                to=[(pending_user.name, email)],
 89                language=pending_user.locale(self.request),
 90                template_name=stage.template,
 91                template_context={
 92                    "user": pending_user,
 93                    "expires": device.valid_until,
 94                    "token": device.token,
 95                },
 96            )
 97
 98            send_mails(stage, message)
 99        except TemplateSyntaxError as exc:
100            Event.new(
101                EventAction.CONFIGURATION_ERROR,
102                message=_("Exception occurred while rendering E-mail template"),
103                template=stage.template,
104            ).with_exception(exc).from_http(self.request)
105            raise StageInvalidException from exc
106
107    def _has_email(self) -> str | None:
108        context = self.executor.plan.context
109
110        # Check user's email attribute
111        user = self.get_pending_user()
112        if user.email:
113            self.logger.debug("got email from user attributes")
114            return user.email
115        # Check plan context for email
116        if PLAN_CONTEXT_EMAIL in context.get(PLAN_CONTEXT_PROMPT, {}):
117            self.logger.debug("got email from plan context")
118            return context.get(PLAN_CONTEXT_PROMPT, {}).get(PLAN_CONTEXT_EMAIL)
119        # Check device for email
120        if PLAN_CONTEXT_EMAIL_DEVICE in self.executor.plan.context:
121            self.logger.debug("got email from device in session")
122            device: EmailDevice = self.executor.plan.context[PLAN_CONTEXT_EMAIL_DEVICE]
123            if device.email == "":
124                return None
125            return device.email
126        return None
127
128    def get_challenge(self, *args, **kwargs) -> Challenge:
129        email = self._has_email()
130        return AuthenticatorEmailChallenge(
131            data={
132                "email": mask_email(email),
133                "email_required": email is None,
134            }
135        )
136
137    def get_response_instance(self, data: QueryDict) -> ChallengeResponse:
138        response = super().get_response_instance(data)
139        response.device = self.executor.plan.context[PLAN_CONTEXT_EMAIL_DEVICE]
140        return response
141
142    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
143        user = self.get_pending_user()
144
145        stage: AuthenticatorEmailStage = self.executor.current_stage
146        # For the moment we only allow one email device per user
147        if EmailDevice.objects.filter(Q(user=user), stage=stage.pk).exists():
148            return self.executor.stage_invalid(
149                _("The user already has an email address registered for MFA.")
150            )
151        if PLAN_CONTEXT_EMAIL_DEVICE not in self.executor.plan.context:
152            device = EmailDevice(user=user, confirmed=False, stage=stage, name="Email Device")
153            valid_secs: int = timedelta_from_string(stage.token_expiry).total_seconds()
154            device.generate_token(valid_secs=valid_secs, commit=False)
155            self.executor.plan.context[PLAN_CONTEXT_EMAIL_DEVICE] = device
156            if email := self._has_email():
157                device.email = email
158                try:
159                    self.validate_and_send(email)
160                except ValidationError as exc:
161                    # We had an email given already (at this point only possible from flow
162                    # context), but an error occurred while sending (most likely)
163                    # due to a duplicate device, so delete the email we got given, reset the state
164                    # (ish) and retry
165                    device.email = ""
166                    self.executor.plan.context.get(PLAN_CONTEXT_PROMPT, {}).pop(
167                        PLAN_CONTEXT_EMAIL, None
168                    )
169                    self.executor.plan.context.pop(PLAN_CONTEXT_EMAIL_DEVICE, None)
170                    self.logger.warning("failed to send email to pre-set address", exc=exc)
171                    return self.get(request, *args, **kwargs)
172        return super().get(request, *args, **kwargs)
173
174    def challenge_valid(self, response: ChallengeResponse) -> HttpResponse:
175        """Email Token is validated by challenge"""
176        device: EmailDevice = self.executor.plan.context[PLAN_CONTEXT_EMAIL_DEVICE]
177        if not device.confirmed:
178            return self.challenge_invalid(response)
179        device.save()
180        del self.executor.plan.context[PLAN_CONTEXT_EMAIL_DEVICE]
181        return self.executor.stage_ok()

Authenticator Email Setup stage

response_class = <class 'AuthenticatorEmailChallengeResponse'>
def validate_and_send(self, email: str):
 75    def validate_and_send(self, email: str):
 76        """Validate email and send message"""
 77        pending_user = self.get_pending_user()
 78
 79        stage: AuthenticatorEmailStage = self.executor.current_stage
 80        if EmailDevice.objects.filter(Q(email=email), stage=stage.pk).exists():
 81            raise ValidationError(_("Invalid email"))
 82
 83        device: EmailDevice = self.executor.plan.context[PLAN_CONTEXT_EMAIL_DEVICE]
 84
 85        try:
 86            message = TemplateEmailMessage(
 87                subject=_(stage.subject),
 88                to=[(pending_user.name, email)],
 89                language=pending_user.locale(self.request),
 90                template_name=stage.template,
 91                template_context={
 92                    "user": pending_user,
 93                    "expires": device.valid_until,
 94                    "token": device.token,
 95                },
 96            )
 97
 98            send_mails(stage, message)
 99        except TemplateSyntaxError as exc:
100            Event.new(
101                EventAction.CONFIGURATION_ERROR,
102                message=_("Exception occurred while rendering E-mail template"),
103                template=stage.template,
104            ).with_exception(exc).from_http(self.request)
105            raise StageInvalidException from exc

Validate email and send message

def get_challenge(self, *args, **kwargs) -> authentik.flows.challenge.Challenge:
128    def get_challenge(self, *args, **kwargs) -> Challenge:
129        email = self._has_email()
130        return AuthenticatorEmailChallenge(
131            data={
132                "email": mask_email(email),
133                "email_required": email is None,
134            }
135        )

Return the challenge that the client should solve

def get_response_instance( self, data: django.http.request.QueryDict) -> authentik.flows.challenge.ChallengeResponse:
137    def get_response_instance(self, data: QueryDict) -> ChallengeResponse:
138        response = super().get_response_instance(data)
139        response.device = self.executor.plan.context[PLAN_CONTEXT_EMAIL_DEVICE]
140        return response

Return the response class type

def get( self, request: django.http.request.HttpRequest, *args, **kwargs) -> django.http.response.HttpResponse:
142    def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
143        user = self.get_pending_user()
144
145        stage: AuthenticatorEmailStage = self.executor.current_stage
146        # For the moment we only allow one email device per user
147        if EmailDevice.objects.filter(Q(user=user), stage=stage.pk).exists():
148            return self.executor.stage_invalid(
149                _("The user already has an email address registered for MFA.")
150            )
151        if PLAN_CONTEXT_EMAIL_DEVICE not in self.executor.plan.context:
152            device = EmailDevice(user=user, confirmed=False, stage=stage, name="Email Device")
153            valid_secs: int = timedelta_from_string(stage.token_expiry).total_seconds()
154            device.generate_token(valid_secs=valid_secs, commit=False)
155            self.executor.plan.context[PLAN_CONTEXT_EMAIL_DEVICE] = device
156            if email := self._has_email():
157                device.email = email
158                try:
159                    self.validate_and_send(email)
160                except ValidationError as exc:
161                    # We had an email given already (at this point only possible from flow
162                    # context), but an error occurred while sending (most likely)
163                    # due to a duplicate device, so delete the email we got given, reset the state
164                    # (ish) and retry
165                    device.email = ""
166                    self.executor.plan.context.get(PLAN_CONTEXT_PROMPT, {}).pop(
167                        PLAN_CONTEXT_EMAIL, None
168                    )
169                    self.executor.plan.context.pop(PLAN_CONTEXT_EMAIL_DEVICE, None)
170                    self.logger.warning("failed to send email to pre-set address", exc=exc)
171                    return self.get(request, *args, **kwargs)
172        return super().get(request, *args, **kwargs)

Return a challenge for the frontend to solve

def challenge_valid( self, response: authentik.flows.challenge.ChallengeResponse) -> django.http.response.HttpResponse:
174    def challenge_valid(self, response: ChallengeResponse) -> HttpResponse:
175        """Email Token is validated by challenge"""
176        device: EmailDevice = self.executor.plan.context[PLAN_CONTEXT_EMAIL_DEVICE]
177        if not device.confirmed:
178            return self.challenge_invalid(response)
179        device.save()
180        del self.executor.plan.context[PLAN_CONTEXT_EMAIL_DEVICE]
181        return self.executor.stage_ok()

Email Token is validated by challenge