authentik.stages.email.tests.test_stage

email tests

  1"""email tests"""
  2
  3from hashlib import sha256
  4from unittest.mock import MagicMock, PropertyMock, patch
  5
  6from django.contrib import messages
  7from django.core import mail
  8from django.core.mail.backends.locmem import EmailBackend
  9from django.core.mail.backends.smtp import EmailBackend as SMTPEmailBackend
 10from django.urls import reverse
 11from django.utils.http import urlencode
 12
 13from authentik.brands.models import Brand
 14from authentik.core.tests.utils import RequestFactory, create_test_admin_user, create_test_flow
 15from authentik.flows.markers import StageMarker
 16from authentik.flows.models import FlowDesignation, FlowStageBinding, FlowToken
 17from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER, FlowPlan
 18from authentik.flows.tests import FlowTestCase
 19from authentik.flows.views.executor import QS_KEY_TOKEN, SESSION_KEY_PLAN, FlowExecutorView
 20from authentik.lib.config import CONFIG
 21from authentik.lib.generators import generate_id
 22from authentik.stages.consent.stage import PLAN_CONTEXT_CONSENT_TOKEN
 23from authentik.stages.email.models import EmailStage
 24from authentik.stages.email.stage import PLAN_CONTEXT_EMAIL_OVERRIDE, EmailStageView
 25
 26
 27class TestEmailStage(FlowTestCase):
 28    """Email tests"""
 29
 30    def setUp(self):
 31        super().setUp()
 32        self.factory = RequestFactory()
 33        self.user = create_test_admin_user()
 34        self.flow = create_test_flow(FlowDesignation.AUTHENTICATION)
 35        self.stage = EmailStage.objects.create(
 36            name="email",
 37            activate_user_on_success=True,
 38        )
 39        self.binding = FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=2)
 40
 41    @patch(
 42        "authentik.stages.email.models.EmailStage.backend_class",
 43        PropertyMock(return_value=EmailBackend),
 44    )
 45    def test_rendering(self):
 46        """Test with pending user"""
 47        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
 48        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
 49        session = self.client.session
 50        session[SESSION_KEY_PLAN] = plan
 51        session.save()
 52
 53        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
 54        response = self.client.get(url)
 55        self.assertEqual(response.status_code, 200)
 56
 57    @patch(
 58        "authentik.stages.email.models.EmailStage.backend_class",
 59        PropertyMock(return_value=EmailBackend),
 60    )
 61    def test_rendering_locale(self):
 62        """Test with pending user"""
 63        self.user.attributes = {"settings": {"locale": "de"}}
 64        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
 65        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
 66        session = self.client.session
 67        session[SESSION_KEY_PLAN] = plan
 68        session.save()
 69
 70        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
 71        response = self.client.get(url)
 72        self.assertEqual(response.status_code, 200)
 73        self.assertEqual(len(mail.outbox), 1)
 74        self.assertEqual(mail.outbox[0].subject, "authentik")
 75        self.assertNotIn("Password Reset", mail.outbox[0].alternatives[0][0])
 76
 77    def test_without_user(self):
 78        """Test without pending user"""
 79        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
 80        session = self.client.session
 81        session[SESSION_KEY_PLAN] = plan
 82        session.save()
 83
 84        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
 85        response = self.client.get(url)
 86        self.assertEqual(response.status_code, 200)
 87
 88    @patch(
 89        "authentik.stages.email.models.EmailStage.backend_class",
 90        PropertyMock(return_value=EmailBackend),
 91    )
 92    def test_pending_user(self):
 93        """Test with pending user"""
 94        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
 95        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
 96        session = self.client.session
 97        session[SESSION_KEY_PLAN] = plan
 98        session.save()
 99
