authentik.stages.authenticator_email.stage
Email Setup stage
1"""Email Setup stage""" 2 3from django.db.models import Q 4from django.http import HttpRequest, HttpResponse 5from django.http.request import QueryDict 6from django.template.exceptions import TemplateSyntaxError 7from django.utils.translation import gettext_lazy as _ 8from rest_framework.exceptions import ValidationError 9from rest_framework.fields import BooleanField, CharField 10 11from authentik.events.models import Event, EventAction 12from authentik.flows.challenge import ( 13 Challenge, 14 ChallengeResponse, 15 WithUserInfoChallenge, 16) 17from authentik.flows.exceptions import StageInvalidException 18from authentik.flows.stage import ChallengeStageView 19from authentik.lib.utils.email import mask_email 20from authentik.lib.utils.time import timedelta_from_string 21from authentik.stages.authenticator_email.models import ( 22 AuthenticatorEmailStage, 23 EmailDevice, 24) 25from authentik.stages.email.tasks import send_mails 26from authentik.stages.email.utils import TemplateEmailMessage 27from authentik.stages.prompt.stage import PLAN_CONTEXT_PROMPT 28 29PLAN_CONTEXT_EMAIL_DEVICE = "goauthentik.io/stages/authenticator_email/email_device" 30PLAN_CONTEXT_EMAIL = "email" 31PLAN_CONTEXT_EMAIL_SENT = "email_sent" 32PLAN_CONTEXT_EMAIL_OVERRIDE = "email" 33 34 35class AuthenticatorEmailChallenge(WithUserInfoChallenge): 36 """Authenticator Email Setup challenge""" 37 38 # Set to true if no previous prompt stage set the email 39 # this stage will also check prompt_data.email 40 email = CharField(default=None, allow_blank=True, allow_null=True) 41 email_required = BooleanField(default=True) 42 component = CharField(default="ak-stage-authenticator-email") 43 44 45class AuthenticatorEmailChallengeResponse(ChallengeResponse): 46 """Authenticator Email Challenge response, device is set by get_response_instance""" 47 48 device: EmailDevice 49 50 code = CharField(required=False) 51 email = CharField(required=False) 52 53 component = CharField(default="ak-stage-authenticator-email") 54 55 def validate(self, attrs: dict) -> dict: 56 """Check""" 57 if "code" not in attrs: 58 if "email" not in attrs: 59 raise ValidationError("email required") 60 self.device.email = attrs["email"] 61 self.stage.validate_and_send(attrs["email"]) 62 return super().validate(attrs) 63 if not self.device.verify_token(str(attrs["code"])): 64 raise ValidationError(_("Code does not match")) 65 self.device.confirmed = True 66 return super().validate(attrs) 67 68 69class AuthenticatorEmailStageView(ChallengeStageView): 70 """Authenticator Email Setup stage""" 71 72 response_class = AuthenticatorEmailChallengeResponse 73 74 def validate_and_send(self, email: str): 75 """Validate email and send message""" 76 pending_user = self.get_pending_user() 77 78 stage: AuthenticatorEmailStage = self.executor.current_stage 79 if EmailDevice.objects.filter(Q(email=email), stage=stage.pk).exists(): 80 raise ValidationError(_("Invalid email")) 81 82 device: EmailDevice = self.executor.plan.context[PLAN_CONTEXT_EMAIL_DEVICE] 83 84 try: 85 message = TemplateEmailMessage( 86 subject=_(stage.subject), 87 to=[(pending_user.name, email)], 88 language=pending_user.locale(self.request), 89 template_name=stage.template, 90 template_context={ 91 "user": pending_user, 92 "expires": device.valid_until, 93 "token": device.token, 94 }, 95 ) 96 97 send_mails(stage, message) 98 except TemplateSyntaxError as exc: 99 Event.new( 100 EventAction.CONFIGURATION_ERROR, 101 message=_("Exception occurred while rendering E-mail template"), 102 template=stage.template, 103 ).with_exception(exc).from_http(self.request) 104 raise StageInvalidException from exc 105 106 def _has_email(self) -> str | None: 107 context = self.executor.plan.context 108 109 # Check user's email attribute 110 user = self.get_pending_user() 111 if user.email: 112 self.logger.debug("got email from user attributes") 113 return user.email 114 # Check plan context for email 115 if PLAN_CONTEXT_EMAIL in context.get(PLAN_CONTEXT_PROMPT, {}): 116 self.logger.debug("got email from plan context") 117 return context.get(PLAN_CONTEXT_PROMPT, {}).get(PLAN_CONTEXT_EMAIL) 118 # Check device for email 119 if PLAN_CONTEXT_EMAIL_DEVICE in self.executor.plan.context: 120 self.logger.debug("got email from device in session") 121 device: EmailDevice = self.executor.plan.context[PLAN_CONTEXT_EMAIL_DEVICE] 122 if device.email == "": 123 return None 124 return device.email 125 return None 126 127 def get_challenge(self, *args, **kwargs) -> Challenge: 128 email = self._has_email() 129 return AuthenticatorEmailChallenge( 130 data={ 131 "email": mask_email(email), 132 "email_required": email is None, 133 } 134 ) 135 136 def get_response_instance(self, data: QueryDict) -> ChallengeResponse: 137 response = super().get_response_instance(data) 138 response.device = self.executor.plan.context[PLAN_CONTEXT_EMAIL_DEVICE] 139 return response 140 141 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 142 user = self.get_pending_user() 143 144 stage: AuthenticatorEmailStage = self.executor.current_stage 145 # For the moment we only allow one email device per user 146 if EmailDevice.objects.filter(Q(user=user), stage=stage.pk).exists(): 147 return self.executor.stage_invalid( 148 _("The user already has an email address registered for MFA.") 149 ) 150 if PLAN_CONTEXT_EMAIL_DEVICE not in self.executor.plan.context: 151 device = EmailDevice(user=user, confirmed=False, stage=stage, name="Email Device") 152 valid_secs: int = timedelta_from_string(stage.token_expiry).total_seconds() 153 device.generate_token(valid_secs=valid_secs, commit=False) 154 self.executor.plan.context[PLAN_CONTEXT_EMAIL_DEVICE] = device 155 if email := self._has_email(): 156 device.email = email 157 try: 158 self.validate_and_send(email) 159 except ValidationError as exc: 160 # We had an email given already (at this point only possible from flow 161 # context), but an error occurred while sending (most likely) 162 # due to a duplicate device, so delete the email we got given, reset the state 163 # (ish) and retry 164 device.email = "" 165 self.executor.plan.context.get(PLAN_CONTEXT_PROMPT, {}).pop( 166 PLAN_CONTEXT_EMAIL, None 167 ) 168 self.executor.plan.context.pop(PLAN_CONTEXT_EMAIL_DEVICE, None) 169 self.logger.warning("failed to send email to pre-set address", exc=exc) 170 return self.get(request, *args, **kwargs) 171 return super().get(request, *args, **kwargs) 172 173 def challenge_valid(self, response: ChallengeResponse) -> HttpResponse: 174 """Email Token is validated by challenge""" 175 device: EmailDevice = self.executor.plan.context[PLAN_CONTEXT_EMAIL_DEVICE] 176 if not device.confirmed: 177 return self.challenge_invalid(response) 178 device.save() 179 del self.executor.plan.context[PLAN_CONTEXT_EMAIL_DEVICE] 180 return self.executor.stage_ok()
PLAN_CONTEXT_EMAIL_DEVICE =
'goauthentik.io/stages/authenticator_email/email_device'
PLAN_CONTEXT_EMAIL =
'email'
PLAN_CONTEXT_EMAIL_SENT =
'email_sent'
PLAN_CONTEXT_EMAIL_OVERRIDE =
'email'
36class AuthenticatorEmailChallenge(WithUserInfoChallenge): 37 """Authenticator Email Setup challenge""" 38 39 # Set to true if no previous prompt stage set the email 40 # this stage will also check prompt_data.email 41 email = CharField(default=None, allow_blank=True, allow_null=True) 42 email_required = BooleanField(default=True) 43 component = CharField(default="ak-stage-authenticator-email")
Authenticator Email Setup challenge
46class AuthenticatorEmailChallengeResponse(ChallengeResponse): 47 """Authenticator Email Challenge response, device is set by get_response_instance""" 48 49 device: EmailDevice 50 51 code = CharField(required=False) 52 email = CharField(required=False) 53 54 component = CharField(default="ak-stage-authenticator-email") 55 56 def validate(self, attrs: dict) -> dict: 57 """Check""" 58 if "code" not in attrs: 59 if "email" not in attrs: 60 raise ValidationError("email required") 61 self.device.email = attrs["email"] 62 self.stage.validate_and_send(attrs["email"]) 63 return super().validate(attrs) 64 if not self.device.verify_token(str(attrs["code"])): 65 raise ValidationError(_("Code does not match")) 66 self.device.confirmed = True 67 return super().validate(attrs)
Authenticator Email Challenge response, device is set by get_response_instance
def
validate(self, attrs: dict) -> dict:
56 def validate(self, attrs: dict) -> dict: 57 """Check""" 58 if "code" not in attrs: 59 if "email" not in attrs: 60 raise ValidationError("email required") 61 self.device.email = attrs["email"] 62 self.stage.validate_and_send(attrs["email"]) 63 return super().validate(attrs) 64 if not self.device.verify_token(str(attrs["code"])): 65 raise ValidationError(_("Code does not match")) 66 self.device.confirmed = True 67 return super().validate(attrs)
Check
70class AuthenticatorEmailStageView(ChallengeStageView): 71 """Authenticator Email Setup stage""" 72 73 response_class = AuthenticatorEmailChallengeResponse 74 75 def validate_and_send(self, email: str): 76 """Validate email and send message""" 77 pending_user = self.get_pending_user() 78 79 stage: AuthenticatorEmailStage = self.executor.current_stage 80 if EmailDevice.objects.filter(Q(email=email), stage=stage.pk).exists(): 81 raise ValidationError(_("Invalid email")) 82 83 device: EmailDevice = self.executor.plan.context[PLAN_CONTEXT_EMAIL_DEVICE] 84 85 try: 86 message = TemplateEmailMessage( 87 subject=_(stage.subject), 88 to=[(pending_user.name, email)], 89 language=pending_user.locale(self.request), 90 template_name=stage.template, 91 template_context={ 92 "user": pending_user, 93 "expires": device.valid_until, 94 "token": device.token, 95 }, 96 ) 97 98 send_mails(stage, message) 99 except TemplateSyntaxError as exc: 100 Event.new( 101 EventAction.CONFIGURATION_ERROR, 102 message=_("Exception occurred while rendering E-mail template"), 103 template=stage.template, 104 ).with_exception(exc).from_http(self.request) 105 raise StageInvalidException from exc 106 107 def _has_email(self) -> str | None: 108 context = self.executor.plan.context 109 110 # Check user's email attribute 111 user = self.get_pending_user() 112 if user.email: 113 self.logger.debug("got email from user attributes") 114 return user.email 115 # Check plan context for email 116 if PLAN_CONTEXT_EMAIL in context.get(PLAN_CONTEXT_PROMPT, {}): 117 self.logger.debug("got email from plan context") 118 return context.get(PLAN_CONTEXT_PROMPT, {}).get(PLAN_CONTEXT_EMAIL) 119 # Check device for email 120 if PLAN_CONTEXT_EMAIL_DEVICE in self.executor.plan.context: 121 self.logger.debug("got email from device in session") 122 device: EmailDevice = self.executor.plan.context[PLAN_CONTEXT_EMAIL_DEVICE] 123 if device.email == "": 124 return None 125 return device.email 126 return None 127 128 def get_challenge(self, *args, **kwargs) -> Challenge: 129 email = self._has_email() 130 return AuthenticatorEmailChallenge( 131 data={ 132 "email": mask_email(email), 133 "email_required": email is None, 134 } 135 ) 136 137 def get_response_instance(self, data: QueryDict) -> ChallengeResponse: 138 response = super().get_response_instance(data) 139 response.device = self.executor.plan.context[PLAN_CONTEXT_EMAIL_DEVICE] 140 return response 141 142 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 143 user = self.get_pending_user() 144 145 stage: AuthenticatorEmailStage = self.executor.current_stage 146 # For the moment we only allow one email device per user 147 if EmailDevice.objects.filter(Q(user=user), stage=stage.pk).exists(): 148 return self.executor.stage_invalid( 149 _("The user already has an email address registered for MFA.") 150 ) 151 if PLAN_CONTEXT_EMAIL_DEVICE not in self.executor.plan.context: 152 device = EmailDevice(user=user, confirmed=False, stage=stage, name="Email Device") 153 valid_secs: int = timedelta_from_string(stage.token_expiry).total_seconds() 154 device.generate_token(valid_secs=valid_secs, commit=False) 155 self.executor.plan.context[PLAN_CONTEXT_EMAIL_DEVICE] = device 156 if email := self._has_email(): 157 device.email = email 158 try: 159 self.validate_and_send(email) 160 except ValidationError as exc: 161 # We had an email given already (at this point only possible from flow 162 # context), but an error occurred while sending (most likely) 163 # due to a duplicate device, so delete the email we got given, reset the state 164 # (ish) and retry 165 device.email = "" 166 self.executor.plan.context.get(PLAN_CONTEXT_PROMPT, {}).pop( 167 PLAN_CONTEXT_EMAIL, None 168 ) 169 self.executor.plan.context.pop(PLAN_CONTEXT_EMAIL_DEVICE, None) 170 self.logger.warning("failed to send email to pre-set address", exc=exc) 171 return self.get(request, *args, **kwargs) 172 return super().get(request, *args, **kwargs) 173 174 def challenge_valid(self, response: ChallengeResponse) -> HttpResponse: 175 """Email Token is validated by challenge""" 176 device: EmailDevice = self.executor.plan.context[PLAN_CONTEXT_EMAIL_DEVICE] 177 if not device.confirmed: 178 return self.challenge_invalid(response) 179 device.save() 180 del self.executor.plan.context[PLAN_CONTEXT_EMAIL_DEVICE] 181 return self.executor.stage_ok()
Authenticator Email Setup stage
response_class =
<class 'AuthenticatorEmailChallengeResponse'>
def
validate_and_send(self, email: str):
75 def validate_and_send(self, email: str): 76 """Validate email and send message""" 77 pending_user = self.get_pending_user() 78 79 stage: AuthenticatorEmailStage = self.executor.current_stage 80 if EmailDevice.objects.filter(Q(email=email), stage=stage.pk).exists(): 81 raise ValidationError(_("Invalid email")) 82 83 device: EmailDevice = self.executor.plan.context[PLAN_CONTEXT_EMAIL_DEVICE] 84 85 try: 86 message = TemplateEmailMessage( 87 subject=_(stage.subject), 88 to=[(pending_user.name, email)], 89 language=pending_user.locale(self.request), 90 template_name=stage.template, 91 template_context={ 92 "user": pending_user, 93 "expires": device.valid_until, 94 "token": device.token, 95 }, 96 ) 97 98 send_mails(stage, message) 99 except TemplateSyntaxError as exc: 100 Event.new( 101 EventAction.CONFIGURATION_ERROR, 102 message=_("Exception occurred while rendering E-mail template"), 103 template=stage.template, 104 ).with_exception(exc).from_http(self.request) 105 raise StageInvalidException from exc
Validate email and send message
128 def get_challenge(self, *args, **kwargs) -> Challenge: 129 email = self._has_email() 130 return AuthenticatorEmailChallenge( 131 data={ 132 "email": mask_email(email), 133 "email_required": email is None, 134 } 135 )
Return the challenge that the client should solve
def
get_response_instance( self, data: django.http.request.QueryDict) -> authentik.flows.challenge.ChallengeResponse:
137 def get_response_instance(self, data: QueryDict) -> ChallengeResponse: 138 response = super().get_response_instance(data) 139 response.device = self.executor.plan.context[PLAN_CONTEXT_EMAIL_DEVICE] 140 return response
Return the response class type
def
get( self, request: django.http.request.HttpRequest, *args, **kwargs) -> django.http.response.HttpResponse:
142 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 143 user = self.get_pending_user() 144 145 stage: AuthenticatorEmailStage = self.executor.current_stage 146 # For the moment we only allow one email device per user 147 if EmailDevice.objects.filter(Q(user=user), stage=stage.pk).exists(): 148 return self.executor.stage_invalid( 149 _("The user already has an email address registered for MFA.") 150 ) 151 if PLAN_CONTEXT_EMAIL_DEVICE not in self.executor.plan.context: 152 device = EmailDevice(user=user, confirmed=False, stage=stage, name="Email Device") 153 valid_secs: int = timedelta_from_string(stage.token_expiry).total_seconds() 154 device.generate_token(valid_secs=valid_secs, commit=False) 155 self.executor.plan.context[PLAN_CONTEXT_EMAIL_DEVICE] = device 156 if email := self._has_email(): 157 device.email = email 158 try: 159 self.validate_and_send(email) 160 except ValidationError as exc: 161 # We had an email given already (at this point only possible from flow 162 # context), but an error occurred while sending (most likely) 163 # due to a duplicate device, so delete the email we got given, reset the state 164 # (ish) and retry 165 device.email = "" 166 self.executor.plan.context.get(PLAN_CONTEXT_PROMPT, {}).pop( 167 PLAN_CONTEXT_EMAIL, None 168 ) 169 self.executor.plan.context.pop(PLAN_CONTEXT_EMAIL_DEVICE, None) 170 self.logger.warning("failed to send email to pre-set address", exc=exc) 171 return self.get(request, *args, **kwargs) 172 return super().get(request, *args, **kwargs)
Return a challenge for the frontend to solve
def
challenge_valid( self, response: authentik.flows.challenge.ChallengeResponse) -> django.http.response.HttpResponse:
174 def challenge_valid(self, response: ChallengeResponse) -> HttpResponse: 175 """Email Token is validated by challenge""" 176 device: EmailDevice = self.executor.plan.context[PLAN_CONTEXT_EMAIL_DEVICE] 177 if not device.confirmed: 178 return self.challenge_invalid(response) 179 device.save() 180 del self.executor.plan.context[PLAN_CONTEXT_EMAIL_DEVICE] 181 return self.executor.stage_ok()
Email Token is validated by challenge