authentik.stages.email.tests.test_stage
email tests
1"""email tests""" 2 3from hashlib import sha256 4from unittest.mock import MagicMock, PropertyMock, patch 5 6from django.contrib import messages 7from django.core import mail 8from django.core.mail.backends.locmem import EmailBackend 9from django.core.mail.backends.smtp import EmailBackend as SMTPEmailBackend 10from django.urls import reverse 11from django.utils.http import urlencode 12 13from authentik.brands.models import Brand 14from authentik.core.tests.utils import RequestFactory, create_test_admin_user, create_test_flow 15from authentik.flows.markers import StageMarker 16from authentik.flows.models import FlowDesignation, FlowStageBinding, FlowToken 17from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER, FlowPlan 18from authentik.flows.tests import FlowTestCase 19from authentik.flows.views.executor import QS_KEY_TOKEN, SESSION_KEY_PLAN, FlowExecutorView 20from authentik.lib.config import CONFIG 21from authentik.lib.generators import generate_id 22from authentik.stages.consent.stage import PLAN_CONTEXT_CONSENT_TOKEN 23from authentik.stages.email.models import EmailStage 24from authentik.stages.email.stage import PLAN_CONTEXT_EMAIL_OVERRIDE, EmailStageView 25 26 27class TestEmailStage(FlowTestCase): 28 """Email tests""" 29 30 def setUp(self): 31 super().setUp() 32 self.factory = RequestFactory() 33 self.user = create_test_admin_user() 34 self.flow = create_test_flow(FlowDesignation.AUTHENTICATION) 35 self.stage = EmailStage.objects.create( 36 name="email", 37 activate_user_on_success=True, 38 ) 39 self.binding = FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=2) 40 41 @patch( 42 "authentik.stages.email.models.EmailStage.backend_class", 43 PropertyMock(return_value=EmailBackend), 44 ) 45 def test_rendering(self): 46 """Test with pending user""" 47 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 48 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 49 session = self.client.session 50 session[SESSION_KEY_PLAN] = plan 51 session.save() 52 53 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 54 response = self.client.get(url) 55 self.assertEqual(response.status_code, 200) 56 57 @patch( 58 "authentik.stages.email.models.EmailStage.backend_class", 59 PropertyMock(return_value=EmailBackend), 60 ) 61 def test_rendering_locale(self): 62 """Test with pending user""" 63 self.user.attributes = {"settings": {"locale": "de"}} 64 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 65 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 66 session = self.client.session 67 session[SESSION_KEY_PLAN] = plan 68 session.save() 69 70 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 71 response = self.client.get(url) 72 self.assertEqual(response.status_code, 200) 73 self.assertEqual(len(mail.outbox), 1) 74 self.assertEqual(mail.outbox[0].subject, "authentik") 75 self.assertNotIn("Password Reset", mail.outbox[0].alternatives[0][0]) 76 77 def test_without_user(self): 78 """Test without pending user""" 79 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 80 session = self.client.session 81 session[SESSION_KEY_PLAN] = plan 82 session.save() 83 84 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 85 response = self.client.get(url) 86 self.assertEqual(response.status_code, 200) 87 88 @patch( 89 "authentik.stages.email.models.EmailStage.backend_class", 90 PropertyMock(return_value=EmailBackend), 91 ) 92 def test_pending_user(self): 93 """Test with pending user""" 94 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 95 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 96 session = self.client.session 97 session[SESSION_KEY_PLAN] = plan 98 session.save() 99 100 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 101 response = self.client.post(url) 102 self.assertEqual(response.status_code, 200) 103 self.assertEqual(len(mail.outbox), 1) 104 self.assertEqual(mail.outbox[0].subject, "authentik") 105 self.assertEqual(mail.outbox[0].to, [f"{self.user.name} <{self.user.email}>"]) 106 107 @patch( 108 "authentik.stages.email.models.EmailStage.backend_class", 109 PropertyMock(return_value=EmailBackend), 110 ) 111 def test_pending_user_override(self): 112 """Test with pending user (override to)""" 113 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 114 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 115 plan.context[PLAN_CONTEXT_EMAIL_OVERRIDE] = "foo@bar.baz" 116 session = self.client.session 117 session[SESSION_KEY_PLAN] = plan 118 session.save() 119 120 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 121 response = self.client.post(url) 122 self.assertEqual(response.status_code, 200) 123 self.assertEqual(len(mail.outbox), 1) 124 self.assertEqual(mail.outbox[0].subject, "authentik") 125 self.assertEqual(mail.outbox[0].to, [f"{self.user.name} <foo@bar.baz>"]) 126 127 @patch( 128 "authentik.stages.email.models.EmailStage.backend_class", 129 PropertyMock(return_value=SMTPEmailBackend), 130 ) 131 def test_use_global_settings(self): 132 """Test use_global_settings""" 133 host = "some-unique-string" 134 with CONFIG.patch("email.host", host): 135 self.assertEqual(EmailStage(use_global_settings=True).backend.host, host) 136 137 def test_token(self): 138 """Test with token""" 139 # Make sure token exists 140 self.test_pending_user() 141 self.user.is_active = False 142 self.user.save() 143 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 144 session = self.client.session 145 session[SESSION_KEY_PLAN] = plan 146 session.save() 147 token: FlowToken = FlowToken.objects.get(user=self.user) 148 149 with patch("authentik.flows.views.executor.FlowExecutorView.cancel", MagicMock()): 150 # Call the executor shell to preseed the session 151 url = reverse( 152 "authentik_api:flow-executor", 153 kwargs={"flow_slug": self.flow.slug}, 154 ) 155 url_query = urlencode( 156 { 157 QS_KEY_TOKEN: token.key, 158 } 159 ) 160 url += f"?query={url_query}" 161 self.client.get(url) 162 163 # Call the actual executor to get the JSON Response 164 response = self.client.get( 165 reverse( 166 "authentik_api:flow-executor", 167 kwargs={"flow_slug": self.flow.slug}, 168 ) 169 ) 170 self.assertStageResponse(response, self.flow, component="ak-stage-consent") 171 response = self.client.post( 172 reverse( 173 "authentik_api:flow-executor", 174 kwargs={"flow_slug": self.flow.slug}, 175 ), 176 data={ 177 "token": self.get_flow_plan().context[PLAN_CONTEXT_CONSENT_TOKEN], 178 }, 179 follow=True, 180 ) 181 182 self.assertEqual(response.status_code, 200) 183 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 184 185 session = self.client.session 186 plan: FlowPlan = session[SESSION_KEY_PLAN] 187 self.assertEqual(plan.context[PLAN_CONTEXT_PENDING_USER], self.user) 188 self.assertTrue(plan.context[PLAN_CONTEXT_PENDING_USER].is_active) 189 190 def test_token_invalid_user(self): 191 """Test with token with invalid user""" 192 # Make sure token exists 193 self.test_pending_user() 194 self.user.is_active = False 195 self.user.save() 196 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 197 session = self.client.session 198 session[SESSION_KEY_PLAN] = plan 199 session.save() 200 # Set flow token user to a different user 201 token: FlowToken = FlowToken.objects.get(user=self.user) 202 token.user = create_test_admin_user() 203 token.revoke_on_execution = True 204 token.save() 205 206 with patch("authentik.flows.views.executor.FlowExecutorView.cancel", MagicMock()): 207 # Call the executor shell to preseed the session 208 url = reverse( 209 "authentik_api:flow-executor", 210 kwargs={"flow_slug": self.flow.slug}, 211 ) 212 url_query = urlencode( 213 { 214 QS_KEY_TOKEN: token.key, 215 } 216 ) 217 url += f"?query={url_query}" 218 self.client.get(url) 219 220 # Call the actual executor to get the JSON Response 221 response = self.client.get( 222 reverse( 223 "authentik_api:flow-executor", 224 kwargs={"flow_slug": self.flow.slug}, 225 ) 226 ) 227 228 self.assertEqual(response.status_code, 200) 229 self.assertStageResponse(response, component="ak-stage-access-denied") 230 231 def test_url_no_params(self): 232 """Test generation of the URL in the EMail""" 233 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 234 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 235 session = self.client.session 236 session[SESSION_KEY_PLAN] = plan 237 session.save() 238 239 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 240 request = self.factory.get(url) 241 stage_view = EmailStageView( 242 FlowExecutorView( 243 request=request, 244 flow=self.flow, 245 ), 246 request=request, 247 ) 248 self.assertEqual(stage_view.get_full_url(), f"http://testserver/if/flow/{self.flow.slug}/") 249 250 def test_url_our_params(self): 251 """Test that all of our parameters are passed to the URL correctly""" 252 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 253 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 254 session = self.client.session 255 session[SESSION_KEY_PLAN] = plan 256 session.save() 257 258 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 259 request = self.factory.get(url) 260 stage_view = EmailStageView( 261 FlowExecutorView( 262 request=request, 263 flow=self.flow, 264 ), 265 request=request, 266 ) 267 token = generate_id() 268 self.assertEqual( 269 stage_view.get_full_url(**{QS_KEY_TOKEN: token}), 270 f"http://testserver/if/flow/{self.flow.slug}/?flow_token={token}", 271 ) 272 273 def test_url_existing_params(self): 274 """Test to ensure that URL params are preserved in the URL being sent""" 275 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 276 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 277 session = self.client.session 278 session[SESSION_KEY_PLAN] = plan 279 session.save() 280 281 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 282 url += "?query=" + urlencode({"foo": "bar"}) 283 request = self.factory.get(url) 284 stage_view = EmailStageView( 285 FlowExecutorView( 286 request=request, 287 flow=self.flow, 288 ), 289 request=request, 290 ) 291 token = generate_id() 292 self.assertEqual( 293 stage_view.get_full_url(**{QS_KEY_TOKEN: token}), 294 f"http://testserver/if/flow/{self.flow.slug}/?foo=bar&flow_token={token}", 295 ) 296 297 def test_get_cache_key(self): 298 """Test to ensure that the correct cache key is returned.""" 299 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 300 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 301 session = self.client.session 302 session[SESSION_KEY_PLAN] = plan 303 session.save() 304 305 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 306 request = self.factory.post(url) 307 request.user = self.user 308 request.session = session 309 310 executor = FlowExecutorView(request=request, flow=self.flow) 311 executor.plan = plan 312 313 stage_view = EmailStageView(executor, request=request) 314 315 cache_key = stage_view._get_cache_key() 316 317 expected_hash = sha256(self.user.email.lower().encode("utf-8")).hexdigest() 318 expected_cache_key = "goauthentik.io/stages/email/stage/" + expected_hash 319 320 self.assertEqual(cache_key, expected_cache_key) 321 322 def test_is_rate_limited_returns_none(self): 323 """Test to ensure None is returned if the request shouldn't be rate limited.""" 324 self.stage.recovery_max_attempts = 2 325 self.stage.recovery_cache_timeout = "minutes=10" 326 self.stage.save() 327 328 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 329 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 330 session = self.client.session 331 session[SESSION_KEY_PLAN] = plan 332 session.save() 333 334 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 335 request = self.factory.post(url) 336 request.user = self.user 337 request.session = session 338 339 executor = FlowExecutorView(request=request, flow=self.flow) 340 executor.current_stage = self.stage 341 executor.plan = plan 342 343 stage_view = EmailStageView(executor, request=request) 344 345 result = stage_view._is_rate_limited() 346 self.assertIsNone(result) 347 348 def test_is_rate_limited_returns_remaining_time(self): 349 """Test to ensure the remaining time is returned if the request 350 should be rate limited.""" 351 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 352 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 353 session = self.client.session 354 session[SESSION_KEY_PLAN] = plan 355 session.save() 356 357 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 358 request = self.factory.post(url) 359 request.user = self.user 360 request.session = session 361 362 executor = FlowExecutorView(request=request, flow=self.flow) 363 executor.current_stage = self.stage 364 executor.plan = plan 365 366 stage_view = EmailStageView(executor, request=request) 367 368 test_cases = [ 369 # 2 attempts within 2 minutes 370 (2, "seconds=120", 2), 371 # 4 attempts within 5 minutes 372 (4, "minutes=5", 5), 373 # 6 attempts within 5 minutes. Although 299 seconds is less than 374 # 5 minutes, the user is intentionally shown "5 minutes". This is 375 # because an initial rate limiting message like "Try again after 4 minutes" 376 # can be confusing. 377 (6, "seconds=299", 5), 378 ] 379 for test_case in test_cases: 380 max_attempts, cache_timeout, minutes_remaining = test_case 381 with self.subTest( 382 f"Test recovery with {max_attempts} max attempts and " 383 f"{cache_timeout} cache timeout seconds" 384 ): 385 self.stage.recovery_max_attempts = max_attempts 386 self.stage.recovery_cache_timeout = cache_timeout 387 self.stage.save() 388 389 # Simulate multiple requests 390 for _ in range(max_attempts): 391 stage_view._is_rate_limited() 392 393 # The following request should be rate-limited 394 result = stage_view._is_rate_limited() 395 396 self.assertEqual(result, minutes_remaining) 397 398 def _challenge_invalid_helper(self): 399 """Helper to test the challenge_invalid() method.""" 400 self.stage.recovery_max_attempts = 1 401 self.stage.recovery_cache_timeout = "seconds=300" 402 self.stage.save() 403 404 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 405 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 406 session = self.client.session 407 session[SESSION_KEY_PLAN] = plan 408 session.save() 409 410 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 411 request = self.factory.get(url, user=self.user) 412 request.session = session 413 414 request.brand = Brand.objects.create(domain="foo-domain.com", default=True) 415 416 executor = FlowExecutorView(request=request, flow=self.flow) 417 executor.current_stage = self.stage 418 executor.plan = plan 419 420 stage_view = EmailStageView(executor, request=request) 421 challenge_response = stage_view.get_response_instance(data={}) 422 challenge_response.is_valid() 423 424 return challenge_response, stage_view, request 425 426 def test_challenge_invalid_not_rate_limited(self): 427 """Tests that the request is not rate limited and email is sent.""" 428 challenge_response, stage_view, request = self._challenge_invalid_helper() 429 430 with patch.object(stage_view, "send_email") as mock_send_email: 431 result = stage_view.challenge_invalid(challenge_response) 432 433 self.assertEqual(result.status_code, 200) 434 435 mock_send_email.assert_called_once() 436 437 message_list = list(messages.get_messages(request)) 438 self.assertEqual(len(message_list), 1) 439 self.assertEqual( 440 "Email Successfully sent.", 441 message_list[-1].message, 442 ) 443 444 def test_challenge_invalid_returns_error_if_rate_limited(self): 445 """Tests that an error is returned if the request is rate limited. Ensure 446 that an email is not sent.""" 447 challenge_response, stage_view, request = self._challenge_invalid_helper() 448 449 # Initial request that shouldn't be rate limited 450 stage_view.challenge_invalid(challenge_response) 451 452 with patch.object(stage_view, "send_email") as mock_send_email: 453 # This next request should be rate limited 454 result = stage_view.challenge_invalid(challenge_response) 455 456 self.assertEqual(result.status_code, 200) 457 458 mock_send_email.assert_not_called() 459 460 message_list = list(messages.get_messages(request)) 461 self.assertEqual(len(message_list), 2) 462 self.assertEqual( 463 "Too many account verification attempts. Please try again after 5 minutes.", 464 message_list[-1].message, 465 )
28class TestEmailStage(FlowTestCase): 29 """Email tests""" 30 31 def setUp(self): 32 super().setUp() 33 self.factory = RequestFactory() 34 self.user = create_test_admin_user() 35 self.flow = create_test_flow(FlowDesignation.AUTHENTICATION) 36 self.stage = EmailStage.objects.create( 37 name="email", 38 activate_user_on_success=True, 39 ) 40 self.binding = FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=2) 41 42 @patch( 43 "authentik.stages.email.models.EmailStage.backend_class", 44 PropertyMock(return_value=EmailBackend), 45 ) 46 def test_rendering(self): 47 """Test with pending user""" 48 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 49 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 50 session = self.client.session 51 session[SESSION_KEY_PLAN] = plan 52 session.save() 53 54 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 55 response = self.client.get(url) 56 self.assertEqual(response.status_code, 200) 57 58 @patch( 59 "authentik.stages.email.models.EmailStage.backend_class", 60 PropertyMock(return_value=EmailBackend), 61 ) 62 def test_rendering_locale(self): 63 """Test with pending user""" 64 self.user.attributes = {"settings": {"locale": "de"}} 65 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 66 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 67 session = self.client.session 68 session[SESSION_KEY_PLAN] = plan 69 session.save() 70 71 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 72 response = self.client.get(url) 73 self.assertEqual(response.status_code, 200) 74 self.assertEqual(len(mail.outbox), 1) 75 self.assertEqual(mail.outbox[0].subject, "authentik") 76 self.assertNotIn("Password Reset", mail.outbox[0].alternatives[0][0]) 77 78 def test_without_user(self): 79 """Test without pending user""" 80 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 81 session = self.client.session 82 session[SESSION_KEY_PLAN] = plan 83 session.save() 84 85 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 86 response = self.client.get(url) 87 self.assertEqual(response.status_code, 200) 88 89 @patch( 90 "authentik.stages.email.models.EmailStage.backend_class", 91 PropertyMock(return_value=EmailBackend), 92 ) 93 def test_pending_user(self): 94 """Test with pending user""" 95 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 96 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 97 session = self.client.session 98 session[SESSION_KEY_PLAN] = plan 99 session.save() 100 101 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 102 response = self.client.post(url) 103 self.assertEqual(response.status_code, 200) 104 self.assertEqual(len(mail.outbox), 1) 105 self.assertEqual(mail.outbox[0].subject, "authentik") 106 self.assertEqual(mail.outbox[0].to, [f"{self.user.name} <{self.user.email}>"]) 107 108 @patch( 109 "authentik.stages.email.models.EmailStage.backend_class", 110 PropertyMock(return_value=EmailBackend), 111 ) 112 def test_pending_user_override(self): 113 """Test with pending user (override to)""" 114 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 115 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 116 plan.context[PLAN_CONTEXT_EMAIL_OVERRIDE] = "foo@bar.baz" 117 session = self.client.session 118 session[SESSION_KEY_PLAN] = plan 119 session.save() 120 121 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 122 response = self.client.post(url) 123 self.assertEqual(response.status_code, 200) 124 self.assertEqual(len(mail.outbox), 1) 125 self.assertEqual(mail.outbox[0].subject, "authentik") 126 self.assertEqual(mail.outbox[0].to, [f"{self.user.name} <foo@bar.baz>"]) 127 128 @patch( 129 "authentik.stages.email.models.EmailStage.backend_class", 130 PropertyMock(return_value=SMTPEmailBackend), 131 ) 132 def test_use_global_settings(self): 133 """Test use_global_settings""" 134 host = "some-unique-string" 135 with CONFIG.patch("email.host", host): 136 self.assertEqual(EmailStage(use_global_settings=True).backend.host, host) 137 138 def test_token(self): 139 """Test with token""" 140 # Make sure token exists 141 self.test_pending_user() 142 self.user.is_active = False 143 self.user.save() 144 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 145 session = self.client.session 146 session[SESSION_KEY_PLAN] = plan 147 session.save() 148 token: FlowToken = FlowToken.objects.get(user=self.user) 149 150 with patch("authentik.flows.views.executor.FlowExecutorView.cancel", MagicMock()): 151 # Call the executor shell to preseed the session 152 url = reverse( 153 "authentik_api:flow-executor", 154 kwargs={"flow_slug": self.flow.slug}, 155 ) 156 url_query = urlencode( 157 { 158 QS_KEY_TOKEN: token.key, 159 } 160 ) 161 url += f"?query={url_query}" 162 self.client.get(url) 163 164 # Call the actual executor to get the JSON Response 165 response = self.client.get( 166 reverse( 167 "authentik_api:flow-executor", 168 kwargs={"flow_slug": self.flow.slug}, 169 ) 170 ) 171 self.assertStageResponse(response, self.flow, component="ak-stage-consent") 172 response = self.client.post( 173 reverse( 174 "authentik_api:flow-executor", 175 kwargs={"flow_slug": self.flow.slug}, 176 ), 177 data={ 178 "token": self.get_flow_plan().context[PLAN_CONTEXT_CONSENT_TOKEN], 179 }, 180 follow=True, 181 ) 182 183 self.assertEqual(response.status_code, 200) 184 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 185 186 session = self.client.session 187 plan: FlowPlan = session[SESSION_KEY_PLAN] 188 self.assertEqual(plan.context[PLAN_CONTEXT_PENDING_USER], self.user) 189 self.assertTrue(plan.context[PLAN_CONTEXT_PENDING_USER].is_active) 190 191 def test_token_invalid_user(self): 192 """Test with token with invalid user""" 193 # Make sure token exists 194 self.test_pending_user() 195 self.user.is_active = False 196 self.user.save() 197 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 198 session = self.client.session 199 session[SESSION_KEY_PLAN] = plan 200 session.save() 201 # Set flow token user to a different user 202 token: FlowToken = FlowToken.objects.get(user=self.user) 203 token.user = create_test_admin_user() 204 token.revoke_on_execution = True 205 token.save() 206 207 with patch("authentik.flows.views.executor.FlowExecutorView.cancel", MagicMock()): 208 # Call the executor shell to preseed the session 209 url = reverse( 210 "authentik_api:flow-executor", 211 kwargs={"flow_slug": self.flow.slug}, 212 ) 213 url_query = urlencode( 214 { 215 QS_KEY_TOKEN: token.key, 216 } 217 ) 218 url += f"?query={url_query}" 219 self.client.get(url) 220 221 # Call the actual executor to get the JSON Response 222 response = self.client.get( 223 reverse( 224 "authentik_api:flow-executor", 225 kwargs={"flow_slug": self.flow.slug}, 226 ) 227 ) 228 229 self.assertEqual(response.status_code, 200) 230 self.assertStageResponse(response, component="ak-stage-access-denied") 231 232 def test_url_no_params(self): 233 """Test generation of the URL in the EMail""" 234 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 235 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 236 session = self.client.session 237 session[SESSION_KEY_PLAN] = plan 238 session.save() 239 240 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 241 request = self.factory.get(url) 242 stage_view = EmailStageView( 243 FlowExecutorView( 244 request=request, 245 flow=self.flow, 246 ), 247 request=request, 248 ) 249 self.assertEqual(stage_view.get_full_url(), f"http://testserver/if/flow/{self.flow.slug}/") 250 251 def test_url_our_params(self): 252 """Test that all of our parameters are passed to the URL correctly""" 253 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 254 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 255 session = self.client.session 256 session[SESSION_KEY_PLAN] = plan 257 session.save() 258 259 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 260 request = self.factory.get(url) 261 stage_view = EmailStageView( 262 FlowExecutorView( 263 request=request, 264 flow=self.flow, 265 ), 266 request=request, 267 ) 268 token = generate_id() 269 self.assertEqual( 270 stage_view.get_full_url(**{QS_KEY_TOKEN: token}), 271 f"http://testserver/if/flow/{self.flow.slug}/?flow_token={token}", 272 ) 273 274 def test_url_existing_params(self): 275 """Test to ensure that URL params are preserved in the URL being sent""" 276 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 277 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 278 session = self.client.session 279 session[SESSION_KEY_PLAN] = plan 280 session.save() 281 282 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 283 url += "?query=" + urlencode({"foo": "bar"}) 284 request = self.factory.get(url) 285 stage_view = EmailStageView( 286 FlowExecutorView( 287 request=request, 288 flow=self.flow, 289 ), 290 request=request, 291 ) 292 token = generate_id() 293 self.assertEqual( 294 stage_view.get_full_url(**{QS_KEY_TOKEN: token}), 295 f"http://testserver/if/flow/{self.flow.slug}/?foo=bar&flow_token={token}", 296 ) 297 298 def test_get_cache_key(self): 299 """Test to ensure that the correct cache key is returned.""" 300 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 301 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 302 session = self.client.session 303 session[SESSION_KEY_PLAN] = plan 304 session.save() 305 306 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 307 request = self.factory.post(url) 308 request.user = self.user 309 request.session = session 310 311 executor = FlowExecutorView(request=request, flow=self.flow) 312 executor.plan = plan 313 314 stage_view = EmailStageView(executor, request=request) 315 316 cache_key = stage_view._get_cache_key() 317 318 expected_hash = sha256(self.user.email.lower().encode("utf-8")).hexdigest() 319 expected_cache_key = "goauthentik.io/stages/email/stage/" + expected_hash 320 321 self.assertEqual(cache_key, expected_cache_key) 322 323 def test_is_rate_limited_returns_none(self): 324 """Test to ensure None is returned if the request shouldn't be rate limited.""" 325 self.stage.recovery_max_attempts = 2 326 self.stage.recovery_cache_timeout = "minutes=10" 327 self.stage.save() 328 329 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 330 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 331 session = self.client.session 332 session[SESSION_KEY_PLAN] = plan 333 session.save() 334 335 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 336 request = self.factory.post(url) 337 request.user = self.user 338 request.session = session 339 340 executor = FlowExecutorView(request=request, flow=self.flow) 341 executor.current_stage = self.stage 342 executor.plan = plan 343 344 stage_view = EmailStageView(executor, request=request) 345 346 result = stage_view._is_rate_limited() 347 self.assertIsNone(result) 348 349 def test_is_rate_limited_returns_remaining_time(self): 350 """Test to ensure the remaining time is returned if the request 351 should be rate limited.""" 352 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 353 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 354 session = self.client.session 355 session[SESSION_KEY_PLAN] = plan 356 session.save() 357 358 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 359 request = self.factory.post(url) 360 request.user = self.user 361 request.session = session 362 363 executor = FlowExecutorView(request=request, flow=self.flow) 364 executor.current_stage = self.stage 365 executor.plan = plan 366 367 stage_view = EmailStageView(executor, request=request) 368 369 test_cases = [ 370 # 2 attempts within 2 minutes 371 (2, "seconds=120", 2), 372 # 4 attempts within 5 minutes 373 (4, "minutes=5", 5), 374 # 6 attempts within 5 minutes. Although 299 seconds is less than 375 # 5 minutes, the user is intentionally shown "5 minutes". This is 376 # because an initial rate limiting message like "Try again after 4 minutes" 377 # can be confusing. 378 (6, "seconds=299", 5), 379 ] 380 for test_case in test_cases: 381 max_attempts, cache_timeout, minutes_remaining = test_case 382 with self.subTest( 383 f"Test recovery with {max_attempts} max attempts and " 384 f"{cache_timeout} cache timeout seconds" 385 ): 386 self.stage.recovery_max_attempts = max_attempts 387 self.stage.recovery_cache_timeout = cache_timeout 388 self.stage.save() 389 390 # Simulate multiple requests 391 for _ in range(max_attempts): 392 stage_view._is_rate_limited() 393 394 # The following request should be rate-limited 395 result = stage_view._is_rate_limited() 396 397 self.assertEqual(result, minutes_remaining) 398 399 def _challenge_invalid_helper(self): 400 """Helper to test the challenge_invalid() method.""" 401 self.stage.recovery_max_attempts = 1 402 self.stage.recovery_cache_timeout = "seconds=300" 403 self.stage.save() 404 405 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 406 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 407 session = self.client.session 408 session[SESSION_KEY_PLAN] = plan 409 session.save() 410 411 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 412 request = self.factory.get(url, user=self.user) 413 request.session = session 414 415 request.brand = Brand.objects.create(domain="foo-domain.com", default=True) 416 417 executor = FlowExecutorView(request=request, flow=self.flow) 418 executor.current_stage = self.stage 419 executor.plan = plan 420 421 stage_view = EmailStageView(executor, request=request) 422 challenge_response = stage_view.get_response_instance(data={}) 423 challenge_response.is_valid() 424 425 return challenge_response, stage_view, request 426 427 def test_challenge_invalid_not_rate_limited(self): 428 """Tests that the request is not rate limited and email is sent.""" 429 challenge_response, stage_view, request = self._challenge_invalid_helper() 430 431 with patch.object(stage_view, "send_email") as mock_send_email: 432 result = stage_view.challenge_invalid(challenge_response) 433 434 self.assertEqual(result.status_code, 200) 435 436 mock_send_email.assert_called_once() 437 438 message_list = list(messages.get_messages(request)) 439 self.assertEqual(len(message_list), 1) 440 self.assertEqual( 441 "Email Successfully sent.", 442 message_list[-1].message, 443 ) 444 445 def test_challenge_invalid_returns_error_if_rate_limited(self): 446 """Tests that an error is returned if the request is rate limited. Ensure 447 that an email is not sent.""" 448 challenge_response, stage_view, request = self._challenge_invalid_helper() 449 450 # Initial request that shouldn't be rate limited 451 stage_view.challenge_invalid(challenge_response) 452 453 with patch.object(stage_view, "send_email") as mock_send_email: 454 # This next request should be rate limited 455 result = stage_view.challenge_invalid(challenge_response) 456 457 self.assertEqual(result.status_code, 200) 458 459 mock_send_email.assert_not_called() 460 461 message_list = list(messages.get_messages(request)) 462 self.assertEqual(len(message_list), 2) 463 self.assertEqual( 464 "Too many account verification attempts. Please try again after 5 minutes.", 465 message_list[-1].message, 466 )
Email tests
31 def setUp(self): 32 super().setUp() 33 self.factory = RequestFactory() 34 self.user = create_test_admin_user() 35 self.flow = create_test_flow(FlowDesignation.AUTHENTICATION) 36 self.stage = EmailStage.objects.create( 37 name="email", 38 activate_user_on_success=True, 39 ) 40 self.binding = FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=2)
Hook method for setting up the test fixture before exercising it.
42 @patch( 43 "authentik.stages.email.models.EmailStage.backend_class", 44 PropertyMock(return_value=EmailBackend), 45 ) 46 def test_rendering(self): 47 """Test with pending user""" 48 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 49 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 50 session = self.client.session 51 session[SESSION_KEY_PLAN] = plan 52 session.save() 53 54 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 55 response = self.client.get(url) 56 self.assertEqual(response.status_code, 200)
Test with pending user
58 @patch( 59 "authentik.stages.email.models.EmailStage.backend_class", 60 PropertyMock(return_value=EmailBackend), 61 ) 62 def test_rendering_locale(self): 63 """Test with pending user""" 64 self.user.attributes = {"settings": {"locale": "de"}} 65 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 66 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 67 session = self.client.session 68 session[SESSION_KEY_PLAN] = plan 69 session.save() 70 71 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 72 response = self.client.get(url) 73 self.assertEqual(response.status_code, 200) 74 self.assertEqual(len(mail.outbox), 1) 75 self.assertEqual(mail.outbox[0].subject, "authentik") 76 self.assertNotIn("Password Reset", mail.outbox[0].alternatives[0][0])
Test with pending user
78 def test_without_user(self): 79 """Test without pending user""" 80 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 81 session = self.client.session 82 session[SESSION_KEY_PLAN] = plan 83 session.save() 84 85 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 86 response = self.client.get(url) 87 self.assertEqual(response.status_code, 200)
Test without pending user
89 @patch( 90 "authentik.stages.email.models.EmailStage.backend_class", 91 PropertyMock(return_value=EmailBackend), 92 ) 93 def test_pending_user(self): 94 """Test with pending user""" 95 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 96 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 97 session = self.client.session 98 session[SESSION_KEY_PLAN] = plan 99 session.save() 100 101 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 102 response = self.client.post(url) 103 self.assertEqual(response.status_code, 200) 104 self.assertEqual(len(mail.outbox), 1) 105 self.assertEqual(mail.outbox[0].subject, "authentik") 106 self.assertEqual(mail.outbox[0].to, [f"{self.user.name} <{self.user.email}>"])
Test with pending user
108 @patch( 109 "authentik.stages.email.models.EmailStage.backend_class", 110 PropertyMock(return_value=EmailBackend), 111 ) 112 def test_pending_user_override(self): 113 """Test with pending user (override to)""" 114 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 115 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 116 plan.context[PLAN_CONTEXT_EMAIL_OVERRIDE] = "foo@bar.baz" 117 session = self.client.session 118 session[SESSION_KEY_PLAN] = plan 119 session.save() 120 121 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 122 response = self.client.post(url) 123 self.assertEqual(response.status_code, 200) 124 self.assertEqual(len(mail.outbox), 1) 125 self.assertEqual(mail.outbox[0].subject, "authentik") 126 self.assertEqual(mail.outbox[0].to, [f"{self.user.name} <foo@bar.baz>"])
Test with pending user (override to)
128 @patch( 129 "authentik.stages.email.models.EmailStage.backend_class", 130 PropertyMock(return_value=SMTPEmailBackend), 131 ) 132 def test_use_global_settings(self): 133 """Test use_global_settings""" 134 host = "some-unique-string" 135 with CONFIG.patch("email.host", host): 136 self.assertEqual(EmailStage(use_global_settings=True).backend.host, host)
Test use_global_settings
138 def test_token(self): 139 """Test with token""" 140 # Make sure token exists 141 self.test_pending_user() 142 self.user.is_active = False 143 self.user.save() 144 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 145 session = self.client.session 146 session[SESSION_KEY_PLAN] = plan 147 session.save() 148 token: FlowToken = FlowToken.objects.get(user=self.user) 149 150 with patch("authentik.flows.views.executor.FlowExecutorView.cancel", MagicMock()): 151 # Call the executor shell to preseed the session 152 url = reverse( 153 "authentik_api:flow-executor", 154 kwargs={"flow_slug": self.flow.slug}, 155 ) 156 url_query = urlencode( 157 { 158 QS_KEY_TOKEN: token.key, 159 } 160 ) 161 url += f"?query={url_query}" 162 self.client.get(url) 163 164 # Call the actual executor to get the JSON Response 165 response = self.client.get( 166 reverse( 167 "authentik_api:flow-executor", 168 kwargs={"flow_slug": self.flow.slug}, 169 ) 170 ) 171 self.assertStageResponse(response, self.flow, component="ak-stage-consent") 172 response = self.client.post( 173 reverse( 174 "authentik_api:flow-executor", 175 kwargs={"flow_slug": self.flow.slug}, 176 ), 177 data={ 178 "token": self.get_flow_plan().context[PLAN_CONTEXT_CONSENT_TOKEN], 179 }, 180 follow=True, 181 ) 182 183 self.assertEqual(response.status_code, 200) 184 self.assertStageRedirects(response, reverse("authentik_core:root-redirect")) 185 186 session = self.client.session 187 plan: FlowPlan = session[SESSION_KEY_PLAN] 188 self.assertEqual(plan.context[PLAN_CONTEXT_PENDING_USER], self.user) 189 self.assertTrue(plan.context[PLAN_CONTEXT_PENDING_USER].is_active)
Test with token
191 def test_token_invalid_user(self): 192 """Test with token with invalid user""" 193 # Make sure token exists 194 self.test_pending_user() 195 self.user.is_active = False 196 self.user.save() 197 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 198 session = self.client.session 199 session[SESSION_KEY_PLAN] = plan 200 session.save() 201 # Set flow token user to a different user 202 token: FlowToken = FlowToken.objects.get(user=self.user) 203 token.user = create_test_admin_user() 204 token.revoke_on_execution = True 205 token.save() 206 207 with patch("authentik.flows.views.executor.FlowExecutorView.cancel", MagicMock()): 208 # Call the executor shell to preseed the session 209 url = reverse( 210 "authentik_api:flow-executor", 211 kwargs={"flow_slug": self.flow.slug}, 212 ) 213 url_query = urlencode( 214 { 215 QS_KEY_TOKEN: token.key, 216 } 217 ) 218 url += f"?query={url_query}" 219 self.client.get(url) 220 221 # Call the actual executor to get the JSON Response 222 response = self.client.get( 223 reverse( 224 "authentik_api:flow-executor", 225 kwargs={"flow_slug": self.flow.slug}, 226 ) 227 ) 228 229 self.assertEqual(response.status_code, 200) 230 self.assertStageResponse(response, component="ak-stage-access-denied")
Test with token with invalid user
232 def test_url_no_params(self): 233 """Test generation of the URL in the EMail""" 234 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 235 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 236 session = self.client.session 237 session[SESSION_KEY_PLAN] = plan 238 session.save() 239 240 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 241 request = self.factory.get(url) 242 stage_view = EmailStageView( 243 FlowExecutorView( 244 request=request, 245 flow=self.flow, 246 ), 247 request=request, 248 ) 249 self.assertEqual(stage_view.get_full_url(), f"http://testserver/if/flow/{self.flow.slug}/")
Test generation of the URL in the EMail
251 def test_url_our_params(self): 252 """Test that all of our parameters are passed to the URL correctly""" 253 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 254 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 255 session = self.client.session 256 session[SESSION_KEY_PLAN] = plan 257 session.save() 258 259 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 260 request = self.factory.get(url) 261 stage_view = EmailStageView( 262 FlowExecutorView( 263 request=request, 264 flow=self.flow, 265 ), 266 request=request, 267 ) 268 token = generate_id() 269 self.assertEqual( 270 stage_view.get_full_url(**{QS_KEY_TOKEN: token}), 271 f"http://testserver/if/flow/{self.flow.slug}/?flow_token={token}", 272 )
Test that all of our parameters are passed to the URL correctly
274 def test_url_existing_params(self): 275 """Test to ensure that URL params are preserved in the URL being sent""" 276 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 277 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 278 session = self.client.session 279 session[SESSION_KEY_PLAN] = plan 280 session.save() 281 282 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 283 url += "?query=" + urlencode({"foo": "bar"}) 284 request = self.factory.get(url) 285 stage_view = EmailStageView( 286 FlowExecutorView( 287 request=request, 288 flow=self.flow, 289 ), 290 request=request, 291 ) 292 token = generate_id() 293 self.assertEqual( 294 stage_view.get_full_url(**{QS_KEY_TOKEN: token}), 295 f"http://testserver/if/flow/{self.flow.slug}/?foo=bar&flow_token={token}", 296 )
Test to ensure that URL params are preserved in the URL being sent
298 def test_get_cache_key(self): 299 """Test to ensure that the correct cache key is returned.""" 300 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 301 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 302 session = self.client.session 303 session[SESSION_KEY_PLAN] = plan 304 session.save() 305 306 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 307 request = self.factory.post(url) 308 request.user = self.user 309 request.session = session 310 311 executor = FlowExecutorView(request=request, flow=self.flow) 312 executor.plan = plan 313 314 stage_view = EmailStageView(executor, request=request) 315 316 cache_key = stage_view._get_cache_key() 317 318 expected_hash = sha256(self.user.email.lower().encode("utf-8")).hexdigest() 319 expected_cache_key = "goauthentik.io/stages/email/stage/" + expected_hash 320 321 self.assertEqual(cache_key, expected_cache_key)
Test to ensure that the correct cache key is returned.
323 def test_is_rate_limited_returns_none(self): 324 """Test to ensure None is returned if the request shouldn't be rate limited.""" 325 self.stage.recovery_max_attempts = 2 326 self.stage.recovery_cache_timeout = "minutes=10" 327 self.stage.save() 328 329 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 330 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 331 session = self.client.session 332 session[SESSION_KEY_PLAN] = plan 333 session.save() 334 335 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 336 request = self.factory.post(url) 337 request.user = self.user 338 request.session = session 339 340 executor = FlowExecutorView(request=request, flow=self.flow) 341 executor.current_stage = self.stage 342 executor.plan = plan 343 344 stage_view = EmailStageView(executor, request=request) 345 346 result = stage_view._is_rate_limited() 347 self.assertIsNone(result)
Test to ensure None is returned if the request shouldn't be rate limited.
349 def test_is_rate_limited_returns_remaining_time(self): 350 """Test to ensure the remaining time is returned if the request 351 should be rate limited.""" 352 plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()]) 353 plan.context[PLAN_CONTEXT_PENDING_USER] = self.user 354 session = self.client.session 355 session[SESSION_KEY_PLAN] = plan 356 session.save() 357 358 url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}) 359 request = self.factory.post(url) 360 request.user = self.user 361 request.session = session 362 363 executor = FlowExecutorView(request=request, flow=self.flow) 364 executor.current_stage = self.stage 365 executor.plan = plan 366 367 stage_view = EmailStageView(executor, request=request) 368 369 test_cases = [ 370 # 2 attempts within 2 minutes 371 (2, "seconds=120", 2), 372 # 4 attempts within 5 minutes 373 (4, "minutes=5", 5), 374 # 6 attempts within 5 minutes. Although 299 seconds is less than 375 # 5 minutes, the user is intentionally shown "5 minutes". This is 376 # because an initial rate limiting message like "Try again after 4 minutes" 377 # can be confusing. 378 (6, "seconds=299", 5), 379 ] 380 for test_case in test_cases: 381 max_attempts, cache_timeout, minutes_remaining = test_case 382 with self.subTest( 383 f"Test recovery with {max_attempts} max attempts and " 384 f"{cache_timeout} cache timeout seconds" 385 ): 386 self.stage.recovery_max_attempts = max_attempts 387 self.stage.recovery_cache_timeout = cache_timeout 388 self.stage.save() 389 390 # Simulate multiple requests 391 for _ in range(max_attempts): 392 stage_view._is_rate_limited() 393 394 # The following request should be rate-limited 395 result = stage_view._is_rate_limited() 396 397 self.assertEqual(result, minutes_remaining)
Test to ensure the remaining time is returned if the request should be rate limited.
427 def test_challenge_invalid_not_rate_limited(self): 428 """Tests that the request is not rate limited and email is sent.""" 429 challenge_response, stage_view, request = self._challenge_invalid_helper() 430 431 with patch.object(stage_view, "send_email") as mock_send_email: 432 result = stage_view.challenge_invalid(challenge_response) 433 434 self.assertEqual(result.status_code, 200) 435 436 mock_send_email.assert_called_once() 437 438 message_list = list(messages.get_messages(request)) 439 self.assertEqual(len(message_list), 1) 440 self.assertEqual( 441 "Email Successfully sent.", 442 message_list[-1].message, 443 )
Tests that the request is not rate limited and email is sent.
445 def test_challenge_invalid_returns_error_if_rate_limited(self): 446 """Tests that an error is returned if the request is rate limited. Ensure 447 that an email is not sent.""" 448 challenge_response, stage_view, request = self._challenge_invalid_helper() 449 450 # Initial request that shouldn't be rate limited 451 stage_view.challenge_invalid(challenge_response) 452 453 with patch.object(stage_view, "send_email") as mock_send_email: 454 # This next request should be rate limited 455 result = stage_view.challenge_invalid(challenge_response) 456 457 self.assertEqual(result.status_code, 200) 458 459 mock_send_email.assert_not_called() 460 461 message_list = list(messages.get_messages(request)) 462 self.assertEqual(len(message_list), 2) 463 self.assertEqual( 464 "Too many account verification attempts. Please try again after 5 minutes.", 465 message_list[-1].message, 466 )
Tests that an error is returned if the request is rate limited. Ensure that an email is not sent.