100        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
101        response = self.client.post(url)
102        self.assertEqual(response.status_code, 200)
103        self.assertEqual(len(mail.outbox), 1)
104        self.assertEqual(mail.outbox[0].subject, "authentik")
105        self.assertEqual(mail.outbox[0].to, [f"{self.user.name} <{self.user.email}>"])
106
107    @patch(
108        "authentik.stages.email.models.EmailStage.backend_class",
109        PropertyMock(return_value=EmailBackend),
110    )
111    def test_pending_user_override(self):
112        """Test with pending user (override to)"""
113        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
114        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
115        plan.context[PLAN_CONTEXT_EMAIL_OVERRIDE] = "foo@bar.baz"
116        session = self.client.session
117        session[SESSION_KEY_PLAN] = plan
118        session.save()
119
120        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
121        response = self.client.post(url)
122        self.assertEqual(response.status_code, 200)
123        self.assertEqual(len(mail.outbox), 1)
124        self.assertEqual(mail.outbox[0].subject, "authentik")
125        self.assertEqual(mail.outbox[0].to, [f"{self.user.name} <foo@bar.baz>"])
126
127    @patch(
128        "authentik.stages.email.models.EmailStage.backend_class",
129        PropertyMock(return_value=SMTPEmailBackend),
130    )
131    def test_use_global_settings(self):
132        """Test use_global_settings"""
133        host = "some-unique-string"
134        with CONFIG.patch("email.host", host):
135            self.assertEqual(EmailStage(use_global_settings=True).backend.host, host)
136
137    def test_token(self):
138        """Test with token"""
139        # Make sure token exists
140        self.test_pending_user()
141        self.user.is_active = False
142        self.user.save()
143        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
144        session = self.client.session
145        session[SESSION_KEY_PLAN] = plan
146        session.save()
147        token: FlowToken = FlowToken.objects.get(user=self.user)
148
149        with patch("authentik.flows.views.executor.FlowExecutorView.cancel", MagicMock()):
150            # Call the executor shell to preseed the session
151            url = reverse(
152                "authentik_api:flow-executor",
153                kwargs={"flow_slug": self.flow.slug},
154            )
155            url_query = urlencode(
156                {
157                    QS_KEY_TOKEN: token.key,
158                }
159            )
160            url += f"?query={url_query}"
161            self.client.get(url)
162
163            # Call the actual executor to get the JSON Response
164            response = self.client.get(
165                reverse(
166                    "authentik_api:flow-executor",
167                    kwargs={"flow_slug": self.flow.slug},
168                )
169            )
170            self.assertStageResponse(response, self.flow, component="ak-stage-consent")
171            response = self.client.post(
172                reverse(
173                    "authentik_api:flow-executor",
174                    kwargs={"flow_slug": self.flow.slug},
175                ),
176                data={
177                    "token": self.get_flow_plan().context[PLAN_CONTEXT_CONSENT_TOKEN],
178                },
179                follow=True,
180            )
181
182            self.assertEqual(response.status_code, 200)
183            self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
184
185            session = self.client.session
186            plan: FlowPlan = session[SESSION_KEY_PLAN]
187            self.assertEqual(plan.context[PLAN_CONTEXT_PENDING_USER], self.user)
188            self.assertTrue(plan.context[PLAN_CONTEXT_PENDING_USER].is_active)
189
190    def test_token_invalid_user(self):
191        """Test with token with invalid user"""
192        # Make sure token exists
193        self.test_pending_user()
194        self.user.is_active = False
195        self.user.save()
196        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
197        session = self.client.session
198        session[SESSION_KEY_PLAN] = plan
199        session.save()
200        # Set flow token user to a different user
201        token: FlowToken = FlowToken.objects.get(user=self.user)
202        token.user = create_test_admin_user()
203        token.revoke_on_execution = True
204        token.save()
205
206        with patch("authentik.flows.views.executor.FlowExecutorView.cancel", MagicMock()):
207            # Call the executor shell to preseed the session
208            url = reverse(
209                "authentik_api:flow-executor",
210                kwargs={"flow_slug": self.flow.slug},
211            )
212            url_query = urlencode(
213                {
214                    QS_KEY_TOKEN: token.key,
215                }
216            )
217            url += f"?query={url_query}"
218            self.client.get(url)
219
220            # Call the actual executor to get the JSON Response
221            response = self.client.get(
222                reverse(
223                    "authentik_api:flow-executor",
224                    kwargs={"flow_slug": self.flow.slug},
225                )
226            )
227
228            self.assertEqual(response.status_code, 200)
229            self.assertStageResponse(response, component="ak-stage-access-denied")
230
231    def test_url_no_params(self):
232        """Test generation of the URL in the EMail"""
233        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
234        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
235        session = self.client.session
236        session[SESSION_KEY_PLAN] = plan
237        session.save()
238
239        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
240        request = self.factory.get(url)
241        stage_view = EmailStageView(
242            FlowExecutorView(
243                request=request,
244                flow=self.flow,
245            ),
246            request=request,
247        )
248        self.assertEqual(stage_view.get_full_url(), f"http://testserver/if/flow/{self.flow.slug}/")
249
250    def test_url_our_params(self):
251        """Test that all of our parameters are passed to the URL correctly"""
252        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
253        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
254        session = self.client.session
255        session[SESSION_KEY_PLAN] = plan
256        session.save()
257
258        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
259        request = self.factory.get(url)
260        stage_view = EmailStageView(
261            FlowExecutorView(
262                request=request,
263                flow=self.flow,
264            ),
265            request=request,
266        )
267        token = generate_id()
268        self.assertEqual(
269            stage_view.get_full_url(**{QS_KEY_TOKEN: token}),
270            f"http://testserver/if/flow/{self.flow.slug}/?flow_token={token}",
271        )
272
273    def test_url_existing_params(self):
274        """Test to ensure that URL params are preserved in the URL being sent"""
275        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
276        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
277        session = self.client.session
278        session[SESSION_KEY_PLAN] = plan
279        session.save()
280
281        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
282        url += "?query=" + urlencode({"foo": "bar"})
283        request = self.factory.get(url)
284        stage_view = EmailStageView(
285            FlowExecutorView(
286                request=request,
287                flow=self.flow,
288            ),
289            request=request,
290        )
291        token = generate_id()
292        self.assertEqual(
293            stage_view.get_full_url(**{QS_KEY_TOKEN: token}),
294            f"http://testserver/if/flow/{self.flow.slug}/?foo=bar&flow_token={token}",
295        )
296
297    def test_get_cache_key(self):
298        """Test to ensure that the correct cache key is returned."""
299        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
300        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
301        session = self.client.session
302        session[SESSION_KEY_PLAN] = plan
303        session.save()
304
305        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
306        request = self.factory.post(url)
307        request.user = self.user
308        request.session = session
309
310        executor = FlowExecutorView(request=request, flow=self.flow)
311        executor.plan = plan
312
313        stage_view = EmailStageView(executor, request=request)
314
315        cache_key = stage_view._get_cache_key()
316
317        expected_hash = sha256(self.user.email.lower().encode("utf-8")).hexdigest()
318        expected_cache_key = "goauthentik.io/stages/email/stage/" + expected_hash
319
320        self.assertEqual(cache_key, expected_cache_key)
321
322    def test_is_rate_limited_returns_none(self):
323        """Test to ensure None is returned if the request shouldn't be rate limited."""
324        self.stage.recovery_max_attempts = 2
325        self.stage.recovery_cache_timeout = "minutes=10"
326        self.stage.save()
327
328        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
329        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
330        session = self.client.session
331        session[SESSION_KEY_PLAN] = plan
332        session.save()
333
334        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
335        request = self.factory.post(url)
336        request.user = self.user
337        request.session = session
338
339        executor = FlowExecutorView(request=request, flow=self.flow)
340        executor.current_stage = self.stage
341        executor.plan = plan
342
343        stage_view = EmailStageView(executor, request=request)
344
345        result = stage_view._is_rate_limited()
346        self.assertIsNone(result)
347
348    def test_is_rate_limited_returns_remaining_time(self):
349        """Test to ensure the remaining time is returned if the request
350        should be rate limited."""
351        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
352        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
353        session = self.client.session
354        session[SESSION_KEY_PLAN] = plan
355        session.save()
356
357        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
358        request = self.factory.post(url)
359        request.user = self.user
360        request.session = session
361
362        executor = FlowExecutorView(request=request, flow=self.flow)
363        executor.current_stage = self.stage
364        executor.plan = plan
365
366        stage_view = EmailStageView(executor, request=request)
367
368        test_cases = [
369            # 2 attempts within 2 minutes
370            (2, "seconds=120", 2),
371            # 4 attempts within 5 minutes
372            (4, "minutes=5", 5),
373            # 6 attempts within 5 minutes. Although 299 seconds is less than
374            # 5 minutes, the user is intentionally shown "5 minutes". This is
375            # because an initial rate limiting message like "Try again after 4 minutes"
376            # can be confusing.
377            (6, "seconds=299", 5),
378        ]
379        for test_case in test_cases:
380            max_attempts, cache_timeout, minutes_remaining = test_case
381            with self.subTest(
382                f"Test recovery with {max_attempts} max attempts and "
383                f"{cache_timeout} cache timeout seconds"
384            ):
385                self.stage.recovery_max_attempts = max_attempts
386                self.stage.recovery_cache_timeout = cache_timeout
387                self.stage.save()
388
389                # Simulate multiple requests
390                for _ in range(max_attempts):
391                    stage_view._is_rate_limited()
392
393                # The following request should be rate-limited
394                result = stage_view._is_rate_limited()
395
396                self.assertEqual(result, minutes_remaining)
397
398    def _challenge_invalid_helper(self):
399        """Helper to test the challenge_invalid() method."""
400        self.stage.recovery_max_attempts = 1
401        self.stage.recovery_cache_timeout = "seconds=300"
402        self.stage.save()
403
404        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
405        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
406        session = self.client.session
407        session[SESSION_KEY_PLAN] = plan
408        session.save()
409
410        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
411        request = self.factory.get(url, user=self.user)
412        request.session = session
413
414        request.brand = Brand.objects.create(domain="foo-domain.com", default=True)
415
416        executor = FlowExecutorView(request=request, flow=self.flow)
417        executor.current_stage = self.stage
418        executor.plan = plan
419
420        stage_view = EmailStageView(executor, request=request)
421        challenge_response = stage_view.get_response_instance(data={})
422        challenge_response.is_valid()
423
424        return challenge_response, stage_view, request
425
426    def test_challenge_invalid_not_rate_limited(self):
427        """Tests that the request is not rate limited and email is sent."""
428        challenge_response, stage_view, request = self._challenge_invalid_helper()
429
430        with patch.object(stage_view, "send_email") as mock_send_email:
431            result = stage_view.challenge_invalid(challenge_response)
432
433            self.assertEqual(result.status_code, 200)
434
435            mock_send_email.assert_called_once()
436
437            message_list = list(messages.get_messages(request))
438            self.assertEqual(len(message_list), 1)
439            self.assertEqual(
440                "Email Successfully sent.",
441                message_list[-1].message,
442            )
443
444    def test_challenge_invalid_returns_error_if_rate_limited(self):
445        """Tests that an error is returned if the request is rate limited. Ensure
446        that an email is not sent."""
447        challenge_response, stage_view, request = self._challenge_invalid_helper()
448
449        # Initial request that shouldn't be rate limited
450        stage_view.challenge_invalid(challenge_response)
451
452        with patch.object(stage_view, "send_email") as mock_send_email:
453            # This next request should be rate limited
454            result = stage_view.challenge_invalid(challenge_response)
455
456            self.assertEqual(result.status_code, 200)
457
458            mock_send_email.assert_not_called()
459
460            message_list = list(messages.get_messages(request))
461            self.assertEqual(len(message_list), 2)
462            self.assertEqual(
463                "Too many account verification attempts. Please try again after 5 minutes.",
464                message_list[-1].message,
465            )
class TestEmailStage(authentik.flows.tests.FlowTestCase):
 28class TestEmailStage(FlowTestCase):
 29    """Email tests"""
 30
 31    def setUp(self):
 32        super().setUp()
 33        self.factory = RequestFactory()
 34        self.user = create_test_admin_user()
 35        self.flow = create_test_flow(FlowDesignation.AUTHENTICATION)
 36        self.stage = EmailStage.objects.create(
 37            name="email",
 38            activate_user_on_success=True,
 39        )
 40        self.binding = FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=2)
 41
 42    @patch(
 43        "authentik.stages.email.models.EmailStage.backend_class",
 44        PropertyMock(return_value=EmailBackend),
 45    )
 46    def test_rendering(self):
 47        """Test with pending user"""
 48        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
 49        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
 50        session = self.client.session
 51        session[SESSION_KEY_PLAN] = plan
 52        session.save()
 53
 54        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
 55        response = self.client.get(url)
 56        self.assertEqual(response.status_code, 200)
 57
 58    @patch(
 59        "authentik.stages.email.models.EmailStage.backend_class",
 60        PropertyMock(return_value=EmailBackend),
 61    )
 62    def test_rendering_locale(self):
 63        """Test with pending user"""
 64        self.user.attributes = {"settings": {"locale": "de"}}
 65        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
 66        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
 67        session = self.client.session
 68        session[SESSION_KEY_PLAN] = plan
 69        session.save()
 70
 71        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
 72        response = self.client.get(url)
 73        self.assertEqual(response.status_code, 200)
 74        self.assertEqual(len(mail.outbox), 1)
 75        self.assertEqual(mail.outbox[0].subject, "authentik")
 76        self.assertNotIn("Password Reset", mail.outbox[0].alternatives[0][0])
 77
 78    def test_without_user(self):
 79        """Test without pending user"""
 80        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
 81        session = self.client.session
 82        session[SESSION_KEY_PLAN] = plan
 83        session.save()
 84
 85        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
 86        response = self.client.get(url)
 87        self.assertEqual(response.status_code, 200)
 88
 89    @patch(
 90        "authentik.stages.email.models.EmailStage.backend_class",
 91        PropertyMock(return_value=EmailBackend),
 92    )
 93    def test_pending_user(self):
 94        """Test with pending user"""
 95        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
 96        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
 97        session = self.client.session
 98        session[SESSION_KEY_PLAN] = plan
 99        session.save()
