authentik.stages.email.stage
authentik multi-stage authentication engine
1"""authentik multi-stage authentication engine""" 2 3import math 4from datetime import UTC, datetime, timedelta 5from hashlib import sha256 6from uuid import uuid4 7 8from django.contrib import messages 9from django.core.cache import cache 10from django.http import HttpRequest, HttpResponse 11from django.http.request import QueryDict 12from django.template.exceptions import TemplateSyntaxError 13from django.urls import reverse 14from django.utils.text import slugify 15from django.utils.timezone import now 16from django.utils.translation import gettext as _ 17from rest_framework.fields import CharField 18from rest_framework.serializers import ValidationError 19 20from authentik.events.models import Event, EventAction 21from authentik.flows.challenge import Challenge, ChallengeResponse 22from authentik.flows.exceptions import StageInvalidException 23from authentik.flows.models import FlowDesignation, FlowToken 24from authentik.flows.planner import PLAN_CONTEXT_IS_RESTORED, PLAN_CONTEXT_PENDING_USER 25from authentik.flows.stage import ChallengeStageView 26from authentik.flows.views.executor import QS_KEY_TOKEN, QS_QUERY 27from authentik.lib.utils.time import timedelta_from_string 28from authentik.stages.email.flow import pickle_flow_token_for_email 29from authentik.stages.email.models import EmailStage 30from authentik.stages.email.tasks import send_mails 31from authentik.stages.email.utils import TemplateEmailMessage 32 33EMAIL_RECOVERY_CACHE_KEY = "goauthentik.io/stages/email/stage/" 34 35PLAN_CONTEXT_EMAIL_SENT = "email_sent" 36PLAN_CONTEXT_EMAIL_OVERRIDE = "email" 37 38 39class EmailChallenge(Challenge): 40 """Email challenge""" 41 42 component = CharField(default="ak-stage-email") 43 44 45class EmailChallengeResponse(ChallengeResponse): 46 """Email challenge resposen. No fields. This challenge is 47 always declared invalid to give the user a chance to retry""" 48 49 component = CharField(default="ak-stage-email") 50 51 def validate(self, attrs): 52 raise ValidationError(detail="email-sent", code="email-sent") 53 54 55class EmailStageView(ChallengeStageView): 56 """Email stage which sends Email for verification""" 57 58 response_class = EmailChallengeResponse 59 60 def get_full_url(self, **kwargs) -> str: 61 """Get full URL to be used in template""" 62 base_url = reverse( 63 "authentik_core:if-flow", 64 kwargs={"flow_slug": self.executor.flow.slug}, 65 ) 66 # Parse query string from current URL (full query string) 67 # this view is only run within a flow executor, where we need to get the query string 68 # from the query= parameter (double encoded); but for the redirect 69 # we need to expand it since it'll go through the flow interface 70 query_params = QueryDict(self.request.GET.get(QS_QUERY), mutable=True) 71 query_params.pop(QS_KEY_TOKEN, None) 72 query_params.update(kwargs) 73 full_url = base_url 74 if len(query_params) > 0: 75 full_url = f"{full_url}?{query_params.urlencode()}" 76 return self.request.build_absolute_uri(full_url) 77 78 def get_token(self) -> FlowToken: 79 """Get token""" 80 pending_user = self.get_pending_user() 81 current_stage: EmailStage = self.executor.current_stage 82 valid_delta = timedelta_from_string(current_stage.token_expiry) + timedelta( 83 minutes=1 84 ) # + 1 because django timesince always rounds down 85 identifier = slugify(f"ak-email-stage-{current_stage.name}-{str(uuid4())}") 86 # Don't check for validity here, we only care if the token exists 87 tokens = FlowToken.objects.filter(identifier=identifier) 88 if not tokens.exists(): 89 return FlowToken.objects.create( 90 expires=now() + valid_delta, 91 user=pending_user, 92 identifier=identifier, 93 flow=self.executor.flow, 94 _plan=pickle_flow_token_for_email(self.executor.plan), 95 revoke_on_execution=False, 96 ) 97 token = tokens.first() 98 # Check if token is expired and rotate key if so 99 if token.is_expired: 100 token.expire_action() 101 return token 102 103 def send_email(self): 104 """Helper function that sends the actual email. Implies that you've 105 already checked that there is a pending user.""" 106 pending_user = self.get_pending_user() 107 if not pending_user.pk and self.executor.flow.designation == FlowDesignation.RECOVERY: 108 # Pending user does not have a primary key, and we're in a recovery flow, 109 # which means the user entered an invalid identifier, so we pretend to send the 110 # email, to not disclose if the user exists 111 return 112 email = self.executor.plan.context.get(PLAN_CONTEXT_EMAIL_OVERRIDE, None) 113 if not email: 114 email = pending_user.email 115 current_stage: EmailStage = self.executor.current_stage 116 token = self.get_token() 117 # Send mail to user 118 try: 119 message = TemplateEmailMessage( 120 subject=_(current_stage.subject), 121 to=[(pending_user.name, email)], 122 language=pending_user.locale(self.request), 123 template_name=current_stage.template, 124 template_context={ 125 "url": self.get_full_url(**{QS_KEY_TOKEN: token.key}), 126 "user": pending_user, 127 "expires": token.expires, 128 "token": token.key, 129 }, 130 ) 131 send_mails(current_stage, message) 132 except TemplateSyntaxError as exc: 133 Event.new( 134 EventAction.CONFIGURATION_ERROR, 135 message=_("Exception occurred while rendering E-mail template"), 136 template=current_stage.template, 137 ).with_exception(exc).from_http(self.request) 138 raise StageInvalidException from exc 139 140 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 141 # Check if the user came back from the email link to verify 142 restore_token: FlowToken = self.executor.plan.context.get(PLAN_CONTEXT_IS_RESTORED, None) 143 user = self.get_pending_user() 144 if restore_token: 145 if restore_token.user != user: 146 self.logger.warning("Flow token for non-matching user, denying request") 147 return self.executor.stage_invalid() 148 messages.success(request, _("Successfully verified Email.")) 149 if self.executor.current_stage.activate_user_on_success: 150 user.is_active = True 151 user.save(update_fields=["is_active"]) 152 return self.executor.stage_ok() 153 if PLAN_CONTEXT_PENDING_USER not in self.executor.plan.context: 154 self.logger.debug("No pending user") 155 messages.error(self.request, _("No pending user.")) 156 return self.executor.stage_invalid() 157 # Check if we've already sent the initial e-mail 158 if PLAN_CONTEXT_EMAIL_SENT not in self.executor.plan.context: 159 try: 160 self.send_email() 161 except StageInvalidException as exc: 162 self.logger.debug("Got StageInvalidException", exc=exc) 163 return self.executor.stage_invalid() 164 self.executor.plan.context[PLAN_CONTEXT_EMAIL_SENT] = True 165 return super().get(request, *args, **kwargs) 166 167 def get_challenge(self) -> Challenge: 168 challenge = EmailChallenge( 169 data={ 170 "title": _("Email sent."), 171 } 172 ) 173 return challenge 174 175 def challenge_valid(self, response: ChallengeResponse) -> HttpResponse: 176 return super().challenge_invalid(response) 177 178 def _get_cache_key(self) -> str: 179 """Return the cache key used for rate limiting email recovery attempts.""" 180 user = self.get_pending_user() 181 user_email_hashed = sha256(user.email.lower().encode("utf-8")).hexdigest() 182 return EMAIL_RECOVERY_CACHE_KEY + user_email_hashed 183 184 def _is_rate_limited(self) -> int | None: 185 """Check whether the email recovery attempt should be rate limited. 186 187 If the request should be rate limited, update the cache and return the 188 remaining time in minutes before the user is allowed to try again. 189 Otherwise, return None.""" 190 cache_key = self._get_cache_key() 191 attempts = cache.get(cache_key, []) 192 193 stage = self.executor.current_stage 194 stage.refresh_from_db() 195 max_attempts = stage.recovery_max_attempts 196 cache_timeout_delta = timedelta_from_string(stage.recovery_cache_timeout) 197 198 _now = now() 199 start_window = _now - cache_timeout_delta 200 201 # Convert unix timestamps to datetime objects for comparison 202 recent_attempts_in_window = [ 203 datetime.fromtimestamp(attempt, UTC) 204 for attempt in attempts 205 if datetime.fromtimestamp(attempt, UTC) > start_window 206 ] 207 208 if len(recent_attempts_in_window) >= max_attempts: 209 retry_after = (min(recent_attempts_in_window) + cache_timeout_delta) - _now 210 minutes_left = max(1, math.ceil(retry_after.total_seconds() / 60)) 211 return minutes_left 212 213 recent_attempts_in_window.append(_now) 214 215 # Convert datetime objects back to unix timestamps to update cache 216 recent_attempts_in_window = [attempt.timestamp() for attempt in recent_attempts_in_window] 217 218 cache.set( 219 cache_key, 220 recent_attempts_in_window, 221 int(cache_timeout_delta.total_seconds()), 222 ) 223 224 return None 225 226 def challenge_invalid(self, response: ChallengeResponse) -> HttpResponse: 227 if minutes_left := self._is_rate_limited(): 228 error = _( 229 "Too many account verification attempts. Please try again after {minutes} minutes." 230 ).format(minutes=minutes_left) 231 messages.error(self.request, error) 232 return super().challenge_invalid(response) 233 234 if PLAN_CONTEXT_PENDING_USER not in self.executor.plan.context: 235 messages.error(self.request, _("No pending user.")) 236 return super().challenge_invalid(response) 237 238 self.send_email() 239 messages.success(self.request, _("Email Successfully sent.")) 240 # We can't call stage_ok yet, as we're still waiting 241 # for the user to click the link in the email 242 return super().challenge_invalid(response)
EMAIL_RECOVERY_CACHE_KEY =
'goauthentik.io/stages/email/stage/'
PLAN_CONTEXT_EMAIL_SENT =
'email_sent'
PLAN_CONTEXT_EMAIL_OVERRIDE =
'email'
40class EmailChallenge(Challenge): 41 """Email challenge""" 42 43 component = CharField(default="ak-stage-email")
Email challenge
46class EmailChallengeResponse(ChallengeResponse): 47 """Email challenge resposen. No fields. This challenge is 48 always declared invalid to give the user a chance to retry""" 49 50 component = CharField(default="ak-stage-email") 51 52 def validate(self, attrs): 53 raise ValidationError(detail="email-sent", code="email-sent")
Email challenge resposen. No fields. This challenge is always declared invalid to give the user a chance to retry
56class EmailStageView(ChallengeStageView): 57 """Email stage which sends Email for verification""" 58 59 response_class = EmailChallengeResponse 60 61 def get_full_url(self, **kwargs) -> str: 62 """Get full URL to be used in template""" 63 base_url = reverse( 64 "authentik_core:if-flow", 65 kwargs={"flow_slug": self.executor.flow.slug}, 66 ) 67 # Parse query string from current URL (full query string) 68 # this view is only run within a flow executor, where we need to get the query string 69 # from the query= parameter (double encoded); but for the redirect 70 # we need to expand it since it'll go through the flow interface 71 query_params = QueryDict(self.request.GET.get(QS_QUERY), mutable=True) 72 query_params.pop(QS_KEY_TOKEN, None) 73 query_params.update(kwargs) 74 full_url = base_url 75 if len(query_params) > 0: 76 full_url = f"{full_url}?{query_params.urlencode()}" 77 return self.request.build_absolute_uri(full_url) 78 79 def get_token(self) -> FlowToken: 80 """Get token""" 81 pending_user = self.get_pending_user() 82 current_stage: EmailStage = self.executor.current_stage 83 valid_delta = timedelta_from_string(current_stage.token_expiry) + timedelta( 84 minutes=1 85 ) # + 1 because django timesince always rounds down 86 identifier = slugify(f"ak-email-stage-{current_stage.name}-{str(uuid4())}") 87 # Don't check for validity here, we only care if the token exists 88 tokens = FlowToken.objects.filter(identifier=identifier) 89 if not tokens.exists(): 90 return FlowToken.objects.create( 91 expires=now() + valid_delta, 92 user=pending_user, 93 identifier=identifier, 94 flow=self.executor.flow, 95 _plan=pickle_flow_token_for_email(self.executor.plan), 96 revoke_on_execution=False, 97 ) 98 token = tokens.first() 99 # Check if token is expired and rotate key if so 100 if token.is_expired: 101 token.expire_action() 102 return token 103 104 def send_email(self): 105 """Helper function that sends the actual email. Implies that you've 106 already checked that there is a pending user.""" 107 pending_user = self.get_pending_user() 108 if not pending_user.pk and self.executor.flow.designation == FlowDesignation.RECOVERY: 109 # Pending user does not have a primary key, and we're in a recovery flow, 110 # which means the user entered an invalid identifier, so we pretend to send the 111 # email, to not disclose if the user exists 112 return 113 email = self.executor.plan.context.get(PLAN_CONTEXT_EMAIL_OVERRIDE, None) 114 if not email: 115 email = pending_user.email 116 current_stage: EmailStage = self.executor.current_stage 117 token = self.get_token() 118 # Send mail to user 119 try: 120 message = TemplateEmailMessage( 121 subject=_(current_stage.subject), 122 to=[(pending_user.name, email)], 123 language=pending_user.locale(self.request), 124 template_name=current_stage.template, 125 template_context={ 126 "url": self.get_full_url(**{QS_KEY_TOKEN: token.key}), 127 "user": pending_user, 128 "expires": token.expires, 129 "token": token.key, 130 }, 131 ) 132 send_mails(current_stage, message) 133 except TemplateSyntaxError as exc: 134 Event.new( 135 EventAction.CONFIGURATION_ERROR, 136 message=_("Exception occurred while rendering E-mail template"), 137 template=current_stage.template, 138 ).with_exception(exc).from_http(self.request) 139 raise StageInvalidException from exc 140 141 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 142 # Check if the user came back from the email link to verify 143 restore_token: FlowToken = self.executor.plan.context.get(PLAN_CONTEXT_IS_RESTORED, None) 144 user = self.get_pending_user() 145 if restore_token: 146 if restore_token.user != user: 147 self.logger.warning("Flow token for non-matching user, denying request") 148 return self.executor.stage_invalid() 149 messages.success(request, _("Successfully verified Email.")) 150 if self.executor.current_stage.activate_user_on_success: 151 user.is_active = True 152 user.save(update_fields=["is_active"]) 153 return self.executor.stage_ok() 154 if PLAN_CONTEXT_PENDING_USER not in self.executor.plan.context: 155 self.logger.debug("No pending user") 156 messages.error(self.request, _("No pending user.")) 157 return self.executor.stage_invalid() 158 # Check if we've already sent the initial e-mail 159 if PLAN_CONTEXT_EMAIL_SENT not in self.executor.plan.context: 160 try: 161 self.send_email() 162 except StageInvalidException as exc: 163 self.logger.debug("Got StageInvalidException", exc=exc) 164 return self.executor.stage_invalid() 165 self.executor.plan.context[PLAN_CONTEXT_EMAIL_SENT] = True 166 return super().get(request, *args, **kwargs) 167 168 def get_challenge(self) -> Challenge: 169 challenge = EmailChallenge( 170 data={ 171 "title": _("Email sent."), 172 } 173 ) 174 return challenge 175 176 def challenge_valid(self, response: ChallengeResponse) -> HttpResponse: 177 return super().challenge_invalid(response) 178 179 def _get_cache_key(self) -> str: 180 """Return the cache key used for rate limiting email recovery attempts.""" 181 user = self.get_pending_user() 182 user_email_hashed = sha256(user.email.lower().encode("utf-8")).hexdigest() 183 return EMAIL_RECOVERY_CACHE_KEY + user_email_hashed 184 185 def _is_rate_limited(self) -> int | None: 186 """Check whether the email recovery attempt should be rate limited. 187 188 If the request should be rate limited, update the cache and return the 189 remaining time in minutes before the user is allowed to try again. 190 Otherwise, return None.""" 191 cache_key = self._get_cache_key() 192 attempts = cache.get(cache_key, []) 193 194 stage = self.executor.current_stage 195 stage.refresh_from_db() 196 max_attempts = stage.recovery_max_attempts 197 cache_timeout_delta = timedelta_from_string(stage.recovery_cache_timeout) 198 199 _now = now() 200 start_window = _now - cache_timeout_delta 201 202 # Convert unix timestamps to datetime objects for comparison 203 recent_attempts_in_window = [ 204 datetime.fromtimestamp(attempt, UTC) 205 for attempt in attempts 206 if datetime.fromtimestamp(attempt, UTC) > start_window 207 ] 208 209 if len(recent_attempts_in_window) >= max_attempts: 210 retry_after = (min(recent_attempts_in_window) + cache_timeout_delta) - _now 211 minutes_left = max(1, math.ceil(retry_after.total_seconds() / 60)) 212 return minutes_left 213 214 recent_attempts_in_window.append(_now) 215 216 # Convert datetime objects back to unix timestamps to update cache 217 recent_attempts_in_window = [attempt.timestamp() for attempt in recent_attempts_in_window] 218 219 cache.set( 220 cache_key, 221 recent_attempts_in_window, 222 int(cache_timeout_delta.total_seconds()), 223 ) 224 225 return None 226 227 def challenge_invalid(self, response: ChallengeResponse) -> HttpResponse: 228 if minutes_left := self._is_rate_limited(): 229 error = _( 230 "Too many account verification attempts. Please try again after {minutes} minutes." 231 ).format(minutes=minutes_left) 232 messages.error(self.request, error) 233 return super().challenge_invalid(response) 234 235 if PLAN_CONTEXT_PENDING_USER not in self.executor.plan.context: 236 messages.error(self.request, _("No pending user.")) 237 return super().challenge_invalid(response) 238 239 self.send_email() 240 messages.success(self.request, _("Email Successfully sent.")) 241 # We can't call stage_ok yet, as we're still waiting 242 # for the user to click the link in the email 243 return super().challenge_invalid(response)
Email stage which sends Email for verification
response_class =
<class 'EmailChallengeResponse'>
def
get_full_url(self, **kwargs) -> str:
61 def get_full_url(self, **kwargs) -> str: 62 """Get full URL to be used in template""" 63 base_url = reverse( 64 "authentik_core:if-flow", 65 kwargs={"flow_slug": self.executor.flow.slug}, 66 ) 67 # Parse query string from current URL (full query string) 68 # this view is only run within a flow executor, where we need to get the query string 69 # from the query= parameter (double encoded); but for the redirect 70 # we need to expand it since it'll go through the flow interface 71 query_params = QueryDict(self.request.GET.get(QS_QUERY), mutable=True) 72 query_params.pop(QS_KEY_TOKEN, None) 73 query_params.update(kwargs) 74 full_url = base_url 75 if len(query_params) > 0: 76 full_url = f"{full_url}?{query_params.urlencode()}" 77 return self.request.build_absolute_uri(full_url)
Get full URL to be used in template
79 def get_token(self) -> FlowToken: 80 """Get token""" 81 pending_user = self.get_pending_user() 82 current_stage: EmailStage = self.executor.current_stage 83 valid_delta = timedelta_from_string(current_stage.token_expiry) + timedelta( 84 minutes=1 85 ) # + 1 because django timesince always rounds down 86 identifier = slugify(f"ak-email-stage-{current_stage.name}-{str(uuid4())}") 87 # Don't check for validity here, we only care if the token exists 88 tokens = FlowToken.objects.filter(identifier=identifier) 89 if not tokens.exists(): 90 return FlowToken.objects.create( 91 expires=now() + valid_delta, 92 user=pending_user, 93 identifier=identifier, 94 flow=self.executor.flow, 95 _plan=pickle_flow_token_for_email(self.executor.plan), 96 revoke_on_execution=False, 97 ) 98 token = tokens.first() 99 # Check if token is expired and rotate key if so 100 if token.is_expired: 101 token.expire_action() 102 return token
Get token
def
send_email(self):
104 def send_email(self): 105 """Helper function that sends the actual email. Implies that you've 106 already checked that there is a pending user.""" 107 pending_user = self.get_pending_user() 108 if not pending_user.pk and self.executor.flow.designation == FlowDesignation.RECOVERY: 109 # Pending user does not have a primary key, and we're in a recovery flow, 110 # which means the user entered an invalid identifier, so we pretend to send the 111 # email, to not disclose if the user exists 112 return 113 email = self.executor.plan.context.get(PLAN_CONTEXT_EMAIL_OVERRIDE, None) 114 if not email: 115 email = pending_user.email 116 current_stage: EmailStage = self.executor.current_stage 117 token = self.get_token() 118 # Send mail to user 119 try: 120 message = TemplateEmailMessage( 121 subject=_(current_stage.subject), 122 to=[(pending_user.name, email)], 123 language=pending_user.locale(self.request), 124 template_name=current_stage.template, 125 template_context={ 126 "url": self.get_full_url(**{QS_KEY_TOKEN: token.key}), 127 "user": pending_user, 128 "expires": token.expires, 129 "token": token.key, 130 }, 131 ) 132 send_mails(current_stage, message) 133 except TemplateSyntaxError as exc: 134 Event.new( 135 EventAction.CONFIGURATION_ERROR, 136 message=_("Exception occurred while rendering E-mail template"), 137 template=current_stage.template, 138 ).with_exception(exc).from_http(self.request) 139 raise StageInvalidException from exc
Helper function that sends the actual email. Implies that you've already checked that there is a pending user.
def
get( self, request: django.http.request.HttpRequest, *args, **kwargs) -> django.http.response.HttpResponse:
141 def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse: 142 # Check if the user came back from the email link to verify 143 restore_token: FlowToken = self.executor.plan.context.get(PLAN_CONTEXT_IS_RESTORED, None) 144 user = self.get_pending_user() 145 if restore_token: 146 if restore_token.user != user: 147 self.logger.warning("Flow token for non-matching user, denying request") 148 return self.executor.stage_invalid() 149 messages.success(request, _("Successfully verified Email.")) 150 if self.executor.current_stage.activate_user_on_success: 151 user.is_active = True 152 user.save(update_fields=["is_active"]) 153 return self.executor.stage_ok() 154 if PLAN_CONTEXT_PENDING_USER not in self.executor.plan.context: 155 self.logger.debug("No pending user") 156 messages.error(self.request, _("No pending user.")) 157 return self.executor.stage_invalid() 158 # Check if we've already sent the initial e-mail 159 if PLAN_CONTEXT_EMAIL_SENT not in self.executor.plan.context: 160 try: 161 self.send_email() 162 except StageInvalidException as exc: 163 self.logger.debug("Got StageInvalidException", exc=exc) 164 return self.executor.stage_invalid() 165 self.executor.plan.context[PLAN_CONTEXT_EMAIL_SENT] = True 166 return super().get(request, *args, **kwargs)
Return a challenge for the frontend to solve
168 def get_challenge(self) -> Challenge: 169 challenge = EmailChallenge( 170 data={ 171 "title": _("Email sent."), 172 } 173 ) 174 return challenge
Return the challenge that the client should solve
def
challenge_valid( self, response: authentik.flows.challenge.ChallengeResponse) -> django.http.response.HttpResponse:
176 def challenge_valid(self, response: ChallengeResponse) -> HttpResponse: 177 return super().challenge_invalid(response)
Callback when the challenge has the correct format
def
challenge_invalid( self, response: authentik.flows.challenge.ChallengeResponse) -> django.http.response.HttpResponse:
227 def challenge_invalid(self, response: ChallengeResponse) -> HttpResponse: 228 if minutes_left := self._is_rate_limited(): 229 error = _( 230 "Too many account verification attempts. Please try again after {minutes} minutes." 231 ).format(minutes=minutes_left) 232 messages.error(self.request, error) 233 return super().challenge_invalid(response) 234 235 if PLAN_CONTEXT_PENDING_USER not in self.executor.plan.context: 236 messages.error(self.request, _("No pending user.")) 237 return super().challenge_invalid(response) 238 239 self.send_email() 240 messages.success(self.request, _("Email Successfully sent.")) 241 # We can't call stage_ok yet, as we're still waiting 242 # for the user to click the link in the email 243 return super().challenge_invalid(response)
Callback when the challenge has the incorrect format