100
101        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
102        response = self.client.post(url)
103        self.assertEqual(response.status_code, 200)
104        self.assertEqual(len(mail.outbox), 1)
105        self.assertEqual(mail.outbox[0].subject, "authentik")
106        self.assertEqual(mail.outbox[0].to, [f"{self.user.name} <{self.user.email}>"])
107
108    @patch(
109        "authentik.stages.email.models.EmailStage.backend_class",
110        PropertyMock(return_value=EmailBackend),
111    )
112    def test_pending_user_override(self):
113        """Test with pending user (override to)"""
114        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
115        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
116        plan.context[PLAN_CONTEXT_EMAIL_OVERRIDE] = "foo@bar.baz"
117        session = self.client.session
118        session[SESSION_KEY_PLAN] = plan
119        session.save()
120
121        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
122        response = self.client.post(url)
123        self.assertEqual(response.status_code, 200)
124        self.assertEqual(len(mail.outbox), 1)
125        self.assertEqual(mail.outbox[0].subject, "authentik")
126        self.assertEqual(mail.outbox[0].to, [f"{self.user.name} <foo@bar.baz>"])
127
128    @patch(
129        "authentik.stages.email.models.EmailStage.backend_class",
130        PropertyMock(return_value=SMTPEmailBackend),
131    )
132    def test_use_global_settings(self):
133        """Test use_global_settings"""
134        host = "some-unique-string"
135        with CONFIG.patch("email.host", host):
136            self.assertEqual(EmailStage(use_global_settings=True).backend.host, host)
137
138    def test_token(self):
139        """Test with token"""
140        # Make sure token exists
141        self.test_pending_user()
142        self.user.is_active = False
143        self.user.save()
144        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
145        session = self.client.session
146        session[SESSION_KEY_PLAN] = plan
147        session.save()
148        token: FlowToken = FlowToken.objects.get(user=self.user)
149
150        with patch("authentik.flows.views.executor.FlowExecutorView.cancel", MagicMock()):
151            # Call the executor shell to preseed the session
152            url = reverse(
153                "authentik_api:flow-executor",
154                kwargs={"flow_slug": self.flow.slug},
155            )
156            url_query = urlencode(
157                {
158                    QS_KEY_TOKEN: token.key,
159                }
160            )
161            url += f"?query={url_query}"
162            self.client.get(url)
163
164            # Call the actual executor to get the JSON Response
165            response = self.client.get(
166                reverse(
167                    "authentik_api:flow-executor",
168                    kwargs={"flow_slug": self.flow.slug},
169                )
170            )
171            self.assertStageResponse(response, self.flow, component="ak-stage-consent")
172            response = self.client.post(
173                reverse(
174                    "authentik_api:flow-executor",
175                    kwargs={"flow_slug": self.flow.slug},
176                ),
177                data={
178                    "token": self.get_flow_plan().context[PLAN_CONTEXT_CONSENT_TOKEN],
179                },
180                follow=True,
181            )
182
183            self.assertEqual(response.status_code, 200)
184            self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
185
186            session = self.client.session
187            plan: FlowPlan = session[SESSION_KEY_PLAN]
188            self.assertEqual(plan.context[PLAN_CONTEXT_PENDING_USER], self.user)
189            self.assertTrue(plan.context[PLAN_CONTEXT_PENDING_USER].is_active)
190
191    def test_token_invalid_user(self):
192        """Test with token with invalid user"""
193        # Make sure token exists
194        self.test_pending_user()
195        self.user.is_active = False
196        self.user.save()
197        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
198        session = self.client.session
199        session[SESSION_KEY_PLAN] = plan
200        session.save()
201        # Set flow token user to a different user
202        token: FlowToken = FlowToken.objects.get(user=self.user)
203        token.user = create_test_admin_user()
204        token.revoke_on_execution = True
205        token.save()
206
207        with patch("authentik.flows.views.executor.FlowExecutorView.cancel", MagicMock()):
208            # Call the executor shell to preseed the session
209            url = reverse(
210                "authentik_api:flow-executor",
211                kwargs={"flow_slug": self.flow.slug},
212            )
213            url_query = urlencode(
214                {
215                    QS_KEY_TOKEN: token.key,
216                }
217            )
218            url += f"?query={url_query}"
219            self.client.get(url)
220
221            # Call the actual executor to get the JSON Response
222            response = self.client.get(
223                reverse(
224                    "authentik_api:flow-executor",
225                    kwargs={"flow_slug": self.flow.slug},
226                )
227            )
228
229            self.assertEqual(response.status_code, 200)
230            self.assertStageResponse(response, component="ak-stage-access-denied")
231
232    def test_url_no_params(self):
233        """Test generation of the URL in the EMail"""
234        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
235        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
236        session = self.client.session
237        session[SESSION_KEY_PLAN] = plan
238        session.save()
239
240        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
241        request = self.factory.get(url)
242        stage_view = EmailStageView(
243            FlowExecutorView(
244                request=request,
245                flow=self.flow,
246            ),
247            request=request,
248        )
249        self.assertEqual(stage_view.get_full_url(), f"http://testserver/if/flow/{self.flow.slug}/")
250
251    def test_url_our_params(self):
252        """Test that all of our parameters are passed to the URL correctly"""
253        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
254        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
255        session = self.client.session
256        session[SESSION_KEY_PLAN] = plan
257        session.save()
258
259        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
260        request = self.factory.get(url)
261        stage_view = EmailStageView(
262            FlowExecutorView(
263                request=request,
264                flow=self.flow,
265            ),
266            request=request,
267        )
268        token = generate_id()
269        self.assertEqual(
270            stage_view.get_full_url(**{QS_KEY_TOKEN: token}),
271            f"http://testserver/if/flow/{self.flow.slug}/?flow_token={token}",
272        )
273
274    def test_url_existing_params(self):
275        """Test to ensure that URL params are preserved in the URL being sent"""
276        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
277        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
278        session = self.client.session
279        session[SESSION_KEY_PLAN] = plan
280        session.save()
281
282        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
283        url += "?query=" + urlencode({"foo": "bar"})
284        request = self.factory.get(url)
285        stage_view = EmailStageView(
286            FlowExecutorView(
287                request=request,
288                flow=self.flow,
289            ),
290            request=request,
291        )
292        token = generate_id()
293        self.assertEqual(
294            stage_view.get_full_url(**{QS_KEY_TOKEN: token}),
295            f"http://testserver/if/flow/{self.flow.slug}/?foo=bar&flow_token={token}",
296        )
297
298    def test_get_cache_key(self):
299        """Test to ensure that the correct cache key is returned."""
300        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
301        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
302        session = self.client.session
303        session[SESSION_KEY_PLAN] = plan
304        session.save()
305
306        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
307        request = self.factory.post(url)
308        request.user = self.user
309        request.session = session
310
311        executor = FlowExecutorView(request=request, flow=self.flow)
312        executor.plan = plan
313
314        stage_view = EmailStageView(executor, request=request)
315
316        cache_key = stage_view._get_cache_key()
317
318        expected_hash = sha256(self.user.email.lower().encode("utf-8")).hexdigest()
319        expected_cache_key = "goauthentik.io/stages/email/stage/" + expected_hash
320
321        self.assertEqual(cache_key, expected_cache_key)
322
323    def test_is_rate_limited_returns_none(self):
324        """Test to ensure None is returned if the request shouldn't be rate limited."""
325        self.stage.recovery_max_attempts = 2
326        self.stage.recovery_cache_timeout = "minutes=10"
327        self.stage.save()
328
329        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
330        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
331        session = self.client.session
332        session[SESSION_KEY_PLAN] = plan
333        session.save()
334
335        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
336        request = self.factory.post(url)
337        request.user = self.user
338        request.session = session
339
340        executor = FlowExecutorView(request=request, flow=self.flow)
341        executor.current_stage = self.stage
342        executor.plan = plan
343
344        stage_view = EmailStageView(executor, request=request)
345
346        result = stage_view._is_rate_limited()
347        self.assertIsNone(result)
348
349    def test_is_rate_limited_returns_remaining_time(self):
350        """Test to ensure the remaining time is returned if the request
351        should be rate limited."""
352        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
353        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
354        session = self.client.session
355        session[SESSION_KEY_PLAN] = plan
356        session.save()
357
358        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
359        request = self.factory.post(url)
360        request.user = self.user
361        request.session = session
362
363        executor = FlowExecutorView(request=request, flow=self.flow)
364        executor.current_stage = self.stage
365        executor.plan = plan
366
367        stage_view = EmailStageView(executor, request=request)
368
369        test_cases = [
370            # 2 attempts within 2 minutes
371            (2, "seconds=120", 2),
372            # 4 attempts within 5 minutes
373            (4, "minutes=5", 5),
374            # 6 attempts within 5 minutes. Although 299 seconds is less than
375            # 5 minutes, the user is intentionally shown "5 minutes". This is
376            # because an initial rate limiting message like "Try again after 4 minutes"
377            # can be confusing.
378            (6, "seconds=299", 5),
379        ]
380        for test_case in test_cases:
381            max_attempts, cache_timeout, minutes_remaining = test_case
382            with self.subTest(
383                f"Test recovery with {max_attempts} max attempts and "
384                f"{cache_timeout} cache timeout seconds"
385            ):
386                self.stage.recovery_max_attempts = max_attempts
387                self.stage.recovery_cache_timeout = cache_timeout
388                self.stage.save()
389
390                # Simulate multiple requests
391                for _ in range(max_attempts):
392                    stage_view._is_rate_limited()
393
394                # The following request should be rate-limited
395                result = stage_view._is_rate_limited()
396
397                self.assertEqual(result, minutes_remaining)
398
399    def _challenge_invalid_helper(self):
400        """Helper to test the challenge_invalid() method."""
401        self.stage.recovery_max_attempts = 1
402        self.stage.recovery_cache_timeout = "seconds=300"
403        self.stage.save()
404
405        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
406        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
407        session = self.client.session
408        session[SESSION_KEY_PLAN] = plan
409        session.save()
410
411        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
412        request = self.factory.get(url, user=self.user)
413        request.session = session
414
415        request.brand = Brand.objects.create(domain="foo-domain.com", default=True)
416
417        executor = FlowExecutorView(request=request, flow=self.flow)
418        executor.current_stage = self.stage
419        executor.plan = plan
420
421        stage_view = EmailStageView(executor, request=request)
422        challenge_response = stage_view.get_response_instance(data={})
423        challenge_response.is_valid()
424
425        return challenge_response, stage_view, request
426
427    def test_challenge_invalid_not_rate_limited(self):
428        """Tests that the request is not rate limited and email is sent."""
429        challenge_response, stage_view, request = self._challenge_invalid_helper()
430
431        with patch.object(stage_view, "send_email") as mock_send_email:
432            result = stage_view.challenge_invalid(challenge_response)
433
434            self.assertEqual(result.status_code, 200)
435
436            mock_send_email.assert_called_once()
437
438            message_list = list(messages.get_messages(request))
439            self.assertEqual(len(message_list), 1)
440            self.assertEqual(
441                "Email Successfully sent.",
442                message_list[-1].message,
443            )
444
445    def test_challenge_invalid_returns_error_if_rate_limited(self):
446        """Tests that an error is returned if the request is rate limited. Ensure
447        that an email is not sent."""
448        challenge_response, stage_view, request = self._challenge_invalid_helper()
449
450        # Initial request that shouldn't be rate limited
451        stage_view.challenge_invalid(challenge_response)
452
453        with patch.object(stage_view, "send_email") as mock_send_email:
454            # This next request should be rate limited
455            result = stage_view.challenge_invalid(challenge_response)
456
457            self.assertEqual(result.status_code, 200)
458
459            mock_send_email.assert_not_called()
460
461            message_list = list(messages.get_messages(request))
462            self.assertEqual(len(message_list), 2)
463            self.assertEqual(
464                "Too many account verification attempts. Please try again after 5 minutes.",
465                message_list[-1].message,
466            )

Email tests

def setUp(self):
31    def setUp(self):
32        super().setUp()
33        self.factory = RequestFactory()
34        self.user = create_test_admin_user()
35        self.flow = create_test_flow(FlowDesignation.AUTHENTICATION)
36        self.stage = EmailStage.objects.create(
37            name="email",
38            activate_user_on_success=True,
39        )
40        self.binding = FlowStageBinding.objects.create(target=self.flow, stage=self.stage, order=2)

Hook method for setting up the test fixture before exercising it.

@patch('authentik.stages.email.models.EmailStage.backend_class', PropertyMock(return_value=EmailBackend))
def test_rendering(self):
42    @patch(
43        "authentik.stages.email.models.EmailStage.backend_class",
44        PropertyMock(return_value=EmailBackend),
45    )
46    def test_rendering(self):
47        """Test with pending user"""
48        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
49        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
50        session = self.client.session
51        session[SESSION_KEY_PLAN] = plan
52        session.save()
53
54        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
55        response = self.client.get(url)
56        self.assertEqual(response.status_code, 200)

Test with pending user

@patch('authentik.stages.email.models.EmailStage.backend_class', PropertyMock(return_value=EmailBackend))
def test_rendering_locale(self):
58    @patch(
59        "authentik.stages.email.models.EmailStage.backend_class",
60        PropertyMock(return_value=EmailBackend),
61    )
62    def test_rendering_locale(self):
63        """Test with pending user"""
64        self.user.attributes = {"settings": {"locale": "de"}}
65        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
66        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
67        session = self.client.session
68        session[SESSION_KEY_PLAN] = plan
69        session.save()
70
71        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
72        response = self.client.get(url)
73        self.assertEqual(response.status_code, 200)
74        self.assertEqual(len(mail.outbox), 1)
75        self.assertEqual(mail.outbox[0].subject, "authentik")
76        self.assertNotIn("Password Reset", mail.outbox[0].alternatives[0][0])

Test with pending user

def test_without_user(self):
78    def test_without_user(self):
79        """Test without pending user"""
80        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
81        session = self.client.session
82        session[SESSION_KEY_PLAN] = plan
83        session.save()
84
85        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
86        response = self.client.get(url)
87        self.assertEqual(response.status_code, 200)

Test without pending user

@patch('authentik.stages.email.models.EmailStage.backend_class', PropertyMock(return_value=EmailBackend))
def test_pending_user(self):
 89    @patch(
 90        "authentik.stages.email.models.EmailStage.backend_class",
 91        PropertyMock(return_value=EmailBackend),
 92    )
 93    def test_pending_user(self):
 94        """Test with pending user"""
 95        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
 96        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
 97        session = self.client.session
 98        session[SESSION_KEY_PLAN] = plan
 99        session.save()
100
101        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
102        response = self.client.post(url)
103        self.assertEqual(response.status_code, 200)
104        self.assertEqual(len(mail.outbox), 1)
105        self.assertEqual(mail.outbox[0].subject, "authentik")
106        self.assertEqual(mail.outbox[0].to, [f"{self.user.name} <{self.user.email}>"])

Test with pending user

@patch('authentik.stages.email.models.EmailStage.backend_class', PropertyMock(return_value=EmailBackend))
def test_pending_user_override(self):
108    @patch(
109        "authentik.stages.email.models.EmailStage.backend_class",
110        PropertyMock(return_value=EmailBackend),
111    )
112    def test_pending_user_override(self):
113        """Test with pending user (override to)"""
114        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
115        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
116        plan.context[PLAN_CONTEXT_EMAIL_OVERRIDE] = "foo@bar.baz"
117        session = self.client.session
118        session[SESSION_KEY_PLAN] = plan
119        session.save()
120
121        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
122        response = self.client.post(url)
123        self.assertEqual(response.status_code, 200)
124        self.assertEqual(len(mail.outbox), 1)
125        self.assertEqual(mail.outbox[0].subject, "authentik")
126        self.assertEqual(mail.outbox[0].to, [f"{self.user.name} <foo@bar.baz>"])

Test with pending user (override to)

@patch('authentik.stages.email.models.EmailStage.backend_class', PropertyMock(return_value=SMTPEmailBackend))
def test_use_global_settings(self):
128    @patch(
129        "authentik.stages.email.models.EmailStage.backend_class",
130        PropertyMock(return_value=SMTPEmailBackend),
131    )
132    def test_use_global_settings(self):
133        """Test use_global_settings"""
134        host = "some-unique-string"
135        with CONFIG.patch("email.host", host):
136            self.assertEqual(EmailStage(use_global_settings=True).backend.host, host)

Test use_global_settings

def test_token(self):
138    def test_token(self):
139        """Test with token"""
140        # Make sure token exists
141        self.test_pending_user()
142        self.user.is_active = False
143        self.user.save()
144        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
145        session = self.client.session
146        session[SESSION_KEY_PLAN] = plan
147        session.save()
148        token: FlowToken = FlowToken.objects.get(user=self.user)
149
150        with patch("authentik.flows.views.executor.FlowExecutorView.cancel", MagicMock()):
151            # Call the executor shell to preseed the session
152            url = reverse(
153                "authentik_api:flow-executor",
154                kwargs={"flow_slug": self.flow.slug},
155            )
156            url_query = urlencode(
157                {
158                    QS_KEY_TOKEN: token.key,
159                }
160            )
161            url += f"?query={url_query}"
162            self.client.get(url)
163
164            # Call the actual executor to get the JSON Response
165            response = self.client.get(
166                reverse(
167                    "authentik_api:flow-executor",
168                    kwargs={"flow_slug": self.flow.slug},
169                )
170            )
171            self.assertStageResponse(response, self.flow, component="ak-stage-consent")
172            response = self.client.post(
173                reverse(
174                    "authentik_api:flow-executor",
175                    kwargs={"flow_slug": self.flow.slug},
176                ),
177                data={
178                    "token": self.get_flow_plan().context[PLAN_CONTEXT_CONSENT_TOKEN],
179                },
180                follow=True,
181            )
182
183            self.assertEqual(response.status_code, 200)
184            self.assertStageRedirects(response, reverse("authentik_core:root-redirect"))
185
186            session = self.client.session
187            plan: FlowPlan = session[SESSION_KEY_PLAN]
188            self.assertEqual(plan.context[PLAN_CONTEXT_PENDING_USER], self.user)
189            self.assertTrue(plan.context[PLAN_CONTEXT_PENDING_USER].is_active)

Test with token

def test_token_invalid_user(self):
191    def test_token_invalid_user(self):
192        """Test with token with invalid user"""
193        # Make sure token exists
194        self.test_pending_user()
195        self.user.is_active = False
196        self.user.save()
197        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
198        session = self.client.session
199        session[SESSION_KEY_PLAN] = plan
200        session.save()
201        # Set flow token user to a different user
202        token: FlowToken = FlowToken.objects.get(user=self.user)
203        token.user = create_test_admin_user()
204        token.revoke_on_execution = True
205        token.save()
206
207        with patch("authentik.flows.views.executor.FlowExecutorView.cancel", MagicMock()):
208            # Call the executor shell to preseed the session
209            url = reverse(
210                "authentik_api:flow-executor",
211                kwargs={"flow_slug": self.flow.slug},
212            )
213            url_query = urlencode(
214                {
215                    QS_KEY_TOKEN: token.key,
216                }
217            )
218            url += f"?query={url_query}"
219            self.client.get(url)
220
221            # Call the actual executor to get the JSON Response
222            response = self.client.get(
223                reverse(
224                    "authentik_api:flow-executor",
225                    kwargs={"flow_slug": self.flow.slug},
226                )
227            )
228
229            self.assertEqual(response.status_code, 200)
230            self.assertStageResponse(response, component="ak-stage-access-denied")

Test with token with invalid user

def test_url_no_params(self):
232    def test_url_no_params(self):
233        """Test generation of the URL in the EMail"""
234        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
235        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
236        session = self.client.session
237        session[SESSION_KEY_PLAN] = plan
238        session.save()
239
240        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
241        request = self.factory.get(url)
242        stage_view = EmailStageView(
243            FlowExecutorView(
244                request=request,
245                flow=self.flow,
246            ),
247            request=request,
248        )
249        self.assertEqual(stage_view.get_full_url(), f"http://testserver/if/flow/{self.flow.slug}/")

Test generation of the URL in the EMail

def test_url_our_params(self):
251    def test_url_our_params(self):
252        """Test that all of our parameters are passed to the URL correctly"""
253        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
254        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
255        session = self.client.session
256        session[SESSION_KEY_PLAN] = plan
257        session.save()
258
259        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
260        request = self.factory.get(url)
261        stage_view = EmailStageView(
262            FlowExecutorView(
263                request=request,
264                flow=self.flow,
265            ),
266            request=request,
267        )
268        token = generate_id()
269        self.assertEqual(
270            stage_view.get_full_url(**{QS_KEY_TOKEN: token}),
271            f"http://testserver/if/flow/{self.flow.slug}/?flow_token={token}",
272        )

Test that all of our parameters are passed to the URL correctly

def test_url_existing_params(self):
274    def test_url_existing_params(self):
275        """Test to ensure that URL params are preserved in the URL being sent"""
276        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
277        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
278        session = self.client.session
279        session[SESSION_KEY_PLAN] = plan
280        session.save()
281
282        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
283        url += "?query=" + urlencode({"foo": "bar"})
284        request = self.factory.get(url)
285        stage_view = EmailStageView(
286            FlowExecutorView(
287                request=request,
288                flow=self.flow,
289            ),
290            request=request,
291        )
292        token = generate_id()
293        self.assertEqual(
294            stage_view.get_full_url(**{QS_KEY_TOKEN: token}),
295            f"http://testserver/if/flow/{self.flow.slug}/?foo=bar&flow_token={token}",
296        )

Test to ensure that URL params are preserved in the URL being sent

def test_get_cache_key(self):
298    def test_get_cache_key(self):
299        """Test to ensure that the correct cache key is returned."""
300        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
301        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
302        session = self.client.session
303        session[SESSION_KEY_PLAN] = plan
304        session.save()
305
306        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
307        request = self.factory.post(url)
308        request.user = self.user
309        request.session = session
310
311        executor = FlowExecutorView(request=request, flow=self.flow)
312        executor.plan = plan
313
314        stage_view = EmailStageView(executor, request=request)
315
316        cache_key = stage_view._get_cache_key()
317
318        expected_hash = sha256(self.user.email.lower().encode("utf-8")).hexdigest()
319        expected_cache_key = "goauthentik.io/stages/email/stage/" + expected_hash
320
321        self.assertEqual(cache_key, expected_cache_key)

Test to ensure that the correct cache key is returned.

def test_is_rate_limited_returns_none(self):
323    def test_is_rate_limited_returns_none(self):
324        """Test to ensure None is returned if the request shouldn't be rate limited."""
325        self.stage.recovery_max_attempts = 2
326        self.stage.recovery_cache_timeout = "minutes=10"
327        self.stage.save()
328
329        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
330        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
331        session = self.client.session
332        session[SESSION_KEY_PLAN] = plan
333        session.save()
334
335        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
336        request = self.factory.post(url)
337        request.user = self.user
338        request.session = session
339
340        executor = FlowExecutorView(request=request, flow=self.flow)
341        executor.current_stage = self.stage
342        executor.plan = plan
343
344        stage_view = EmailStageView(executor, request=request)
345
346        result = stage_view._is_rate_limited()
347        self.assertIsNone(result)

Test to ensure None is returned if the request shouldn't be rate limited.

def test_is_rate_limited_returns_remaining_time(self):
349    def test_is_rate_limited_returns_remaining_time(self):
350        """Test to ensure the remaining time is returned if the request
351        should be rate limited."""
352        plan = FlowPlan(flow_pk=self.flow.pk.hex, bindings=[self.binding], markers=[StageMarker()])
353        plan.context[PLAN_CONTEXT_PENDING_USER] = self.user
354        session = self.client.session
355        session[SESSION_KEY_PLAN] = plan
356        session.save()
357
358        url = reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug})
359        request = self.factory.post(url)
360        request.user = self.user
361        request.session = session
362
363        executor = FlowExecutorView(request=request, flow=self.flow)
364        executor.current_stage = self.stage
365        executor.plan = plan
366
367        stage_view = EmailStageView(executor, request=request)
368
369        test_cases = [
370            # 2 attempts within 2 minutes
371            (2, "seconds=120", 2),
372            # 4 attempts within 5 minutes
373            (4, "minutes=5", 5),
374            # 6 attempts within 5 minutes. Although 299 seconds is less than
375            # 5 minutes, the user is intentionally shown "5 minutes". This is
376            # because an initial rate limiting message like "Try again after 4 minutes"
377            # can be confusing.
378            (6, "seconds=299", 5),
379        ]
380        for test_case in test_cases:
381            max_attempts, cache_timeout, minutes_remaining = test_case
382            with self.subTest(
383                f"Test recovery with {max_attempts} max attempts and "
384                f"{cache_timeout} cache timeout seconds"
385            ):
386                self.stage.recovery_max_attempts = max_attempts
387                self.stage.recovery_cache_timeout = cache_timeout
388                self.stage.save()
389
390                # Simulate multiple requests
391                for _ in range(max_attempts):
392                    stage_view._is_rate_limited()
393
394                # The following request should be rate-limited
395                result = stage_view._is_rate_limited()
396
397                self.assertEqual(result, minutes_remaining)

Test to ensure the remaining time is returned if the request should be rate limited.

def test_challenge_invalid_not_rate_limited(self):
427    def test_challenge_invalid_not_rate_limited(self):
428        """Tests that the request is not rate limited and email is sent."""
429        challenge_response, stage_view, request = self._challenge_invalid_helper()
430
431        with patch.object(stage_view, "send_email") as mock_send_email:
432            result = stage_view.challenge_invalid(challenge_response)
433
434            self.assertEqual(result.status_code, 200)
435
436            mock_send_email.assert_called_once()
437
438            message_list = list(messages.get_messages(request))
439            self.assertEqual(len(message_list), 1)
440            self.assertEqual(
441                "Email Successfully sent.",
442                message_list[-1].message,
443            )

Tests that the request is not rate limited and email is sent.

def test_challenge_invalid_returns_error_if_rate_limited(self):
445    def test_challenge_invalid_returns_error_if_rate_limited(self):
446        """Tests that an error is returned if the request is rate limited. Ensure
447        that an email is not sent."""
448        challenge_response, stage_view, request = self._challenge_invalid_helper()
449
450        # Initial request that shouldn't be rate limited
451        stage_view.challenge_invalid(challenge_response)
452
453        with patch.object(stage_view, "send_email") as mock_send_email:
454            # This next request should be rate limited
455            result = stage_view.challenge_invalid(challenge_response)
456
457            self.assertEqual(result.status_code, 200)
458
459            mock_send_email.assert_not_called()
460
461            message_list = list(messages.get_messages(request))
462            self.assertEqual(len(message_list), 2)
463            self.assertEqual(
464                "Too many account verification attempts. Please try again after 5 minutes.",
465                message_list[-1].message,
466            )

Tests that an error is returned if the request is rate limited. Ensure that an email is not sent.